3. 高级请求-应答模式

第三章 - 高级请求-应答模式 #

第二章 - 套接字和模式中,我们通过开发一系列小型应用程序,每次探索 ZeroMQ 的新方面,来学习 ZeroMQ 的基础知识。本章我们将继续采用这种方法,探索基于 ZeroMQ 核心请求-应答模式构建的高级模式。

我们将涵盖:

  • 请求-应答机制如何工作
  • 如何组合使用 REQ、REP、DEALER 和 ROUTER 套接字
  • ROUTER 套接字的工作原理详解
  • 负载均衡模式
  • 构建一个简单的负载均衡消息代理
  • 设计 ZeroMQ 高级 API
  • 构建异步请求-应答服务器
  • 详细的代理间路由示例

请求-应答机制 #

我们已经简单了解了多部分消息。现在让我们看看一个主要的用例,即应答消息信封。信封是一种安全地将数据与地址打包在一起的方式,而无需触碰数据本身。通过将应答地址分离到信封中,我们可以编写通用的中介(如 API 和代理),它们无论消息载荷或结构如何,都能创建、读取和删除地址。

在请求-应答模式中,信封包含了应答的返回地址。这是无状态的 ZeroMQ 网络如何创建往返请求-应答对话的方式。

当你使用 REQ 和 REP 套接字时,你甚至看不到信封;这些套接字会自动处理它们。但对于大多数有趣的请求-应答模式,你会需要理解信封,特别是 ROUTER 套接字。我们将一步步进行。

简单应答信封 #

请求-应答交换包括一个请求消息和一个最终的应答消息。在简单请求-应答模式中,每个请求对应一个应答。在更高级的模式中,请求和应答可以异步流动。然而,应答信封的工作方式始终相同。

ZeroMQ 应答信封正式由零个或多个应答地址,后跟一个空帧(信封分隔符),再后跟消息体(零个或多个帧)组成。信封由链中协同工作的多个套接字创建。我们将详细分解这一点。

我们将从通过 REQ 套接字发送“Hello”开始。REQ 套接字创建了最简单的应答信封,它没有地址,只有一个空的分隔符帧和包含“Hello”字符串的消息帧。这是一个两帧消息。

图 26 - 带有最小信封的请求

REP 套接字完成匹配工作:它剥离信封,直到并包括分隔符帧,保存整个信封,并将“Hello”字符串传递给应用程序。因此,我们最初的 Hello World 示例在内部使用了请求-应答信封,但应用程序从未见过它们。

如果你窥探网络数据流,在hwclienthwserver之间,你会看到:每个请求和每个应答实际上是两个帧,一个空帧,然后是消息体。对于简单的 REQ-REP 对话来说,这似乎没有多大意义。然而,当我们探索 ROUTER 和 DEALER 如何处理信封时,你就会明白其中的原因。

扩展应答信封 #

现在,让我们在 REQ-REP 对中间添加一个 ROUTER-DEALER 代理,看看这如何影响应答信封。这就是我们在第二章 - 套接字和模式中已经见过的扩展请求-应答模式。实际上,我们可以插入任意数量的代理步骤。其机制是相同的。

图 27 - 扩展请求-应答模式

代理执行以下伪代码操作:

prepare context, frontend and backend sockets
while true:
    poll on both sockets
    if frontend had input:
        read all frames from frontend
        send to backend
    if backend had input:
        read all frames from backend
        send to frontend

ROUTER 套接字与其他套接字不同,它会跟踪其所有连接,并将这些连接信息告知调用者。告知调用者的方式是将连接的身份放在接收到的每条消息前面。身份,有时也称为地址,只是一个二进制字符串,其含义仅为“这是连接的唯一句柄”。然后,当你通过 ROUTER 套接字发送消息时,你首先发送一个身份帧。

zmq_socket()手册页如此描述:

接收消息时,ZMQ_ROUTER 套接字应在将消息传递给应用程序之前,在消息前面加上包含发起对端身份的消息部分。接收到的消息会从所有连接的对端中公平地排队。发送消息时,ZMQ_ROUTER 套接字应移除消息的第一部分,并使用它来确定消息应路由到的对端的身份。

历史注:ZeroMQ v2.2 及更早版本使用 UUID 作为身份。ZeroMQ v3.0 及更高版本默认生成一个 5 字节的身份(0 + 一个随机 32 位整数)。这对网络性能有一些影响,但只在使用多个代理跳跃时才会发生,这种情况很少见。主要改动是为了通过移除对 UUID 库的依赖来简化构建libzmq

身份是一个难以理解的概念,但如果你想成为 ZeroMQ 专家,这是必不可少的。ROUTER 套接字会为其与之工作的每个连接发明一个随机身份。如果有三个 REQ 套接字连接到 ROUTER 套接字,它将为每个 REQ 套接字发明三个随机身份。

因此,如果我们继续之前的示例,假设 REQ 套接字有一个 3 字节的身份:ABC。在内部,这意味着 ROUTER 套接字维护一个哈希表,它可以在其中搜索ABC并找到 REQ 套接字的 TCP 连接。

当我们从 ROUTER 套接字接收消息时,我们会得到三个帧。

图 28 - 带有单个地址的请求

代理循环的核心是“从一个套接字读取,写入另一个套接字”,所以我们实际上将这三个帧原封不动地通过 DEALER 套接字发送出去。如果你现在嗅探网络流量,你会看到这三个帧从 DEALER 套接字飞向 REP 套接字。REP 套接字像以前一样,剥离整个信封,包括新的应答地址,然后再次将“Hello”传递给调用者。

顺带一提,REP 套接字一次只能处理一个请求-应答交换,这就是为什么如果你试图读取多个请求或发送多个应答而不遵循严格的接收-发送循环时,它会报错。

现在你应该能够想象回程路径了。当hwserver发送“World”回来时,REP 套接字会用它保存的信封将消息包装起来,然后通过网络发送一个包含三个帧的应答消息到 DEALER 套接字。

图 29 - 带有单个地址的应答

现在 DEALER 读取这三个帧,并将全部三个通过 ROUTER 套接字发送出去。ROUTER 取消息的第一个帧,即ABC身份,并查找与之对应的连接。如果找到,它就会将接下来的两个帧发送到网络上。

图 30 - 带有最小信封的应答

REQ 套接字接收到这条消息,并检查第一个帧是否为空分隔符,确实如此。REQ 套接字丢弃该帧,并将“World”传递给调用应用程序,应用程序打印出来,让第一次接触 ZeroMQ 的我们感到惊奇。

有什么用? #

老实说,严格请求-应答或扩展请求-应答的用例有些受限。例如,当服务器由于有 bug 的应用程序代码而崩溃时,没有简单的方法恢复。我们将在第四章 - 可靠请求-应答模式中看到更多相关内容。然而,一旦你掌握了这四种套接字处理信封的方式以及它们之间如何交互,你就可以做很多有用的事情。我们看到了 ROUTER 如何使用应答信封来决定将应答路由回哪个客户端 REQ 套接字。现在让我们换一种方式来表达:

  • 每次 ROUTER 给你一条消息时,它都会告诉你这条消息来自哪个对端,以身份的形式。
  • 你可以将此信息与哈希表(以身份作为键)结合使用,以跟踪新连接的对端。
  • 如果你将身份作为消息的第一个帧前缀,ROUTER 将异步地将消息路由到与其连接的任何对端。

ROUTER 套接字并不关心整个信封。它们不知道空分隔符的存在。它们只关心那个身份帧,以便确定将消息发送到哪个连接。

请求-应答套接字回顾 #

我们来回顾一下:

  • REQ 套接字在消息数据之前向网络发送一个空分隔符帧。REQ 套接字是同步的。REQ 套接字总是发送一个请求,然后等待一个应答。REQ 套接字一次与一个对端通信。如果将 REQ 套接字连接到多个对端,请求会轮流分配给每个对端,并轮流从每个对端接收应答。

  • REP 套接字读取并保存所有身份帧,直到并包括空分隔符,然后将随后的一个或多个帧传递给调用者。REP 套接字是同步的,一次与一个对端通信。如果将 REP 套接字连接到多个对端,请求会以公平的方式从对端读取,应答总是发送给发出上一个请求的那个对端。

  • DEALER 套接字对应答信封一无所知,并像处理任何多部分消息一样处理它。DEALER 套接字是异步的,并且像 PUSH 和 PULL 的组合。它们将发送的消息分发到所有连接,并从所有连接中公平地排队接收消息。

  • ROUTER 套接字对回复信封一无所知,就像 DEALER 一样。它为其连接创建身份,并将这些身份作为接收到的任何消息的第一个帧传递给调用者。反之,当调用者发送消息时,它使用消息的第一个帧作为身份来查找要发送到的连接。ROUTER 是异步的。

请求-应答组合 #

我们有四种请求-应答套接字,每种都有特定的行为。我们已经了解了它们在简单和扩展请求-应答模式中的连接方式。但这些套接字是你可以用来解决许多问题的构建块。

以下是合法的组合:

  • REQ 到 REP
  • DEALER 到 REP
  • REQ 到 ROUTER
  • DEALER 到 ROUTER
  • DEALER 到 DEALER
  • ROUTER 到 ROUTER

以下组合是无效的(我将解释原因):

  • REQ 到 REQ
  • REQ 到 DEALER
  • REP 到 REP
  • REP 到 ROUTER

这里有一些记住语义的技巧。DEALER 类似于异步 REQ 套接字,而 ROUTER 类似于异步 REP 套接字。在我们使用 REQ 套接字的地方,可以使用 DEALER;我们只需要自己读写信封。在我们使用 REP 套接字的地方,可以使用 ROUTER;我们只需要自己管理身份。

将 REQ 和 DEALER 套接字视为“客户端”,将 REP 和 ROUTER 套接字视为“服务器”。大多数情况下,你会希望绑定 REP 和 ROUTER 套接字,并将 REQ 和 DEALER 套接字连接到它们。这并非总是如此简单,但这是一个清晰且易于记忆的起点。

REQ 与 REP 组合 #

我们已经介绍了 REQ 客户端与 REP 服务器通信的情况,但我们来看看一个方面:REQ 客户端必须启动消息流。REP 服务器不能与尚未首先向其发送请求的 REQ 客户端通信。从技术上讲,这甚至是不可能的,如果你尝试这样做,API 也会返回一个EFSM错误。

DEALER 与 REP 组合 #

现在,让我们用 DEALER 替换 REQ 客户端。这为我们提供了一个可以与多个 REP 服务器通信的异步客户端。如果我们使用 DEALER 重写“Hello World”客户端,我们将能够发送任意数量的“Hello”请求,而无需等待应答。

当我们使用 DEALER 与 REP 套接字通信时,我们必须精确地模拟 REQ 套接字会发送的信封,否则 REP 套接字会将消息丢弃为无效。因此,要发送消息,我们:

  • 发送一个设置了 MORE 标志的空消息帧;然后
  • 发送消息体。

接收消息时,我们:

  • 接收第一个帧,如果它不是空的,则丢弃整个消息;
  • 接收下一个帧并将其传递给应用程序。

REQ 与 ROUTER 组合 #

就像我们可以用 DEALER 替换 REQ 一样,我们可以用 ROUTER 替换 REP。这为我们提供了一个可以同时与多个 REQ 客户端通信的异步服务器。如果我们使用 ROUTER 重写“Hello World”服务器,我们将能够并行处理任意数量的“Hello”请求。我们在第二章 - 套接字和模式中的mtserver示例中看到了这一点。

我们可以以两种不同的方式使用 ROUTER:

  • 作为在前端和后端套接字之间切换消息的代理。
  • 作为读取消息并对其进行操作的应用程序。

第一种情况下,ROUTER 只是简单地读取所有帧,包括人工身份帧,并盲目地将其传递下去。第二种情况下,ROUTER 必须知道发送给它的应答信封的格式。由于另一个对端是 REQ 套接字,ROUTER 将接收身份帧、一个空帧,然后是数据帧。

DEALER 与 ROUTER 组合 #

现在我们可以用 DEALER 和 ROUTER 替换 REQ 和 REP,从而获得最强大的套接字组合,即 DEALER 与 ROUTER 通信。它使异步客户端能够与异步服务器通信,并且双方都可以完全控制消息格式。

由于 DEALER 和 ROUTER 都可以处理任意消息格式,如果你希望安全地使用它们,你就必须稍微扮演一下协议设计者的角色。至少,你必须决定是否希望模仿 REQ/REP 应答信封。这取决于你是否确实需要发送应答。

DEALER 与 DEALER 组合 #

你可以用 ROUTER 替换 REP,但如果 DEALER 只与一个对端通信,你也可以用 DEALER 替换 REP。

当你用 DEALER 替换 REP 时,你的工作者可以突然完全异步,发送任意数量的应答。代价是你必须自己管理应答信封,并且要确保正确无误,否则一切都不会工作。我们稍后会看到一个实例。现在姑且说,DEALER 与 DEALER 组合是比较难以正确实现的模式之一,幸运的是我们很少需要它。

ROUTER 与 ROUTER 组合 #

这听起来很适合 N 对 N 连接,但它是最难使用的组合。在深入学习 ZeroMQ 之前,你应该避免使用它。我们将在第四章 - 可靠请求-应答模式的 Freelance 模式中看到一个例子,并在第八章 - 分布式计算框架中看到一种用于点对点工作的 DEALER 与 ROUTER 替代设计。

无效组合 #

通常来说,试图将客户端连接到客户端,或将服务器连接到服务器是一个糟糕的主意,并且不会奏效。然而,与其给出笼统含糊的警告,我将详细解释原因:

  • REQ 到 REQ:双方都想通过向对方发送消息来开始通信,这只有在你精确地安排时序,使得双方同时交换消息时才可能奏效。光是想想就让人头疼。

  • REQ 到 DEALER:理论上你可以这样做,但如果你添加第二个 REQ,它就会崩溃,因为 DEALER 没有办法将应答发送回原始对端。因此,REQ 套接字会混乱,并且/或者返回原本应发送给其他客户端的消息。

  • REP 到 REP:双方都会等待对方发送第一条消息。

  • REP 到 ROUTER:理论上,ROUTER 套接字可以在知道 REP 套接字已经连接并且知道该连接的身份的情况下发起对话并发送格式正确的请求。但这很混乱,并且相比 DEALER 到 ROUTER 没有任何额外的好处。

这些有效和无效组合分类的共同点是,ZeroMQ 套接字连接总是偏向于一个绑定到端点的对端,以及另一个连接到该端点的对端。此外,哪一方绑定,哪一方连接并非任意的,而是遵循自然模式。我们期望“始终存在”的一方进行绑定:它将是服务器、代理、发布者、收集者。“来来往往”的一方进行连接:它将是客户端和工作者。记住这一点将有助于你设计更好的 ZeroMQ 架构。

探索 ROUTER 套接字 #

让我们更仔细地看看 ROUTER 套接字。我们已经了解了它们通过将单独的消息路由到特定连接来工作的方式。我将更详细地解释我们如何识别这些连接,以及当 ROUTER 套接字无法发送消息时会发生什么。

身份和地址 #

ZeroMQ 中的身份概念特指 ROUTER 套接字及其如何识别与其他套接字的连接。更广泛地说,身份在应答信封中用作地址。在大多数情况下,身份是任意的,并且是 ROUTER 套接字的本地概念:它是哈希表中的查找键。独立于身份,对端可以拥有一个物理地址(如网络端点“tcp://192.168.55.117:5670”)或逻辑地址(如 UUID、电子邮件地址或其他唯一键)。

使用 ROUTER 套接字与特定对端通信的应用程序,如果构建了必要的哈希表,可以将逻辑地址转换为身份。因为 ROUTER 套接字只有在对端发送消息时才会公布该连接(到特定对端)的身份,所以你实际上只能回复消息,而不能自发地与对端通信。

即使你颠倒规则,让 ROUTER 连接对端而不是等待对端连接 ROUTER,这也是成立的。然而,你可以强制 ROUTER 套接字使用逻辑地址作为其身份。的zmq_setsockopt参考页将此称为设置套接字身份。其工作方式如下:

  • 对端应用程序在绑定或连接之前,设置其对端套接字(DEALER 或 REQ)的ZMQ_IDENTITY选项。
  • 通常情况下,对端会连接到已绑定的 ROUTER 套接字。但 ROUTER 也可以连接到对端。
  • 在连接时,对端套接字会告诉路由器套接字:“请对该连接使用此身份”。
  • 如果对端套接字没有指定,路由器会为其连接生成通常的任意随机身份。
  • ROUTER 套接字现在将此逻辑地址作为前缀身份帧提供给应用程序,用于来自该对端的任何消息。
  • ROUTER 也期望逻辑地址作为任何传出消息的前缀身份帧。

这是一个简单的例子,说明两个对端连接到 ROUTER 套接字,其中一个强加了一个逻辑地址“PEER2”:

C | C++ | CL | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Node.js | Perl | PHP | Python | Q | Racket | Ruby | Scala | Tcl | Ada | Basic | C# | F# | Felix | Julia | Objective-C | ooc | Racket | Rust | OCaml

以下是程序输出:

----------------------------------------
[005] 006B8B4567
[000]
[039] ROUTER uses a generated 5 byte identity
----------------------------------------
[005] PEER2
[000]
[038] ROUTER uses REQ's socket identity

ROUTER 错误处理 #

ROUTER 套接字处理无法发送的消息的方式有些简单粗暴:它们会静默丢弃这些消息。这种处理方式在正常工作的代码中是合理的,但会使调试变得困难。“将身份作为第一个帧发送”的方法本身就很微妙,我们在学习时经常会出错,而 ROUTER 在我们搞砸时的冰冷沉默并不具建设性。

自 ZeroMQ v3.2 起,你可以设置一个套接字选项来捕获此错误:ZMQ_ROUTER_MANDATORY。在 ROUTER 套接字上设置此选项后,当你发送消息时提供一个无法路由的身份,套接字将报告一个EHOSTUNREACH错误。

负载均衡模式 #

现在让我们看一些代码。我们将了解如何将 ROUTER 套接字连接到 REQ 套接字,然后再连接到 DEALER 套接字。这两个例子遵循相同的逻辑,即负载均衡模式。这种模式是我们第一次接触使用 ROUTER 套接字进行有意识的路由,而不仅仅是充当应答通道。

负载均衡模式非常常见,我们将在本书中多次看到它。它解决了简单的轮询路由(如 PUSH 和 DEALER 提供)的主要问题,即如果任务花费的时间差异很大,轮询就会变得效率低下。

这就像邮局的例子。如果你每个柜台都有一个队列,并且有些人购买邮票(一个快速、简单的交易),而有些人开设新账户(一个非常慢的交易),那么你会发现购买邮票的人会不公平地被困在队列中。就像在邮局一样,如果你的消息架构不公平,人们就会感到恼火。

邮局的解决方案是创建一个单一队列,这样即使一两个柜台被缓慢的工作卡住,其他柜台仍将继续以先到先得的方式为客户服务。

PUSH 和 DEALER 使用这种简单方法的其中一个原因是纯粹的性能。如果你抵达美国任何一个主要机场,你会看到排队等待过海关的长队。边境巡逻官员会提前将人们分配到每个柜台前排队,而不是使用单一队列。让人们提前走五十码可以为每位乘客节省一两分钟。而且由于每个护照检查所需的时间大致相同,所以或多或少是公平的。这就是 PUSH 和 DEALER 的策略:提前发送工作负载,以减少传输距离。

这是 ZeroMQ 反复出现的主题:世界上的问题是多样的,你可以通过以正确的方式解决不同的问题来受益。机场不是邮局,一刀切的方案对谁都不适用,真的。

让我们回到工作者(DEALER 或 REQ)连接到代理(ROUTER)的场景。代理必须知道工作者何时准备就绪,并维护一个工作者列表,以便每次都可以选择最近最少使用的工作者。

实际上,解决方案非常简单:工作者在启动时以及完成每个任务后发送一个“ready”消息。代理逐一读取这些消息。每次读取消息时,都是来自上一次使用的工作者。由于我们使用的是 ROUTER 套接字,我们得到了一个身份,然后可以使用这个身份将任务发送回工作者。

这可以看作是对请求-应答的一种变体,因为任务是随应答一起发送的,而任务的任何响应则作为一个新的请求发送。下面的代码示例应该会使其更清晰。

ROUTER 代理和 REQ 工作者 #

以下是一个使用 ROUTER 代理与一组 REQ 工作者通信的负载均衡模式示例:

C | C++ | CL | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Node.js | Perl | PHP | Python | Ruby | Scala | Tcl | Ada | Basic | C# | F# | Felix | Julia | Objective-C | ooc | Q | Racket | Rust | OCaml

示例运行五秒钟,然后每个工作者打印他们处理了多少任务。如果路由工作正常,我们期望任务会被公平地分配。

Completed: 20 tasks
Completed: 18 tasks
Completed: 21 tasks
Completed: 23 tasks
Completed: 19 tasks
Completed: 21 tasks
Completed: 17 tasks
Completed: 17 tasks
Completed: 25 tasks
Completed: 19 tasks

在这个示例中与工作者对话,我们必须创建一个对 REQ 友好的信封,它由一个身份和一个空的信封分隔帧组成。

图 31 - REQ 的路由信封

ROUTER 代理和 DEALER 工作者 #

任何可以使用 REQ 的地方,你都可以使用 DEALER。有两处具体的区别

  • REQ 套接字在发送任何数据帧之前总是发送一个空的定界帧;DEALER 则不会。
  • REQ 套接字在收到回复之前只会发送一条消息;DEALER 则是完全异步的。

同步与异步行为对我们的示例没有影响,因为我们执行的是严格的请求-回复模式。当我们在第 4 章 - 可靠请求-回复模式中讨论从故障中恢复时,它会更具相关性。

现在让我们看看完全相同的示例,但将 REQ 套接字替换为 DEALER 套接字

C | C++ | CL | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Node.js | Perl | PHP | Python | Ruby | Scala | Tcl | Ada | Basic | C# | F# | Felix | Julia | Objective-C | ooc | Q | Racket | Rust | OCaml

代码几乎完全相同,区别在于工作者使用 DEALER 套接字,并在数据帧之前读写那个空的帧。这是当我想保持与 REQ 工作者兼容时使用的方法。

然而,请记住那个空的定界帧的原因:它是为了允许多跳扩展请求在 REP 套接字处终止,REP 套接字使用该定界符来分割回复信封,以便将数据帧交给其应用程序。

如果我们从不需要将消息传递给 REP 套接字,我们可以简单地在两端都丢弃空的定界帧,这会使事情变得更简单。这通常是我用于纯 DEALER 到 ROUTER 协议的设计。

一个负载均衡消息代理 #

上面的示例只完成了一半。它可以管理一组带有模拟请求和回复的工作者,但无法与客户端通信。如果我们添加第二个接受客户端请求的前端 ROUTER 套接字,并将我们的示例变成一个可以将消息从前端切换到后端的代理,我们就能得到一个有用且可重用的小型负载均衡消息代理。

图 32 - 负载均衡代理

这个代理执行以下操作

  • 接受来自一组客户端的连接。
  • 接受来自一组工作者的连接。
  • 接受来自客户端的请求,并将这些请求保存在一个队列中。
  • 使用负载均衡模式将这些请求发送给工作者。
  • 接收工作者返回的回复。
  • 将这些回复发送回原始请求客户端。

代理的代码相当长,但值得理解

C | C++ | CL | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Node.js | Perl | PHP | Python | Ruby | Scala | Tcl | Ada | Basic | C# | F# | Felix | Julia | Objective-C | ooc | Q | Racket | Rust | OCaml

这个程序困难的部分在于 (a) 每个套接字读写的信封,以及 (b) 负载均衡算法。我们将依次讲解这些部分,从消息信封格式开始。

让我们逐步了解从客户端到工作者再返回的完整请求-回复链。在这段代码中,我们设置了客户端和工作者套接字的身份,以便更容易跟踪消息帧。实际上,我们会允许 ROUTER 套接字为连接创建身份。假设客户端的身份是“CLIENT”,工作者的身份是“WORKER”。客户端应用程序发送一个包含“Hello”的单帧。

图 33 - 客户端发送的消息

由于 REQ 套接字添加了空的定界帧,并且 ROUTER 套接字添加了其连接身份,代理从前端 ROUTER 套接字读取客户端地址、空的定界帧和数据部分。

图 34 - 前端接收到的消息

代理将此消息发送给工作者,并在消息前面加上选定工作者的地址,外加一个额外的空部分,以使另一端的 REQ 套接字工作正常。

图 35 - 发送到后端的 संदेश

这个复杂的信封堆栈首先被后端 ROUTER 套接字处理,它会移除第一个帧。然后,工作者中的 REQ 套接字移除空的部分,并将剩余部分提供给工作者应用程序。

图 36 - 发送到工作者的消息

工作者必须保存信封(即直到并包括空消息帧的所有部分),然后才能对数据部分进行所需的操作。请注意,REP 套接字会自动完成此操作,但我们正在使用 REQ-ROUTER 模式,以便获得适当的负载均衡。

在返回路径上,消息与进来时相同,即后端套接字给代理一个包含五部分的消息,代理发送给前端套接字一个包含三部分的消息,而客户端收到一个包含一部分的消息。

现在让我们看一下负载均衡算法。它要求客户端和工作者都使用 REQ 套接字,并且工作者能够正确地存储并重播他们收到的消息的信封。算法如下:

  • 创建一个 pollset,它始终轮询后端,并且仅当有一个或多个工作者可用时才轮询前端。

  • 使用无限超时进行轮询活动。

  • 如果后端有活动,我们要么收到“就绪”消息,要么收到给客户端的回复。在任何一种情况下,我们都将工作者地址(第一部分)存储在我们的工作者队列中;如果其余部分是客户端回复,我们就通过前端将其发送回给该客户端。

  • 如果前端有活动,我们就接收客户端请求,取出下一个工作者(即上次使用的那个),并将请求发送到后端。这意味着发送工作者地址、空的部分,然后是客户端请求的三部分。

现在你应该明白,你可以根据工作者在初始“就绪”消息中提供的信息来重用和扩展负载均衡算法。例如,工作者启动后可以进行性能自检,然后告诉代理它们的速度。代理 then 可以选择最快可用的工作者,而不是最旧的。

ZeroMQ 的高层 API #

我们将把请求-回复模式放在一边,开启另一个领域,即 ZeroMQ API 本身。这次绕道是有原因的:随着我们编写更复杂的示例,低级的 ZeroMQ API 开始显得越来越笨拙。看看我们的负载均衡代理中工作者线程的核心代码:


while (true) {
    //  Get one address frame and empty delimiter
    char *address = s_recv (worker);
    char *empty = s_recv (worker);
    assert (*empty == 0);
    free (empty);

    //  Get request, send reply
    char *request = s_recv (worker);
    printf ("Worker: %s\n", request);
    free (request);

    s_sendmore (worker, address);
    s_sendmore (worker, "");
    s_send     (worker, "OK");
    free (address);
}

那段代码甚至不可重用,因为它在信封中只能处理一个回复地址,而且它已经对 ZeroMQ API 进行了一些封装。如果我们使用libzmq简单的消息 API,我们就不得不写出这样的代码:


while (true) {
    //  Get one address frame and empty delimiter
    char address [255];
    int address_size = zmq_recv (worker, address, 255, 0);
    if (address_size == -1)
        break;

    char empty [1];
    int empty_size = zmq_recv (worker, empty, 1, 0);
    assert (empty_size <= 0);
    if (empty_size == -1)
        break;

    //  Get request, send reply
    char request [256];
    int request_size = zmq_recv (worker, request, 255, 0);
    if (request_size == -1)
        return NULL;
    request [request_size] = 0;
    printf ("Worker: %s\n", request);

    zmq_send (worker, address, address_size, ZMQ_SNDMORE);
    zmq_send (worker, empty, 0, ZMQ_SNDMORE);
    zmq_send (worker, "OK", 2, 0);
}

当代码太长而无法快速编写时,它也太长而难以理解。到目前为止,我一直坚持使用原生 API,因为作为 ZeroMQ 用户,我们需要对它有深入的了解。但当它妨碍我们时,我们必须将其视为一个需要解决的问题。

当然,我们不能仅仅改变 ZeroMQ API,它是一个有文档记载的公共契约,成千上万的人同意并依赖它。相反,我们基于目前的经验,特别是从编写更复杂的请求-回复模式中获得的经验,在其之上构建一个更高层的 API。

我们想要一个 API,它能让我们一次性接收和发送整个消息,包括包含任意数量回复地址的回复信封。一个能让我们用最少的代码行实现我们想要的功能的 API。

构建一个好的消息 API 是相当困难的。我们存在术语上的问题:ZeroMQ 使用“消息”来描述多部分消息和单个消息帧。我们存在期望上的问题:有时将消息内容视为可打印的字符串数据是很自然的,有时则是二进制大块。而且我们还面临技术挑战,特别是如果想避免过多地复制数据。

构建一个好的 API 的挑战影响所有语言,尽管我的特定用例是 C 语言。无论你使用何种语言,都请思考如何为你的语言绑定做贡献,使其与我将要描述的 C 语言绑定一样好(或更好)。

高层 API 的特性 #

我的解决方案是使用三个相当自然且显而易见的概念:字符串(已经是我们现有s_sends_recv)辅助函数、(一个消息帧)和消息(包含一个或多个帧的列表)。这是使用这些概念重写的工人代码 API:


while (true) {
    zmsg_t *msg = zmsg_recv (worker);
    zframe_reset (zmsg_last (msg), "OK", 2);
    zmsg_send (&msg, worker);
}

减少读写复杂消息所需的代码量非常好:结果易于阅读和理解。让我们继续将这个过程应用于使用 ZeroMQ 的其他方面。以下是我根据目前使用 ZeroMQ 的经验,希望在高层 API 中包含的功能清单:

  • 套接字的自动处理。我发现手动关闭套接字以及在某些(但非全部)情况下必须明确定义 linger 超时很麻烦。如果能在我关闭上下文时自动关闭套接字,那就太好了。

  • 可移植的线程管理。每个非琐碎的 ZeroMQ 应用程序都使用线程,但 POSIX 线程不可移植。因此,一个体面的高层 API 应该在可移植层下隐藏这一点。

  • 父线程到子线程的管道通信。这是一个反复出现的问题:如何在父子线程之间发送信号。我们的 API 应该提供一个 ZeroMQ 消息管道(使用 PAIR 套接字和inproc自动完成)。

  • 可移植的时钟。即使获取毫秒级精度的时间,或休眠几毫秒,都不可移植。实际的 ZeroMQ 应用程序需要可移植的时钟,所以我们的 API 应该提供它们。

  • 一个用于替换 zmq_poll() 的反应器。轮询循环很简单,但很笨拙。写很多这样的代码,我们最终会一遍又一遍地做同样的工作:计算定时器,并在套接字就绪时调用代码。一个带有套接字读取器和定时器的简单反应器可以节省大量重复工作。

  • Ctrl-C 的正确处理。我们已经看到了如何捕获中断。如果所有应用程序都能做到这一点,那就太好了。

CZMQ 高层 API #

将这个愿望清单变为现实,对于 C 语言来说就是 CZMQ,这是一个 ZeroMQ 的 C 语言绑定。这个高层绑定实际上是从早期版本的示例中发展出来的。它结合了更优雅的 ZeroMQ 使用语义以及一些可移植层,还有(对 C 语言很重要,但对其他语言次之)哈希和列表等容器。CZMQ 还使用了一种优雅的对象模型,可以写出非常漂亮的代码。

这是使用更高层 API (C 语言示例中使用 CZMQ) 重写的负载均衡代理

C | C++ | Delphi | Haxe | Java | Lua | PHP | Python | Scala | Ada | Basic | C# | CL | Erlang | Elixir | F# | Felix | Go | Haskell | Julia | Node.js | Objective-C | ooc | Perl | Q | Racket | Ruby | Rust | Tcl | OCaml

CZMQ 提供的一项功能是干净的中断处理。这意味着 Ctrl-C 将导致任何阻塞的 ZeroMQ 调用退出,返回代码为 -1,并将 errno 设置为EINTR。在这种情况下,高层接收方法将返回 NULL。因此,你可以像这样干净地退出循环:


while (true) {
    zstr_send (client, "Hello");
    char *reply = zstr_recv (client);
    if (!reply)
        break;              //  Interrupted
    printf ("Client: %s\n", reply);
    free (reply);
    sleep (1);
}

或者,如果你正在调用 zmq_poll(),检查返回值


if (zmq_poll (items, 2, 1000 * 1000) == -1)
    break;              //  Interrupted

上面的示例仍然使用 zmq_poll()。那么反应器怎么样呢?CZMQ 的zloop反应器很简单但功能强大。它允许你

  • 在任何套接字上设置读取器,即当套接字有输入时调用的代码。
  • 取消套接字上的读取器。
  • 设置一个在特定时间间隔内触发一次或多次的定时器。
  • 取消定时器。

zloop当然在内部使用 zmq_poll()。它每次添加或移除读取器时都会重建其 poll set,并计算轮询超时时间以匹配下一个定时器。然后,它会为每个需要关注的套接字和定时器调用相应的读取器和定时器处理函数。

当我们使用反应器模式时,我们的代码会发生颠倒。主要逻辑看起来像这样:


zloop_t *reactor = zloop_new ();
zloop_reader (reactor, self->backend, s_handle_backend, self);
zloop_start (reactor);
zloop_destroy (&reactor);

实际的消息处理位于专门的函数或方法内部。你可能不喜欢这种风格——这取决于个人喜好。它有助于混合处理定时器和套接字活动。在本文的其余部分,我们将在更简单的情况下使用 zmq_poll(),而在更复杂的示例中则使用zloop

这是再次重写的负载均衡代理,这次使用zloop:

C | Haxe | Java | Python | Ada | Basic | C++ | C# | CL | Delphi | Erlang | Elixir | F# | Felix | Go | Haskell | Julia | Lua | Node.js | Objective-C | ooc | Perl | PHP | Q | Racket | Ruby | Rust | Scala | Tcl | OCaml

当你发送 Ctrl-C 时,让应用程序正确关闭可能会很棘手。如果你使用zctx类,它会自动设置信号处理,但你的代码仍然需要配合。如果zmq_poll返回 -1,或者如果任何zstr_recv, zframe_recv,或zmsg_recv方法返回 NULL,你必须中断任何循环。如果你有嵌套循环,将外部循环设置为依赖于!zctx_interrupted.

可能会很有用。如果你使用子线程,它们不会接收到中断信号。要告知它们关闭,你可以选择

  • 如果它们共享同一个上下文,销毁该上下文,在这种情况下,它们正在等待的任何阻塞调用将以 ETERM 结束。
  • 如果它们使用自己的上下文,发送关闭消息给它们。为此你需要一些套接字管道。

异步客户端/服务器模式 #

在 ROUTER 到 DEALER 的示例中,我们看到了一个 1 对 N 的用例,其中一个服务器异步地与多个工作者通信。我们可以将这种情况颠倒过来,得到一个非常有用的 N 对 1 架构,其中各种客户端与单个服务器通信,并且是异步进行的。

图 37 - 异步客户端/服务器

工作原理如下:

  • 客户端连接到服务器并发送请求。
  • 对于每个请求,服务器发送 0 个或多个回复。
  • 客户端可以发送多个请求,而无需等待回复。
  • 服务器可以发送多个回复,而无需等待新的请求。

以下是展示其工作原理的代码

C | C++ | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Node.js | PHP | Python | Ruby | Scala | Tcl | 贡献翻译

该示例在一个进程中运行,通过多个线程模拟真实的多进程架构。运行示例时,您会看到三个客户端(每个客户端都有一个随机 ID),打印出它们从服务器获得的回复。仔细观察,您会发现每个客户端任务在每个请求中获得 0 个或更多回复。

关于此代码的一些评论

  • 客户端每秒发送一个请求,并收到零个或多个回复。为了使用以下方法实现这一点 zmq_poll(),我们不能简单地使用 1 秒的超时时间进行轮询,否则我们将只在收到最后一个回复一秒后才发送新的请求。因此,我们以高频率(每秒轮询 100 次,每次间隔 1/100 秒)进行轮询,这大致是准确的。

  • 服务器使用一个工作线程池,每个线程同步处理一个请求。它使用内部队列将这些线程连接到其前端套接字。它使用一个 zmq_proxy()调用。

图 38 - 异步服务器详情

请注意,我们在客户端和服务器之间执行 DEALER 到 ROUTER 对话,但在服务器主线程和工作线程之间,我们执行 DEALER 到 DEALER。如果工作线程是严格同步的,我们将使用 REP。然而,因为我们想发送多个回复,我们需要一个异步套接字。我们想路由回复,它们总是发送到向我们发送请求的单个服务器线程。

让我们考虑一下路由信封。客户端发送一个包含单个帧的消息。服务器线程接收一个双帧消息(原始消息前缀为客户端身份)。我们将这两个帧发送给工作线程,工作线程将其视为正常的回复信封,并以双帧消息的形式返回给我们。然后,我们使用第一个帧作为身份,将第二个帧路由回客户端作为回复。

它看起来像这样

     client          server       frontend       worker
   [ DEALER ]<---->[ ROUTER <----> DEALER <----> DEALER ]
             1 part         2 parts       2 parts

现在来说套接字:我们可以使用负载均衡的 ROUTER 到 DEALER 模式与工作线程通信,但这会增加额外的工作。在这种情况下,DEALER 到 DEALER 模式可能没问题:权衡是每个请求的延迟较低,但工作分布不平衡的风险较高。在这种情况下,简单性更重要。

当你构建维护与客户端有状态对话的服务器时,你会遇到一个经典问题。如果服务器为每个客户端保留一些状态,并且客户端不断地连接和断开,最终服务器将耗尽资源。即使是同一个客户端不断连接,如果你使用默认身份,每个连接都看起来像一个新的连接。

在上面的示例中,我们通过仅保留非常短时间(工作线程处理请求所需的时间)的状态,然后丢弃该状态来作弊。但这在许多情况下并不实用。为了在有状态的异步服务器中正确管理客户端状态,你必须

  • 从客户端到服务器进行心跳。在我们的示例中,我们每秒发送一个请求,这可以可靠地用作心跳。

  • 使用客户端身份(无论是生成的还是显式的)作为键来存储状态。

  • 检测到停止的心跳。如果在例如两秒内没有来自客户端的请求,服务器可以检测到这一点并销毁其为该客户端保留的任何状态。

工作示例:中介间路由 #

让我们回顾一下到目前为止所看到的一切,并将其扩展到一个真实的应用程序。我们将分几个迭代步骤逐步构建。我们最好的客户紧急来电,请求设计一个大型云计算设施。他设想的云跨越多个数据中心,每个数据中心都是客户端和工作线程的集群,并且作为一个整体协同工作。因为我们足够聪明,知道实践总是胜过理论,所以我们建议使用 ZeroMQ 进行一个工作模拟。我们的客户,急于在他自己的老板改变主意之前确定预算,并且在 Twitter 上读到关于 ZeroMQ 的好评,欣然同意。

确定细节 #

喝了几杯意式浓缩咖啡后,我们想立即开始编写代码,但一个细微的声音告诉我们,在为一个完全错误的问题提供耸人听闻的解决方案之前,先了解更多细节。我们问:“云在做什么样的工作?”

客户解释道

  • 工作线程运行在各种硬件上,但它们都能处理任何任务。每个集群有数百个工作线程,总共有多达十几个集群。

  • 客户端为工作线程创建任务。每个任务都是一个独立的工作单元,客户端只希望尽快找到一个可用的工作线程并将其发送给它。将会有很多客户端,它们会任意地出现和消失。

  • 真正的难点在于能够随时添加和移除集群。一个集群可以立即离开或加入云,并带走其所有工作线程和客户端。

  • 如果客户端自己的集群中没有工作线程,它们的任务将发送给云中其他可用的工作线程。

  • 客户端一次发送一个任务,等待回复。如果它们在 X 秒内没有收到回复,它们会再次发送该任务。这不是我们需要关注的问题;客户端 API 已经处理了。

  • 工作线程一次处理一个任务;它们非常简单。如果它们崩溃,会由启动它们的脚本重新启动。

所以我们再次确认以确保我们正确理解了这一点

  • 我们问:“集群之间会有某种超赞的网络互连,对吧?” 客户说:“是的,当然,我们又不是傻瓜。”

  • 我们问:“我们谈论的规模是怎样的?” 客户回答说:“每个集群最多有一千个客户端,每个客户端每秒最多进行十次请求。请求很小,回复也很小,每个不超过 1K 字节。”

所以我们做了一个简单的计算,发现这在普通 TCP 上工作得很好。2,500 个客户端 x 10 次/秒 x 1,000 字节 x 2 方向 = 50MB/秒 或 400Mb/秒,对于 1Gb 网络来说不是问题。

这是一个直接的问题,不需要特殊的硬件或协议,只需要一些巧妙的路由算法和仔细的设计。我们首先设计一个集群(一个数据中心),然后弄清楚如何将集群连接在一起。

单个集群的架构 #

工作线程和客户端是同步的。我们想使用负载均衡模式将任务路由给工作线程。工作线程都是相同的;我们的设施没有不同服务的概念。工作线程是匿名的;客户端从不直接寻址它们。我们在此不尝试提供保证交付、重试等功能。

出于我们已经探讨过的原因,客户端和工作线程不会直接相互通信。这使得动态添加或移除节点变得不可能。因此,我们的基本模型由我们之前看到的请求-回复消息中介构成。

图 39 - 集群架构

扩展到多个集群 #

现在我们将其扩展到多个集群。每个集群都有一组客户端和工作线程,以及一个将它们连接在一起的中介。

图 40 - 多个集群

问题是:我们如何让每个集群的客户端与另一个集群的工作线程通信?有几种可能性,每种都有优缺点

  • 客户端可以直接连接到两个中介。优点是我们无需修改中介或工作线程。但客户端会变得更复杂,并了解整体拓扑结构。例如,如果我们要添加第三个或第四个集群,所有客户端都会受到影响。实际上,我们必须将路由和故障转移逻辑移到客户端中,这并不好。

  • 工作线程可以直接连接到两个中介。但是 REQ 工作线程做不到,它们只能回复一个中介。我们可以使用 REP,但 REP 不提供像负载均衡那样的可定制的中介到工作线程的路由,只有内置的负载均衡。这是一个失败;如果我们要将工作分配给空闲的工作线程,我们恰恰需要负载均衡。一个解决方案是为工作节点使用 ROUTER 套接字。我们将此标记为“想法 #1”。

  • 中介可以相互连接。这看起来最简洁,因为它创建的额外连接最少。我们无法动态添加集群,但这可能超出了范围。现在客户端和工作线程仍然不了解真实的网络拓扑,并且中介在有空闲容量时相互告知。我们将此标记为“想法 #2”。

让我们探讨想法 #1。在这种模型中,工作线程连接到两个中介,并接受来自其中任何一个的任务。

图 41 - 想法 1:交叉连接的工作线程

这看起来可行。然而,它没有提供我们想要的东西,即客户端尽可能获得本地工作线程,只有在比等待更好的情况下才获得远程工作线程。此外,工作线程会向两个中介发送“就绪”信号,并可能同时获得两个任务,而其他工作线程仍然空闲。看起来这个设计失败了,因为我们再次将路由逻辑放在了边缘。

那么,就是想法 #2 了。我们连接中介,而不触碰客户端或工作线程,它们仍然是我们习惯的 REQ。

图 42 - 想法 2:中介相互通信

这种设计很有吸引力,因为问题在一个地方解决,对外界不可见。基本上,中介会相互开启秘密通道,像骆驼商人一样低语:“嘿,我有一些空闲容量。如果你客户端太多,叫我一声,我们就成交。”

实际上,这只是一种更复杂的路由算法:中介彼此成为分包商。即使在我们编写实际代码之前,这种设计还有其他值得喜欢的地方

  • 它将常见情况(客户端和工作线程在同一集群)视为默认情况,并为异常情况(在集群之间分派任务)做额外工作。

  • 它允许我们对不同类型的工作使用不同的消息流。这意味着我们可以以不同方式处理它们,例如使用不同类型的网络连接。

  • 感觉它可以平滑地扩展。连接三个或更多中介并不会变得过于复杂。如果发现这是个问题,可以通过添加一个超级中介轻松解决。

现在我们将创建一个工作示例。我们将把整个集群打包到一个进程中。这显然不现实,但这使得模拟变得简单,并且该模拟可以精确地扩展到真实的进程。这就是 ZeroMQ 的魅力所在——你可以在微观层面进行设计,然后将其扩展到宏观层面。线程变成进程,然后变成盒子,模式和逻辑保持不变。我们的每个“集群”进程都包含客户端线程、工作线程和一个中介线程。

我们现在已经很清楚基本模型了

  • REQ 客户端 (REQ) 线程创建工作负载并将其传递给中介 (ROUTER)。
  • REQ 工作线程 (REQ) 线程处理工作负载并将结果返回给中介 (ROUTER)。
  • 中介使用负载均衡模式对工作负载进行排队和分发。

联邦式 vs 对等式 #

有几种可能的方式可以连接中介。我们希望能够告诉其他中介“我们有容量”,然后接收多个任务。我们还需要能够告诉其他中介“停止,我们满了”。这不需要是完美的;有时我们可能会接受不能立即处理的任务,然后我们会尽快处理它们。

最简单的互连方式是联邦式 (federation),其中中介彼此模拟客户端和工作线程。我们会通过将我们的前端连接到另一个中介的后端套接字来实现这一点。请注意,将套接字绑定到一个端点并将其连接到其他端点是合法的。

图 43 - 联邦式模型中的交叉连接中介

这将使两个中介都拥有简单的逻辑和相对不错的机制:当没有工作线程时,告诉另一个中介“就绪”,并接受来自它的一个任务。问题在于,它对于这个问题来说也太简单了。一个联邦式中介一次只能处理一个任务。如果中介模拟的是一个锁步客户端和工作线程,那么它按定义也将是锁步的,而且如果它有很多可用工作线程,它们也不会被使用。我们的中介需要以完全异步的方式连接。

联邦式模型非常适合其他类型的路由,尤其是面向服务的架构 (SOA),它通过服务名称和邻近性进行路由,而不是负载均衡或轮询。所以不要认为它没用,它只是不适合所有用例。

除了联邦式,让我们看看对等式 (peering) 方法,其中中介明确地知道彼此,并通过特权通道通信。让我们分解一下,假设我们要连接 N 个中介。每个中介都有 (N - 1) 个对等体,并且所有中介都使用完全相同的代码和逻辑。中介之间有两种不同的信息流

  • 每个中介需要随时告诉其对等体有多少可用工作线程。这可以是相当简单的信息——仅仅是一个定期更新的数量。为此,显而易见的(也是正确的)套接字模式是 pub-sub。因此,每个中介都会打开一个 PUB 套接字并发布状态信息,并且每个中介也会打开一个 SUB 套接字并将其连接到其他所有中介的 PUB 套接字,以从其对等体获取状态信息。

  • 每个中介需要一种方式将任务委托给对等体并异步获取回复。我们将使用 ROUTER 套接字来实现这一点;没有其他组合可行。每个中介都有两个这样的套接字:一个用于接收任务,一个用于委托任务。如果我们不使用两个套接字,每次读取时要判断是请求还是回复会更麻烦。这意味着需要在消息信封中添加更多信息。

中介与其本地客户端和工作线程之间也存在信息流。

命名仪式 #

三个流 x 每个流两个套接字 = 我们必须在中介中管理的六个套接字。选择好名字对于在我们的头脑中保持多套接字操作的合理一致性至关重要。套接字做某事,它们所做的事情应该构成其名称的基础。这是为了能够在几周后的寒冷星期一早上喝咖啡之前阅读代码,并且不会感到任何痛苦。

让我们为这些套接字举行一个萨满教的命名仪式。这三个流是

  • 中介与其客户端和工作线程之间的本地请求-回复流。
  • 中介与其对等中介之间的云端请求-回复流。
  • 中介与其对等中介之间的状态流。

找到长度相同且有意义的名称意味着我们的代码将很好地对齐。这不是一件大事,但注意细节会有帮助。对于每个流,中介有两个套接字,我们可以正交地称之为前端 (frontend)后端 (backend)。我们经常使用这些名称。前端接收信息或任务。后端将这些信息或任务发送给其他对等体。概念上的流向是从前到后(回复的方向则从后到前)。

因此,在本教程编写的所有代码中,我们将使用这些套接字名称

  • 本地流使用 localfelocalbe
  • 云端流使用 cloudfecloudbe
  • 状态流使用 statefestatebe

对于我们的传输方式,并且因为我们在一个盒子中模拟整个过程,我们将使用ipc用于所有通信。这具有像tcp一样工作的优点(即,它是一种非连接传输,与inproc不同),而且我们不需要 IP 地址或 DNS 名称,这在这里会很麻烦。相反,我们将使用ipc名为 something-localsomething-cloud,以及 something-state的端点,其中 something 是我们模拟集群的名称。

您可能认为这只是为一些名字做了很多工作。为什么不称它们为 s1, s2, s3, s4 等等?答案是,如果你的大脑不是一个完美的机器,你在阅读代码时需要很多帮助,我们会看到这些名字确实有帮助。记住“三个流,两个方向”比记住“六个不同的套接字”更容易。

图 44 - 中介套接字布局

请注意,我们将每个中介中的 cloudbe 连接到其他所有中介中的 cloudfe,同样,我们将每个中介中的 statebe 连接到其他所有中介中的 statefe。

状态流原型实现 #

因为每个套接字流都有其自己针对粗心者的陷阱,我们将逐个在实际代码中测试它们,而不是试图一次性将所有内容放入代码中。当我们对每个流都满意后,就可以将它们组合成一个完整的程序。我们将从状态流开始。

图 45 - 状态流

以下是此代码的工作原理

C | C++ | Delphi | Go | Haskell | Haxe | Java | Lua | Node.js | PHP | Python | Racket | Ruby | Scala | Tcl | 贡献翻译

关于此代码的注意事项

  • 每个中介都有一个身份,我们用来构建ipc端点名称。一个真正的中介需要使用 TCP 和更复杂的配置方案。我们将在本书后面讨论这些方案,但目前,使用生成的ipc名称使我们可以忽略获取 TCP/IP 地址或名称的问题。

  • 我们使用一个 zmq_poll()循环作为程序的核心。它处理传入消息并发送状态消息。我们只有在没有收到任何传入消息并且等待了一秒钟后才发送状态消息。如果我们每次收到消息时都发送状态消息,就会造成消息风暴。

  • 我们使用由发送者地址和数据组成的两部分 pub-sub 消息。请注意,为了向发布者发送任务,我们需要知道发布者的地址,唯一的方法是将此地址显式作为消息的一部分发送。

  • 我们不在订阅者上设置身份,因为如果设置了,连接到正在运行的中介时会获取到过时的状态信息。

  • 我们不在发布者上设置 HWM (High Water Mark),但如果使用 ZeroMQ v2.x,那会是一个明智的主意。

我们可以构建这个小程序并运行三次来模拟三个集群。我们将它们称为 DC1、DC2 和 DC3(名称是任意的)。我们在三个独立的窗口中运行这三个命令

peering1 DC1 DC2 DC3  #  Start DC1 and connect to DC2 and DC3
peering1 DC2 DC1 DC3  #  Start DC2 and connect to DC1 and DC3
peering1 DC3 DC1 DC2  #  Start DC3 and connect to DC1 and DC2

您会看到每个集群报告其对等体的状态,几秒钟后,它们都会愉快地每秒打印一个随机数。尝试一下,并确认这三个中介都能匹配并同步到每秒的状态更新。

在实际生活中,我们不会定期发送状态消息,而是在状态发生变化时发送,例如,当工作线程变得可用或不可用时。这看起来流量很大,但状态消息很小,而且我们已经确定集群间的连接速度非常快。

如果我们要以精确的间隔发送状态消息,我们会创建一个子线程并在该线程中打开statebe套接字。然后我们会从主线程向该子线程发送不规则的状态更新,并允许子线程将它们合并成规则的传出消息。这比我们在这里需要的更多工作。

本地流和云端流原型实现 #

现在让我们通过本地和云端套接字原型实现任务流。这段代码从客户端拉取请求,然后随机分发给本地工作线程和云端对等体。

图 46 - 任务流

在我们深入代码之前(代码正变得有点复杂),让我们先勾勒出核心路由逻辑,并将其分解成一个简单但健壮的设计。

我们需要两个队列,一个用于接收来自本地客户端的请求,一个用于接收来自云端客户端的请求。一个选择是从本地和云端前端拉取消息,然后将它们推送到各自的队列中。但这有点没意义,因为 ZeroMQ 套接字本身就是队列。所以让我们使用 ZeroMQ 套接字缓冲区作为队列。

这是我们在负载均衡中介中使用的技术,效果很好。我们只在有地方发送请求时才从两个前端读取。我们可以随时从后端读取,因为它们提供要路由回去的回复。只要后端不与我们通信,就没有必要查看前端。

所以我们的主循环变成了

  • 轮询后端以检查活动。当我们收到消息时,可能是来自工作者的“ready”消息,也可能是回复。如果是回复,则通过本地或云前端路由回去。

  • 如果一个工作者回复了,它就变得可用,因此我们将其排队并计数。

  • 当有可用工作者时,从任一前端获取请求(如果有的话),并路由到本地工作者,或者随机路由到云对等体。

随机将任务发送给对等broker而不是工作者,这模拟了集群中的工作分布。这很笨,但对于这个阶段来说没关系。

我们使用broker身份来路由broker之间的消息。在这个简单原型中,每个broker都有一个我们在命令行上提供的名称。只要这些名称不与用于客户端节点的 ZeroMQ 生成的 UUID 重叠,我们就能判断是应该将回复路由回客户端还是路由到broker。

这是代码中实现的方式。有趣的部分从注释“Interesting part”附近开始。

C | C++ | Delphi | Go | Haskell | Haxe | Java | Lua | PHP | Python | Ruby | Scala | Tcl | Ada | Basic | C# | CL | Erlang | Elixir | F# | Felix | Julia | Node.js | Objective-C | ooc | Perl | Q | Racket | Rust | OCaml

例如,通过在两个窗口中启动两个broker实例来运行此程序。

peering2 me you
peering2 you me

关于此代码的一些评论

  • 至少在 C 代码中,使用 zmsg 类会使开发变得容易得多,并且代码更短。显然这是一个有效的抽象。如果你在 C 中构建 ZeroMQ 应用程序,应该使用 CZMQ。

  • 由于我们没有从对等体获取任何状态信息,我们天真地认为它们正在运行。代码会在你启动所有broker后提示你确认。在实际情况中,我们不会向没有告知我们其存在的broker发送任何消息。

你可以通过观察代码永远运行来证明它是有效的。如果有任何错误路由的消息,客户端最终会阻塞,并且broker会停止打印跟踪信息。你可以通过杀死其中任何一个broker来证明这一点。另一个broker会尝试将请求发送到云端,其客户端会逐个阻塞,等待回复。

整合起来 #

让我们将这一切整合到一个包中。和之前一样,我们将整个集群作为单个进程运行。我们将把前面的两个示例合并成一个正常工作的设计,让你能够模拟任意数量的集群。

这段代码的大小与前两个原型加起来差不多,共 270 行代码。对于一个包含客户端、工作者和云工作负载分布的集群模拟来说,这相当不错。代码如下

C | C++ | Delphi | Go | Haskell | Haxe | Java | Lua | PHP | Python | Ruby | Tcl | Ada | Basic | C# | CL | Erlang | Elixir | F# | Felix | Julia | Node.js | Objective-C | ooc | Perl | Q | Racket | Rust | Scala | OCaml

这是一个非简单的程序,花费了大约一天时间才使其工作。主要内容如下

  • 客户端线程检测并报告失败的请求。它们通过轮询等待响应来实现这一点,如果一段时间(10秒)后没有收到响应,则打印错误消息。

  • 客户端线程不直接打印,而是将消息发送到一个监控 socket (PUSH),主循环收集 (PULL) 并打印出来。这是我们第一次看到使用 ZeroMQ socket 进行监控和日志记录的案例;这是一个重要的用例,我们稍后会再讨论。

  • 客户端模拟不同的负载,以便在随机时刻使集群达到 100% 利用率,从而将任务转移到云端。客户端和工作者的数量,以及客户端和工作者线程中的延迟控制着这一点。欢迎调整这些参数,看看是否能创建一个更真实的模拟。

  • 主循环使用两个 pollset。实际上,它可以使用三个:信息、后端和前端。如同早期的原型一样,如果后端没有容量,接收前端消息就没有意义。

这些是开发此程序期间出现的一些问题

  • 客户端会冻结,因为请求或回复在某个地方丢失了。回想一下,ROUTER socket 会丢弃无法路由的消息。这里的第一个策略是修改客户端线程以检测和报告此类问题。其次,我放了zmsg_dump()在主循环中的每次接收后和每次发送前调用,直到问题的根源清晰为止。

  • 主循环错误地从多个就绪 socket 中读取。这导致第一条消息丢失。我通过仅从第一个就绪 socket 读取来修复了这个问题。

  • zmsg类未能正确地将 UUID 编码为 C 字符串。这导致包含零字节的 UUID 被损坏。我通过修改zmsg将 UUID 编码为可打印的十六进制字符串来修复了这个问题。

这个模拟没有检测到云对等体的消失。如果你启动了几个对等体并停止了其中一个,并且它正在向其他对等体广播容量信息,即使它已消失,其他对等体仍会继续向其发送工作。你可以尝试一下,你会发现客户端会抱怨请求丢失。解决方案有两个方面:首先,只保留短时间的容量信息,这样如果一个对等体消失了,其容量会很快设置为零。其次,为请求-回复链增加可靠性。我们将在下一章探讨可靠性。