《一篇文了解分布式队列编程:从模型、实战到优化》要点:
本文介绍了一篇文了解分布式队列编程:从模型、实战到优化,希望对您有用。如果有疑问,可以联系我们。
本文由美团点评技术团队出品,一篇文助你掌握分布式队列编程的要义.从模型到实战再到优化,基本涵盖你可能踩的坑与其解决办法.
作为一种基础的抽象数据结构,队列被广泛应用在各类编程中.大数据时代对跨进程、跨机器的通讯提出了更高的要求,和以往相比,分布式队列编程的运用几乎已无处不在.但是,这种常见的基础性的事物往往容易被忽视,使用者往往会忽视两点:
使用分布式队列的时候,没有意识到它是队列.
有具体需求的时候,忘记了分布式队列的存在.
文章首先从最基础的需求出发,详细剖析分布式队列编程模型的需求来源、定义、结构以及其变化多样性.通过这一部分的讲解,作者期望能在两方面帮助读者:一方面,提供一个系统性的思考方法,使读者能够将具体需求关联到分布式队列编程模型,具备进行分布式队列架构的能力;另一方面,通过全方位的讲解,让读者能够快速识别工作中碰到的各种分布式队列编程模型.
文章的第二部分实战篇.根据作者在新美大实际工作经验,给出了队列式编程在分布式环境下的一些具体应用.这些例子的基础模型并非首次出现在互联网的文档中,但是所有的例子都是按照挑战、构思、架构三个步骤进行讲解的.这种讲解方式能给读者一个“从需求出发去构架分布式队列编程”的旅程.
老司机介绍
刘丁,新美大广告平台CRM系统技术负责人,曾就职于Amazon、Tripadvisor.2014年加入美团,先后负责美团推荐系统、智能筛选系统架构,作为技术负责人主导了美团广告系统的开发和上线.目前致力于推进新美大广告运营的标准化、自动化和智能化.
新美大广告平台是美团、大众点评双平台的营销推广平台,帮助商户推广店铺品牌及提升客流量.
模型篇从基础的需求出发,去思考何时以及如何使用分布式队列编程模型.建模环节非常重要,因为大部分中高级工程师面临的都是具体的需求,接到需求后的第一个步骤就是建模.通过本篇的讲解,希望读者能够建立起从需求到分布式队列编程模型之间的桥梁.
通信是人们最基本的需求,同样也是计算机最基本的需求.对于工程师而言,在编程和技术选型的时候,更容易进入大脑的概念是RPC、RESTful、Ajax、Kafka.在这些具体的概念后面,最本质的东西是“通讯”.
所以,大部分建模和架构都需要从“通信”这个基本概念开始.当确定系统之间有通讯需求的时候,工程师们需要做很多的决策和平衡,这直接影响工程师们是否会选择分布式队列编程模型作为架构.从这个角度出发,影响建模的因素有四个:When、Who、Where、How.
通信的一个基本问题是:发出去的消息什么时候需要被接收到?这个问题引出了两个基础概念:“同步通讯”和“异步通讯”.根据理论抽象模型,同步通信和异步通信最本质的差别来自于时钟机制的有无.同步通信的双方需要一个校准的时钟,异步通信的双方不需要时钟.
现实的情况是,没有完全校准的时钟,所以没有绝对的同步通信.同样,绝对异步通信意味着无法控制一个发出去的消息被接收到的时间点,无期限的等待一个消息显然毫无实际意义.
所以,实际编程中所有的通信既不是“同步通信”也不是“异步通信”;或者说,既是“同步通信”也是“异步通信”.特别是对于应用层的通信,其底层架构可能既包含“同步机制”也包含“异步机制”.判断“同步”和“异步”消息的标准问题太深,而不适合继续展开.作者这里给一些启发式的建议:
发出去的消息是否需要确认,如果不需要确认,更像是异步通信,这种通信有时候也称为单向通信(One-WayCommunication).
如果需要确认,可以根据需要确认的时间长短进行判断.时间长的更像是异步通信,时间短的更像是同步通信.当然时间长短的概念是纯粹的主观概念,不是客观标准.
发出去的消息是否阻塞下一个指令的执行,如果阻塞,更像是同步,否则,更像是异步.
无论如何,工程师们不能生活在混沌之中,不做决定往往是最坏的决定.当分析一个通信需求或者进行通信构架的时候,工程师们被迫作出“同步”还是“异步”的决定.当决策的结论是“异步通信”的时候,分布式队列编程模型就是一个备选项.
在进行通信需求分析的时候,需要回答的另外一个基本问题是:消息的发送方是否关心谁来接收消息,或者反过来,消息接收方是否关心谁来发送消息.如果工程师的结论是:消息的发送方和接收方不关心对方是谁、以及在哪里,分布式队列编程模型就是一个备选项.因为在这种场景下,分布式队列架构所带来的解耦能给系统架构带来这些好处:
无论是发送方还是接收方,只需要跟消息中间件通信,接口统一.统一意味着降低开发成本.
在不影响性能的前提下,同一套消息中间件部署,可以被不同业务共享.共享意味着降低运维成本.
发送方或者接收方单方面的部署拓扑的变化不影响对应的另一方.解藕意味着灵活和可扩展.
在进行通信发送方设计的时候,令工程师们苦恼的问题是:如果消息无法被迅速处理掉而产生堆积怎么办、能否被直接抛弃?如果根据需求分析,确认存在消息积存,并且消息不应该被抛弃,就应该考虑分布式队列编程模型构架,因为队列可以暂存消息.
对通信需求进行架构,一系列的基础挑战会迎面而来,这包括:
可用性,如何保障通信的高可用.
可靠性,如何保证消息被可靠地传递.
持久化,如何保证消息不会丢失.
吞吐量和响应时间.
跨平台兼容性.
除非工程师对造轮子有足够的兴趣,并且有充足的时间,采用一个满足各项指标的分布式队列编程模型就是一个简单的选择.
很难给出分布式队列编程模型的精确定义,由于本文偏重于应用,作者并不打算完全参照某个标准的模型.总体而言:分布式队列编程模型包含三类角色:发送者(Sender)、分布式队列(Queue)、接收者(Receiver).发送者和接收者分别指的是生产消息和接收消息的应用程序或服务.
需要重点明确的概念是分布式队列,它是提供以下功能的应用程序或服务:
接收“发送者”产生的消息实体;
传输、暂存该实体;
为“接收者”提供读取该消息实体的功能.
特定的场景下,它当然可以是Kafka、RabbitMQ等消息中间件.但它的展现形式并不限于此,例如:
队列可以是一张数据库的表,发送者将消息写入表,接收者从数据表里读消息.
如果一个程序把数据写入Redis等内存Cache里面,另一个程序从Cache里面读取,缓存在这里就是一种分布式队列.
流式编程里面的的数据流传输也是一种队列.
典型的MVC(Model–view–controller)设计模式里面,如果Model的变化需要导致View的变化,也可以通过队列进行传输.这里的分布式队列可以是数据库,也可以是某台服务器上的一块内存.
最基础的分布式队列编程抽象模型是点对点模型,其他抽象构架模型居于改基本模型上各角色的数量和交互变化所导致的不同拓扑图.具体而言,不同数量的发送者、分布式队列以及接收者组合形成了不同的分布式队列编程模型.记住并理解典型的抽象模型结构对需求分析和建模而言至关重要,同时也会有助于学习和深入理解开源框架以及别人的代码.
基础模型中,只有一个发送者、一个接收者和一个分布式队列.如下图所示:
如果发送者和接收者都可以有多个部署实例,甚至不同的类型;但是共用同一个队列,这就变成了标准的生产者消费者模型.在该模型,三个角色一般称为生产者(Producer)、分布式队列(Queue)、消费者(Consumer).
如果只有一类发送者,发送者将产生的消息实体按照不同的主题(Topic)分发到不同的逻辑队列.每种主题队列对应于一类接收者.这就变成了典型的发布订阅模型.在该模型,三个角色一般称为发布者(Publisher),分布式队列(Queue),订阅者(Subscriber).
MVC模型
如果发送者和接收者存在于同一个实体中,但是共享一个分布式队列.这就很像经典的MVC模型.
为了让读者更好地理解分布式队列编程模式概念,这里将其与一些容易混淆的概念做一些对比 .
分布式队列编程模型的通讯机制一般是采用异步机制,但是它并不等同于异步编程.
首先,并非所有的异步编程都需要引入队列的概念,例如:大部分的操作系统异步I/O操作都是通过硬件中断( Hardware Interrupts)来实现的.
其次,异步编程并不一定需要跨进程,所以其应用场景并不一定是分布式环境.
最后,分布式队列编程模型强调发送者、接收者和分布式队列这三个角色共同组成的架构.这三种角色与异步编程没有太多关联.
随着Spark Streaming,Apache Storm等流式框架的广泛应用,流式编程成了当前非常流行的编程模式.但是本文所阐述的分布式队列编程模型和流式编程并非同一概念.
首先,本文的队列编程模式不依赖于任何框架,而流式编程是在具体的流式框架内的编程.
其次,分布式队列编程模型是一个需求解决方案,关注如何根据实际需求进行分布式队列编程建模.流式框架里的数据流一般都通过队列传递,不过,流式编程的关注点比较聚焦,它关注如何从流式框架里获取消息流,进行map、reduce、 join等转型(Transformation)操作、生成新的数据流,最终进行汇总、统计.
这里所有的项目都是作者在新美大工作的真实案例.实战篇的关注点是训练建模思路,所以这些例子都按照挑战、构思、架构三个步骤进行讲解.受限于保密性要求,有些细节并未给出,但这些细节并不影响讲解的完整性.
另一方面,特别具体的需求容易让人费解,为了使讲解更加顺畅,作者也会采用一些更通俗易懂的例子.通过本篇的讲解,希望和读者一起去实践“如何从需求出发去构架分布式队列编程模型”.
需要声明的是,这里的解决方案并不是所处场景的最优方案.但是,任何一个稍微复杂的问题,都没有最优解决方案,更谈不上唯一的解决方案.实际上,工程师每天所追寻的只是在满足一定约束条件下的可行方案.当然不同的约束会导致不同的方案,约束的松弛度决定了工程师的可选方案的宽广度.
信息采集处理应用广泛,例如:广告计费、用户行为收集等.作者碰到的具体项目是为广告系统设计一套高可用的采集计费系统.
典型的广告CPC、CPM计费原理是:收集用户在客户端或者网页上的点击和浏览行为,按照点击和浏览进行计费.计费业务有如下典型特征:
计费业务的典型特征给我们带来了如下挑战:
采集的高可用性意味着我们需要多台服务器同时采集,为了避免单IDC故障,采集服务器需要部署在多IDC里面.
实现一个高可用、高吞吐量、高一致性的信息传递系统显然是一个挑战,为了控制项目开发成本,采用开源的消息中间件进行消息传输就成了必然选择.
完整性约束要求集中进行计费,所以计费系统发生在核心IDC.
计费服务并不关心采集点在哪里,采集服务也并不关心谁进行计费.
根据以上构思,我们认为采集计费符合典型的“生产者消费者模型”.
采集计费系统架构图如下:
采用此架构,我们可以在如下方面做进一步优化:
缓存是一个非常宽泛的概念,几乎存在于系统各个层级.典型的缓存访问流程如下:
对于已经存入缓存的数据,其更新时机和更新频率是一个经典问题,即缓存更新机制(Cache Replacement Algorithms ).典型的缓存更新机制包括:近期最少使用算法(LRU)、最不经常使用算法(LFU).
这两种缓存更新机制的典型实现是:启动一个后台进程,定期清理最近没有使用的,或者在一段时间内最少使用的数据.由于存在缓存驱逐机制,当一个请求在没有命中缓存时,业务层需要从持久层中获取信息并更新缓存,提高一致性.
分布式缓存给缓存更新机制带来了新的问题:
根据上面的分析,分布式缓存需要解决的问题是:在保证读取性能的前提下,尽可能地提高老数据的一致性和新数据的可用性.如果仍然假定最近被访问的键值最有可能被再次访问(这是LRU或者LFU成立的前提),键值每次被访问后触发一次异步更新就是提高可用性和一致性最早的时机.
无论是高性能要求还是业务解耦都要求缓存读取和缓存更新分开,所以我们应该构建一个单独的集中的缓存更新服务.集中进行缓存更新的另外一个好处来自于频率控制.
由于在一段时间内,很多类型访问键值的数量满足高斯分布,短时间内重复对同一个键值进行更新Cache并不会带来明显的好处,甚至造成缓存性能的下降.通过控制同一键值的更新频率可以大大缓解该问题,同时有利于提高整体数据的一致性,参见“排重优化”.
综上所述,业务访问方需要把请求键值快速传输给缓存更新方,它们之间不关心对方的业务.要快速、高性能地实现大量请求键值消息的传输,高性能分布式消息中间件就是一个可选项.这三方一起组成了一个典型的分布式队列编程模型.
如下图,所有的业务请求方作为生产者,在返回业务代码处理之前将请求键值写入高性能队列.Cache Updater作为消费者从队列中读取请求键值,将持久层中数据更新到缓存中.
采用此架构,我们可以在如下方面做进一步优化:
典型的后台任务处理应用包括工单处理、火车票预订系统、机票选座等.我们所面对的问题是为运营人员创建工单.一次可以为多个运营人员创建多个工单.这个应用场景和火车票购买非常类似.工单相对来说更加抽象,所以,下文会结合火车票购买和运营人员工单分配这两种场景同时讲解.
典型的工单创建要经历两个阶段:数据筛选阶段、工单创建阶段.例如,在火车票预订场景,数据筛选阶段用户选择特定时间、特定类型的火车,而在工单创建阶段,用户下单购买火车票.
工单创建往往会面临如下挑战:
如果将用户筛选的最终规则做为消息存储下来,并发送给工单创建系统.此时,工单创建系统将具备创建工单所需的全局信息,具备在满足各种约束的条件下进行统筹优化的能力.如果工单创建阶段采用单实例部署,就可以避免数据锁定问题,同时也意味着没有锁冲突,所以也不会有死锁或任务延迟问题.
居于以上思路,在多工单处理系统的模型中,筛选阶段的规则创建系统将充当生产者角色,工单创建系统将充当消费者角色,筛选规则将作为消息在两者之间进行传递.这就是典型的分布式队列编程架构.根据工单创建量的不同,可以采用数据库或开源的分布式消息中间件作为分布式队列.
该架构流程如下图:
采用该架构,我们在数据锁定、运筹优化、原子性问题都能得到比较好成果:
接下来重点阐述工程师运用分布式队列编程构架的时候,在生产者、分布式队列以及消费者这三个环节的注意点以及优化建议.
确定采用分布式队列编程模型之后,主体架构就算完成了,但工程师的工作还远远未结束.天下事必做于细,细节是一个不错的架构向一个优秀的系统进阶的关键因素.优化篇选取了作者以及其同事在运用分布式队列编程模型架构时所碰到的典型问题和解决方案.
这里些问题出现的频率较高,如果你经验不够,很可能会“踩坑”.希望通过这些讲解,帮助读者降低分布式队列编程模型的使用门槛.本文将对分布式队列编程模型的三种角色:生产者(Producer),分布式队列(Queue),消费者(Consumer)分别进行优化讨论.
在分布式队列编程中,生产者往往并非真正的生产源头,只是整个数据流中的一个节点,这种生产者的操作是处理-转发(Process-Forward)模式.
这种模式给工程师们带来的第一个问题是吞吐量问题.这种模式下运行的生产者,一边接收上游的数据,一边将处理完的数据发送给下游.本质上,它是一个非常经典的数学问题,其抽象模型是一些没有盖子的水箱,每个水箱接收来自上一个水箱的水,进行处理之后,再将水发送到下一个水箱.
工程师需要预测水源的流量、每个环节水箱的处理能力、水龙头的排水速度,最终目的是避免水溢出水箱,或者尽可能地减小溢出事件的概率.实际上流式编程框架以及其开发者花了大量的精力去处理和优化这个问题.下文的缓存优化和批量写入优化都是针对该问题的解决方案.
第二个需要考虑的问题是持久化.由于各种原因,系统总是会宕机.如果信息比较敏感,例如计费信息、火车票订单信息等,工程师们需要考虑系统宕机所带来的损失,找到让损失最小化的解决方案.持久化优化重点解决这一类问题.
处于“处理-转发”模式下运行的生产者往往被设计成请求驱动型的服务,即每个请求都会触发一个处理线程,线程处理完后将结果写入分布式队列.如果由于某种原因队列服务不可用,或者性能恶化,随着新请求的到来,生产者的处理线程就会产生堆积.这可能会导致如下两个问题:
缓解这类问题的思路来自于CAP理论,即通过降低一致性来提高可用性.生产者接收线程在收到请求之后第一时间不去处理,直接将请求缓存在内存中(牺牲一致性),而在后台启动多个处理线程从缓存中读取请求、进行处理并写入分布式队列.
与线程所占用的内存开销相比,大部分的请求所占内存几乎可以忽略.通过在接收请求和处理请求之间增加一层内存缓存,可以大大提高系统的处理吞吐量和可扩展性.这个方案本质上是一个内存生产者消费者模型.
如果生产者的请求过大,写分布式队列可能成为性能瓶颈,有如下几个因素:
如果在处理请求和写队列之间添加一层缓存,消息写入程序批量将消息写入队列,可以大大提高系统的吞吐量.原因如下:
通过添加缓存,消费者服务的吞吐量和可用性都得到了提升.但缓存引入了一个新问题——内存数据丢失.对于敏感数据,工程师需要考虑如下两个潜在问题:
所以缓存中的数据需要定期被持久化到磁盘等持久层设备中,典型的持久化触发策略主要有两种:
分布式队列不等同于各种开源的或者收费的消息中间件,甚至在一些场景下完全不需要使用消息中间件.但是,消息中间件产生的目的就是解决消息传递问题,这为分布式队列编程架构提供了很多的便利.在实际工作中,工程师们应该将成熟的消息中间件作为队列的首要备选方案.
本节对消息中间件的功能、模型进行阐述,并给出一些消息中间件选型、部署的具体建议.
明白一个系统的每个具体功能是设计和架构一个系统的基础.典型的消息中间件主要包含如下几个功能:
抽象的消息中间件模型包含如下几个角色:
要完整的描述消息中间件各个方面非常困难,大部分良好的消息中间件都有完善的文档,这些文档的长度远远超过本文的总长度.但如下几个标准是工程师们在进行消息中间件选型时经常需要考虑和权衡的.
性能主要有两个方面需要考虑:吞吐量(Throughput)和响应时间(Latency).
不同的消息队列中间件的吞吐量和响应时间相差甚远,在选型时可以去网上查看一些性能对比报告.
对于同一种中间件,不同的配置方式也会影响性能.主要有如下几方面的配置:
可靠性主要包含:可用性、持久化、确认机制等.
高可用性的消息中间件应该具备如下特征:
高可靠的消息中间件应该确保从发送者接收到的消息不会丢失.中间件代理服务器的宕机并不是小概率事件,所以保存在内存中的消息很容易发生丢失.大部分的消息中间件都依赖于消息的持久化去降低消息丢失损失,即将接收到的消息写入磁盘.即使提供持久化,仍有两个问题需要考虑:
确认机制本质上是通讯的握手机制(Handshaking).如果没有该机制,消息在传输过程中丢失将不会被发现.高敏感的消息要求选取具备确认机制的消息中间件.当然如果没有接收到消息中间件确认完成的指令,应用程序需要决定如何处理.典型的做法有两个:
采用现存消息中间件就意味着避免重复造轮子.如果某个消息中间件未能提供对应语言的客户端接口,则意味着极大的成本和兼容性问题.
投递策略指的是一个消息会被发送几次.主要包含三种策略:最多一次(At most Once )、最少一次(At least Once)、仅有一次(Exactly Once).
在实际应用中,只考虑消息中间件的投递策略并不能保证业务的投递策略,因为接收者在确认收到消息和处理完消息并持久化之间存在一个时间窗口.例如,即使消息中间件保证仅有一次(Exactly Once),如果接收者先确认消息,在持久化之前宕机,则该消息并未被处理.
从应用的角度,这就是最多一次(At most Once).反之,接收者先处理消息并完成持久化,但在确认之前宕机,消息就要被再次发送,这就是最少一次(At least Once). 如果消息投递策略非常重要,应用程序自身也需要仔细设计.
消费者是分布式队列编程中真正的数据处理方,数据处理方最常见的挑战包括:有序性、串行化(Serializability)、频次控制、完整性和一致性等.
在很多场景下,如何保证队列信息的有序处理是一个棘手的问题.如下图,假定分布式队列保证请求严格有序,请求ri2和ri1都是针对同一数据记录的不同状态,ri2的状态比ri1的状态新.T1、T2、T3和T4代表各个操作发生的时间,并且 T1 < T2 < T3 < T4(”<“代表早于).
采用多消费者架构,这两条记录被两个消费者(Consumer1和Consumer2)处理后更新到数据库里面.Consumer1虽然先读取ri1但是却后写入数据库,这就导致,新的状态被老的状态覆盖,所以多消费者不保证数据的有序性.
很多场景下,串行化是数据处理的一个基本需求,这是保证数据完整性、可恢复性、事务原子性等的基础.为了在并行计算系统里实现串行化,一系列的相关理论和实践算法被提出.对于分布式队列编程架构,要在在多台消费者实现串行化非常复杂,无异于重复造轮子.
有时候,消费者的消费频次需要被控制,可能的原因包括:
完整性和一致性是所有多线程和多进程的代码都面临的问题.在多线程或者多进程的系统中考虑完整性和一致性往往会大大地增加代码的复杂度和系统出错的概率.
几乎所有串行化理论真正解决的问题只有一个:性能. 所以,在性能允许的前提下,对于消费者角色,建议采用单实例部署.通过单实例部署,有序性、串行化、完整性和一致性问题自动获得了解决.另外,单实例部署的消费者拥有全部所需信息,它可以在频次控制上采取很多优化策略.
天下没有免费的午餐.同样,单实例部署并非没有代价,它意味着系统可用性的降低,很多时候,这是无法接受的.解决可用性问题的最直接的思路就是冗余(Redundancy).最常用的冗余方案是Master-slave架构,不过大部分的Master-slave架构都是Active/active模式,即主从服务器都提供服务.
例如,数据库的Master-slave架构就是主从服务器都提供读服务,只有主服务器提供写服务.大部分基于负载均衡设计的Master-slave集群中,主服务器和从服务器同时提供相同的服务.这显然不满足单例服务优化需求.
有序性和串行化需要Active/passive架构,即在某一时刻只有主实例提供服务,其他的从服务等待主实例失效.这是典型的领导人选举架构,即只有获得领导权的实例才能充当实际消费者,其他实例都在等待下一次选举.采用领导人选举的Active/passive架构可以大大缓解纯粹的单实例部署所带来的可用性问题.
令人遗憾的是,除非工程师们自己在消费者实例里面实现Paxos等算法,并在每次消息处理之前都执行领导人选举.否则,理论上讲,没有方法可以保障在同一个时刻只有一个领导者.而对每个消息都执行一次领导人选举,显然性能不可行.
实际工作中,最容易出现的问题时机发生在领导人交接过程中,即前任领导人实例变成辅助实例,新部署实例开始承担领导人角色.为了平稳过渡,这两者之间需要有一定的通讯机制,但是,无论是网络分区(Network partition)还是原领导人服务崩溃都会使这种通讯机制变的不可能.
对于完整性和一致性要求很高的系统,我们需要在选举制度和交接制度这两块进行优化.
典型的领导人选举算法有Paxos、ZAB( ZooKeeper Atomic Broadcast protocol).为了避免重复造轮子,建议采用ZooKeeper的分布式锁来实现领导人选举.典型的ZooKeeper实现算法如下:
Let ELECTION be a path of choice of the application. To volunteer to be a leader:
1.Create znode z with path “ELECTION/guid-n_” with both SEQUENCE and EPHEMERAL flags;
2.Let C be the children of “ELECTION”, and i be the sequence number of z;
3.Watch for changes on “ELECTION/guid-n_j”, where j is the largest sequence number such that j < i and n_j is a znode in C;
Upon receiving a notification of znode deletion:
1.Let C be the new set of children of ELECTION;
2.If z is the smallest node in C, then execute leader procedure;
3.Otherwise, watch for changes on “ELECTION/guid-n_j”, where j is the largest sequence number such that j < i and n_j is a znode in C;
领导人选举的整个过程发生在ZooKeeper集群中,各个消费者实例在这场选举中只充当被告知者角色(Learner).领导人选举算法,只能保证最终只有一个Leader被选举出来,并不保障被告知者对Leader的理解是完全一致的.
本质上,上文的架构里,选举的结果是作为令牌(Token)传递给消费者实例,消费者将自身的ID与令牌进行对比,如果相等,则开始执行消费操作.所以当发生领导人换届的情况,不同的Learner获知新Leader的时间并不同.
例如,前任Leader如果因为网络问题与ZooKeeper集群断开,前任Leader只能在超时后才能判断自己是否不再承担Leader角色了,而新的Leader可能在这之前已经产生.另一方面,即使前任Leader和新Leader同时接收到新Leader选举结果,某些业务的完整性要求迫使前任Leader仍然完成当前未完成的工作.
以上的讲解非常抽象,生活中却给了一些更加具体的例子.众所周知,美国总统候选人在选举结束后并不直接担任美国总统,从选举到最终承担总统角色需要一个过渡期.对于新当选Leader的候选人而言,过渡期间称之为加冕阶段(Inauguration).对于即将卸任的Leader,过渡期称为交接阶段(HandOver).
所以一个基于领导人选举的消费者从加冕到卸任经历三个阶段:Inauguration、Execution、HandOver.在加冕阶段,新领导需要进行一些初始化操作.Execution阶段是真正的队列消息处理阶段.在交接阶段,前任领导需要进行一些清理操作.
类似的,为了解决领导人交接问题,所有的消费者从代码实现的角度都需要实现类似ILeaderCareer接口.这个接口包含三个方发inaugurate(),handOver()和execute().某个部署实例(Learner)在得知自己承担领导人角色后,需要调用inaugurate()方法,进行加冕.主要的消费逻辑通过不停的执行execute()实现,当确认自己不再承担领导人之后,执行handOver()进行交接.
如果承担领导人角色的消费者,在执行execute()阶段得知自己将要下台,根据消息处理的原子性,该领导人可以决定是否提前终止操作.如果整个消息处理是一个原子性事务,直接终止该操作可以快速实现领导人换届.否则,前任领导必须完成当前消息处理后,才进入交接阶段.这意味着新的领导人,在inaugurate()阶段需要进行一定时间的等待.
频次控制是一个经典问题.对于分布式队列编程架构,相同请求重复出现在队列的情况并不少见.如果相同请求在队列中重复太多,排重优化就显得很必要.分布式缓存更新是一个典型例子,所有请求都被发送到队列中用于缓存更新.如果请求符合典型的高斯分布,在一段时间内会出现大量重复的请求,而同时多线程更新同一请求缓存显然没有太大的意义.
排重优化是一个算法,其本质是基于状态机的编程,整个讲解通过模型、构思和实施三个步骤完成.
进行排重优化的前提是大量重复的请求.在模型这一小节,我们首先阐述重复度模型、以及不同重复度所导致的消费模型,最后基于这两个模型去讲解排重状态机.
首先我们给出最小重复长度的概念.同一请求最小重复长度:同一请求在队列中的重复出现的最小间距.例如,请求ri第一次出现在位置3,第二次出现在10,最小重复长度等于7.
是否需要进行排重优化取决于队列中请求的重复度.由于不同请求之间并不存在重复的问题,不失一般性,这里的模型只考了单个请求的重复度,重复度分为三个类:无重复、稀疏重复、高重复.
对于不同的重复度,会有不同的消费模型.
在整个队列处理过程中,所有的请求都不相同,如下图:
当同一请求最小重复长度大于消费者队列长度,如下图.假定有3个消费者,Consumer1将会处理r1,Consumer2将会处理r2,Consumer3将会处理r3,如果每个请求处理的时间严格相等,Consumer1在处理完r1之后,接着处理r4,Consumer2将会处理r2之后会处理r1.虽然r1被再次处理,但是任何时刻,只有这一个消费者在处理r1,不会出现多个消费者同时处理同一请求的场景.
如下图,仍然假定有3个消费者,队列中前面4个请求都是r1,它会同时被3个消费者线程处理:
显然,对于无重复和稀疏重复的分布式队列,排重优化并不会带来额外的好处.排重优化所针对的对象是高重复消费模型,特别是对于并行处理消费者比较多的情况,重复处理同一请求,资源消耗极大.
排重优化的主要对象是高重复的队列,多个消费者线程或进程同时处理同一个幂等请求只会浪费计算资源并延迟其他待请求处理.所以,排重状态机的一个目标是处理唯一性,即:同一时刻,同一个请求只有一个消费者处理.
如果消费者获取一条请求消息,但发现其他消费者正在处理该消息,则当前消费者应该处于等待状态.如果对同一请求,有一个消费者在处理,一个消费者在等待,而同一请求再次被消费者读取,再次等待则没有意义.
所以,状态机的第二个目标是等待唯一性,即:同一时刻,同一个请求最多只有一个消费者处于等待状态.总上述,状态机的目标是:处理唯一性和等待唯一性.我们把正在处理的请求称为头部请求,正在等待的请求称为尾部请求.
由于状态机的处理单元是请求,所以需要针对每一个请求建立一个排重状态机.基于以上要求,我们设计的排重状态机包含4个状态Init,Process,Block,Decline.各个状态之间转化过程如下图:
状态机描述的是针对单个请求操作所引起状态变化,排重优化需要解决队列中所有请求的排重问题,需要对所有请求的状态机进行管理.这里只考虑单虚拟机内部对所有请求状态机的管理,对于跨虚拟机的管理可以采用类似的方法.对于多状态机管理主要包含三个方面:一致性问题、完整性问题和请求缓存驱逐问题.
一致性在这里要求同一请求的不同消费者只会操作一个状态机.由于每个请求都产生一个状态机,系统将会包含大量的状态机.为了兼顾性能和一致性,我们采用ConcurrentHashMap保存所有的状态机.用ConcurrentHashMap而不是对整个状态机队列进行加锁,可以提高并行处理能力,使得系统可以同时操作不同状态机.
为了避免处理同一请求的多消费者线程同时对ConcurrentHashMap进行插入所导致状态机不一致问题,我们利用了ConcurrentHashMap的putIfAbsent()方法.代码方案如下,key2Status用于存储所有的状态机.
消费者在处理请求之前,从状态机队列中读取排重状态机TrafficAutomate.如果没有找到,则创建一个新的状态机,并通过putIfAbsent()方法插入到状态机队列中.
完整性要求保障状态机Init,Process,Block,Decline四种状态正确、状态之间的转换也正确.由于状态机的操作非常轻量级,兼顾完整性和降低代码复杂度,我们对状态机的所有方法进行加锁.
如果不同请求的数量太多,内存永久保存所有请求的状态机的内存开销太大.所以,某些状态机需要在恰当的时候被驱逐出内存.这里有两个思路:
每个请求对应于一个状态机,不同的状态机采用不同的请求进行识别.
对于同一状态机的不同消费者,在单虚拟机方案中,我们采用线程id进行标识.
排重优化的主要功能都是通过排重状态机(TrafficAutomate)和状态机队列(QueueCoordinator)来实施的.排重状态机描述的是针对单个请求的排重问题,状态机队列解决所有请求状态机的排重问题.
根据状态机模型,其主要操作为enQueue和deQueue,其状态由头部请求和尾部请求的状态共同决定,所以需要定义两个变量为head和tail,用于表示头部请求和尾部请求.为了确保多线程操作下状态机的完整性(Integraty),所有的操作都将加上锁.
当一个消费者执行enQueue操作时:如果此时尾部请求不为空,根据等待唯一性要求,返回DECLINE,当前消费者应该抛弃该请求;如果头部请求为空,返回ACCPET,当前消费者应该立刻处理该消息;否则,返回BLOCK,该消费者应该等待,并不停的查看状态机的状态,一直到头部请求处理完成.enQueue代码如下:
对于deQueue操作,首先将尾部请求赋值给头部请求,并将尾部请求置为无效.deQueue代码如下:
状态机队列集中管理所有请求的排重状态机,所以其操作和单个状态机一样,即enQueue和deQueuqe接口.这两个接口的实现需要识别特定请求的状态机,所以它们的入参应该是请求.为了兼容不同类型的请求消息,我们采用了Java泛型编程.接口定义如下:
enQueue操作过程如下:
首先,根据传入的请求key值,获取状态机, 如果不存在则创建一个新的状态机,并保存在ConcurrentHashMap中.
接下来,获取线程id作为该消费者的唯一标识,并对对应状态机进行enQueue操作.
如果状态机返回值为ACCEPT或者DECLINE,返回业务层处理代码,ACCEPT意味着业务层需要处理该消息,DECLINE表示业务层可以抛弃当前消息.如果状态机返回值为Block,则该线程保持等待状态.
在某些情况下,头部请求线程可能由于异常,未能对状态机进行deQueue操作(作为组件提供方,不能假定所有的规范被使用方实施).为了避免处于阻塞状态的消费者无期限地等待,建议对状态机设置安全超时时限.超过了一定时间后,状态机强制清空头部请求,返回到业务层,业务层开始处理该请求.
代码如下:
deQueue操作首先从ConcurrentHashMap获取改请求所对应的状态机,接着获取该线程的线程id,对状态机进行deQueue操作.
enQueue代码如下:
完整源代码可以在QueueCoordinator获取.链接:
https://github.com/dinglau2008/QueueCoordinator/tree/master/src
文章出处:美团点评技术团队
转载请注明本页网址:
http://www.vephp.com/jiaocheng/4455.html