前言
说起Spark,大家就会自然而然地想到Flink,而且会不自觉地将这两种主流的大数据实时处理技术进行比较。然后最终得出结论:Flink实时性大于Spark。
的确,Flink中的数据计算是以事件为驱动的,所以来一条数据就会触发一次计算,而Spark基于数据集RDD计算,RDD最小生成间隔就是50毫秒,所以Spark就被定义为亚实时计算。
窗口Window
这里的RDD就是“天然的窗口”,将RDD生成的时间间隔设置成1min,那么这个RDD就可以理解为“1min窗口”。所以如果想要窗口计算,首选Spark。
但当需要对即临近时间窗口进行计算时,必须借助滑动窗口的算子来实现。
临近时间如何理解
例如“3分钟内”这种时间范围描述。这种时间范围的计算,需要计算历史的数据。例如1 ~ 3是3min,2 ~ 4也是3min,这里就重复使用了2和3的数据,依次类推,3 ~ 5也是3min,同样也重复使用了3和4。
如果使用普通窗口,就无法满足“最近3分钟内”这种时间概念。
很多窗口都丢失了临近时间,例如第3个RDD的临近时间其实是第二个RDD,但是他们就没法在一起计算,这就是为什么不用普通窗口的原因。
滑动窗口
滑动窗口三要素:RDD的生成时间、窗口的长度、滑动的步长。
我在本次实践中,将RDD的时间间隔设置为10s,窗口长度为30s、滑动步长为10s。也就是说每10s就会生成一个窗口,计算最近30s内的数据,每个窗口由3个RDD组成。
数据源构建
1. 数据规范
假设我们采集了设备的指标信息,这里我们只关注吞吐量和响应时间,在采集之前定义数据字段和规范[throughput, response_time],这里都定义成int类型,响应时间单位这里定义成毫秒ms。
实际情况中,我们不可能只采集一台设备,如果我们想要得出每台或者每个种类设备的指标监控,就要在采集数据的时候对每个设备加上唯一ID或者TypeID。
我这里的想法是对每台设备的指标进行分析,所以我给每个设备都增加了一个唯一ID,最终字段[id, throughput, response_time],所以我们就按照这个数据格式,在SparkStreaming中构建数据源读取部分。
2. 读取kafka
代码语言:scala
复制
val conf = new SparkConf().setAppName("aqi").setMaster("local[1]") val ssc = new StreamingContext(conf, Seconds(10)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "121.91.168.193:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "aqi", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (true: java.lang.Boolean) ) val topics = Array("evt_monitor") val stream: DStream[String] = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ).map(_.value)
这里我们将一个RDD时间间隔设置为10S,因为使用的是笔记本跑,所以这里要将Master设置为local,表示本地运行模式,1代表使用1个线程。
我们使用Kafka作为数据源,在读取时就要构建Consumer的config,像bootstrap.servers这些基本配置没有什么好说的,关键的是auto.offset.reset和enable.auto.commit,
这两个参数分表控制读取topic消费策略和是否提交offset。这里的earliest会从topic中现存最早的数据开始消费,latest是最新的位置开始消费。
当重启程序时,这两种消费模式又被enable.auto.commit控制,设置true提交offset时,earliest和latest不再生效,都是从消费组记录的offset进行消费。设置为false不提交offset,offset不被提交记录earliest还是从topic中现存最早的数据开始消费,latest还是从最新的数据消费。
最后就是设置要读取的topic和创建Kafka的DStream数据流。至此,整个数据源的读取就已经完成了,下面就是对数据处理逻辑的开发。
3. 指标聚合计算
代码语言:scala
复制
stream.map(x => { val s = x.split(",") (s(0), (s(2).toInt, 1)) }).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) .reduceByKeyAndWindow((x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2), Seconds(30), Seconds(10)) .foreachRDD(rdd => { rdd.foreach(x => { val id = x._1 val responseTimes = x._2._1 val num = x._2._2 val responseTime_avg = responseTimes / num println(id, responseTime_avg) }) })
我们从自身需求出发,来构思程序逻辑的开发。从需求看,关键字无非是最近一段时间内、平均值。想要取一段时间内的数据,就要使用滑动窗口,以当前时间为基准,向前圈定时间范围。
而平均值,无非就是将时间范围内,即窗口所有的响应时间加起来,然后除以数据条数即可。想要把所有的响应时间加起来,这里使用reduceByKey() 将窗口内相同ID的设备时间相加,将数据条数进行相加。
所以我在第一步切分数据的时候,就将数据切分成KV的元组形式,V有两个字段,第一个是响应时间,第二个1表示一条数据。reduceByKey一共分为两步,第一是RDD内的reduceByKey,这也算是数据的预处理,RDD的数据只会计算一次,当这个RDD被多个窗口使用,就不会重复计算了。第二步是基于窗口的reduceByKey,将窗口所有RDD的数据再一次聚合,最后在foreachRDD中获取输出
4. 验证结果
我们向kafka的evt_monitor这个topic中写入数据。
备注:(最后11那个id是终端显示问题,其实是1),然后可以输出平均值。
验证结果是没有问题的,换个角度,我们也可以从DAG来看。
这个窗口一共计算了3个RDD,其中左侧的两个是灰色的,上面是skipped标识,代表着这两个RDD在上一个窗口已经计算完成了,在这个窗口只需要计算当前的RDD,然后再一起对RDD的结果数据进行窗口计算。
结语
本篇文章主要是利用Spark的滑动窗口,做了一个计算平均响应时长的应用场景,以Kafka作为数据源、通过滑动窗口和reduceByKey算子得以实现。同时,开发Spark还是强烈推荐scala,整个程序看起来没有任何多余的部分。
最后对于Spark和Flink的选型看法,Spark的确是在实时性上比Flink差一些,但是Spark对于窗口计算还是有优势的。所以对于每种技术,也不用人云亦云,适合自己的才是最好的。
审核编辑 黄宇
-
RDD
+关注
关注
0文章
7浏览量
7972 -
实时监控
+关注
关注
1文章
88浏览量
13577 -
SPARK
+关注
关注
1文章
105浏览量
19893
发布评论请先 登录
相关推荐
评论