插曲
screamscope 论文读了好几个月了,把之前分享,今天发出来。
总结:
论文主要是提出一种设计理念 rVertex/rStream, 这个设计理念其实还非常先进。尤其是在低延迟和容错性上思考很多
这个设计理念换了一种角度来思考常见的分布式计算框架(常见思路,主要是考虑DAG 和节点), 将计算拆解为计算单元(rVertex)和管道(rStream)2种抽象, 尤其是管道(rStream)将上下游的关系进行解藕, 对failure 问题上还是解的很漂亮。
核心一些观点:
- 这套系统起源于批处理系统SCOPE, 是对SCOPE 系统一个扩展,并且将原来一些批处理的任务已经迁移到StreamScope 上。已经部署到2万台机器上,每天处理百亿/数十TB 的数据。另外因为它起源于SCOPE, 它应该用sql 解决应用问题上, 会比其他流式框架要更成熟(这个是个人猜测);
- 用户接口上面, 为用户提供申明式语言(类似sql)和udf,
- 用户提交一段类sql,将它编译成logic plan(含一些udf和各种算子),然后用优化器,选择一个最优路径,然后,转化为physical plan,部署到每台机器上, 最后用job manager 来跟踪所有的状态。 很多组件都是用scope的组件完成或稍加改造。 (现代系统大致思路都是这个逻辑)
- 接口抽象上,接口还是适配性非常强; 节点rVertex 提供3个核心接口Load/Execute/GetSnapshot, 这个模型可以适配到绝大部分计算场景。rStream 接口 Write(seq, e)/Register(seq)/GarbageCollect(seq)
- 在容错性上, 首先通过rStream 来将上下游节点进行解藕, 下游的失败不一定需要影响到上游, 然后提供3种策略来,让用户选择。(读者可以看后面的详细介绍)
- 在exactly-once 要求上, 依赖3大部分: 1. 容错机制; 2. sequence number(每条消息配置单调递增的) 3. 确定性, 明确每个节点在状态相同时,接受相同输入时(无论是多实例同时计算或者同一个实例重启后计算),必须产生相同结果
- 性能上, 1. 通过sequence number来减少ack 数据流;2. rStream 的3层设计 (volatile/Reliable/GC)在保障稳定性同时,可以获得一些好的性能。3. 一次读取,可以获取一个小collection
架构
StreamScope 是 SCOPE 批处理系统的流式扩展的设计与实现。StreamScope 深度考虑SCOPE中的架构, compiler, 优化器和任务管理器, 对这些组件进行调整或redesign 以适应流式计算。通过对批处理和流计算的整合在实践中提供显著好处。
在StreamScope 中, 用户程序类似sql。 这个程序倍编译成一个流式 DAG 如图4 所示
为了产生这种DAG, StreamScope 的compiler 执行如下步骤:
- 程序首先被转化为logical plan(DAG), logical plan 中含有StreamScope runtime operators, 如临时join, window aggregates和用户自定义函数。
- StreamScope optimizer 评估各种执行计划, 选择一个评估最低成本的执行计划, 基于可获得的资源, 数据参数如数据流入速度和内部成本模型
- 创建最终的physical plan(DAG), 映射一个logical 节点到适当的physical 节点上,并发执行。 为每个节点生成code, 并部署到集群上。 这些细节类似SCOPE .
整个执行由streaming job manager 进行精心安排:
- 调度节点到并建立DAG 中的channel到不同的机器上
- 监控进程并跟踪snapshots
- 当发现错误时提供容错,并初始化恢复动作。
不像批处理的job manager, 批处理的job manager在不同的时间上调度节点, 流计算job manager在任务的起始阶段就开始调度DAG中所有的节点 。 为了提供容错性和应付动态变更, rVertex 和rStream 用来实现DAG 中的节点和channel, 同job manager协同工作。
编程模型
本章将介绍一个high-level的编程模型概述, 重点描述重要概念,包括数据模型和查询语言。
连续事件流(continuous event StreamScope)。 在StreamScope中, 数据以事件流的形式来表达, 每一个描述一个潜在的无限事件集合, 每一个事件流有一个定义好的schema。 除此之外, 每一个事件有一个事件间隔[Vs, Ve), 描述事件有效的起始时间和终止事件。
像其他的流计算引擎, StreamScope 支持current time increments(CTI) event, 它可以来保证截止到CTI的Vs的event已经发送完毕; 也就意味着,在CTI event 后再没有消息的timestamp小于Vs。 依赖CTI event的算子决定了当前处理事件从而保证流程继续并丢掉失效的状态信息。
Declarative query language. (申明式查询语言), StreamScope 提供一个陈述式查询语言, 这样用户可以编程他们的应用而不用担心分布式环境的一些细节,比如scalability或容错性。 尤其是, StreamScope 扩展了SCOPE 查询语言 来支撑一个full temporal relational algebra(完整所有关系型代数),通过用户自定义函数, aggregator 和算子来进行扩展。
StreamScope支持一套复杂的关系型算子包括 projection, filter, grouping和join 适配到所有语法。 举例来说, 一个inner join 适用于存在重叠时间窗口的事件。 window是另外一个重要概念。 一个window明确定义时间窗口和在这个窗口内的事件子集, 这样在这个子集上可以做aggregate。 StreamScope支持几种时间窗口,比如hopping/tumbling/snapshot 窗口。 例如, hopping window是按照固定大小H进行jump的窗口(大小为S), 一个新的S大小的窗口会被创建每H 单位时间内。
Example.
Figure 1 展示一个实例程序, 他执行连续分析行为在Process上并Alert 事件流。 一个StreamScope程序包含一系列在事件流上申明式查询操作。 Process 事件记录了每一个process并且关联用户,而Alert event记录了每一个alert的信息, 包括谁产生这个alert。 这个程序首先join 2个数据流并关联用户信息到alert,然后用一个hopping window计算每个用户 alert的数量每5秒钟 。
StreamScope 抽象
StreamScope的程序执行可以用一个DAG 来表述, 每一个vertex 在输入数据流上进行本地计算并产生输出数据流。 每一个数据流描述为一个无限顺序的事件, 每个事件包含一个连续增加的sequence number(序列号)。
图2展示了图1的的DAG, 每一个stage的计算会被分区到多个节点上进行并行执行。StreamScope 根据数据量和计算复杂度来决定每个stage的并发程度。
一个节点可以维护一个本地状态。 它的执行从它最初的状态开始并逐步执行。 在每一步, 节点从它的输入源上中消费下一个事件, 更新状态,并可能产生新的事件到它的输出流。 这个节点的执行可以通过一系列的snapshot来跟踪, 每一个snapshot都是一个三元组,包含当前输入流的sequence numbers, 输出流的sequence numbers和它的当前状态。
图3 展示了节点的处理过程, 从snapshot s0 到 s1 (处理了a1), 然后到s2(处理了b1). StreamScope 引入2个抽象 rStream 和rVertex 来实现流和节点。
实现
rStream 概念
不同于节点直接通过网络进行通信, StreamScope引入一个rStream抽象来解藕上下游的节点 通过一些属性来达到错误恢复。
rStream 包含一系列event,这些event包含连续并单调递增的sequence number, 支持多个writer和reader。 一个writer 调用Write(seq, e) 吧sequence number seq加到event中去。 rStream支持多个writer主要是为了允许同一个vertex的2个实例, 它会非常有用,当处理失败和重复执行中的stragglers。
一个reader可以调用Register(seq) 来显示他的感兴趣的event的起始sequence number seq,并开始通过ReadNext来读取数据,ReadNext 会返回下一个batch的带有sequence numbers的事件并调整读取偏移量。在实现中, event可以被push到一个注册的reader中,而不是被pull。 每一个reader可以异步处理events而不用和同个stream的气体readers或writers同步。一个reader同样可以rewind 一个stream通过重新register sequence number。 rStream 同样支持GarbageCollect(seq)来显示所有的小于seq的event不再需要并可以被丢弃。rStream 保留下面属性
- 唯一性(Uniqueness), 每个sequence number是一个唯一值。 当第一个seq的写成功后, 任何后续的关联到seq的写操作都会被扔弃。
- 有效性(Validity)假设一个ReadNext 返回一个带seq的event, 那必然有一个成功的Write(seq, e)
- 可靠性(Reliability), 假设一个write(seq, e)成功了, 任何ReadNext 读取到seq 位置时,都会返回(seq, e)
唯一性保证了每一个sequence number的一致性, 有效性保证每个sequence number的event value的正确性, 可靠性保证写到流的所有event 都是available。 rStream 可以通过一个带有后台存储的可靠的pub/sub 系统来实现, 但StreamScope采用一个更高效的实现,避免时延变长因为在关键路径上把数据序序列化到存储上, 后面会介绍这个实现。
实现rStream
rStream 抽象提供稳定的管道, 允许receiver 可以从任何指定位置进行read。 一个简单直接的实现是持久化和稳定的写数据到底层的Cosmos 分布式文件系统。 这些在关键路径上的同步写会带入显著的延迟。 StreamScope因此使用一个混合schema,将一些关键路径上的写操作移出单提供稳定可靠的channel的效果: event首先在内存中进行缓存起来,可以直接传输给消费方的节点。 内存中的buffer 会异步刷新到 Cosmos 来容错。 当错误发生时, 内存中的buffer会丢失,但当需要时,可以被重新计算。
为了在失败时,可以重新计算丢失的event, stream 跟踪每条消息的计算,类似TimeStream的依赖跟踪或D-Stream的血统跟踪, job manager跟踪节点的snapshots, 它可以用后来来通知如何重现event到输出流中。 一个节点自己决定什么时候获取snapshot, 保存并汇报给job manager。 举例来说,如图5,
节点v4 发送2个更新到job manager, 第一个更新汇报snapshot s1={<2, 7>, <12>, t1}
, 它表示这个节点这个秒时, 消费到2的event对第一个数据流,消费到7对第二个数据流, 并且在状态t1,产生event 12. 第二个update s2={<5, 10>, <20>, t2}
, 它接受到5的event 在第一个数据流, 在第二个数据流到10, 当前状态为t2. 这种跟踪机制完全对用户透明。 如果输出流中的event 16需要被重新计算, job manager可以找到最大的output sequence number 小于16的snapshot, 这个例子中是s1. 这样他会启动一个新的节点, 加载snapshot s1, 然后持续计算直到产生event 16。这个新实例的执行需要第一个数据流的event 3到5, 第二个数据流的8到10, 这样有可能会触发上游节点重新计算,当这些event也是无法获取时。 当原始输入数据假设为可靠序列化时,这个过程最终会结束(指下游不断触发上游做重新计算,直到需要的原始数据已经持久化好,就不用再触发上游做重新计算)。总的来说, 这种设计移动flush 到可靠持久化存储的过程到关键路径外, 同时,减少需要重新计算的event的数量。 因为rStream 是无限的, 在一个真实的实现中, 垃圾收集是有必要删除失效的event和相关的tracking 信息(用来在错误发生时,重新生成event之用)
图6 展示了rStream 的实现方式。 在本例中, 只有一个writer W 和3个reader R1, R2, R3. 数据流不断增长从左到右。 stream的头(标记为GC)包含失效的event和可以被垃圾回收的event, 紧接着时一系列已经被可靠持久化的event, stream的尾部(最新的event) 是volatile, 当失败发生时,这些数据会丢失。 checkpoint可以定期创建(比如c1,c2, c3, c4, c5), 当volatile 部分的stream 丢失时, it 可以从c4 snapshot开始做重新计算。 在可靠持久化部分的event可以提供给R1 R2而无序重新计算。 在可靠存储中还有部分数据直接重播。 没有更近一步的重叠重新计算需要重新恢复。
垃圾回收
StreamScope 持久化snapshot, StreamScope 和其他跟踪信息(用于恢复错误), 并且需要确定什么时候开始进行垃圾回收。 StreamScope 为节点/StreamScope维护一个low-water-marks。 对于一个stream, 这个low-water-mark指向需要的最小event的sequence number。
对于每个节点, snapshot 根据它的输入流和输出流的sequence number来做完全排序。 举例来说, 假设一个节点有2个输入,1个输出, snapshot s 为(<7, 12>, <5>)
将会比比(<7, 20>, <8>)
要小, 并且不存在(<6, 16>, <4> )
这样的snapshot。
考虑一个节点v , I 作为它的输入数据流, O 做为它的输出流。 节点v 维护一个low-water-marker sequence number lm0 为每个输出流, 初始化为0. 节点v实现GC(o, m)来实现垃圾回收, 显示所有小于m的sequence number的event将不再被下游vertex需要(下游vertex 消费输出流o)。 简单假设,每一个数据流只被一个下游节点消费, 但一般来说, 一个流经常被多个下游节点一起消费。
- 假如 m <= lm0, 返回; // 不需要进一步gc
- 设置lm0到m。 假设s是最新的checkpoint的snapshot, s必须满足条件, 在s中输出流o的sequence number并不高于lm0. 丢掉所有小于s的snapshot
- 对于每个输入的stream, 让vi 是上游节点,它产生输入流i 并且让si是对应s中输入流i的sequence number si, 调用vi GarbageCollect(si), 丢掉所有小于si的event在输入流i中, 递归调用vi GC(i, si)intuitive
直觉来说, GC(o, m) 算出哪些信息将不再被下游节点需要, sequence number小于m。 当最重输出event被持久化或消费,或档任何输出的event被持久化。 图7 显示一个low-water-marks的例子。 尽管递归确定算法, 可以通过相反的拓扑遍历顺序来实现。
rVertex 概念
rVertex 支持如下的操作
Load(s) 节点从snapshot s 开始一个实例。
Execute() 从当前snapshot 开始执行一步
GetSnapshot() 返回当前snapshot
一个节点可以以Load(s0) 开始, s0是起始状态并且所有的流以起始的position。 节点然后可以执行一系列的Execute(), Execute 会读取输入数据, 更新状态,产生新数据。任何时候, 节点可以调用GetSnapshot() 获取当前snapshot并保存它。 当节点失败时, 可以用Load(s)来restart。
确定性(Determinism). 对于一个确定输入流的节点来说, 运行 Execute() 在相同的snapshot上会产生相同的状态(snapshot)和相同的输出结果。
确定性保证了当从失败中进行重放event时,数据的准确性。 它也暗示了执行从不同输入stream获取的event的顺序是确定的; 第4章会介绍StreamScope是怎么做到顺序性而又不引入不必要的delay。 确定性极大保证正确性,并让应用更容易开发和调试。
实现rVertex
实现rVertex的关键是保障如section 3所述的确定性, 它要求function determinism和input determinism。 在StreamScope中, 所有的算子和用户自定义函数必须确定性的。 我们假设一个任务的输入stream是确定性的, event的顺序和值都是确定性的。 唯一的不确定性是穿过不同管道的event的顺序性。因为StreamScope使用CTI event做为标记, 我们插入一个特殊的MERGE 算子在一个接收多个输入的节点的起始位置, 这样可以生成一个确定性的顺序,为后序的操作。通过等待对应的CTI event 来露出并排序event,并按照确定性顺序来emit 它们。 因为, 节点的处理逻辑用相同方式等待CTI event, 这种方式并没有引入额外的dely。
StreamScope 对每个stream的event 以连续单调递增的方式进行打标sequence number。 一个节点可以用sequence number来从所有的流中定位最后的消费/产生 event。 在每个步骤, 一个节点消费输入流的一个event,比如调用Execute(), 它可能改变内部状态,产生新的event 到输出流中, 因此达到一个新的snapshot。 GetSnapshot() 返回当前snapshot, 他可以在一个步骤后暂停执行或者在不中断执行的情况,对一些copy-on-write的数据做一些拷贝动作,来保证一致性。Load(s) 启动一个节点并load s 作为当前snapshot在继续执行前。 为了可以从一个snapshot上可以进行断点恢复, 一个节点会定期做checkpoint。
错误恢复
rStream 抽象解藕了上下游节点,从而允许单个节点从错误中恢复。当一个节点失败时, 可以简单重启节点通过Load(s) 从最近的保存的snapshot 开始执行。 rVertex 抽象表示恢复后的执行同没有错误发生时的执行结果是相同的。rStream抽象保证重启的节点可以reread 输入数据。 第4章介绍 如何实现rVertex和rStream. 不同的错误恢复策略来达到不同的tradeoff
错误恢复策略
StreamScope 必须能够从错误中进行恢复。 rVertex 和rStream 抽象结果下游的节点, 从而很容易查找并恢复错误。 除此之外, 他们还抽象潜在的实现机制, 允许他们共享公共的机制来容错。
已经实现了集中错误恢复策略, 可以根据一些因子来做判断和选择, 普通情况开销(需要资源消耗), 普通情况费用(指延迟), 恢复成本(恢复资源)和恢复时间。
目前有3种策略, 对于rStream 和rVertex, 每个节点可以从错误中独立恢复出来。 因此,这些策略可以用在这些节点上甚至相同job上不同的节点使用不同的策略。
基于checkpoint的恢复。
节点会定期做checkpoint,把snapshot保存到持久化存储上。 当一个节点失败时, 他会加载最近的checkpoint 并继续执行。 checkpoint 的直接实现会引入正常执行的额外开销, 并且在维护一个大的state时并不是很理想。 高级的checkpoint技术会需要特定的数据结构,他们会引入复杂性和额外开销。
基于重播机制的恢复。
常见的流计算是无状态或者因为使用window 算子儿拥有一个短期的内存。也就是说, 当前的内部状态依赖最新window的一些event。 这这鞋情况下, 节点可以抛开显示的checkoint,而采用重新加载那个window的event从而从一个起始状态开始rebuild 状态。 然而这是一种特殊情况, 很常见很有用。 利用这种属性, stream能够简单跟踪输入/输出流的sequence number而不用存储节点的本地状态。 这种策略有可能需要重新加载一个大的window, 但它避免checkpoint的一些开销。
这种策略有一个潜在暗示在垃圾回收机制。 不是从一个snapshot中加载一个状态, 一个节点必须recover从它输入的早起的event, 这些event必须可以获得。
replication机制的recovery。
另外一种策略是对于相同的节点同时有多个instance。 他们连接到相同的输入流和输出流。 rStream 允许多个reader和writer, 自动根据sequence number进行去重。
rVertex的确定性属性让replication 方式可行, 因为这些instance 行为是一致性的。 通过replication, 一个节点可以有instance 轮流进行checkpoint而不会影响latency,因为其他的instance正以正常速度进行允许。 当一个instance fail, 它可以获得另外一个instance的当前snapshot,从而直接加速recovery。 这些好处都是来自于同时运行多个instance。
讨论
StreamScope 在现有的分布式流计算引擎上选择另外一种方式。
minibatch 的带RDD 的流处理。 不是支持连续的流模型, D-Streams 模型一个流计算为一系列minibatch的计算在一些小的时间间隔,并且利用immutable RDD 来做错误恢复。 将流计算拆解为一系列minibatch的方式会比较笨重, 因为一些流算子,比如windowing, join, aggregation, 维护状态到处理event 会高效一些。 拆分这些操作到独立的minibatch计算要求重建计算状态从前一个batch在处理当前batch event之前。 比如图1 的inner join。 这个join的算子需要track 所有的event, 这些event 有可能生成匹配结果从当前或将来batch 以高效的数据结构并且可以仅当接收到CTI event 时retie他们, 这种join状态是潜在复杂的join类型, 并且需要在每个batch或连续的mini-batch中进行重构。 更近一步, D-Streams 不考虑低延迟和容错性, 一个minibatch决定节点计算的粒度, 然而一个imuutable RDD, 主要为了错误恢复, 根据每个minibatch来创建。 这种低延迟要求小的small batch size甚至已经没有必要在那个粒度开启错误恢复。
非确定性。
rVertex 要求确定性来保证正确性和容易debug。 非确定性容易引起不一致性当一个节点失败重新计算时。 非确定性有可能引起重新执行结果偏离起始的执行并导致一种情况, 下游节点使用2种不一致的输出event。 StreamScope 可以扩展去支持不一致性,但以一定代价。
一种方式来避免因为非确定导致的不一致性 是保证节点输出的任何event 不再需要重新计算。 这可以通过在让下游可见event之前,对它做checkpoint并持久化到存储中去。 这就是millwheel的本质。 这个设计会引入显著额外开销,因为在关键路径上的高昂checkpoint。 一种可选的方式是log 非确定性的决定当执行replay是。 log方式的开销会小于checkpoint方式, 但这种方式要求所有的非确定的来源必须被标记,适度log并重放。 StreamScope 并不支持这种实现方式。
乱序event。
event可以不按它吗应用时间的顺序来到达, 比如event来自多个消息。 比如storm或millwheel 分配一个唯一的但非顺序性的id给每个event。 下游节点发送带这些id的acks给上游节点来跟踪状态和处理失败。 StreamScope 解藕 event的logical order 于物理发送或消费顺序。 它借用了流数据库中的CTI 理念 来达到乱序event处理在语言和算子层面。 在系统层面, StreamScope 分配唯一病情顺序性的sequence number给event, 从而轻松可以跟踪处理进程和错误恢复, 从而避免显著的acks而引起性能开销。
生产经验
- 从batch到流, StreamScope 是batch处理系统的扩展,并且大量使用现有的组件。并且大量的批处理应用迁移到StreamScope上, 并且提供迁移支持, 允许batch version来验证流部分的正确性。
- 伸缩性和健壮性波动。 rStream 有效的保证了错误恢复时和扩容时,新节点能更上节奏。
- 简化分布式流计算。 声明式语言让业务方很容易些流应用。StreamScope 提供off-line模式, 有限的数据持久化到存储后, 可以模拟线上数据流, 并且对一个节点进行单独debug
- 回溯, 比如发生数据订正时, 就需要对数据进行回溯,修正之前的数据。 StreamScope会保留所有的checkpoints 和输入channel 到一个全局repository, 它实现了一定的保留策略。
- 当系统维护时不间断执行。其实就是利用rVetex的确定性,当部分节点需要升级维护时,会启动一个备份,当备份节点ready后, 会把停机维护的节点给杀掉。