The Dataflow Model: Google Dataflow 编程模型

插曲

《The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing》 2015年,这篇文章就发布了, 每次快速扫描, 理解上总是有些遗漏, 最近,决定将他翻译一下, 仔细阅读一下.
补充一下,2.4及之后的章节自己看了,但是没有翻译, copy一下同事默岭的翻译(版权归默岭所有哦)。

为什么重新设计dataflow 编程模型

  1. 一份代码运行在不同的引擎, 现有很多google的业务方, 使用lambda架构, 实时部分使用millwheel做一些计算,不能保证完全正确, 然后通过离线flumejava作业不停的矫正数据。 (吐槽一句, millwheel 论文里面说自己的强一致多么牛x, 最后在dataflow 论文里面被打脸, 不过,millwheel里面提到的强一致设计是可以保证强一致,但成本非常高)。 业务方只需要实现一次代码,就可以在不同的引擎上切换,按照自己的业务需求。
  2. 会话窗口需求, 需要支持session window
  3. trigger, accumulate和retraction 需求, 一些业务方使用watermark来标记数据已经完成,但经常有数据延迟到达,因此,需要处理迟到的数据。 整个设计需要考虑: 1. 支持增量处理, accumulate和retraction; 2, trigger机制, 一份相同的数据,根据不同的准确性和扩展性要求,选择不同的模式,从而达到不同的结果。
  4. 水位线百分位触发器, 比如, 部分节点处理特别缓慢, 成为job的长尾, 拖慢了整个项目的进度。 另一个需求是, 很多时候,只需要达到一个很高的准确行就好了,而不是100%准备。 这样,只需要通过percentile watermark trigger, 就可以确定是否要提前终止长尾任务或提前计算。
  5. 时间trigger, 在google 推荐系统中, 使用大量google 基础设施建立用户行为画像, 这些用户行为画像会被用来根据兴趣来做推荐。注意的是, 这些系统使用处理时间进行数据驱动, 因为,这些系统需要经常更新,并且查看局部view 数据比等待直到当watermark 来了计算完整view 更重要。也就是说,对于一些时效要求高的系统, 必须具备根据处理时间来进行驱动的。
  6. 数据驱动 & 组合trigger, 在MillWheel的论文中,我们描述了一种用来检测谷歌网站搜索查询趋势的异常探测系统。当我们为模型设计触发器的时候,这种微分异常探测系统启发我们设计了数据驱动触发器。这种微分探测器检测网站检索流,通过统计学估计来计算搜索查询请求量是否存在一个毛刺。如果系统认为一个毛刺即将产生,系统将发出一个启动型号。当他们认为毛刺已经消除,那么他们会发出一个停止信号。尽管我们可以采用别的方式来触发计算,比如说Trill的Punctuations,但是对于异常探测你可能希望一旦系统确认有异常即将发生,系统应该立即输出这个判断。Punctuations的使用事实上把流处理系统转换成了微批次处理系统,引入了额外的延迟。在调查过一些用户场景后,我们认为Punctuations不完全适合我们。因此我们在模型中引入了可定制化数据驱动触发器。同时这个场景也驱使我们支持触发器组合,因为在现实场景中,一个系统可能在处理多种微分计算,需要根据定义的一组逻辑来支持多种多样的输出。图9中的AtCount触发器是数据驱动触发器的例子,而图10-14使用了组合触发器。

概述

无限,乱序, global-scale 的数据处理需求不断增长, 与此同时,消费者提出复杂的业务需求,如event-time ordering, window特性,并且追求越快越好。 实际上, 一个系统很难完全从正确性,latency和成本上完全满足所有需求, 因此,不同的系统带来不同的解决方案,不同的tradeoff。

数据处理的未来是无边界数据处理。 尽管有边界数据的处理永远都有着重要地位并且有用武之地,但是语义上它会被无边界数据处理模型所涵盖。一方面,无边界数据处理技术发展上步履蹒跚,另一方面对于数据进行处理并消费的要求在不断提高,比如说,需要对按事件发生时间对数据处理,或者支持非对齐窗口等。要发展能够支撑未来业务需要的数据处理系统,当前存在的系统和模型是一个非常好的基础,但我们坚持相信如果要完善地解决用户对无边界数据处理的需求,我们必须根本地改变我们的思维。

根据我们多年在谷歌处理大规模无边界数据的实践经验,我们相信我们提出的模型一个非常好的进展。它支持非对齐,事件发生时间窗口。这些都是当前用户所需要的。它提供了灵活的窗口触发机制,支持窗口累积和撤回,把关注点从寻求等待数据的完整性变为自动适应现实世界中持续变更的数据源。它对批处理,微批次,流处理提供了统一的抽象,允许数据开发人员灵活从三者中选择。同时,它避免了单一系统容易把系统本身的构建蔓延到数据处理抽象层面中去的问题。它的灵活性让数据开发者能根据使用场景恰当地平衡数据处理的准确性,成本和延迟程度。对于处理多样化的场景和需求来说,这一点很关键。最后,通过把数据处理的逻辑划分为计算什么,在哪个事件发生时间范围内计算,在什么处理时间点触发计算,如何用新的结果订正之前的数据处理结果让整个数据处理逻辑透明清晰。我们希望其他人能够认同这个模型并且和我们一起推进这个复杂而又令人着迷的领域的发展。

Dataflow 提出一种基础的方式来满足这些需求。 dataflow 并没有将无限的数据切分成有限的数据集,相反地,假定永远不清楚什么时候数据已经结束, 当新数据来时, 老的数据做retracted, 唯一的解决这个问题的方式是通过一个原则性的抽象来对 正确性/latency/cost 做一个合理的tradeoff。

本文提出一种方式 dataflow model, 详细的语法,核心原则,model validation(实践开发中对模型的检验)。

介绍

现代数据处理系统是一个复杂和令人兴奋的领域, 从最开始的MapReduce 到它的后继者 hadoop,pig,hive,spark,包括各种sql 社区的流引擎,然而,现存的现代计算系统仍然存在一些基础缺点。

举个例子: 一个streaming video提供商 想要统计 他要支付多少money给内容提供商,并对广告商收取多少费用。这个平台需要支持在线和离线的数据, video 提供商需要知道每天收取广告商多少money,同时汇总video和广告的各种统计参数, 同时,他想基于历史数据运行一些离线实验。广告/内容 提供商也想知道 他们哪些video/广告被看过,看的频率,看的时长,以及观看者的分类。更关键,他们也想知道他们收费/支付多少money,他们期望越快越好拿到这些信息, 这样他们可以尽快调整预算,投标,改变策略。 因为涉及到money, 因此准确性是基本要求。

数据处理系统天然很复杂,但video 提供商还是期望一个简单并具备伸缩性的编程模型。 他们同样要求系统能够处理全球 scale的离散数据。

这个use case是每个video的观看时间和观看时长, 观看者是谁, 哪些内容或广告被看过。 然而,现有的系统依旧不能满足这些需求。

  • MR/FlumeJava/Spark/hive/pig/ 延迟不能满足需求
  • Aurora/TelegraphCQ/Niagara/Esper, 在容错性上不行或者代价比较大
  • storm/samza/pulsar 不支持exactly-once 语义, pulsar 具备一定的scalable,并且支持high-level session的非对齐window的概念
  • tigon 不支持window 语义
  • spark-streaming/sonora/trident 只支持处理时间或tuple时间的window
  • sqlstream 必须强顺序性
  • flink, event-time触发机制有些限制,
  • CEDR/Trill 可以提供准时的触发和增量模型, 但window语义不能完全表达session, 另外定期的打标(punctuation)不足够
  • mill wheel 和spark streaming底层是够scalable/fault-tolerant/low-latency,但缺乏应用层编程框架, 在event-time session上还有欠缺
  • lamada架构能够满足这些需求, 但需要维护2套系统
  • summingbird通过提供一套复杂的接口来抽象batch和streaming, 为了让某些类型计算可以在2套系统上运行,增加了大量的限制, 同样需要维护2套系统。

上面这些问题并不是无解问题,随着时间推移, 这些系统的开发者会克服这些缺陷。但有一个观念上的错误, 这些系统对于输入的数据上, 认为输入的数据在某个时间点上是complete。我们认为这是一个基本错误, 现实中存在海量,乱序的dataset冲突和消费者的时效要求。因此需要一个简单但powerful的框架,在解决用户的需求的同时来平衡正确性/时延/成本。我们相信任何一个想要大量应用的解决方案必须 简单但强大, 并且以一个合适的成本很好平衡正确性和latency。 最近,我们是时候超越现有流行计算引擎语义, 正确的设计并构建batch, micro-batch和streaming 系统, 这些系统能够提供相同的正确性,3者并能广泛使用在无限的数据处理中。

本文核心是一个统一的模型:

  • 在一个无限,乱序的数据源, 进行计算, 并支持event-time 顺序的结果, 根据feature 执行window操作, 并优化正确性,latency和成本。
  • 对pipeline 实现以4个维度进行分解, 提供clarity(清晰), composability(可组合性)和flexibility
    • what result are being computed
    • where in event time they are being computed
    • when in processing time they are materialized
    • How earlier results relate to later refinements
  • 将逻辑数据处理与底层的物理实现独立开, 从而通过选择batch, micro-batch或streaming 引擎,来选择不同的侧重点, 正确性/延迟/成本。

具体上, 本文讨论:

  • windowing model, 支持unaligned event-time window, 展示一组简单的api 创建和使用window
  • triggering model, 根据流水线的runtime 特性来决定输出结果次数, 使用一个powerful并且flexible 声明式API 来描述trigger 语义。
  • 增量处理模型incremental processing model, 集成retraction/update到 windowing/triggering 模型上
  • scalable implementation, 在mill wheel 流引擎和flumejava batch 引擎上,重新实现cloud dataflow, 包含一个open-source 的sdk
  • 一组核心原则, 指导这种模型的设计
  • 简单讨论google 在海量,无限,乱序数据处理的经验

最后值得提一下, 这个模型也没有神奇的地方, 现有强一致的batch,micro-batch,streaming或lambda系统中不切实际的东西仍然存在, cpu/ram/disk 限制依旧存在。这个模型是一种框架, 这个框架可以相对简单表达并行计算, 用一种独立底层引擎的方式, 同样它也提供能力切入精确latency, 切入正确性因数据和资源导致的实际问题。 它的目标是简化构建实际,海量数据处理流水线。

unbounded/bounded vs streaming/batch

当描述 无限/有限数据集时, 我们倾向使用字符unbounded/bounded, 而不是stream/batch 上, 因为后面会暗示特定的执行引擎。 batch系统可以重复执行来处理unbounded数据集, 同样好的设计streaming 系统同样可以处理bounded 数据。 这个角度上, streaming和batch系统的区别是不相干的。

windowing

window 将数据集切成有限的块,并作为一组数据来处理。 当处理无限数据(unbounded)时, 一些operation(aggregation, outer join, time-bouunded operation)需要window,然后另外一些operation(filter,mapping, inner join 等)没有需求。 对于有限数据集, window基本上是可选的, 尽管在一些场景中语义上是有帮助。 window一般是基于时间的, 不过一些系统支持基于tuple的window, 它实际上是在逻辑时间领域上的基于时间window, 并且按顺序的元素在逻辑时间上连续增长。 window 有可能是aligned(window time应用在所有数据上), 有可能unalign(window of time 应用在部分数据上)。 下图展示了3种window类型。



Fixed window/tumbling window

定义一个静态的window大小,比如小时或天window。 他们一般是aligned, 每个window 贯穿对应时间的数据。 有的时候,为了分摊window完成时的压力, 系统会shift window以一个随机值。

sliding window

定义一个window size和slide 时间, 比如window为小时,滑动为分钟。当滑动时间小于窗口大小时,会有一些overlap。 划窗(sliding window)一般也是aligned。 fixed window其实就是划窗的一种特殊场景,sliding 大小等于窗口大小。

session window

session window是某段时间,数据是活跃的。 通常会定义个timeout gap。 event 时间跨度小于timeout 时间的event 会组成一个session。 session通常是unaligned。 在本例中, window 2 只应用在key 1, window 3 只应用在key 2, window 1和4 应用在key 3上。

时间领域 time domains

有2种时间领域。

  • event time, event 本身出现的时间, 比如系统clock time, 当系统产生这个event的时间
  • process time 在流水线中 处理的时间, 比如系统当前时间。 注意,并没有假设系统中所有的时间同步。

event time 从来不变,process time 不停会变, 因为event会随着流转整个pipeline而导致时间会不停向前走。

在处理过程中, 因为系统的真实使用(communication dely, 调度算法, 用于处理时间, 流水线序列化 等等)导致这两种时间发生差异并且动态变化。 全局进度metrics比如打标(punctuations)或者watermark,提供一个好方式展示这种差异。 我们采用像millwheel的watermark, 它是一种时间戳,在event time上的下限值,小于这个时间戳的都已经被处理. 完成的概念并不兼容正确性, 我们因此并不依赖watermark(来保证正确性)。 系统常常使用一种有用的方式, 系统会认为截止到一个指定的event time前所有的数据已经接收, 因此应用既可以visualizing skew(可视化时间差),又可以监控系统整体的健康和进度, 同时如果不依赖完全正确性的情况下,可以做一些决策,比如常见的垃圾回收。



在理想情况下, time domain skew(process time和event time 差异)是为0, 只要event一出现,系统会处理event。 现实并非如此美好, 如上图所示, 从12点开始, watermark都滞后一段时间。 上图在分布式系统中是非常正常的现象, 并且在考虑提供准确性和可重复的result时, 必须要考虑这种情况。

dataflow model

这章将介绍dataflow model,并解释为什么语义对batch, micro-batch,streaming是通用的。 我们将展示dataflow java sdk (它从flumejava api 演化而来)

core primitives

开始之前,先想想经典batch 模型的基本元素。 dataflow sdk 有2个核心 transform 操作在(key, value).

ParDo

ParDo 是普通的并发处理。 将输入的数据传递给用户的代码, 每个input可能产生0或多个output elements。 在无限数据中, pardo 操作每个input上元素,可能转化为无限的数据。



GroupByKey



GroupByKey 在发送结果到下游进行reduce前,收集一个key的所有数据。 如果输入数据是无限的,系统不知道什么时候结束。 常见的解决方案是window 这些数据。

windowing

系统重新定义GroupByKey operation 为 GroupByKeyAndWindow. 第一个要做的是支持unaligned window, 因为有可能有2个key 视图。第一个key是它简单把所有window strategies处理为unaligned 并让底层实现对aligned case使用相关优化。 第二个key是window可以切分为2个相关的操作:

  • Set AssignWindows(T datum), 它分配这个元素到0个或多个window上, 这是基本的bucket操作
  • Set MergeWindows(Set windows), 在grouping时 merge window。 当数据到达并被group 一起时, 可以在时间上进行构建数据驱动的window。

对于任何给定的window strategy, 这两种operation是紧密相关。 滑窗assignment需要滑窗merging, session window assignment需要session window merging。

为了天然支持event-time window, 修改数据结构为(key, value, event time, window), 每个元素都会带event-time并被初始化到全局window, covering 所有的event time, 这种方式match 默认的标准batch 方式。

window assignment

window assignment会copy一份新的元素, 每个元素会分配自己的window。 举例来说, 如下图所示,划窗中window为2分钟长度和移动步长为1分钟。



window 同它的元素关联起来, 这表示, 在流水线使用grouping之前的任何地方都可以做window assignment操作。 这很重要, 在一个合成转换中(如Sum.integersPerKey())有可能将grouping 操作隐藏在下游的什么地方。

window merging

windows merging是GroupByKeyAndWindow的一部分。 如下例所示:



上图中, 做session window, session window的timeout为30分钟。 一开始都放在全局window中。 AssignWindows 会把每个元素放到一个window中(在它自己的timestamp 延伸30分钟)。 然后开始GroupByKeyAndWindow, 实际上它进行了5步组合操作:

  • DropTimestamps, 丢掉元素的timestamp,因为window是从现在到后面(30分钟),后续的计算只关系窗口
  • GroupByKey, 按key 进行group(value, window)
  • MergeWindows, 按照一个key merge 一组window。 实际的merge逻辑由window strategy 来决定。 因为v1 和v4 overlap了, 所以merge 它们到一个新的,更大的session
  • GroupAlsoByWindow, 对每个key,按照window进行group 操作。 v1和v4是相同window,因此group在一起
  • ExpandToElements, 将每个group 扩展为(key, value, event time, window),在本例,设置timestamp为window的结束时间。 任何timestamp 大于或等于 window中最早event的timestamp都是有效的,符合watermark正确性。

api

下例中:



在求和之前,增加一个Window.into 来生成时长30分钟的session window

Triggers / Incremental Processing

构建unaligned/event-time window是一种改进,但有2个缺点:

  • 需要支持tuple- 和processing-time-based 的window, 否则会退到和其他系统一样。
  • 需要知道什么时候emit 一个window的result。 因为event-time是乱序, 需要某种signal 告诉我们window 完成了。

下一节介绍如何解决第一个问题。 对于第二个问题解决方案,最初倾向使用如watermark 这样的全局event-time 进度metric。 然后watermark 对于正确性存在2个缺点:

  • 有的时候太快, 意味着有些数据晚于watermark到达。 对于分布式系统,很难获得一个完美的event-time watermark, 如果用户想要100%的正确性,很难通过它来达到。
  • 太慢, 因为它是一个全局进度metric, 因为一个慢的数据可能就拖慢整个pipeline的watermark。 即使是健康的变化少的pipleline, 基线仍然有可能是几分钟甚至更多。 因此,使用watermark作为emit window signal容易导致高的latency。

假定单独watermark是不够的, 一个可用的方式是lambda架构的高效sidesteps(步进), 它并没有解决完成问题,但更快提供正确答案。 他提供像streaming流水线一样的更好的低延迟结果,一旦batch pipleline运行则可以保证最终一致性和正确性。如果我们想要通过单个流水线达到同样的效果, 我们需要为任何一个window提供多个答案。 我们称这种特性为trigger, 设定一个说明,什么时候为任何一个window trigger输出结果。

trigger是一种机制, 当接收到内部或外部的信号输出GroupByKeyAndWindow 的结果。 他们补充window 模型, 他们通过不同的时间轴影响了系统行为:

  • windowing 用event time 决定什么地方进行grouped。
  • triggering 用processing time 来决定什么时候输出结果

dataflow 预定义了一套trigger 实现 来trigger 完成评估(比如 watermark, 包括百分比watermark, 他提供有用的语义来处理在batch和streaming引擎延迟的数据,当用户更关注快速处理小比例的数据而不是最后一块数据)。基于processing time, 基于数据到达情况(记录数,字节数,data punctuations, 数据匹配模式等)。 dataflow 同样支持嵌入trigger到 逻辑联合(and/or), loops,sequences 和其他这种构建。 除此之外, 用户可以定义他们自己的trigger, 可以基于底层的primitives(watermark timer, processing timer, data arrival, composition support)或任何外部相关signal。
除了控制什么时候emit result, trigger系统控制一个window的多个pane如何相互关联, 通过3种定义模型:

  • discarding, window 内容会被丢弃, 后面的结果不会影响前面的结果。 当下游消费者期望大量的trigger是相互独立时(比如,对inject 到系统的数据做一个sum计算), 这种模型是有用的。尽管联合和交换的操作可以潜入到dataflow的Combiner 里面, 这是在缓存数据的最有效方式, dela的效率会最小化。在我们的video session例子中, 这样是不够的, 因为要求下游消费者拼装(stitch)部分数据。
  • accumulating, window的数据会存到存储中, 后面的结果会对之前的结果进行矫正。 当接收一个window的多个结果时, 下游消费者期望后面的结果能够覆盖之前的结果, 这种方式会非常有用。另外在lambda架构中这种方式很高效, streaming 流水线产生的低延迟的结果会被后面batch 流水线运行的结果给覆盖掉。 在我们的vedio session 例子中, 这种方式可能够用, 如果我们简单计算session,然后更新到支持更新的存储中(比如数据库或kv store)
  • accumulating 和retraction, 在accumulating语义上, 一个emitted 拷贝值仍然会存储到持久化存储中。 当后面又触发window trigger, 之前值的retraction会首先发送出去, 紧接着新的计算值。 当流水线中存在多个串行的GroupByKeyAndWindow操作时很有必要这种模式, 一个window产生的多个结果, 因为由一个window产生的多个结果在下游做group的key上结束(没理解这句话什么意思)。那种情况下, 第二个grouping的操作将会产生错误的结果, 除非那些key被通知一个retraction, 从而原始值的影响会被去掉。 dataflow Combiner 的相反操作uncombine 支持retraction。对于video session例子来说,这种方式是最理想的。

example

考虑下例中,做整数求和:



我们假设从某个数据源我们观察到了10个数据点,每个数据点都是一个比较小的整数。我们会考虑有边界输入源和无边界输入源两种情况。为了画图简单,我们假设这些数据点的键是一样的,而生产环境里我们这里所描述的数据处理是多个键并行处理的。图5展示了数据在我们关心的两个时间轴上的分布。X轴是事件发生时间(也就是事件发生的时间),而Y轴是处理时间(即数据管道观测到数据的时间)。(译者注:圆圈里的数值是从源头采样到的数值)除非是另有说明,所有例子假设数据的处理执行都是在流处理引擎上。



很多例子都要考虑水位线,因此我们的图当中也包括了理想的水位线,也包括了实际的水位线。直的虚线代表了理想的水位线,即,事件发生时间和数据处理时间不存在任何延迟,所有的数据一产生就马上消费了。不过考虑到分布式系统的不确定性,这两个时间之间有偏差是非常普遍的。在图5中,实际的水位线(黑色弯曲虚线)很好的说明了这一点。另外注意由于实际的水位线是猜测获得的,因此有一个迟到比较明显的数据点落在了水位线的后面。

如果我们在传统的批处理系统中构建上述的对数据进行求和的数据处理管道,那么我们会等待所有的数据到达,然后聚合成一个批次(因为我们现在假设所有的数据拥有同样的键),再进行求和,得到了结果51。如图6所示黑色的长方形是这个运算的示意图。长方形的区域代表求和运算涵盖的处理时间和参与运算的数据的事件发生时间区间。长方形的上沿代表计算发生,获得结果的管道处理时间点。因为传统的批处理系统不关心数据的事件发生时间,所有的数据被涵盖在一个大的全局性窗口中,因此包含了所有事件发生时间内的数据。而且因为管道的输出在收到所有数据后只计算一次,因此这个输出包含了所有处理时间的数据(译者注:处理时间是数据系统观察到数据的时间,而不是运算发生时的时间)。



注意上图中包含了水位线。尽管在传统批处理系统中不存在水位线的概念,但是在语义上我们仍然可以引入它。批处理的水位线刚开始时一直停留不动。直到系统收到了所有数据并开始处理,水位线近似平行于事件发生时间轴开始平移,然后一直延伸到无穷远处。我们之所以讨论这一点,是因为如果让流处理引擎在收到所有数据之后启动来处理数据,那么水位线进展和传统批处理系统是一模一样的。(译者注:这提示我们其实水位线的概念可以同样适用于批处理)

现在假设我们要把上述的数据处理管道改造成能够接入无边界数据源的管道。在Dataflow模型中,默认的窗口触发方式是当水位线移过窗口时吐出窗口的执行结果。但如果对一个无边界数据源我们使用了全局性窗口,那么窗口就永远不会触发(译者注:因为窗口的大小在不停地扩大)。因此,我们要么用其他的触发器触发计算(而不是默认触发器),或者按某种别的方式开窗,而不是一个唯一的全局性窗口。否则,我们永远不会获得计算结果输出。

我们先来尝试改变窗口触发方式,因为这会帮助我们产生概念上一致的输出(一个全局的包含所有时间的按键进行求和),周期性地输出更新的结果。在这个例子中,我们使用了Window.trigger操作,按处理时间每分钟周期性重复触发窗口的计算。我们使用累积的方式对窗口结果进行修正(假设结果输出到一个数据库或者KV数据库,因而新的结果会持续地覆盖之前的计算结果)。这样,如图7所示,我们每分钟(处理时间)产生更新的全局求和结果。注意图中半透明的输出长方形是相互重叠的,这是因为累积窗格处理机制计算时包含了之前的窗口内容。



如果我们想要求出每分钟的和的增量,那么我们可以使用窗格的抛弃模式,如图8所示。注意这是很多流处理引擎的处理时间窗口的窗口计算模式。窗格不再相互重合,因此窗口的结果包含了相互独立的时间区域内的数据.



另外一种更健壮的处理时间窗口的实现方式,是把数据摄入时的数据到达时间作为数据的事件发生时间,然后使用eventtime window。这样的另一个效果是系统对流入系统的数据的事件发生时间非常清楚,因而能够生成完美的水位线,不会存在迟到的数据。如果数据处理场景中不关心真正的事件发生时间,或者无法获得真正的事件发生时间,那么采用这种方式生成事件发生时间是一种非常低成本且有效的方式。

在我们讨论其他类型的窗口前,我们先来考虑下另外一种触发器。一种常见的窗口模式是基于记录数的窗口。我们可以通过改变触发器为每多少条记录到达触发一次的方式来实现基于记录数的窗口。图9是一个以两条记录为窗口大小的例子。输出是窗口内相邻的两条记录之和。更复杂的记录数窗口(比如说滑动记录数窗口)可以通过定制化的窗口触发器来支持。



我们接下来考虑支持无边界数据源的其他选项,不再仅仅考虑全局窗口。一开始,我们来观察固定的2分钟窗口,累积窗格。



水位线触发器是指当水位线越过窗口底线时窗口被触发。我们这里假设批处理和流处理系统都实现了水位线(详见3.1)。Repeat代表的含义是如何处理迟到的数据。在这里Repeat意味着当有迟于水位线的记录到达时,窗口都会立即触发再次进行计算,因为按定义,此时水位线早已经越过窗口底线了。

图10-12描述了上述窗口在三种不同的数据处理引擎上运行的情况。首先我们来观察下批处理引擎上这个数据处理管道如何执行的。受限于我们当前的实现,我们认为数据源现在是有边界的数据源,而传统的批处理引擎会等待所有的数据到来。之后,我们会根据数据的事件发生时间处理,在模拟的水位线到达后窗口计算触发吐出计算结果。整个过程如图10所示



然后来考虑一下微批次引擎,每分钟做一次批次处理。系统会每分钟收集输入的数据进行处理,反复重复进行。每个批次开始后,水位线会从批次的开始时间迅速上升到批次的结束时间(技术上来看基本上是即刻完成的,取决于一分钟内积压的数据量和数据处理管道的吞吐能力)。这样每轮微批次完成后系统会达到一个新的水位线,窗口的内容每次都可能会不同(因为有迟到的数据加入进来),输出结果也会被更新。这种方案很好的兼顾了低延迟和结果的最终准确性。如图11所示:



接下来考虑数据管道在流处理引擎上的执行情况,如图12所示。大多数窗口在水位线越过它们之后触发执行。注意值为9的那个数据点在水位线之后到达。不管什么原因(移动设备离线,网络故障分区等),系统并没有意识到那一条数据并没有到达,仍然提升了水位线并触发了窗口计算。当值为9的那条记录到达后,窗口会重新触发,计算出一个新的结果值。

如果说我们一个窗口只有一个输出,而且针对迟到的数据仅做一次的修正,那么这个计算方式还是不错的。不过因为窗口要等待水位线进展,整体上的延迟比起微批次系统可能要更糟糕,这就是我们之前在2.3里所说的,单纯依赖水位线可能引起的问题(水位线可能太慢)



如果我们想降低整体的延迟,那么我们可以提供按数据处理时间的触发器进行周期性的触发,这样我们能够尽早得到窗口的计算结果,并且在随后得到周期性的更新,直到水位线越过窗口边界。参见图13。这样我们能够得到比微批次系统更低的延迟,因为数据一到达就进入了窗口随后就可能被触发,而不像在微批次系统里必须等待一个批次数据完全到达。假设微批次系统和流处理系统都是强一致的,那么我们选择哪种引擎,就是在能接受的延迟程度和计算成本之间的选择(对微批次系统也是批大小的选择)。这就是我们这个模型想要达到的目标之一。参见图13:固定窗口,流处理,部分窗格



作为最后一个例子,我们来看一下如何支持之前提到的视频会话需求(为了保持例子之间的一致性,我们继续把求和作为我们的计算内容。改变成其他的聚合函数也是很容易的)。我们把窗口定义为会话窗口,会话超时时间为1分钟,并且支持retraction操作。这个例子也体现了我们把模型的四个维度拆开之后带来的灵活的可组合性(计算什么,在哪段事件发生时间里计算,在哪段处理时间里真正触发计算,计算产生的结果后期如何进行修正)。也演示了对之前的计算结果可以进行撤回是一个非常强力的工具,否则可能会让下游之前接收到的数据无法得到修正。



在这个例子中,我们首先接收到了数据5 和数据7。由于5和7之间事件发生时间大于1分钟,因此被当做了两个会话。在第一次窗口被触发时,产生了两条计算结果,和分别为5和7。在第二个因处理时间引起的窗口触发时,我们接收到了数据3,4,3,并且第一个3和上一个7之间时间大于1分钟,因此被分组到一个新的会话窗口,窗口触发计算并输出了计算结果10。紧接着,数据8到达了。数据8的到达使得数据7,3,4,3,8合并成了一个大窗口。当水位线越过数据点8后,新窗口计算被触发。触发后需要先撤回之前两个小窗口的计算结果,撤回方式是往下游发送两条键为之前的两个会话标记,值为-7和-10的记录,然后发送一个新的值为25的新窗口计算结果。同样,当值为9的记录迟于水位线到达后,之前的所有7条记录都合并成了一个会话,因此要对之前的会话再次进行撤回。值为-5和-25的记录又被发送往下游,新的值为39的会话记录随后也被发往下游。

同样的操作在处理最后3条值为3,8,1的记录时也会发生,先是输出了结果值3,随后回撤了这个计算结果,输出了合并会话后的结果值12。

3. 实现和设计

3.1 实现

我们已经用FlumeJava实现了这个模型,使用MillWheel作为底层的流执行引擎;在本文写作的时候,针对公有云服务Cloud Dataflow的重新实现也接近完成。由于这些系统要么是谷歌的内部系统,要么是共有云服务,因此为简洁起见,实现的细节我们略掉了。可以提及的让人感兴趣的一点是,核心的窗口机制代码,触发机制代码是非常通用的,绝大部分都同时适用于批处理引擎实现和流处理引擎实现。这个实现本身也值得在将来进行更进一步的分析。

3.2 设计原则

尽管我们很多的设计其实是受到3.3节所描述的真实业务场景启发,我们在设计中也遵从了一系列的核心原则。这些原则我们认为是这个模型必须要遵循的。

  • 永远不要依赖任何的数据完整性标记(译者注:如水位标记)
  • 灵活性,要能覆盖已知的多样化的使用用例,并且覆盖将来可能的使用用例
  • 对于每个预期中的执行引擎,(模型抽象)不但要正确合理,而且要有额外的附加价值
  • 鼓励实现的透明性
  • 支持对数据在它们产生的上下文中进行健壮的分析。
    可以这么说,下述的使用案例决定了模型的具体功能,而这些设计原则决定了模型整体的特征和框架。我们认为这两者是我们设计的模型具有完全性,普遍性的根本原因。

3.3 业务场景

在我们设计Dataflow模型的过程中,我们考虑了FlumeJava和MillWheel系统在这些年遇到的各种真实场景。那些良好工作的设计,我们保留到了模型中,而那些工作不那么良好的设计激励我们采用新的方法重新设计。下面我们简单介绍一些影响过我们设计的场景。

3.3.1 大规模数据回写和Lambda架构;统一模型

有一些团队在MillWheel上跑日志链接作业。这其中有一个特别大的日志链接处理作业在MillWheel上按流模式运行,而另外一个单独的FlumeJava批处理作业用来对流处理作业的结果进行大规模的回写。一个更好的设计是使用一个统一的模型,对数据处理逻辑只实现一次,但是能够在流处理引擎和批处理引擎不经修改而同时运行。这是第一个激发我们思考去针对批处理,微批次处理和流处理建立一个统一模型的业务场景。这也是图10-12所展示的。

另外一个激发我们设计统一模型的场景是Lambda架构的使用。尽管谷歌大多数数据处理的场景是由批处理系统和流处理系统分别单独承担的,不过有一个MillWheel的内部客户在弱一致性的模式下运行他们的流处理作业,用一个夜间的MR作业来生产正确的结果。他们发现他们的客户不信任弱一致性的实时结果,被迫重新实现了一个系统来支持强一致性,这样他们就能提供可靠的,低延时的数据处理结果。这个场景进一步激励我们能支持灵活地选择不同的执行引擎。

3.3.2 非对齐窗口:会话

从一开始我们就知道我们需要支持会话;事实上这是我们窗口模型对现有模型而言一个重大的贡献。会话对谷歌来说是一个非常重要的使用场景(也是MillWheel创建的原因之一)。会话窗口在一系列的产品域中都有应用,如搜索,广告,分析,社交和YouTube。基本上任何关心把用户的分散活动记录进行相互关联分析都需要通过会话来进行处理。因此,支持会话成为我们设计中的最重要考虑。如图14所示,支持会话在Dataflow中是非常简单的。

3.3.3 支付:触发器,累加和撤回

有两个在MillWheel上跑支付作业的团队遇到的问题对模型的一部分也有启发作用。当时我们的设计实践是使用水位线作为数据完全到达的指标。然后写额外的逻辑代码来处理迟到的数据或者更改源头数据。由于缺乏一个支持更新和撤回的系统,负责资源利用率方案的团队最终放弃了我们的平台,构建了自己独立的解决方案(他们最后使用的模型和我们同时设计开发的模型事实上非常类似)。另一个支付团队的数据源头有少部分缓慢到达的数据,造成了水位线延迟,这给他们带来了大问题。这些系统上的缺陷成为我们对现有系统需要进行改良设计的重要动因,并且把我们的考虑点从保证数据的完整性转移到了对迟到数据的可适应性。对于这个场景的思考总结带来了两个方面:一个方面是能够精确,灵活地确定何时将窗口内容物化的触发器(如7-14所示),对同样的输入数据集也可以使用多种多样地结果输出模式进行处理。另外一方面是通过累积和撤回能够支持增量处理。(图14)

3.3.4 统计计算:水位线触发器

很多MillWheel作业用来进行汇总统计(如平均延迟)。对这些作业来说,100%的准确性不是必须的,但是在合理的时间范围内得到一个接近完整的统计是必须的。考虑到对于结构化的输入(如日志文件),使用水位线就能达到很高程度的准确度。这些客户发现使用单次的的基于水位线的触发器就可以获得高度准确的统计。水位线触发器如图12所示。

我们有一些滥用检测的作业运行在MillWheel中。滥用检测是另外一种快速处理大部分数据比缓慢处理掉所有数据要远远更有价值的场景。因此,他们会大量地使用水位线百分位触发器。这个场景促使我们在模型中加入了对水位线百分位触发器的支持。

与此相关的,批处理作业中的一个痛点是部分处理节点的缓慢进度会成为执行时间中的长尾,拖慢整个进度。除了可以通过动态平衡作业来缓解这个问题,FlumeJava也支持基于整体完成百分度来选择是否终止长尾节点。用统一模型来描述批处理中遇到的这个场景的时候,水位线百分位触发器可以很自然地进行表达,不需要在引入额外的定制功能、定制接口。

3.3.5 推荐:处理时间触发器

另外一种我们考虑过的场景是从大量的谷歌数据资产中构建用户活动树(本质上是会话树)。这些树用来根据用户的兴趣来做推荐。在这些作业中我们使用处理时间作为触发器。这是因为,对于用户推荐来说,周期性更新的,即便是基于不完备数据的用户活动树比起持续等待水位线越过会话窗口边界(即会话结束)获得完全的数据要有意义的多。这也意味着由于部分少量数据引起的水位线进展延迟不影响基于其他已经到达的数据进行计算并获得有效的用户活动树。考虑到这种场景,我们包含了基于处理时间的触发器(如图7和图8所示)

3.3.6 异常探测:数据驱动和组合触发器

在MillWheel的论文中,我们描述了一种用来检测谷歌网站搜索查询趋势的微分异常探测数据处理管道。当我们为模型设计触发器的时候,这种微分异常探测系统启发我们设计了数据驱动触发器。这种微分探测器检测网站检索流,通过统计学估计来计算搜索查询请求量是否存在一个毛刺。如果系统认为一个毛刺即将产生,系统将发出一个启动型号。当他们认为毛刺已经消除,那么他们会发出一个停止信号(译者注:可能会对接系统自动对系统扩容或缩容)。尽管我们可以采用别的方式来触发计算,比如说Trill的标点符(Punctuations),但是对于异常探测你可能希望一旦系统确认有异常即将发生,系统应该立即输出这个判断。标点符的使用事实上把流处理系统转换成了微批次处理系统,引入了额外的延迟。在调查过一些用户场景后,我们认为标点符不完全适合我们。因此我们在模型中引入了可定制化数据驱动触发器。同时这个场景也驱使我们支持触发器组合,因为在现实场景中,一个系统可能在处理多种微分计算,需要根据定义的一组逻辑来支持多种多样的输出。图9中的AtCount触发器是数据驱动触发器的例子,而图10-14使用了组合触发器。

文章目录
  1. 1. 插曲
  2. 2. 为什么重新设计dataflow 编程模型
  3. 3. 概述
  4. 4. 介绍
    1. 4.1. unbounded/bounded vs streaming/batch
    2. 4.2. windowing
      1. 4.2.1. Fixed window/tumbling window
      2. 4.2.2. sliding window
      3. 4.2.3. session window
    3. 4.3. 时间领域 time domains
  5. 5. dataflow model
    1. 5.1. core primitives
      1. 5.1.1. ParDo
      2. 5.1.2. GroupByKey
    2. 5.2. windowing
      1. 5.2.1. window assignment
      2. 5.2.2. window merging
      3. 5.2.3. api
    3. 5.3. Triggers / Incremental Processing
    4. 5.4. example
  6. 6. 3. 实现和设计
    1. 6.1. 3.1 实现
    2. 6.2. 3.2 设计原则
    3. 6.3. 3.3 业务场景
      1. 6.3.1. 3.3.1 大规模数据回写和Lambda架构;统一模型
      2. 6.3.2. 3.3.2 非对齐窗口:会话
      3. 6.3.3. 3.3.3 支付:触发器,累加和撤回
      4. 6.3.4. 3.3.4 统计计算:水位线触发器
      5. 6.3.5. 3.3.5 推荐:处理时间触发器
      6. 6.3.6. 3.3.6 异常探测:数据驱动和组合触发器