本文共 3416 字,大约阅读时间需要 11 分钟。
[酷玩 Spark] Structured Streaming 源码解析系列 ,返回目录请
技术团队荣誉出品
本文内容适用范围:* 2017.07.11 update, Spark 2.2 全系列 √ (已发布:2.2.0)* 2017.10.02 update, Spark 2.1 全系列 √ (已发布:2.1.0, 2.1.1, 2.1.2)
阅读本文前,请一定先阅读 一文,其中概述了 Structured Streaming 的实现思路(包括 StreamExecution, Source, Sink 等在 Structured Streaming 里的作用),有了全局概念后再看本文的细节解释。
Structured Streaming 非常显式地提出了输入(Source)、执行(StreamExecution)、输出(Sink)的 3 个组件,并且在每个组件显式地做到 fault-tolerant,由此得到整个 streaming 程序的 end-to-end exactly-once guarantees.
具体到源码上,Source 是一个抽象的接口 [1],包括了 Structured Streaming 实现 end-to-end exactly-once 处理所一定需要提供的功能(我们将马上详细解析这些方法):
trait Source { /* 方法 (1) */ def schema: StructType /* 方法 (2) */ def getOffset: Option[Offset] /* 方法 (3) */ def getBatch(start: Option[Offset], end: Offset): DataFrame /* 方法 (4) */ def commit(end: Offset) : Unit = {} /* 方法 (5) */ def stop(): Unit}
相比而言,前作 Spark Streaming 的输入 InputDStream 抽象 [2] 并不强制要求可靠和可重放,因而也存在一些不可靠输入源(如 Receiver-based 输入源),在失效情形下丢失源头输入数据;这时即使 Spark Streaming 框架本身能够重做,但由于源头数据已经不存在了,也会导致计算本身不是 exactly-once 的。当然,Spark Streaming 对可靠的数据源如 HDFS, Kafka 等的计算给出的 guarantee 还是 exactly-once。
进化到 Structured Streaming 后,只保留对 可靠数据源 的支持:
在 Structured Streaming 里,由 StreamExecution 作为持续查询的驱动器,分批次不断地:
def getOffset: Option[Offset]
,即方法 (2)getOffset()
实现 ,会通过在 driver 端的一个长时运行的 consumer 从 kafka brokers 处获取到各个 topic 最新的 offsets(注意这里不存在 driver 或 consumer 直接连 zookeeper),比如 topicA_partition1:300, topicB_partition1:50, topicB_partition2:60
,并把 offsets 返回getOffset()
实现,是先扫描一下最新的一组文件,给一个递增的编号并持久化下来,比如 2 -> {c.txt, d.txt}
,然后把编号 2
作为最新的 offset 返回(start, end]
区间范围内的数据 def getBatch(start: Option[Offset], end: Offset): DataFrame
,即方法 (3)def commit(end: Offset): Unit
,即方法 (4)commit()
方法主要是帮助 Source 完成 garbage-collection,如果外部数据源本身即具有 garbage-collection 功能,如 Kafka,那么在 Source 的具体 commit()
实现上即可为空、留给外部数据源去自己管理到此,是解析了 Source 的方法 (2) (3) (4) 在 StreamExecution 的具体批次执行中,所需要实现的语义和被调用的过程。
另外还有方法 (1) 和 (5):
def schema: StructType
def stop(): Unit
我们总结一下截至目前,Source 已有的具体实现:
Sources | 是否可重放 | 原生内置支持 | 注解 |
---|---|---|---|
HDFS-compatible file system | 已支持 | 包括但不限于 text, json, csv, parquet, orc, ... | |
Kafka | 已支持 | Kafka 0.10.0+ | |
RateStream | 已支持 | 以一定速率产生数据 | |
Socket | 已支持 | 主要用途是在技术会议/讲座上做 demo |
这里我们特别强调一下,虽然 Structured Streaming 也内置了 socket
这个 Source,但它并不能可靠重放、因而也不符合 Structured Streaming 的结构体系。它的主要用途只是在技术会议/讲座上做 demo,不应用于线上生产系统。
转载地址:http://xfvmi.baihongyu.com/