5. 高级发布/订阅模式

第 5 章 - 高级发布/订阅模式 #

第 3 章 - 高级请求/回复模式第 4 章 - 可靠请求/回复模式中,我们探讨了 ZeroMQ 请求/回复模式的高级用法。如果你能消化所有这些内容,恭喜你。在本章中,我们将重点关注发布/订阅,并用更高级的模式来扩展 ZeroMQ 的核心发布/订阅模式,以实现性能、可靠性、状态分发和监控。

我们将涵盖

  • 何时使用发布/订阅
  • 如何处理过慢的订阅者 (Suicidal Snail 模式)
  • 如何设计高速订阅者 (Black Box 模式)
  • 如何监控发布/订阅网络 (Espresso 模式)
  • 如何构建共享键值存储 (Clone 模式)
  • 如何使用 Reactor 简化复杂服务器
  • 如何使用 Binary Star 模式为服务器添加故障转移功能

发布/订阅的优缺点 #

ZeroMQ 的底层模式各具特点。发布/订阅解决了 오래된 消息传递问题,即 多播 (multicast) 或 组播 (group messaging)。它具有 ZeroMQ 独特的细致的简洁性和残酷的漠视的结合。理解发布/订阅的权衡、它们如何使我们受益,以及如果需要如何绕过它们,是很有价值的。

首先,PUB 将每条消息发送给“许多中的所有”,而 PUSH 和 DEALER 将消息轮流发送给“许多中的一个”。你不能简单地用 PUB 替换 PUSH 或反过来,然后指望一切正常。这一点值得重申,因为人们似乎经常建议这样做。

更深层地说,发布/订阅旨在实现可伸缩性。这意味着大量数据快速发送给许多接收者。如果你需要每秒向数千个点发送数百万条消息,你将比只需每秒向少数接收者发送几条消息更能体会到发布/订阅的价值。

为了获得可伸缩性,发布/订阅使用了与 push-pull 相同的技巧,即去除回话。这意味着接收者不会回复发送者。有一些例外,例如 SUB 套接字会向 PUB 套接字发送订阅信息,但它是匿名的且不频繁的。

去除回话对于真正的可伸缩性至关重要。在发布/订阅中,这是该模式如何干净地映射到 PGM 多播协议的方式,该协议由网络交换机处理。换句话说,订阅者根本不连接到发布者,他们连接到交换机上的一个多播,发布者将消息发送到该组。

当我们去除回话时,整体消息流会变得 훨씬 简单,这使得我们可以构建更简单的 API、更简单的协议,并且通常能够触达更多人。但我们也消除了协调发送者和接收者的任何可能性。这意味着

  • 发布者无法知道订阅者何时成功连接,无论是在初始连接还是在网络故障后重新连接时。

  • 订阅者无法告知发布者任何信息来让发布者控制他们发送消息的速度。发布者只有一个设置,即全速,订阅者必须跟上,否则就会丢失消息。

  • 发布者无法知道订阅者何时因进程崩溃、网络中断等原因而消失。

缺点是,如果我们想实现可靠的多播,我们就确实需要所有这些功能。当订阅者正在连接时、网络故障发生时,或者仅仅是订阅者或网络无法跟上发布者时,ZeroMQ 发布/订阅模式会任意丢失消息。

好处是,在许多用例中,几乎 可靠的多播已经足够好。当我们需要这种回话时,我们可以切换到使用 ROUTER-DEALER (我在大多数正常流量情况下倾向于这样做),或者我们可以添加一个独立的通道用于同步 (在本章后面我们将看到一个例子)。

发布/订阅就像无线电广播;在你加入之前,你会错过所有内容,然后你收到多少信息取决于你的接收质量。令人惊讶的是,这种模式很有用并且广泛应用,因为它与现实世界的信息分发完美契合。想想 Facebook 和 Twitter、BBC 世界广播以及体育比赛结果。

就像我们在请求/回复模式中所做的那样,让我们根据可能出现的问题来定义可靠性。以下是发布/订阅的经典故障情况:

  • 订阅者加入较晚,因此他们会错过服务器已经发送的消息。
  • 订阅者获取消息速度过慢,导致队列积压并溢出。
  • 订阅者可能断开连接并在断开期间丢失消息。
  • 订阅者可能崩溃并重启,丢失他们已经收到的任何数据。
  • 网络可能过载并丢弃数据 (特别是对于 PGM)。
  • 网络可能变得过慢,导致发布者端的队列溢出,发布者崩溃。

可能出现的问题还有很多,但这些是我们在现实系统中看到的典型故障。自 v3.x 版本起,ZeroMQ 对其内部缓冲区(即所谓的高水位标记或 HWM)强制设置了默认限制,因此发布者崩溃的情况较少见,除非您故意将 HWM 设置为无限大。

所有这些故障情况都有解决方案,尽管并非总是简单的。可靠性需要我们大多数人大多数时候不需要的复杂性,这就是 ZeroMQ 不试图开箱即用提供它的原因(即使存在一种全局的可靠性设计,而实际上并不存在)。

发布/订阅跟踪 (Espresso 模式) #

让我们从查看一种跟踪发布/订阅网络的方法开始本章。在第 2 章 - 套接字与模式中,我们看到了一个使用这些套接字进行传输桥接的简单代理。该 zmq_proxy()方法有三个参数:它桥接在一起的 frontendbackend 套接字,以及一个 capture 套接字,它将所有消息发送到该套接字。

代码看起来很简单

C | C++ | Java | Node.js | Python | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | Tcl | OCaml

Espresso 的工作原理是创建一个监听线程,该线程读取一个 PAIR 套接字并打印其接收到的任何内容。该 PAIR 套接字是管道的一端;另一端(另一个 PAIR)是我们传递给 zmq_proxy()的方法。在实际应用中,您会过滤感兴趣的消息以获取您想要跟踪的核心内容(因此得名该模式)。

订阅者线程订阅“A”和“B”,接收五条消息,然后销毁其套接字。当您运行示例时,监听器打印两条订阅消息、五条数据消息、两条取消订阅消息,然后归于平静。

[002] 0141
[002] 0142
[007] B-91164
[007] B-12979
[007] A-52599
[007] A-06417
[007] A-45770
[002] 0041
[002] 0042

这清楚地展示了当没有订阅者订阅时,发布者套接字如何停止发送数据。发布者线程仍在发送消息。套接字只是默默地丢弃它们。

最新值缓存 #

如果您使用过商业发布/订阅系统,您可能习惯于 ZeroMQ 快速活泼的发布/订阅模式中缺少的一些功能。其中之一是最新值缓存 (LVC)。这解决了新订阅者加入网络时如何追赶数据的问题。理论上,发布者会在新订阅者加入并订阅特定主题时收到通知。然后,发布者可以重新广播这些主题的最新消息。

我已经解释了为什么在有新订阅者时发布者不会收到通知,因为在大型发布/订阅系统中,数据量使得这几乎不可能。要构建真正大规模的发布/订阅网络,您需要像 PGM 这样的协议,它利用高端以太网交换机将数据多播给数千个订阅者的能力。尝试通过 TCP 单播从发布者向数千个订阅者中的每一个发送数据根本无法扩展。你会遇到奇怪的峰值、不公平的分发(一些订阅者比其他人先收到消息)、网络拥塞以及普遍的不愉快。

PGM 是一种单向协议:发布者将消息发送到交换机上的一个多播地址,然后交换机将其重新广播给所有感兴趣的订阅者。发布者永远看不到订阅者何时加入或离开:这一切都发生在交换机中,而我们并不真正想开始重新编程交换机。

然而,在只有几十个订阅者和有限主题的低流量网络中,我们可以使用 TCP,这样 XSUB 和 XPUB 套接字确实会像我们在 Espresso 模式中看到的那样相互通信。

我们能否使用 ZeroMQ 构建一个 LVC?答案是肯定的,如果我们构建一个位于发布者和订阅者之间的代理;它类似于 PGM 交换机,但我们可以自己编程。

我将首先创建一个发布者和订阅者来突出最坏的情况。这个发布者是病态的。它一开始就立即向一千个主题中的每一个发送消息,然后每秒向一个随机主题发送一条更新。一个订阅者连接并订阅一个主题。如果没有 LVC,订阅者平均需要等待 500 秒才能获得任何数据。为了增加一些戏剧性,假设有一个名叫 Gregor 的越狱犯威胁说,如果我们不能解决那 8.3 分钟的延迟,他就要扯掉玩具兔子 Roger 的脑袋。

这是发布者代码。请注意,它有命令行选项可以连接到某个地址,但通常会绑定到一个端点。我们稍后会用它连接到我们的最新值缓存。

C | C++ | Java | Python | Ruby | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Rust | Scala | Tcl | OCaml

这是订阅者代码

C | C++ | Java | Python | Ruby | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Rust | Scala | Tcl | OCaml

尝试构建并运行这些代码:先运行订阅者,再运行发布者。你会看到订阅者按预期报告收到“Save Roger”

./pathosub &
./pathopub

当你运行第二个订阅者时,你就会明白 Roger 的困境。你需要等待相当长的时间,它才会报告收到任何数据。所以,这就是我们的最新值缓存。正如我承诺的,它是一个代理,绑定到两个套接字,然后处理这两个套接字上的消息。

C | C++ | Java | Node.js | Python | Ruby | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Rust | Scala | Tcl | OCaml

现在,运行代理,然后再运行发布者

./lvcache &
./pathopub tcp://localhost:5557

现在,运行任意数量的订阅者实例,每次都连接到端口 5558 上的代理

./pathosub tcp://localhost:5558

每个订阅者都开心地报告收到“Save Roger”,而越狱犯 Gregor 则溜回座位享用晚餐和一杯热牛奶,这才是他真正想要的。

注意一点:默认情况下,XPUB 套接字不会报告重复订阅,这是您天真地将 XPUB 连接到 XSUB 时所期望的行为。我们的示例巧妙地绕过了这个问题,通过使用随机主题,这样它不工作的几率只有百万分之一。在真正的 LVC 代理中,您会希望使用ZMQ_XPUB_VERBOSE选项,我们在第 6 章 - ZeroMQ 社区中将其作为一个练习来实现。

慢速订阅者检测 (Suicidal Snail 模式) #

在实际应用中使用发布/订阅模式时,您会遇到的一个常见问题是慢速订阅者。在理想世界中,我们以全速将数据从发布者传输到订阅者。在现实中,订阅者应用程序通常是用解释型语言编写的,或者只是做了大量工作,或者写得很糟糕,以至于它们无法跟上发布者的速度。

我们如何处理慢速订阅者?理想的解决方案是让订阅者更快,但这可能需要工作和时间。一些处理慢速订阅者的经典策略包括:

  • 在发布者端排队消息。当我不读邮件几个小时时,Gmail 就是这样做的。但在高流量消息传递中,将队列推向上游会导致发布者内存不足并崩溃,结果既惊心动魄又不划算——特别是当订阅者数量很多且出于性能原因无法刷新到磁盘时。

  • 在订阅者端排队消息。这要好得多,如果网络能够跟上,ZeroMQ 默认就是这样做的。如果有人会内存不足并崩溃,那将是订阅者而不是发布者,这是公平的。这对于“峰值”流非常适用,在这种情况下,订阅者可能暂时无法跟上,但在流速变慢时可以赶上。然而,这对于总体上速度过慢的订阅者来说并非解决方案。

  • 一段时间后停止新消息排队。当我的邮箱溢出其宝贵的几 GB 空间时,Gmail 就是这样做的。新消息会被拒绝或丢弃。从发布者的角度来看,这是一个很好的策略,也是 ZeroMQ 在发布者设置 HWM 时所做的事情。然而,这仍然无法帮助我们解决慢速订阅者的问题。现在我们的消息流中只会出现空白。

  • 通过断开连接惩罚慢速订阅者。这是 Hotmail(还记得吗?)在我两周没有登录时做的事情,这也是当我意识到可能有更好的方法时,我已经在使用我的第十五个 Hotmail 账户的原因。这是一种很好的残酷策略,它迫使订阅者振作精神并集中注意力,这会很理想,但 ZeroMQ 不会这样做,并且没有办法在其之上实现这一层,因为订阅者对于发布者应用程序来说是不可见的。

这些经典策略都不适用,所以我们需要创新。与其断开发布者,不如说服订阅者自杀。这就是 Suicidal Snail 模式。当订阅者检测到自己运行速度过慢(其中“过慢”大概是一个配置选项,实际意思是“慢到如果你到达这里,就大声呼救,因为我需要知道,这样我才能修复它!”)时,它就会“呱”一声然后死去。

订阅者如何检测到这一点?一种方法是给消息编号(按顺序编号),并在发布者端使用 HWM。现在,如果订阅者检测到缺口(即编号不连续),它就知道有问题了。然后我们将 HWM 调整到“如果达到此水平就‘呱’一声死去”的程度。

这个解决方案有两个问题。第一,如果我们有多个发布者,如何给消息编号?解决方案是给每个发布者一个唯一的 ID 并添加到编号中。第二,如果订阅者使用ZMQ_SUBSCRIBE过滤器,他们从定义上就会获得缺口。我们宝贵的编号将毫无用处。

有些用例不使用过滤器,编号对它们有效。但更通用的解决方案是发布者为每条消息加上时间戳。当订阅者收到消息时,它检查时间,如果差值超过(比如)一秒,它就会执行“呱”一声死去的动作,可能会先向操作员控制台发送一声尖叫。

自杀蜗牛模式特别适用于订阅者拥有自己的客户端和服务水平协议,并且需要保证一定的最大延迟的情况。中止一个订阅者可能看起来不是一种建设性的方式来保证最大延迟,但这是一种断言模型。今天就中止,问题就会得到解决。允许延迟的数据向下游流动,问题可能会造成更广泛的损害,并且需要更长时间才能被发现。

这里是一个自杀蜗牛的最小示例

C | C++ | Java | Lua | PHP | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Node.js | Objective-C | ooc | Perl | Q | Racket | Ruby | Rust | Scala | OCaml

关于自杀蜗牛示例的一些注意事项如下:

  • 此处的消息仅包含当前系统时钟(表示为毫秒数)。在实际应用中,消息至少应包含带时间戳的消息头和带数据的消息体。

  • 示例将订阅者和发布者作为两个线程放在同一个进程中。实际上,它们应该是独立的进程。使用线程只是为了方便演示。

高速订阅者(黑盒模式) #

现在我们来看一种方法来加快订阅者的速度。发布-订阅的一种常见用例是分发大型数据流,例如来自证券交易所的市场数据。典型的设置是发布者连接到证券交易所,获取报价,然后将其发送给许多订阅者。如果只有少数订阅者,我们可以使用 TCP。如果订阅者数量较多,我们可能会使用可靠的多播,例如 PGM。

图 56 - 简单的黑盒模式

假设我们的数据源平均每秒有 100,000 条 100 字节的消息。这是过滤掉不需要发送给订阅者的市场数据后的典型速率。现在我们决定记录一整天的数据(8 小时大约 250 GB),然后将其回放到一个模拟网络,即一小组订阅者。虽然每秒 10 万条消息对于 ZeroMQ 应用来说很容易,但我们想以快得多的速度回放它。

因此,我们建立了一个包含许多盒子的架构——一个用于发布者,每个订阅者一个。这些是配置良好的盒子——八核,发布者为十二核。

当我们将数据泵入订阅者时,我们注意到两件事

  1. 当我们对消息进行哪怕最少量的工作时,都会让我们的订阅者慢下来,以至于无法再次赶上发布者。

  2. 我们达到了一个上限,无论是在发布者还是订阅者端,大约是每秒 600 万条消息,即使经过仔细优化和 TCP 调优之后也是如此。

我们首先要做的是将订阅者拆分成多线程设计,这样我们就可以在一组线程中处理消息,同时在另一组线程中读取消息。通常,我们不想以相同的方式处理每条消息。相反,订阅者会过滤一些消息,可能通过前缀键进行过滤。当消息匹配某些条件时,订阅者将调用一个 worker 来处理它。用 ZeroMQ 的术语来说,这意味着将消息发送到 worker 线程。

因此,订阅者看起来像是一个队列设备。我们可以使用各种套接字来连接订阅者和 worker。如果我们假定是单向流量且所有 worker 都相同,我们可以使用 PUSH 和 PULL 并将所有路由工作委托给 ZeroMQ。这是最简单快捷的方法。

订阅者通过 TCP 或 PGM 与发布者通信。订阅者通过与位于同一进程中的 worker 通信,通过inproc:@<//>@.

图 57 - 疯狂黑盒模式

现在来打破这个上限。订阅者线程会占用 100% 的 CPU,而且由于它是一个线程,所以无法使用超过一个核心。单个线程总是会达到上限,无论是在每秒 200 万、600 万还是更多消息时。我们希望将工作分配到多个可以并行运行的线程中。

许多高性能产品使用的方法(在此处也适用)是分片(sharding)。使用分片,我们将工作分解为并行且独立的流,例如将一半的主题键分到一个流中,另一半分到另一个流中。我们可以使用许多流,但除非有空闲核心,否则性能不会扩展。那么让我们看看如何分片成两个流。

使用两个流,以全速工作时,我们将按如下方式配置 ZeroMQ

  • 两个 I/O 线程,而不是一个。
  • 两个网络接口(NIC),每个订阅者一个。
  • 每个 I/O 线程绑定到一个特定的 NIC。
  • 两个订阅者线程,绑定到特定的核心。
  • 两个 SUB 套接字,每个订阅者线程一个。
  • 剩余的核心分配给 worker 线程。
  • Worker 线程连接到两个订阅者 PUSH 套接字。

理想情况下,我们希望架构中满载线程的数量与核心数量相匹配。当线程开始争夺核心和 CPU 周期时,增加更多线程的成本将大于收益。例如,创建更多的 I/O 线程不会带来任何好处。

可靠的发布-订阅(克隆模式) #

作为一个更详细的示例,我们将探讨如何构建一个可靠的发布-订阅架构的问题。我们将分阶段开发它。目标是允许一组应用程序共享一些共同的状态。以下是我们的技术挑战

  • 我们有大量的客户端应用程序,例如数千或数万个。
  • 它们将随意加入和离开网络。
  • 这些应用程序必须共享一个最终一致的状态
  • 任何应用程序都可以在任何时候更新状态。

假设更新量相对较低。我们没有实时性要求。整个状态可以放入内存。一些可能的用例包括

  • 一组云服务器共享的配置。
  • 一组玩家共享的游戏状态。
  • 实时更新并可供应用程序使用的汇率数据。

中心化 vs 去中心化 #

我们首先需要决定的问题是是否使用中心服务器。这对最终的设计会产生很大影响。权衡如下

  • 从概念上讲,中心服务器更容易理解,因为网络并非天然对称。使用中心服务器,我们可以避免所有关于服务发现、绑定(bind)与连接(connect)等问题。

  • 通常,完全分布式的架构在技术上更具挑战性,但最终得到的协议更简单。也就是说,每个节点必须以正确的方式扮演服务器和客户端的角色,这很精妙。如果处理得当,结果会比使用中心服务器更简单。我们在第 4 章 - 可靠的请求-回复模式中的 Freelance 模式中看到了这一点。

  • 中心服务器在高容量用例中会成为瓶颈。如果需要处理每秒数百万条消息的规模,我们应该立即考虑去中心化。

  • 具有讽刺意味的是,中心化架构更容易扩展到更多节点,而去中心化架构则不然。也就是说,将 10,000 个节点连接到一个服务器比将它们相互连接更容易。

因此,对于克隆模式,我们将使用一个发布状态更新的服务器和一组代表应用程序的客户端

将状态表示为键值对 #

我们将分阶段开发克隆模式,一次解决一个问题。首先,让我们看看如何在一组客户端之间更新共享状态。我们需要决定如何表示我们的状态以及更新。最简单的可行格式是键值存储,其中一个键值对代表共享状态中的一个原子变化单元。

我们在第 1 章 - 基础知识中有一个简单的发布-订阅示例,即天气服务器和客户端。让我们修改服务器来发送键值对,并让客户端将其存储在哈希表中。这使我们可以使用经典的发布-订阅模型从一个服务器向一组客户端发送更新。

更新可以是新的键值对、现有键的修改值或已删除的键。现在我们可以假设整个存储适合内存,并且应用程序通过键访问它,例如使用哈希表或字典。对于更大的存储和某种持久化需求,我们可能会将状态存储在数据库中,但这与此处无关。

这是服务器端

C | C++ | Java | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | OCaml

这是客户端

C | C++ | Java | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | OCaml
图 58 - 发布状态更新

关于这个第一个模型的一些注意事项如下:

  • 所有繁重的工作都在一个kvmsg类中完成。这个类处理键值消息对象,它们是多部分 ZeroMQ 消息,其结构包含三个帧:一个键(ZeroMQ 字符串)、一个序列号(64 位值,网络字节序)以及一个二进制消息体(包含其他所有内容)。

  • 服务器生成带有随机 4 位数字键的消息,这使我们可以模拟一个大型但不巨大的哈希表(10K 条目)。

  • 此版本中未实现删除功能:所有消息都是插入或更新。

  • 服务器在绑定其套接字后暂停 200 毫秒。这是为了防止慢连接综合征(slow joiner syndrome),指订阅者在连接到服务器套接字时丢失消息的情况。我们会在后续版本的克隆代码中移除此暂停。

  • 我们将在代码中使用术语发布者订阅者来指代套接字。这将在稍后处理多个执行不同任务的套接字时有所帮助。

这是kvmsg类,这是目前可用的最简单形式

C | C++ | Java | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | OCaml

稍后,我们将创建一个更精密的kvmsg类,它将在实际应用中工作。

服务器和客户端都维护哈希表,但只有在我们先启动所有客户端,然后启动服务器,并且客户端从不崩溃的情况下,这个第一个模型才能正常工作。这非常不切实际。

获取带外快照 #

因此,现在我们面临第二个问题:如何处理延迟加入的客户端或崩溃后重启的客户端。

为了让延迟(或恢复)的客户端追赶上服务器,它必须获取服务器状态的快照。正如我们将“消息”简化为“一个带序列号的键值对”,我们也可以将“状态”简化为“一个哈希表”。要获取服务器状态,客户端打开一个 DEALER 套接字并明确请求。

为了实现这一点,我们必须解决一个时序问题。获取状态快照需要一定时间,如果快照很大,时间可能会相当长。我们需要将更新正确地应用到快照上。但是服务器不知道何时开始向我们发送更新。一种方法是先订阅,获取第一个更新,然后请求“更新 N 的状态”。这将要求服务器为每次更新存储一个快照,这不切实际。

图 59 - 状态复制

因此,我们将在客户端进行同步,步骤如下

  • 客户端首先订阅更新,然后发出状态请求。这保证了状态将比它收到的最旧的更新要新。

  • 客户端等待服务器返回状态,同时将所有更新排队。它通过不读取这些消息来做到这一点:ZeroMQ 将它们保存在套接字队列中。

  • 当客户端收到其状态更新时,它再次开始读取更新。但是,它会丢弃任何比状态更新更旧的更新。因此,如果状态更新包含截至 200 的更新,客户端将丢弃截至 201 的更新。

  • 然后,客户端将更新应用到自己的状态快照上。

这是一个利用 ZeroMQ 自身内部队列的简单模型。这是服务器端

C | C++ | Java | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | OCaml

这是客户端

C | C++ | Java | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | OCaml

关于这两个程序,有几点需要注意

  • 服务器使用了两个任务。一个线程(随机地)生成更新并发送到主 PUB 套接字,而另一个线程则在 ROUTER 套接字上处理状态请求。这两个线程通过 PAIR 套接字 over aninproc:@<//>@连接进行通信。

  • 客户端非常简单。在 C 语言中,它只有大约五十行代码。大部分繁重的工作是在kvmsg类中完成的。即便如此,基本的克隆模式实现起来比最初看起来要容易。

  • 我们没有使用任何花哨的方法来序列化状态。哈希表保存了一组kvmsg对象,服务器将这些对象作为一批消息发送给请求状态的客户端。如果多个客户端同时请求状态,每个客户端将获得一个不同的快照。

  • 我们假定客户端恰好有一个服务器进行通信。服务器必须正在运行;我们不尝试解决服务器崩溃时会发生什么的问题。

目前,这两个程序并没有做任何实际的事情,但它们能够正确同步状态。这是一个巧妙的示例,展示了如何混合不同的模式:PAIR-PAIR、PUB-SUB 和 ROUTER-DEALER。

从客户端重新发布更新 #

在我们的第二个模型中,键值存储的更改来自服务器本身。这是一个中心化模型,例如,当我们需要分发一个中心配置文件并在每个节点上进行本地缓存时,这个模型就很有用。一个更有趣的模型是接收来自客户端而不是服务器的更新。因此,服务器成为了一个无状态代理。这为我们带来了一些好处:

  • 我们对服务器的可靠性担忧较少。如果它崩溃,我们可以启动一个新的实例并向其馈送新值。

  • 我们可以使用键值存储在活跃对等方之间共享信息。

要将客户端的更新发送回服务器,我们可以使用多种套接字模式。最简单的可行方案是 PUSH-PULL 组合。

为什么我们不允许客户端之间直接发布更新?虽然这会降低延迟,但会失去一致性保证。如果允许更新的顺序根据接收者而改变,就无法获得一致的共享状态。假设我们有两个客户端,正在更改不同的键。这会工作良好。但如果两个客户端尝试大致同时更改同一个键,它们最终会对该键的值产生不同的看法。

当多个地方同时发生更改时,有几种实现一致性的策略。我们将采用中心化所有更改的方法。无论客户端进行更改的确切时机如何,所有更改都会通过服务器推送,服务器根据接收更新的顺序强制执行一个单一的序列。

图 60 - 重新发布更新

通过中介所有更改,服务器还可以为所有更新添加一个唯一的序列号。有了唯一的序列,客户端就可以检测到更严重的故障,包括网络拥塞和队列溢出。如果客户端发现其接收的消息流有缺失,它可以采取行动。客户端联系服务器并请求缺失的消息似乎是合理的,但在实践中这并没有用。如果存在缺失,它们是由网络压力引起的,给网络增加更多压力只会让情况变得更糟。客户端所能做的就是警告用户它“无法继续”,停止运行,并且在有人手动检查问题原因之前不再重启。

现在我们将在客户端生成状态更新。这是服务器

C | C++ | Java | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | OCaml

这是客户端

C | C++ | Java | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | OCaml

关于这第三个设计,有几点需要注意

  • 服务器已精简为一个单独的任务。它管理一个用于接收传入更新的 PULL 套接字,一个用于状态请求的 ROUTER 套接字,以及一个用于发送传出更新的 PUB 套接字。

  • 客户端使用一个简单的无时钟节拍计时器每秒向服务器发送一个随机更新。在实际实现中,我们将从应用代码驱动更新。

处理子树 #

随着客户端数量的增长,共享存储的大小也会增长。将所有内容发送给每个客户端变得不再合理。这是 pub-sub 的经典故事:当客户端数量很少时,可以将每条消息发送给所有客户端。随着架构的增长,这变得低效。客户端会在不同领域进行专业化。

因此,即使在使用共享存储时,一些客户端也只想使用存储的一部分,我们称之为子树。客户端在发出状态请求时必须请求该子树,并且在订阅更新时也必须指定相同的子树。

有几种常用的树结构语法。一种是路径层次结构,另一种是主题树。它们看起来像这样:

  • 路径层次结构/一些/路径/列表
  • 主题树一些.主题.列表

我们将使用路径层次结构,并扩展我们的客户端和服务器,以便客户端可以处理单个子树。一旦了解了如何处理单个子树,如果你的用例需要,你就可以自己扩展它来处理多个子树。

这是实现子树功能的服务器,是模型三的一个小变体

C | C++ | Java | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | OCaml

这是相应的客户端

C | C++ | Java | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | OCaml

短暂值 #

短暂值是一种会自动过期除非定期刷新的值。如果你考虑将 Clone 用于注册服务,那么短暂值就可以实现动态值。一个节点加入网络,发布其地址,并定期刷新。如果该节点死亡,其地址最终会被移除。

短暂值的通常抽象是将其附加到会话,并在会话结束时删除。在 Clone 中,会话由客户端定义,并在客户端死亡时结束。一个更简单的替代方案是为短暂值附加一个生存时间 (TTL),服务器用它来使未及时刷新的值过期。

我在可能的情况下会遵循一个好的设计原则:不发明非绝对必要的概念。如果我们有大量短暂值,会话将提供更好的性能。如果我们使用少量短暂值,为每个值设置 TTL 就可以。如果我们使用海量短暂值,将它们附加到会话并批量过期会更高效。这并不是我们在当前阶段面临的问题,可能永远不会面临,所以我们将暂时放弃会话。

现在我们将实现瞬时值(ephemeral values)。首先,我们需要一种方法在键值消息中编码 TTL(存活时间)。我们可以添加一个帧。使用 ZeroMQ 帧来承载属性的问题在于,每当我们想添加一个新属性时,都必须改变消息结构。这会破坏兼容性。因此,让我们在消息中添加一个属性帧,并编写代码来获取和设置属性值。

接下来,我们需要一种方式来表示“删除这个值”。到目前为止,服务器和客户端总是盲目地将新值插入或更新到它们的哈希表中。我们将规定,如果值为空,则意味着“删除这个键”。

以下是kvmsg类的一个更完整版本,它实现了属性帧(并添加了一个我们稍后会需要的 UUID 帧)。它还通过在必要时从哈希表中删除键来处理空值。

C | C++ | Java | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | OCaml

模型五客户端与模型四几乎相同。它使用了完整的kvmsg类,并为每条消息设置了一个随机的ttl属性(以秒为单位)


kvmsg_set_prop (kvmsg, "ttl", "%d", randof (30));

使用 Reactor #

到目前为止,我们在服务器中使用了 poll 循环。在服务器的下一个模型中,我们切换到使用 Reactor。在 C 语言中,我们使用 CZMQ 的zloop类。使用 Reactor 会使代码更冗长,但更容易理解和扩展,因为服务器的每个部分都由一个独立的 Reactor 处理器处理。

我们使用单线程并将服务器对象传递给 Reactor 处理器。我们可以将服务器组织成多个线程,每个线程处理一个 socket 或定时器,但这在线程不必共享数据时效果更好。在这种情况下,所有工作都围绕着服务器的哈希表,所以一个线程更简单。

有三个 Reactor 处理器:

  • 一个处理来自 ROUTER socket 的快照请求;
  • 一个处理来自客户端的更新,通过 PULL socket 接收;
  • 一个处理已过 TTL 的瞬时值过期。
C | C++ | Java | Python | Tcl | Ada | Basic | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | OCaml

添加二进制星模式以提高可靠性 #

到目前为止,我们探讨的克隆模型相对简单。现在我们将进入令人不快的复杂领域,这让我需要再来一杯浓缩咖啡。你应该认识到,“可靠”消息传递的实现非常复杂,因此在着手之前,你总需要问一句:“我们真的需要这个吗?”如果你能接受不可靠或“足够好”的可靠性,那么在成本和复杂性方面将是一个巨大的胜利。当然,你可能会不时丢失一些数据。这通常是一个很好的权衡。话虽如此,并且……呷了一口……因为这杯浓缩咖啡确实很棒,让我们开始吧。

当你使用最后一个模型时,你会停止并重启服务器。它可能看起来像恢复了,但实际上它是在一个空状态而不是正确的当前状态上应用更新。任何加入网络的新客户端都只会收到最新的更新,而不是完整的历史记录。

我们想要的是一种服务器从被终止或崩溃中恢复的方法。我们还需要提供备份,以防服务器在任何时间长度内停止运行。当有人要求“可靠性”时,请他们列出他们希望处理的故障。在我们的例子中,这些故障包括:

  • 服务器进程崩溃并自动或手动重启。进程丢失其状态,必须从某个地方获取回来。

  • 服务器机器宕机并在较长时间内离线。客户端必须切换到某个备用服务器。

  • 服务器进程或机器与网络断开连接,例如,交换机损坏或数据中心瘫痪。它可能在某个时候恢复,但在此期间客户端需要一个备用服务器。

我们的第一步是添加第二个服务器。我们可以使用第四章 - 可靠的请求-回复模式中的二进制星(Binary Star)模式将它们组织成主服务器和备份服务器。二进制星是一个 Reactor,所以我们已经将最后一个服务器模型重构为 Reactor 风格是很方便的。

我们需要确保主服务器崩溃时更新不会丢失。最简单的技术是将更新发送给两个服务器。备份服务器可以像客户端一样运行,通过接收所有客户端接收的更新来保持其状态同步。它也会接收来自客户端的新更新。它尚不能将这些更新存储到其哈希表中,但可以暂时保留它们。

因此,模型六在模型五的基础上引入了以下变化:

  • 我们使用发布-订阅(pub-sub)流而不是推-拉(push-pull)流来处理发送给服务器的客户端更新。这负责将更新扇出到两个服务器。否则我们将不得不使用两个 DEALER socket。

  • 我们在服务器更新(发送给客户端)中添加了心跳,以便客户端能够检测到主服务器何时宕机。然后它可以切换到备份服务器。

  • 我们使用二进制星bstarReactor 类连接两个服务器。二进制星依赖于客户端通过向它们认为活动状态的服务器发出明确请求来进行投票。我们将使用快照请求作为投票机制。

  • 我们通过添加一个 UUID 字段使所有更新消息具有唯一标识。客户端生成此 UUID,服务器在重新发布的更新中将其传回。

  • 被动服务器维护一个“待处理列表”,包含它从客户端接收但尚未从活动服务器接收的更新;或者它从活动服务器接收但尚未从客户端接收的更新。该列表按从旧到新的顺序排列,以便轻松地从头部移除更新。

图 61 - 克隆客户端有限状态机

将客户端逻辑设计为有限状态机是很有用的。客户端会经历以下三种状态循环:

  • 客户端打开并连接其 socket,然后向第一个服务器请求快照。为了避免请求风暴,它只会向任何给定的服务器请求两次。丢失一个请求可能是运气不好。丢失两个请求则属于疏忽大意。

  • 客户端等待当前服务器的回复(快照数据),如果收到,则存储它。如果在一定时间内没有收到回复,则故障转移到下一个服务器。

  • 当客户端获得快照后,它会等待并处理更新。同样,如果在一定时间内没有收到服务器的任何消息,它就会故障转移到下一个服务器。

客户端永远循环。在启动或故障转移期间,很可能有些客户端试图与主服务器通信,而另一些客户端试图与备份服务器通信。二进制星状态机会处理这种情况,希望是准确的。证明软件是正确的很难;相反,我们会不断测试它直到无法证明它是错误的。

故障转移过程如下:

  • 客户端检测到主服务器不再发送心跳,并断定它已宕机。客户端连接到备份服务器并请求新的状态快照。

  • 备份服务器开始接收来自客户端的快照请求,并检测到主服务器已失效,因此接管成为主服务器。

  • 备份服务器将其待处理列表应用到其自身的哈希表中,然后开始处理状态快照请求。

当主服务器恢复在线时,它将:

  • 作为被动服务器启动,并像克隆客户端一样连接到备份服务器。

  • 通过其 SUB socket 开始接收来自客户端的更新。

我们做了一些假设:

  • 至少有一个服务器将保持运行。如果两个服务器都崩溃,我们将丢失所有服务器状态,并且无法恢复。

  • 多个客户端不会同时更新相同的哈希键。客户端更新将以不同的顺序到达两个服务器。因此,备份服务器可能会以与主服务器不同(或曾经)的顺序应用其待处理列表中的更新。来自单个客户端的更新总是以相同的顺序到达两个服务器,因此这是安全的。

因此,使用二进制星模式的高可用性服务器对的架构包含两个服务器和一组与两个服务器通信的客户端。

图 62 - 高可用性克隆服务器对

以下是克隆服务器的第六个也是最后一个模型:

C | Java | Python | Ada | Basic | C++ | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | Tcl | OCaml

这个模型只有几百行代码,但花了不少时间才让它正常工作。准确地说,构建模型六花了我整整一周的时间,期间不断冒出“天哪,这对于一个例子来说太复杂了”的想法。我们几乎将所有东西都塞进了这个小程序中,包括故障转移、瞬时值、子树等等。令我惊讶的是,前期的设计相当准确。尽管如此,编写和调试如此多 socket 流的细节仍然颇具挑战性。

基于 Reactor 的设计消除了代码中的大量繁重工作,剩下的部分更简单易懂。我们复用了第四章 - 可靠的请求-回复模式中的 bstar Reactor。整个服务器作为单个线程运行,因此没有线程间的奇怪问题——只有一个结构体指针(self)传递给所有处理器,它们可以愉快地完成自己的工作。使用 Reactor 的一个很好的副作用是,代码与 poll 循环的集成度较低,因此更容易重用。模型六的很大一部分代码来自模型五。

我一块一块地构建它,并在进入下一块之前确保每一块都 正常 工作。因为有四五个主要的 socket 流,这意味着大量的调试和测试。我只是通过将消息转储到控制台来进行调试。不要使用传统的调试器来单步调试 ZeroMQ 应用程序;你需要查看消息流才能理解正在发生的事情。

对于测试,我总是尝试使用 Valgrind,它可以捕获内存泄漏和无效的内存访问。在 C 语言中,这是一个主要问题,因为你不能依赖垃圾回收器。使用 kvmsg 和 CZMQ 等恰当且一致的抽象大大有所帮助。

集群哈希表协议 #

虽然服务器基本上是前一个模型加上二进制星模式的混合体,但客户端要复杂得多。但在讨论客户端之前,让我们先看看最终的协议。我已将其作为规范发布在 ZeroMQ RFC 网站上,名称为集群哈希表协议

大致来说,设计像这样的复杂协议有两种方法。一种是将每个流分离到自己的一组 socket 中。这是我们在这里使用的方法。优点是每个流都很简单清晰。缺点是同时管理多个 socket 流会相当复杂。使用 Reactor 使其更简单,但仍然有很多移动的部分必须正确地协同工作。

设计这种协议的第二种方法是使用一对 socket 来处理所有事情。在这种情况下,我会在服务器端使用 ROUTER,客户端使用 DEALER,然后通过该连接完成所有操作。这会使协议更复杂,但至少复杂性集中在一处。在第七章 - 使用 ZeroMQ 的高级架构中,我们将看到一个通过 ROUTER-DEALER 组合实现的协议示例。

让我们来看看 CHP 规范。请注意,“SHOULD”、“MUST”和“MAY”是我们用于协议规范中表示要求级别的关键词。

目标

CHP 旨在为通过 ZeroMQ 网络连接的客户端集群提供可靠的发布-订阅基础。它定义了一个由键值对组成的“哈希表”抽象。任何客户端可以在任何时间修改任何键值对,并且更改会传播到所有客户端。客户端可以在任何时间加入网络。

架构

CHP 连接了一组客户端应用程序和一组服务器。客户端连接到服务器。客户端彼此不互相见。客户端可以随意加入和离开。

端口和连接

服务器 MUST(必须)打开以下三个端口:

  • 快照端口(ZeroMQ ROUTER socket),端口号 P。
  • 发布者端口(ZeroMQ PUB socket),端口号 P + 1。
  • 收集者端口(ZeroMQ SUB socket),端口号 P + 2。

客户端 SHOULD(应该)打开至少两个连接:

  • 快照连接(ZeroMQ DEALER socket),连接到端口 P。
  • 订阅者连接(ZeroMQ SUB socket),连接到端口 P + 1。

如果客户端想要更新哈希表,MAY(可以)打开第三个连接:

  • 发布者连接(ZeroMQ PUB socket),连接到端口 P + 2。

这个额外的帧未在下面解释的命令中显示。

状态同步

客户端 MUST(必须)首先向其快照连接发送一个 ICANHAZ 命令。该命令包含以下两个帧:

ICANHAZ command
-----------------------------------
Frame 0: "ICANHAZ?"
Frame 1: subtree specification

两个帧都是 ZeroMQ 字符串。子树规范 MAY(可以)为空。如果不为空,它由一个斜杠后跟一个或多个路径段组成,并以斜杠结尾。

服务器 MUST(必须)通过向其快照端口发送零个或多个 KVSYNC 命令来响应 ICANHAZ 命令,其后跟一个 KTHXBAI 命令。服务器 MUST(必须)在每个命令前加上客户端的身份,该身份由 ZeroMQ 在 ICANHAZ 命令中提供。KVSYNC 命令按如下方式指定一个键值对:

KVSYNC command
-----------------------------------
Frame 0: key, as ZeroMQ string
Frame 1: sequence number, 8 bytes in network order
Frame 2: <empty>
Frame 3: <empty>
Frame 4: value, as blob

序列号没有意义,可以为零。

KTHXBAI 命令形式如下:

KTHXBAI command
-----------------------------------
Frame 0: "KTHXBAI"
Frame 1: sequence number, 8 bytes in network order
Frame 2: <empty>
Frame 3: <empty>
Frame 4: subtree specification

序列号 MUST(必须)是先前发送的 KVSYNC 命令中的最高序列号。

当客户端收到 KTHXBAI 命令后,它 SHOULD(应该)开始从其订阅者连接接收消息并应用它们。

服务器到客户端的更新

当服务器对其哈希表有更新时,它 MUST(必须)在其发布者 socket 上以 KVPUB 命令广播此更新。KVPUB 命令形式如下:

KVPUB command
-----------------------------------
Frame 0: key, as ZeroMQ string
Frame 1: sequence number, 8 bytes in network order
Frame 2: UUID, 16 bytes
Frame 3: properties, as ZeroMQ string
Frame 4: value, as blob

序列号必须严格递增。客户端必须丢弃任何序列号不严格大于接收到的最后一个 KTHXBAI 或 KVPUB 命令的 KVPUB 命令。

UUID 是可选的,帧 2 可能为空(大小为零)。属性字段格式化为零个或多个“name=value”实例,后跟一个换行符。如果键值对没有属性,则属性字段为空。

如果值为零,客户端应删除具有指定键的键值条目。

在没有其他更新的情况下,服务器应按固定间隔发送 HUGZ 命令,例如每秒一次。HUGZ 命令的格式如下

HUGZ command
-----------------------------------
Frame 0: "HUGZ"
Frame 1: 00000000
Frame 2: <empty>
Frame 3: <empty>
Frame 4: <empty>

客户端可以将没有 HUGZ 命令视为服务器崩溃的指示(参见下面的可靠性)。

客户端到服务器的更新

当客户端对其散列映射有更新时,它可以通过其发布者连接作为 KVSET 命令发送给服务器。KVSET 命令的形式如下

KVSET command
-----------------------------------
Frame 0: key, as ZeroMQ string
Frame 1: sequence number, 8 bytes in network order
Frame 2: UUID, 16 bytes
Frame 3: properties, as ZeroMQ string
Frame 4: value, as blob

序列号没有意义,可以为零。如果使用可靠的服务器架构,UUID 应是一个通用唯一标识符。

如果值为空,服务器必须删除具有指定键的键值条目。

服务器应接受以下属性

  • ttl: 指定生存时间(秒)。如果 KVSET 命令包含一个ttl属性,服务器应删除键值对并广播一个值为空的 KVPUB 命令,以便在 TTL 过期时从所有客户端中删除此条目。

可靠性

CHP 可用于双服务器配置,其中主服务器发生故障时由备用服务器接管。CHP 没有指定用于此故障转移的机制,但二进制星模式可能有所帮助。

为了提高服务器可靠性,客户端可以

  • 在每个 KVSET 命令中设置一个 UUID。
  • 检测一段时间内没有收到 HUGZ 命令,并将其用作当前服务器已发生故障的指示。
  • 连接到备用服务器并重新请求状态同步。

可伸缩性和性能

CHP 设计用于扩展到大量(数千个)客户端,仅受限于代理上的系统资源。由于所有更新都通过单个服务器传递,因此峰值时的总吞吐量将限制在每秒数百万次更新,可能更少。

安全性

CHP 未实现任何身份验证、访问控制或加密机制,不应在需要这些功能的部署中使用。

构建多线程栈和 API #

我们目前使用的客户端栈不够智能,无法正确处理这个协议。一旦我们开始发送心跳,我们就需要一个可以在后台线程中运行的客户端栈。在第 4 章 - 可靠请求-回复模式末尾的 Freelance 模式中,我们使用了多线程 API,但没有详细解释。事实证明,当你开始构建像 CHP 这样更复杂的 ZeroMQ 协议时,多线程 API 非常有用。

图 63 - 多线程 API

如果你构建了一个非简单的协议,并期望应用程序能正确实现它,大多数开发者大部分时候都会搞错。你会遇到很多不高兴的人抱怨你的协议过于复杂、过于脆弱且难以使用。而如果你提供一个简单的 API 供调用,他们接受的可能性就会大得多。

我们的多线程 API 由一个前端对象和一个后台代理组成,它们通过两个 PAIR 套接字连接。像这样连接两个 PAIR 套接字非常有用,以至于你的高级绑定可能应该像 CZMQ 那样做,即封装一个“创建一个新线程,带有一个可用于向其发送消息的管道”的方法。

本书中我们看到的多线程 API 都采用相同的形式

  • 对象的构造函数(clone_new)创建一个上下文并启动一个通过管道连接的后台线程。它持有管道的一端,以便可以向后台线程发送命令。

  • 后台线程启动一个*代理*,它本质上是一个zmq_poll循环,从管道套接字和任何其他套接字(这里是 DEALER 和 SUB 套接字)读取。

  • 主应用程序线程和后台线程现在只通过 ZeroMQ 消息通信。按照约定,前端发送字符串命令,以便类上的每个方法都变成发送到后端代理的消息,如下所示


void
clone_connect (clone_t *self, char *address, char *service)
{
    assert (self);
    zmsg_t *msg = zmsg_new ();
    zmsg_addstr (msg, "CONNECT");
    zmsg_addstr (msg, address);
    zmsg_addstr (msg, service);
    zmsg_send (&msg, self->pipe);
}
  • 如果方法需要返回码,它可以等待代理的回复消息。

  • 如果代理需要将异步事件发送回前端,我们添加一个recv方法到类中,该方法在前端管道上等待消息。

  • 我们可能希望暴露前端管道套接字句柄,以便将类集成到进一步的轮询循环中。否则,任何recv方法都会阻塞应用程序。

clone 类与flcliapi第 4 章 - 可靠请求-回复模式中的类具有相同的结构,并添加了 Clone 客户端最后一个模型的逻辑。如果没有 ZeroMQ,这种多线程 API 设计将需要数周的艰苦工作。有了 ZeroMQ,只需一两天的工作。

clone 类的实际 API 方法非常简单


//  Create a new clone class instance
clone_t *
    clone_new (void);

//  Destroy a clone class instance
void
    clone_destroy (clone_t **self_p);

//  Define the subtree, if any, for this clone class
void
    clone_subtree (clone_t *self, char *subtree);

//  Connect the clone class to one server
void
    clone_connect (clone_t *self, char *address, char *service);

//  Set a value in the shared hashmap
void
    clone_set (clone_t *self, char *key, char *value, int ttl);

//  Get a value from the shared hashmap
char *
    clone_get (clone_t *self, char *key);

因此,这是 Clone 客户端的第六个模型,它现在只是一个使用 clone 类的薄层

C | Java | Python | Ada | Basic | C++ | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | Tcl | OCaml

注意 connect 方法,它指定了一个服务器端点。实际上,我们在后台与三个端口通信。然而,正如 CHP 协议所说,这三个端口是连续的端口号

  • 服务器状态路由器 (ROUTER) 在端口 P。
  • 服务器更新发布者 (PUB) 在端口 P + 1。
  • 服务器更新订阅者 (SUB) 在端口 P + 2。

因此,我们可以将这三个连接合并为一个逻辑操作(我们将其实现为三个单独的 ZeroMQ 连接调用)。

最后来看看 clone 栈的源代码。这是一段复杂的代码,但将其分解为前端对象类和后端代理会更容易理解。前端向代理发送字符串命令(“SUBTREE”、“CONNECT”、“SET”、“GET”),代理处理这些命令并与服务器通信。以下是代理的逻辑

  1. 启动时从第一个服务器获取快照
  2. 获得快照后,切换到从订阅者套接字读取。
  3. 如果没有获得快照,则故障转移到第二个服务器。
  4. 在管道和订阅者套接字上进行轮询。
  5. 如果从管道接收到输入,则处理来自前端对象的控制消息。
  6. 如果从订阅者接收到输入,则存储或应用更新。
  7. 如果在一定时间内没有从服务器收到任何消息,则进行故障转移。
  8. 重复此过程直到被 Ctrl-C 中断。

以下是实际的 clone 类实现

C | Java | Python | Ada | Basic | C++ | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Haxe | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | Tcl | OCaml