Flink介绍——实时计算核心论文之Storm论文总结

发布于:2025-04-05 ⋅ 阅读:(16) ⋅ 点赞:(0)

引入

我们通过S4和Storm论文的以下文章,已经对S4和Storm有了较多认识:

本文则会结合这两者的论文内容进行对比总结,去看看为什么Storm能战胜S4成为当时实时处理的顶流。

我们知道S4的实时计算模型,是通过抽象出一个叫做 PE 的单元,然后所有的数据、处理逻辑都是基于PE的,而且整个系统没有Master,是完全对称的架构。

而Storm在流式计算上,虽然也采用了类似的有向无环图(DAG)的逻辑模型,但是在整个系统架构上,Storm却要传统很多。Storm是一个典型的Master+Worker的分布式系统架构,并且将传送的消息和对应消息的处理逻辑做了分离。

计算逻辑模型设计

和S4类似,Storm系统的抽象模型,也是一个有向无环图。在Storm里的有向无环图,叫做 Topology,也就是拓扑图。

整个拓扑图里有这样几个元素:

  • 首先是 Spouts,也就是数据源。Storm并没有像S4一样,把一切东西都定义成PE。而是抽象出了Spout,由它负责从外部去读取数据或者接收数据。就和它的名字一样,Spout好像一个出水管,一旦打开,就会源源不断地有外部的数据灌进来。这在S4里,对应的就是无键PE(Keyless PE)

  • 其次是 Tuple,也就是元组,它是Topology中传输的最小粒度的数据单元。一个Tuple是一个带命名的值的列表,可以把它看成是一个个KV对,不过这个Key只是在定义Tuple的时候出现。但是在数据传输的时候,我们只需要传输对应的值。这个方式,其实有点像Thrift,字段的名称是定义在外部的,实际传输的时候只需要序号、类型和值。这在S4里,Tuple对应的就是事件(Event)

  • 然后是 Streams,也就是数据流,一个流包含了无限多个Tuple的序列,这些Tuple会被系统分布式地并行去处理。

  • 最后就是 Bolts,也就是我们进行计算逻辑处理的地方。Bolt可以处理任意数量的输入流,然后产生任意数量的输出流。对应地,我们要把计算结果写入到外部数据库,也是通过Bolts来进行处理。乍一看,Bolts似乎很像S4里的PE,用于处理对应的分布式计算逻辑,不过实际上,Bolts和PE完全不一样。

可以看到,在Storm抽象的计算模型里,和S4的最大不同就在 Bolts 上。S4的PE,不仅是一个功能逻辑的单元,也是一个KV对的数据。同样类型的事件下,所有相同的Key的数据,都会聚合到同一个PE下。这就使得整个系统里有大量的PE对象,这也是我们提到的,S4最主要的问题之一,大量的PE会占用大量的内存和GC开销。

但是在S4中,这个内存占用又是应用开发人员完全控制不了的。因为系统里有多少PE,以及当前计算节点的内存占用是否过大,是由S4框架控制的。对于应用开发人员来说,能操作的只是单个PE对象,在内存不足的时候,我们原本可以在应用层面有更灵活的操作,比如更频繁地把数据输出到外部的KV数据库里,释放掉内存占用。但是在S4的框架下,我们做不到这一点。

其次就是我们的业务逻辑代码里,混入了控制分布式数据分发的逻辑。在S4中,分布式的分发逻辑,和数据处理逻辑混合在了一起,整个系统是被耦合在一起了。论文中提到的示例代码中,特地将输出的Key变成了(SortID,N)这样的组合,就是为了通过PE里的逻辑处理代码,来设置整个拓扑图的并行度,从而使得Top K的排序能够分布式地并行执行。

然而离谱的是,如果计算的数据量增加了,想要提升并行度,只能修改代码,重新编译部署才行,没办法通过修改参数简单地实现。并且已经处理了(SortID, N)组合PE,在重新部署之后,处理的数据可能是完全不同的,因为我们的N的最大值已经变了。

而Storm的设计就要好很多,Storm里的Bolt更像是MapReduce里的Map或者Reduce函数。我们可以在Topology里面,去设置不同Bolt的并行度,以及设置数据流是如何分组的。但是每个Bolt输出的Tuple本身,却不需要通过生成一个类似于(SortID, N)这样一个特殊的Key,来定义下一层的Bolt的并行度。

在Storm里面,对应的数据流可以进行这样几种分组(Grouping):

  • 随机分组(Shuffle Grouping),也就是每一个Bolt输出的结果,在分发到下游的Bolt的时候,是随机分发给不同的Bolt的,每个Bolt都会收到数量接近的Tuple。

  • 字段分组(Fields Grouping),可以选定Tuple中的某一个字段,按照字段的值进行分组。比如我们如果还是要进行单词出现频率的Top K排序,我们可能有一个Tuple,第一个字段叫做word,存放具体的单词,第二个字段叫做count,存放它出现的频率。那么,我们可以通过word这个字段进行字段分组,这样,相同单词的Tuple就会分发到相同的Bolt里去了。

  • 全部分组(All Grouping),这个类似于数据广播,也就是Bolt输出的Tuple需要向下游的每一个Bolt都发送一份。

  • 全局分组(Global Grouping),这个类似于S4里你看到的终点的MergePE,所有上游的Bolt都会把Tuple发送到唯一一个下游Bolt中。这样在下游,就可以有全局信息来做统计判断。

  • 无分组(None Grouping),这个是说开发人员不关心这个数据怎么分组。在实际实现里面,它和随机分组是一样的。

  • 指向分组(Direct Grouping),这个是指,上游的Bolt可以指定下游由哪一个Bolt来接收对应的Tuple。

  • 本地或随机分组(Local or Shuffle Grouping),也就是当下游的Bolt如果有一个或者多个“任务(Tasks)”,和上游的Bolt在同一个worker进程里,那么Tuple只会分发到这些进程里的任务里。如果没有的话,那就还是按照随机分组的方式发送Tuple。这个主要是为了性能考虑,如果可以在同一个台机器的同一个进程内通信,会大大节省整个集群的网络开销。

在Storm下进行Top K的单词排序,和S4对应处理的数据流向是不一样的,S4里的一个WordCountPE的输出,只会给一个SortPE;而Storm里的WordCountBolt的输出,会发送给多个不同的SortCountBolt,因为同一个WordCountBolt下,会包含很多个不同的单词。

主从架构设计

前面提到,Storm的Bolt很像MapReduce的Map和Reduce函数,其实Storm本身的架构也和MapReduce非常相似。Storm选择了一个典型的主从架构设计。整个Storm集群里,是由 Nimbus+Supervisor+Worker 这样三种类型的进程组成的。

首先是 Nimbus进程,其实也就是Storm集群的Master节点。它的作用,类似于Hadoop里的JobTracker,主要负责资源的分配和任务的调度。

开发人员会直接提交一个Topology给Master。这个Topology就会从一个抽象的有向无环图,转换成实际应用里,类似MapReduce一样的任务,本质就是编译好的程序和对应的配置。只不过与MapReduce的任务不一样是,MapReduce任务执行完了就结束了,而Topology这个任务如果我们不去终止它,它就会一直运行下去。

然后是 Supervisor进程,这个类似于Hadoop里的TaskTracker,也就是MapReduce里的Worker。Supervisor在每一个服务器上都会有一个,它本身不负责执行任务,但是会负责接收Nimbus分配的任务,然后管理本地的Worker进程,让Worker进程来实际执行任务。

最后是 Worker进程,一台服务器上会有多个Worker进程。Storm是使用Clojure写的,跑在JVM上,所以每一个Worker进程就是一个独立的JVM,Worker里面还会通过JVM的Executor来维护一个线程池。然后实际的线程池里,会有很多个Spout/Bolt的任务。因为Java的Executor的实现里会复用线程,所以Spout和Bolt实际上会使用同一个线程。这个也会大大减少整个系统的开销。

而把整个系统拆分成Nimbus、Supervisor和Worker三种进程,就使得Storm的容错能力也大大增强了。

Nimbus和Supervisor之间,并不是直接通信的。因为如果这样的话,显然Nimbus会有单点故障的风险。所以Nimbus是把对应的任务分配写到Zookeeper里。这样任务分配是持久化 的,而且会通过Paxos协议来保障容错能力,而Supervisor也是从Zookeeper里面,去读取对应的任务分配。

Nimbus和Supervisor的职责都非常简单,Nimbus只需要进行Topology的解析和任务调度,而Supervisor只需要接收任务,并且监控Worker进程是否存活。 它们本身不处理数据,而且也不在内存里面保存数据。 即使挂掉了,也只需要简单重启一下进程就好了。

这种的设计思路,通过把一些职责单独拆分出来,让特定的节点足够简单。即使这些节点可能成为单点,但是它们的稳定性,也会远高于要处理复杂逻辑的Worker进程。

可以看到,Storm的整体设计思路和MapReduce很像,各个节点的角色都能在MapReduce的各种节点里找到对应的影子。其实, 各类分布式系统的设计思路都是类似的,特别是这样Master+Worker 组合的模型,那就是Master负责调度,Worker负责实际处理问题。

而为了解决高可用性,往往我们会引入分布式锁,确保任务分配的数据不依赖Master。

另外,为了让整个系统更稳定,我们也会拆分调度任务的进程,和直接执行任务的进程,让每个进程都只有单一的职责。

容错机制

在S4的论文里,并没有说明它的上下游是怎么通信的。但在Storm里,倒是可以很清楚地知道,它是通过 ZeroMQ 这个消息队列,完成两个不同的Worker之间的通信的。

相比于通过一个RPC,消息队列有一个很大的优点,那就是高性能。上游节点不需要等待下游节点返回接收成功,就能发送下一条信息。当然这也会带来一个问题,就是如果在消息发送之后,下游是否成功接收并处理了这条消息,上游是不知道的。可能因为网络超时、也可能因为下游节点的软硬件故障, 在分布式系统里,这类问题是没法避免的。

而且论文里提到的统计Tweet里的单词数量的例子,需要先要从一个TweetSpout里,读取数据流里的Tweet,随机发送给到一个ParseTweetBolt,这个Bolt会解析Tweet成一个个单词,再发送给下游的多个WordCountBolt。而这里下游不只有一个WordCountBolt,而是不同的单词会发送给不同的WordCountBolt,其中任意一个WordCountBolt没有被成功处理,都意味着最终结果的错误。

Storm选择的解决方案,是把从Spout发起的第一个Tuple作为一棵树的根。下游所有衍生出来发送的Tuple,都是这棵树的一部分。任何一个Tuple处理失败或者超时了,那么就从Spout重新发送消息。

而要实现这个操作,Storm在系统里引入了一个特殊的Bolt,叫做AckerBolt。Spout发送出去的消息,同时会通知给到AckerBolt。而Bolt一旦处理完根Tuple 相关的消息,也会通知给到Acker。

Bolt会告诉AckerBolt两个信息,一个是我已经处理完了某一个Tuple,另一个是这个Tuple衍生往下游的哪些Tuple我也已经发送出去了。这样,Acker就有了一开始Spout发出的Tuple的整棵树的完整信息。等到最后一层的Bolt处理完对应的Tuple,然后发送了对应的通知给到AckerBolt,并且告诉它后面没有新的Tuple了,那么AckerBolt就知道,整棵Tuple树已经处理完成了。

但是如果这样搞,那AckerBolt的开销岂不是会非常大?

因为看起来要实现这个,就需要在AckerBolt里面,存上整棵Tuple树。更准确地说,AckerBolt不是要存一棵Tuple树,而是要把所有还在处理中的Tuple都存下来。这就相当于一个AckerBolt,需要存下所有Spout和Bolt在整个处理过程中的内存占用。这样的话,任务一多起来,那集群资源很快就抗不住了。

针对这个,Storm采用了一个很巧妙的办法,那就是利用位运算里的异或(XOR)。Storm给每一个发送出去的Tuple都会分配一个64位的message id。当消息从Spout被发送出去的时候,Storm会给AckerBolt发送这个message-id,告诉它,你要开始追踪这个Tuple树了。Acker里呢,则会维护一个message-id到校验码(checksum)的映射关系。这个校验码,一开始就是拿0和message-id去异或(XOR)一下。

而下游的每一个Bolt,会处理完这个Tuple相关的消息,并且向外发送新的Tuple。每个新发送的Tuple里,都需要带上根Tuple的message-id。在新Tuple发送出去之后,Bolt会通知AckerBolt,通知的内容也很简单,也是一个根message-id到校验码的映射关系。

这里的校验码,就是把当前对外发送的所有消息的message-id,和已经处理完的消息的message-id做一下异或。然后AckerBolt收到这个消息,会把收到的校验码,和本地的校验码也做一下异或,更新成最新的校验码。

因为异或操作,就是当两个数字完全相同的时候,会变成0,也就是A XOR A = 0。而在其他情况下,最后的结果一定不会每一位都是0。我们发送一次消息,并且acking一次消息,相当于在这个校验码上执行了一次A XOR A。所以,只要有Tuple还没有被acking,我们的校验码就不会是0,但是一旦所有的Tuple树上的Tuple都被acking了,那么这个校验码必然就是0。

正是通过message-id加上校验码的方式,Storm只需要16 bytes就能在AckerBolt里维护一个Tuple树是否已经都处理完了。而且所有的Bolt通知Acker最新的执行情况,也只需要发送16 bytes的messsage-id和校验码,既不需要发送Tuple的原始内容,也不需要为向下游发送的每一个Tuple都单独发送一条消息,占用的性能消耗也不会太大。这样自然就没有前面我们担心的集群资源问题了。

不过这个机制只能保障,Spout发出来的Tuple至少被处理一次,也就是At Least Once,但是它避免不了Tuple可能被重复处理的问题。比如拿Top K排序的Topology来说,任何一个单词在某个Bolt里没有被正确处理,就需要重新处理整个句子。这也意味着,一些单词可能会被重复统计。所以,这个通过AckerBolt进行容错重发的机制,并不适合所有的应用场景。用户需要根据自己的实际业务需求,来决定要不要启用这个机制。

有了At Least Once,那你自然会想到还有At Most Once,也就是一条消息最多发一次。这个要求在Storm里自然也很容易做到,只要关掉这里的acking机制就好了。其实也就是上游只需要把消息发出去,下游有没有收到、有没有处理成功,上游就不管了。

然而无论是At Least Once还是At Most Once,都不是我们最理想的进行流式数据处理的方式。我们真正希望的,是每个消息“恰好”被处理一次,也就是“ Exactly Once”。所以从这一点上来说,Storm还不是流式数据处理最终极的解决方案。

总结

我们梳理了Storm里的核心设计,其中最重要的就是它是如何巧妙地通过异或运算,用很小的内存和网络开销,来追踪整个Tuple的生命周期,通过重发消息实现“至少一次”的容错保障。