Google Millwheel

插曲

google millwheel 《MillWheel: Fault-Tolerant Stream Processing at Internet Scale》, 论文应该2012年就推出来了, 以前总是快速扫一遍, 每次阅读都有一些不同的收获, 这次终于仔细拜读一下, 顺便将它翻译了一遍。

设计目标

  • 提供一种编程模型,可以满足复杂的计算逻辑而无需使用者是分布式计算专家
    • framework能处理任何一条边或一个节点发生故障
    • 可以保证数据处理仅被处理一次, 采用冥等的方式
    • 用一种合适的粒度进行checkpoint, 从而可以避免外部sender 缓冲数据的buffer在多个checkpoint 之间长时间等待
  • 能够同时高效满足scalability和容错性
  • 按照论文描述, 可以支持多语言

个人总结:

这篇论文,大篇幅介绍的如何做容错和扩展性, 扩展性很多时候是依赖容错性来做系统伸缩。 整体而言,扩展性和容错性应该非常不错, 但成本比较高,因为对持久化层需求很大, 而且比较偏好mini-batch设计,非常倾向去做window的aggregate, 另外对这个系统时延,感觉会在秒级以上。微软scope streaming 感觉和这个系统有很大的相似性, 不过scope 比较强调确定性(determinacy)。
整个设计里面一些亮点:

  • 整个dag中,组件(compuation)之间是完全解耦, 因此,可以自由对一个computation做迁移,扩容,合并。不过,这些都依赖底层的exactly-once 框架。
  • 系统数据传输都是通过rpc, 而非消息中间件, 并且组件(computation)之间可以自由订阅, 这一点超越了samza和scope。
  • 数据结构是(key, bytes[], timestamp), 这个数据结构在后来的dataflow中发生了变化
    • key 非常重要, 目前是对key 做group by, 相同的key保证在一个进程中串行执行, 并且每个用户自己写key-extractor 函数。 并且后续的所有操作/context,都是基于对应的key, 比如checkpoint,状态更新, 发送/更新 timer/low wwatermark。
    • bytes[], 用户自己选择序列化和反序列化
    • timestamp 完全由用户来确定,
  • exactly-once
    • 当timer 来到时, 会做checkpoint, 这个checkpoint, 会在一个原子操作中, 把这次checkpoint的输入数据id 和输出数据 和状态更新, timer 全部更新到bigtable 中的一行, 如果有外部状态更新,用户需要自己保证冥等
    • 后台存储更新需要一个sequencer, 拥有了sequencer token后,才能做数据库更新操作, 保证一个key,永远只有一个single-writer。
    • 接收数据后,需要ack给发送方, 这样发送方可以做 input record id的清理工作。
    • 系统会对输入消息进行去重
  • low watermark
    • 通过一个外部全局系统injector来做lower watermark, 会订阅injector 来获取lower watermark。
    • 每个worker 从injector处获取输入源的low watermark,然后根据自己的工作状态,计算出自己的low watermark, 然后汇报给injector

概述

基于模型的流式系统, 类似异常探测器,依赖于基于历史数据的预测,他们的模型必须当数据来临时,时刻更新. 按某个维度进行扩容这些系统不应该带来同等增量的成本增加。

Millwheel 定位于这样一个编程模型,定制于流式,低延时系统。用户实现dag节点中的应用逻辑, 这些dag可以用来定义任意并且动态topology,数据在dag中持续流动, millwheel 保证任何节点或dag中的任何边发生故障时,数据依然正确。作为系统容错性的一部分, 保证每一条record发送到它的消费者. Millwheel 提供api,通过一种幂等性方式,从用户的视角,保证record exactly-once。Millwhell用一个合适粒度(频率)对过程进行checkpoint, 消灭因为checkpoint之间长时间间隔,需要在外部senders上cache发送数据buffer的需求。

其他的流式系统并不提供容错,versatility(多功能性),scalability的结合。Spark streaming和sonora 在高效checkpoint上做的很出色,但限制了用户代码可使用的operator的空间。S4 并不提供一定容错性。Storm 不能保证exactly-once, trident 要求严格的事务顺序。尝试对一些批处理模型,比如hadoop mapreduce为了达到低延时,会牺牲flexibility, 比如某些operator依赖的spark streaming rdd。Streaming sql 系统提供简洁的solution来解决常见的stream问题, 但要想直观抽象或复杂的应用逻辑还是必须使用特定语言而不是描叙语言比如sql.

我们提供一种流式系统编程模型和millwheel 的实现:

  • 定义一种编程模型, 允许用户创建一个复杂的流式系统而不需要丰富分布式系统的经验
  • millwheel的高效实现了scalable和容错性, 持久化状态。
  • 按照论文描述, 可以支持多语言

设计需求

google的zeitgeist 用于跟踪web queries的趋势。为了展示millwhell的feature, 我们examine the zeitgeist的要求。Zeitgeist 接受不间断的search queries, 执行异常探测, 尽可能快的输出哪些queries是突发(尖峰刺,spike),哪些queries是快速下滑(dip)。 系统为每个query建立一个历史模型,并对traffic做一个预期判断, 这个判断并不会引起反面影响。尽可能快的识别突发(spike)query或跌落(dip)query是非常重要的。Zeitgeist帮助google提供热点跟踪服务,google热点跟踪服务主要依赖新鲜信息。基本流水线如图所示 ![img](http://img3.tbcdn.cn/5476e8b07b923/TB1BoC4NVXXXXckXXXXXXXXXXXX) 图1: 输入是持续不断的查询,输出是spiking或diping的查询

为了实现zeitgeist系统,合并record到每秒间隔的bucket, 并且比较实际traffic 和基于模型推测的预期流量。 假如持续出现数量不一致, 那么我们可以很确定这个query是spiking或dipping。同时, 我们用新数据更新模型并且存储他们以备将来使用。

Persistent storage

注意这个实现需要短期和长期的storage。 一个spike可能仅仅持续几秒钟,因此只依赖一个小时间窗口的state, 然而模型数据会几个月持续不断更新

Low watermarks

一些zeitgeist用户对探测traffic中的dips很感兴趣, 指一个query的容量是非同寻常的低(比如埃及政府关闭了internet)。在一个输入数据来自世界各地的分布式系统中,数据的到达时间并不严格对应数据的产生时间,因此识别出一个t=1296167641的突然查询是因为在线路上delay了,还是根本就不存在。millwhere通过low watermark来跟踪这个问题, low watermark显示所有数据到某个时刻都已经到达。在系统中,low watermark跟踪所有的pending events。在这个例子中, 如果low watermark 提前过去时间t并且没有查询来临, 可以认为这些查询根本就不存在,而不是在网络上的延迟。 这种语义避免了要求在输入数据源上的单调递增,因为在真实环境中,乱序是很正常的。

防止重复

重复的record 会导致误认为错误的spike, 另外有一些计费的用户。millwheel在框架层面提供exactly-once, 而非业务方自己去处理这种问题。

总结 millwheel需求:

  • 数据需要尽可能快的被推送到consumer
  • 持久化状态抽象需要被集成到系统一致性模型中,并可以被用户使用
  • 乱序的数据可以被优雅处理
  • 系统产生单调递增的low watermark
  • 时延不受扩容影响
  • 系统应该提供exactly-once的delivery能力

系统概况

millwheel实际上就是一系列用户定义的transform操作组成的dag 图, 我们称这些transform为compuation操作,任何一个transform都能并行运行在任意的机器上。因此,用户无需考虑在一个合适范围内负载平衡。

在millwheel里面的输入和输出都是用(key, value, timestamp)来表示, key 是含有语义含义的元数据字段,value是任意的字符串,代表整个record。用户代码运行的context被限定到一个特定的key,每个computation根据自己的逻辑为每个输入源定义key。 比如, zeitgeist中某些computation为了计算每个query的统计信息,会选择搜索字段作为key,而另外一些compuation为了统计基于地理位置的统计信息,会选择地理位置为key。 而triple里面的timestamp可以由用户随意定义(但通常都是墙钟时间),而mill wheel根据timestamp来计算低水位。如果一个用户想要aggregate search term的每秒统计值, 他就需要assign timestamp 为这个search被处理的时间。

一个compuation的output可以成为另外一个computation的输入,从而形成一个pipeline。用户可以动态增加或删除一个compuation到这个topology中而无需重启这个topology。一个compuation可以任意处理records,新建,修改,删除,过滤等操作。

millwheel提供框架api来冥等处理record。 用户只需要使用系统提供的通信和state抽象,所有failure和重试都被隐藏在系统内部。这样可以让用户代码简单,并只专注用户自己的逻辑。在一个computation context中,用户可以获取一个per-key和per-computation的持久化store,这样就可以基于key的aggregation。这种设计依赖下面的基本原则:

分发保障:所有因处理record而产生的内部update都会自动基于per-key做checkpoint并保证exactly-once delivery。这种特性不需要依赖外部存储。

核心概念

数据在一个dag中分发,每一个环节都独立操作和emit数据。

computation:

用户代码运行在compuation中。 当接受到数据时,会调用compuation,会触发用户定义的一些操作,比如连接外部存储, 输出数据或操作其它millwhell的数据。如果连接外部存储,则用户来保证操作外部存储的行为具有冥等性。computation在一个单key的context中执行,但key之间是相互不可知的。如下图所示, 另外一个key的所有操作是串行的,但多个不同的key可以并行被处理

key

key是mill wheel中不同record aggregate和comparison最重要的抽象,系统中每一个record,消费者需要定义一个key-extraction 函数, 这个key-extraction函数会分配一个key给这个record。 用户代码运行在某个key的context下,只能允许访问这个key相关的stat。 举例来说, 在zeitgeist系统中,一个好的选择key的方式是使用search text, 从而,我们可以基于query 的text来进行counts和query模型的aggregate。同样不同的系统,可以使用不同的key-extract函数来处理相同的数据源来获取不同的key。

streams:

streams 表示mill wheel内不同computation的分发机制。一个computation订阅零个或多个input,然后产生一个或多个stream, 系统保证分发正确性。每个stream内每个consumer有自己key-extract 函数,一份stream可以被多个consumer订阅. streams 根据名字来唯一标识,任何computation可以消费任何stream, 也可以产生records到任何stream。

persistent stat(持久化状态):

millwheel里面的持久化状态是基于per-key的透明字符串。用户提供序列号和反序列化函数,可以使用类似protocol buffer这种方便的机制来做。persistent stat存在高可用的存储中,从而保证数据一致性,并对终端用户完全透明。常见状态使用,比如一个时间窗口的record或等待join的数据的计数器aggregate。

low watermark(低水位):

computation的low watermark 限制了接受record 的timestamp 的一定范围。

定义: millwheel 提供一个基于数据流水线的low watermarks的迭代定义。 对于一个computation a, 设定为最老的work的timestamp对应最老的未完成的record。定义low watermark为:

1
min(最老a的工作, c的低水位(c 输出数据到a))。

如果没有输入数据,则低水位值等同于oldest work 值。
低水位值由injectors (从外部系统获取数据,并发送到millwheel)进行seed,经常用外部系统监控pending work 作为评估手段, computation 期望少量的late records(小于低水位)。zeitgeist处理late records的方法就是知己丢弃这种数据, 但会跟踪有多少数据被丢弃掉了(一般在0.001%)。 一些流水线当接受到晚到的数据,可以根据这个进行反向矫正。系统保证一个computation的低水位单调递增,即使对于晚到的数据。

通过等待compuation 的低水位(提前一定值),用户可以有一个完整的picture 关于他们的到低水位时间的数据,就像之前zeitgeist的dip 探测系统展示的一样。当分配timestamp到新的或aggregate的record, 取决于用户去选择一个时间戳,只要不小于来源record的timestamp。通过millwheel低水位可以测量进度。如图所示:

上图中,watermark像record一样前进。 pending works在时间轴上面,完成的在时间轴下面。新到的数据为pending work, 带着时间戳值提前于低水位值,数据可以乱序执行,低水位值反映出系统所有的pending work

timers

timers 是一个基于key的编程hook, 由一定的墙钟时间或低水位值进行触发。 timer 由一个computation的context创建并运行。用户来决定是使用墙钟还是低水位,比如邮件提醒系统使用墙钟,或者基于window进行aggregate的分析系统是基于low watermark。一旦设定,保障以时间戳的递增的顺序触发timer。timer会在持久化存储中记录日志并保障当机器故障时能够重启恢复。 当触发一个timer, 它运行一定的用户函数并像普通输入数据一样需要保障exactly-once。 zeitgeist的dips简单实现就是用一个指定时间的bucket的终止时间设置一个低水位timer, 并当监控的流量低于预测的模型时汇报一个dip。
timer的使用是可选的, 用户无timer barrier需求时可以直接跳过。 举例来说, zeitgeist 能够探测spiking 查询而无需timers, 发现一个spike 无需完整的数据视图。 如果观察的流量已经超过预测模型的预测值, 延迟的数据会加到总数据中并增加spike的大小。

api

如下图所示, 提供借口访问所有的抽象(状态,timer和输出)。一旦设定,这些代码会自动在framework中运行。 用户无需构建任何per-key的locking语义, 系统是基于key的序列化执行。

computation api

用户代码的2大入口点是ProcessRecord和ProcessTimer, 当数据来了会触发ProcessRecord,当timer 超时时触发ProcessTimer。这些构成了应用的compation。

在这些hook(ProcessRecord和ProcessTimer)执行过程中, 系统提供api 获取或控制 per-key 状态, 产生record和设置timer。 如下图所示, 展示了这些接口之间交互。 注意并没有错误恢复的逻辑,因为由框架自动进行错误恢复。

injector/low watermark api

在系统层面, 每一个computation会为所有自己的pending work(处理中或队列中等待的deliver)计算一个低水位值。可以分配一个timestamp值给持久化state 。这样系统可以自动roll up,为了提供一种透明的timers 的api 语义, 用户很少直接和低水位值进行交互,但通过分配给record的timestamp间接计算出他们。

injector:injector从外部获取数据,并发送到millwheel中。因为injector会为流水线其他部分seed low watermark,injector可以publish 一个injector low watermark到他的output streams中, 而其他subscriber可以获取他们。举例来说, 一个injector分析日志文件, 可以通过计算未完成文件的最小创建时间来计算low watermark。
一个injector可以跨进程运行,因此这些进程的低水位aggregate值会作为injector的低水位。 用户设定一组injector进程,从而防止injector单点故障。 实际上,常见的类型如日志,pubsub service等都有现成的injector, 用户无需再实现。如果一个injector 违反低水位语义并且发送一个迟于低水位的record, 用户代码可以决定丢弃这个record或者不对它进行现成aggregate的update。

容错性

分发保障

大部分millwheel 编程模型的概念性简洁让用户代码无需冥等,但却可以达到冥等的效果, 让用户不用担心这些问题(数据可靠性问题)。

发送保障

exactly-once delivery

当computation接收到一个record后:

  • record会被检查是否和之前分发的重复,重复的会被丢弃。
  • 用户代码只处理输入的数据, 会产生timer/state/production的变化。
  • pending changes会被提交到backing store
  • 发送者是acked的
  • 会发送pending downstream productions

作为一个优化,上述的操作可能被合并到对多个record的一个checkpoint。发送会不断重试直到他们被ack,这样可以保证at-least-once. 因为机器或网络故障,需要重试机制。然而, 这会带来一种问题, receiver在ack前crash了,即使它已经被成功处理并持久化它的状态。 这种情况下,当sender发送多次时,我们需要防止重复处理数据。
当computation产生一个数据(production)时,系统会分配一个唯一的id 给record。通过这个唯一id 可以识别重复的record。 如果相同record后面重发了, 会把它和jounaled id进行比较,然后扔弃并ack这个重复record。因为我们不能存储所有的重复数据到内存中,我们使用bloom filter来提供一个快速判断。对于boolm filter miss的event, 我们需要从backing store去进一步判断这个record是否是重复的。当所有内部sender完成发送时, 会做record id 垃圾回收。 对于经常发送late data的injector, 系统垃圾回收会额外delay一个对应的slake value。然而,exactly-once的数据会被几分钟被清理掉。

strong production

因为millwheel是乱序处理record, 系统会在发送产生数据前,在原子状态更新里面进行checkpoint 生产数据。称这种checkpoint方式为strong production。举例来说, 一个computation 根据墙钟时间做aggregate并发送count结果给下游; 如果没有checkpoint, 对于一个产生window count 的computation,在存储它的状态前crash。一旦这个computation重启回来, 它可能接受到另外的record在产生相同aggregate,产生一个record在字节上是不同于之前的window但实际上是相同的window。为了正确处理这种逻辑, 下游的消费者需要一个复杂的冲突解决方案。在millwheel使用一个简单可行的solution, 因为用户的逻辑已经被系统保证为冥等来运行。
millwheel 会用bigtable在作为storage 系统,它高效实现blind write(直写, 和read-modify-write相反),像日志一样来进行checkpoint。 当一个进程重启后, checkpoint会被加载到内存中并被replay。 checkpoint一旦数据发送成功后(production successful)会被删除。

weak production 和冥等

通过strong production, exactly-once delivery, token使用从而让计算冥等。然而,一些compuation已经冥等了, 可以不需要这些措施,因为这些措施会消耗资源和加大latency。因此,用户可以自己控制disable strong production或exactly-once delivery 。 系统层面,disable exactly-once可以简单允许重复record 来实现,但禁止strong production需要注意性能影响。

对于weak production, 不是在发送数据前进行checkpoint 产生数据, 在持久化state之前,乐观的发送给下游。这会带来一个新问题, 流水线的完成时间会被大幅翻倍,尤其是连续stage,因为他们在等待下游的ack records。 这种情况会大大增加端到端的latency因为流水线深度增加。 举例来说,我们假设在某个分钟有1%的概率机器会出现故障, 至少出现一次failure的可能性就会按照流水线的深度大大提高,假设流水线深度是5, 那每分钟出现失败的概率就是5%。为了降低这种失败概率, 通过checkpoint 小比例的产生数据(production), 允许这些stage 可以ack 他们的sender (说白了,就是strong production就是每个stage 做checkpoint, week production就是 在发送前做checkpoint,并且要求下游能够做数据去重)。 通过选择性checkpoint 这种方式,millwheel可以即提高端到端的latency并减少整体的资源消耗量。

当流水线是冥等的computation时,上述的方案是可行的, 因为重试不会影响正确性并且downstream production会是重试不可知。 真实的冥等例子就是无状态的filter, 重发的数据不会影响结果。

state manipulation(状态控制)

在millwheel 用户状态存储中, 有2种状态, hard state 会被持久化到backing 存储上,而soft state 则包含任何在内存中cache或aggregate。 millwheel 会提供下面保证:

  • 系统不能丢失数据
  • 更新state必须遵守exactly-once语义
  • 系统中所有的持久化的数据必须是在任何时间点都是一致性的
  • 低水位必须反映系统中所有pending state
  • timer必须按某个key顺序被触发。

为了避免在持久化状态时的不一致, millwheel会封装所有的基于key的update到一个原子操作中。 在任何一个时间点,有可能因为非预期事件或处理失败导致中断处理。就像前面所述, exactly-once 数据在相同的操作中被更新, 增加它到基于per-key 一致性封装中。
因为工作会在不同机器之间迁移, 对于数据一致性的主要威胁是僵死的writer和网络里残留的写操作到backing store。为了跟踪这些问题, 我们attach 一个sequencer token到每一个write, backing store的代理, 在允许commit write前做检查。新worker在开始工作之前使所有现存的sequencer失效, 因此没有残留的更新能够成功。这个sequencer是一种类似lease 加强机制, 类似Centrifuge系统。 因此,我们可以保证, 对于一个指定的key, 在一个时间点,仅仅一个worker能被更新那个key相关内容。
这种single-writer 同样对于soft state非常关键, 但事务无法保证single-writer。 以pending timer来说, 如下图所示:

当僵死进程b 触发一个延迟写的transaction 作为response 给a, 在transaction 开始, 新的b, b-prime 执行初始化扫描timer, 当扫描结束, transaction 执行并且a 接收ack, 这样b-prime 就出在一个非一致状态。 就会永远丢失一个timer, 并且这个timer触发的更新操作就会被延迟, 因此,对于一些延迟敏感的系统,这些是不可接受的。

更进一步, 相同的情况会在checkpoint的production(输出)下会出现, 因为跳过一个backing store的初始化scan使它变的对系统不可知。 这个production将不会对低水位有操作直到它被发现。在一个中间时间,millwheel有可能汇报一个错误的低水位给consumer。 更近一步,因为低水位时单调递增,millwheel不能纠正这个错误值。因为违背low watermark原则,各种检查会出现, 包括发送非完善timer和非结束window 输出。

为了快速从失效状态中恢复, millwheel中每个computation worker 可以以一个合适的粒度做checkpoint。 millwheel的soft state状态一致性可以最小化意外失效。 可以执行异步扫描时,允许computation继续处理input。

系统实现

架构

millwheel是一个分布式系统, 所有的computation运行在一台或多台机器上, 数据流通过rpc进行相互通信。 每台机器上, millwheel 排列输入work并管理进程级别元数据, 分配合适的用户computation到进程上。

由master进行调度或负载平衡, master 将每个computation 切分成一组key intervals,并分配这批key intervals到一组机器上。 当cpu或内存负载大时, 对key intervals做迁移或split,或merge, 每一个interval分配一个sequencer, 当发生改变(迁移,split, merge)时,将老的sequencer失效。

对于持久化状态, 使用bigtable或spanner系统, 提供原子行更新操作。 一个key的timer, pending production(输出)和持久化状态全部存在一行数据中。

当一个interval发生迁移时, millwheel从backing store中扫描元数据从而进行恢复。 初始化扫描存储在内存中,pending timer和checkpointed的production(输出), 从而和后端存储在状态上是一致的。 这种方式 通过single-writer 语义来实现。

low watermark

为了保证数据一致性, 由一个全局可靠的子系统来实现low watermark。 millwheel 通过一个中央控制系统(带授权)来完成lower mark, 它会跟踪所有的lower watermark并打日志到持久化层, 防止进程失效时产生错误值。
每个进程aggregate 当前自己工作的timestamp 信息,并汇报中央控制系统, 它们包括所有的checkpointed或pending的production(输出),pending的timer或持久化状态。 每个进程可以在内存中高效完成这个动作,而无须执行代价昂贵的后段存储查询。 因为进程时基于key interval进行分配的, 因此low watermark也是bucket到key interval中并发送到中央控制系统。
为了正确计算系统的low watermark, 中央控制系统可以访问所有的low watermark信息。 当aggregate per-process的更新时, 它通过build 一个 low watermark的interval map为一个compuation,从而跟踪一个compuation的完成信息。当任何interval丢失时, 对应失效interval的low watermark 不变直到汇报一个新的值。 中央控制系统然后广播low watermark值到系统所有的computation。

消费者可以订阅数据的所有sender的lower watermark,然后计算它所有输入数据的low watermark的最小值。 这个计算最小值的工作在worker中执行,而非中央控制系统,是因为一致性, 中央控制系统的lower watermark是所有worker的最小值,但非输入worker的最小值。同样的,中央控制系统的lower watermark不会修改worker的lower watermark。
为了保持一致性, 所有的low watermark更新需要sequencer, 类似single-writer 到所有跟新到key interval state, 这些sequencer保证仅仅这个key的最新owner才能更新它的lower watermark值。 为了扩展性, 这个授权可以在机器之间share。