用于传感器分析的KSQL UDF。利用KSQL的新的API特性,用Java轻松地构建UDF / UDAF函数,从而使用Apache Kafka进行连续流处理。用例:联网汽车——使用深度学习的实时流媒体分析。
我为混合机器学习基础设施构建了一个场景,利用Apache Kafka作为可伸缩的中枢神经系统。使用公共云在极端尺度下训练分析模型(如通过谷歌ML引擎在谷歌云平台(GCP)上使用TensorFlow和TPUs。预测(即模型推断)是在本地Kafka基础设施的边缘前提下执行的(例如利用Kafka流或KSQL进行流分析)。
这篇文章的重点是在前提部署。我用KSQL UDF创建了一个用于传感器分析的Github项目。它利用KSQL的新API特性轻松地使用Java构建UDF / UDAF函数,对传入事件进行连续流处理。
用例:联网汽车——使用深度学习的实时流媒体分析
连续处理来自连接设备(本例中的汽车传感器)的数百万个事件:
我建立了不同的分析模型。他们在公共云上接受训练,利用TensorFlow、H2O和谷歌ML引擎。模型创建不是这个示例的重点。最终的模型已经准备好投入生产,并可以部署进行实时预测。
模型服务可以通过模型服务器或原生嵌入到流处理应用程序中来完成。查看模型部署中RPC与流处理的权衡和“TensorFlow + gRPC + Kafka流”示例。
演示:使用MQTT、Kafka和KSQL在边缘进行模型推断
Github项目生成汽车传感器数据,通过Confluent MQTT代理将其转发到Kafka集群进行KSQL处理和实时分析。
这个项目主要是通过MQTT将数据输入Kafka,通过KSQL对数据进行处理:
Confluent MQTT代理的一大优点是可以简单地实现物联网场景,而不需要MQTT代理。您可以通过MQTT代理直接将消息从MQTT设备转发到Kafka。这大大减少了工作和成本。如果您“只是”希望在Kafka和MQTT设备之间进行通信,那么这是一个完美的解决方案。
如果你想看这个故事的其他部分(与像Elasticsearch / Grafana这样的sink应用的集成),请看看Github项目“KSQL流物联网数据”。通过Kafka Connect和Elastic connector实现了与ElasticSearch和Grafana的集成。
KSQL UDF 源代码
开发udf非常容易。只需在一个UDF类中实现一个Java方法:
下面是KSQL UDF异常检测的完整源代码。(Anomaly Detection KSQL UDF.)
如何运行与Apache Kafka和MQTT代理演示?
在Github项目中描述了执行演示的所有步骤。
您只需要安装Confluent Platform,然后按照以下步骤部署UDF、创建MQTT事件并通过利用分析模型的KSQL处理它们。
我使用mosquito to生成MQTT消息。当然,您也可以使用任何其他MQTT客户机。这就是开放和标准化协议的最大好处。
责任编辑:pj
-
传感器
+关注
关注
2553文章
51511浏览量
757251 -
物联网
+关注
关注
2914文章
45014浏览量
377903 -
机器学习
+关注
关注
66文章
8453浏览量
133164
发布评论请先 登录
相关推荐
华为云 FlexusX 实例下的 Kafka 集群部署实践与性能优化
![华为云 FlexusX 实例下的 <b class='flag-5'>Kafka</b> 集群部署实践与性能优化](https://file1.elecfans.com//web3/M00/05/20/wKgZPGd88qmAOKzdAAEA43MSbXE480.png)
MQTT协议网关的工作原理及功能特性
![<b class='flag-5'>MQTT</b>协议网关的工作原理及功能特性](https://file1.elecfans.com//web2/M00/07/B1/wKgaombqlp6ARrPnAADgy2-cZI8853.jpg)
MQTT网关:物联网中的关键桥梁
![<b class='flag-5'>MQTT</b>网关:物联网中的关键桥梁](https://file1.elecfans.com//web2/M00/04/B3/wKgaombG15aAcAnfAAEFTeFZDvg122.jpg)
知识科普 MQTT Broker 代理 是什么
![知识科普 <b class='flag-5'>MQTT</b> Broker <b class='flag-5'>代理</b> 是什么](https://file1.elecfans.com/web2/M00/FC/9C/wKgZomaU5NOAeXp5AAHyw44uN4w849.png)
来了解一下MQTT Broker代理
![来了解一下<b class='flag-5'>MQTT</b> Broker<b class='flag-5'>代理</b>](https://file1.elecfans.com/web2/M00/FC/9C/wKgZomaU5NOAeXp5AAHyw44uN4w849.png)
PSoC6上的Wi-fi_mqtt_Client出现MQTT连接错误的原因?
面试官:Kafka会丢消息吗?
![面试官:<b class='flag-5'>Kafka</b>会丢消息吗?](https://file1.elecfans.com/web2/M00/DF/45/wKgaomYvahSAbcCrAAAjPDHR2as033.gif)
云服务器apache如何配置解析php文件?
MQTT协议网关解决方案设计与实施
PSoC6上的Wi-fi_mqtt_Client出现MQTT连接错误怎么解决?
Redis流与Kafka相比如何?
![Redis流与<b class='flag-5'>Kafka</b>相比如何?](https://file1.elecfans.com/web2/M00/C1/64/wKgaomXVsI-AZVEhAAAfnbR6DtI221.png)
评论