2. Socket(套接字)和模式

第 2 章 - Socket 和模式 #

第 1 章 - 基础中,我们初步了解了 ZeroMQ,并学习了一些主要的 ZeroMQ 模式的基本示例:请求-响应、发布-订阅和流水线。在本章中,我们将深入实践,开始学习如何在实际程序中使用这些工具。

我们将涵盖:

  • 如何创建和使用 ZeroMQ Socket。
  • 如何在 Socket 上发送和接收消息。
  • 如何围绕 ZeroMQ 的异步 I/O 模型构建应用程序。
  • 如何在单个线程中处理多个 Socket。
  • 如何正确处理致命错误和非致命错误。
  • 如何处理中断信号,例如 Ctrl-C。
  • 如何干净地关闭 ZeroMQ 应用程序。
  • 如何检查 ZeroMQ 应用程序是否存在内存泄漏。
  • 如何发送和接收多部分消息。
  • 如何在网络上转发消息。
  • 如何构建一个简单的消息队列代理。
  • 如何使用 ZeroMQ 编写多线程应用程序。
  • 如何使用 ZeroMQ 进行线程间信号传递。
  • 如何使用 ZeroMQ 协调节点网络。
  • 如何创建和使用发布-订阅的消息信封。
  • 使用高水位标记(HWM)来防止内存溢出。

Socket API #

坦白地说,ZeroMQ 在某种程度上对你使用了“障眼法”,对此我们并不道歉。这是为了你好,而且这比伤到你更让我们自己难受。ZeroMQ 提供了一个熟悉的基于 Socket 的 API,为了隐藏大量的消息处理引擎,我们付出了巨大的努力。然而,结果将慢慢地纠正你关于如何设计和编写分布式软件的世界观。

Socket 是事实上的网络编程标准 API,同时也能防止你的眼球掉到脸颊上。让 ZeroMQ 对开发者特别有吸引力的一点是,它使用了 Socket 和消息,而不是其他一些任意的概念集。感谢 Martin Sustrik 实现了这一点。它将“面向消息的中间件”——一个能让整个房间陷入僵局的短语——变成了“超辣 Socket!”,这让我们对披萨产生了奇怪的渴望,并渴望了解更多。

就像一道最喜欢的菜一样,ZeroMQ Socket 很容易消化。Socket 的生命分为四个部分,就像 BSD Socket 一样:

请注意,Socket 始终是 void 指针,而消息(我们很快就会讲到)是结构体。因此,在 C 语言中,你直接传递 Socket,但在所有处理消息的函数中,你都传递消息的地址,例如 `zmq_msg_send()``zmq_msg_send()` `zmq_msg_recv()`和 `zmq_msg_recv()`。作为记忆辅助,记住“在 ZeroMQ 中,你的所有 Socket 都属于我们”,但消息是你代码中实际拥有的东西。

创建、销毁和配置 Socket 的方式和你期望的任何对象一样。但请记住,ZeroMQ 是一个异步的、弹性的结构。这对我们将 Socket 连接到网络拓扑以及之后如何使用它们产生了一些影响。

将 Socket 连接到网络拓扑中 #

要在两个节点之间建立连接,你使用 `zmq_bind()``zmq_bind()` `zmq_connect()`在一个节点中,并使用 `zmq_bind()``zmq_connect()` `zmq_connect()`在另一个节点中。通常的经验法则是,执行 `zmq_bind()` 的是“服务器”,它监听在一个众所周知的网络地址上,而执行 `zmq_connect()` 的是“客户端”,其网络地址未知或任意。因此,我们说“将一个 Socket 绑定到一个端点”以及“将一个 Socket 连接到一个端点”,端点就是那个众所周知的网络地址。

ZeroMQ 连接与经典的 TCP 连接有些不同。主要显著差异在于:

  • 它们通过任意传输方式(inprocipctcppgmepgm)。参见`zmq_inproc()`, `zmq_ipc()`, `zmq_tcp()`, `zmq_pgm()``zmq_epgm()`)。参见 zmq_inproc(), zmq_ipc(), zmq_tcp(), zmq_pgm(),以及 zmq_epgm().

  • 一个 Socket 可以有多个出站和入站连接。

  • 没有`zmq_accept`() 方法。当 Socket 绑定到一个端点后,它会自动开始接受连接。

  • 网络连接本身在后台进行,如果网络连接中断(例如,对等方消失后又出现),ZeroMQ 会自动重新连接。

  • 你的应用程序代码不能直接操作这些连接;它们被封装在 Socket 下。

许多架构遵循某种客户端/服务器模型,其中服务器是最静态的组件,而客户端是最动态的组件,即它们出现和消失最频繁。有时会遇到寻址问题:服务器对客户端可见,但反之则不一定。因此,大多数情况下,哪个节点应该执行 `zmq_bind()``zmq_bind()` `zmq_connect()`(服务器),哪个节点应该执行 `zmq_connect()`(客户端)是显而易见的。这也取决于你使用的 Socket 类型,对于不寻常的网络架构可能有一些例外。我们稍后会查看 Socket 类型。

现在,想象一下我们在服务器启动之前启动客户端。在传统网络中,我们会得到一个大大的红色失败标志。但 ZeroMQ 允许我们任意启动和停止各部分。一旦客户端节点执行了 `zmq_connect()``zmq_connect()` `zmq_bind()`,连接就存在了,该节点可以开始向 Socket 写入消息。在某个阶段(希望在消息排队过多以至于开始被丢弃或客户端阻塞之前),服务器就会启动,执行一次 `zmq_bind()`,然后 ZeroMQ 开始传递消息。

服务器节点可以绑定到多个端点(即协议和地址的组合),并且可以使用单个 Socket 来完成。这意味着它将接受跨不同传输方式的连接


zmq_bind (socket, "tcp://*:5555");
zmq_bind (socket, "tcp://*:9999");
zmq_bind (socket, "inproc://somename");

对于大多数传输方式,你不能两次绑定到同一个端点,这与 UDP 等不同。然而,`zmq_ipc()``inproc`

传输方式允许一个进程绑定到已被第一个进程使用的端点。这是为了让进程在崩溃后能够恢复。尽管 ZeroMQ 试图对哪一端绑定、哪一端连接保持中立,但它们之间存在差异。我们稍后会更详细地了解这些差异。总而言之,你通常应该将“服务器”视为拓扑中静态的部分,绑定到或多或少固定的端点,而将“客户端”视为动态的部分,它们会不断出现和消失并连接到这些端点。然后,围绕这个模型设计你的应用程序。这样,“just work”(正常运行)的机会就会大大增加。

Socket 有类型。Socket 类型定义了 Socket 的语义、消息的入站和出站路由策略、队列等。你可以将某些类型的 Socket 连接在一起,例如,发布者 Socket 和订阅者 Socket。Socket 在“消息模式”中协同工作。我们稍后会更详细地讨论这一点。

正是这种以不同方式连接 Socket 的能力赋予了 ZeroMQ 作为消息队列系统的基本能力。在此之上还有一些层,例如代理,我们稍后会讲到。但本质上,使用 ZeroMQ,你就像拼搭儿童积木一样将各部分连接起来,从而定义你的网络架构。

发送和接收消息 #

要发送和接收消息,你使用 `zmq_msg_send()``zmq_msg_send()` `zmq_msg_recv()``zmq_send()`

和 `zmq_recv()` 方法。这些名称是惯用的,但 ZeroMQ 的 I/O 模型与经典的 TCP 模型足够不同,你需要花时间来理解它。

图 9 - TCP Socket 是一对一的

  • 让我们看看在处理数据时,TCP Socket 和 ZeroMQ Socket 之间的主要区别:

  • ZeroMQ Socket 传输的是消息,类似于 UDP,而不是像 TCP 那样的字节流。ZeroMQ 消息是指定长度的二进制数据。我们很快就会讲到消息;它们的设计经过性能优化,因此有点复杂。

  • ZeroMQ Socket 在后台线程中执行 I/O 操作。这意味着无论你的应用程序正在忙碌什么,消息都会到达本地输入队列并从本地输出队列发送出去。

根据 Socket 类型,ZeroMQ Socket 内置了 1-对-N 的路由行为。 The`zmq_send()` The方法实际上并没有将消息发送到 Socket 连接。它将消息放入队列,以便 I/O 线程可以异步发送。除了某些异常情况外,它不会阻塞。因此,当 `zmq_send()` 返回到你的应用程序时,消息不一定已经发送出去。

单播传输 #

ZeroMQ 提供了一组单播传输(inprocipctcp)和多播传输(epgm, pgm)。多播是一种高级技术,我们稍后会讲到。除非你确定你的扇出比率使得 1 对 N 的单播变得不可能,否则不要轻易使用它。`zmq_inproc()`, `zmq_ipc()`,以及`zmq_tcp()`)和组播传输 (epgm, pgm)。组播是一种高级技术,我们稍后会介绍。除非你知道你的扇出率使得 1 对 N 的单播变得不可能,否则甚至不要开始使用它。

对于大多数常见情况,请使用 tcp,这是一种断开连接的 TCP 传输方式。它具有弹性、可移植性,并且对于大多数情况来说足够快。我们称之为断开连接,因为 ZeroMQ 的`zmq_tcp()``tcp`

传输方式不需要在连接之前端点就存在。客户端和服务器可以随时连接和绑定,可以离开和回来,这一切对应用程序来说都是透明的。进程间传输方式`zmq_ipc()``ipc``zmq_tcp()`是断开连接的,类似于`zmq_ipc()``tcp`

。它有一个限制:它尚不支持 Windows。按照约定,我们使用扩展名为“.ipc”的端点名称,以避免与其他文件名潜在冲突。在 UNIX 系统上,如果你使用`zmq_tcp()``ipc``zmq_ipc()`端点,你需要使用适当的权限创建它们,否则在不同用户 ID 下运行的进程之间可能无法共享。你还必须确保所有进程都能访问文件,例如通过在相同工作目录中运行。线程间传输方式 inproc 是一种连接型信号传输。它比`zmq_tcp()``zmq_msg_send()``zmq_ipc()``tcp`

`ipc`

快得多。与

`tcp`

相比,这种传输方式有一个特定的限制:服务器必须在任何客户端发出连接之前进行绑定。此问题已在 ZeroMQ v4.0 及更高版本中修复。

ZeroMQ 不是中立载体 #

ZeroMQ 的新手经常会问(这也是我曾经问过自己的问题):“如何在 ZeroMQ 中编写一个 XYZ 服务器?”例如,“如何在 ZeroMQ 中编写一个 HTTP 服务器?”言下之意是,如果我们使用普通的 Socket 传输 HTTP 请求和响应,那么我们也应该可以使用 ZeroMQ Socket 来做同样的事情,而且会更快更好。过去的答案是“这不是它的工作方式”。ZeroMQ 不是中立载体:它对所使用的传输协议强加了帧格式。这种帧格式与现有的协议不兼容,因为这些协议倾向于使用自己的帧格式。例如,比较一个 HTTP 请求和一个 ZeroMQ 请求,两者都基于 TCP/IP。图 10 - 网络上的 HTTP

HTTP 请求使用 CR-LF 作为最简单的帧分隔符,而 ZeroMQ 使用长度指定的帧。因此,你可以使用 ZeroMQ 编写一个类似 HTTP 的协议,例如使用请求-响应 Socket 模式。但它不会是真正的 HTTP。

图 11 - 网络上的 ZeroMQ 然而,自 v3.3 版本以来,ZeroMQ 有一个 Socket 选项叫做`ZMQ_ROUTER_RAW`


int io_threads = 4;
void *context = zmq_ctx_new ();
zmq_ctx_set (context, ZMQ_IO_THREADS, io_threads);
assert (zmq_ctx_get (context, ZMQ_IO_THREADS) == io_threads);

,它允许你在不使用 ZeroMQ 帧的情况下读取和写入数据。你可以使用它来读取和写入真正的 HTTP 请求和响应。Hardeep Singh 贡献了这一修改,以便他可以从 ZeroMQ 应用程序连接到 Telnet 服务器。在撰写本文时,这仍然有些实验性,但这表明 ZeroMQ 如何不断发展以解决新问题。也许下一个补丁将由你贡献。

I/O 线程 #

我们说过 ZeroMQ 在后台线程中执行 I/O 操作。对于除了最极端的应用程序之外的所有应用来说,一个 I/O 线程(用于所有 Socket)就足够了。当你创建一个新的上下文时,它会启动一个 I/O 线程。一般的经验法则是,每秒输入或输出每吉字节数据,允许一个 I/O 线程。要增加 I/O 线程的数量,请在创建任何 Socket 之前使用

`zmq_ctx_set()`

调用。

我们已经看到,一个 Socket 可以同时处理几十甚至几千个连接。这对你编写应用程序的方式产生了根本性的影响。传统的网络应用程序通常为每个远程连接分配一个进程或线程,该进程或线程处理一个 Socket。ZeroMQ 允许你将这种整个结构压缩到一个单一进程中,然后根据需要进行拆分以实现扩展。

如果你只使用 ZeroMQ 进行线程间通信(即,不执行外部 Socket I/O 的多线程应用程序),你可以将 I/O 线程数设置为零。但这并不是一个显著的优化,更多是一种奇特用法。

消息模式 #

  • 在 ZeroMQ Socket API 的朴实包装下,隐藏着消息模式的世界。如果你有企业消息传递背景,或熟悉 UDP,这些模式你会感到隐约熟悉。但对于大多数 ZeroMQ 新手来说,它们是一个惊喜。我们太习惯于 TCP 范式,其中一个 Socket 与另一个节点是一对一映射的。

  • 让我们简要回顾一下 ZeroMQ 为你做了什么。它快速有效地将数据块(消息)传递给节点。你可以将节点映射到线程、进程或物理节点。无论实际传输方式是什么(如进程内、进程间、TCP 或多播),ZeroMQ 都为你的应用程序提供单一的 Socket API。它会自动重新连接来来去去的对等方。它根据需要对发送方和接收方进行消息排队。它限制这些队列以防止进程内存耗尽。它处理 Socket 错误。它在后台线程中执行所有 I/O。它使用无锁技术在节点之间进行通信,因此永远不会有锁、等待、信号量或死锁。

  • 但除此之外,它根据称为模式的精确规则来路由和排队消息。正是这些模式提供了 ZeroMQ 的智能。它们封装了我们关于数据和工作分配最佳方式的宝贵经验。ZeroMQ 的模式是硬编码的,但未来的版本可能会允许用户自定义模式。

  • ZeroMQ 模式由类型匹配的 Socket 对实现。换句话说,要理解 ZeroMQ 模式,你需要理解 Socket 类型以及它们如何协同工作。大多数情况下,这只需要学习;在这个层面,很少有显而易见的东西。

内置的核心 ZeroMQ 模式有: `zmq_socket()`请求-响应(Request-reply),它将一组客户端连接到一组服务。这是一种远程过程调用和任务分发模式。

  • 发布-订阅(Pub-sub),它将一组发布者连接到一组订阅者。这是一种数据分发模式。
  • 流水线(Pipeline),它以扇出/扇入模式连接节点,该模式可以有多个步骤和循环。这是一种并行任务分发和收集模式。
  • 独占对(Exclusive pair),它独占地连接两个 Socket。这是一种用于连接同一进程中两个线程的模式,不要与“普通”的 Socket 对混淆。
  • 我们在第 1 章 - 基础中介绍了前三种模式,并在本章稍后会看到独占对模式。关于这些模式,
  • `zmq_socket(3)`
  • 手册页写得相当清楚——值得反复阅读,直到你理解。以下是连接-绑定对有效的 Socket 组合(任何一侧都可以绑定):
  • PUB 和 SUB
  • REQ 和 REP
  • REQ 和 ROUTER(注意,REQ 会插入一个额外的空帧)

DEALER 和 REP(注意,REP 会假定存在一个空帧)

DEALER 和 ROUTER

DEALER 和 DEALER

ROUTER 和 ROUTER

PUSH 和 PULL

PAIR 和 PAIR

根据 Socket 类型,ZeroMQ Socket 内置了 1-对-N 的路由行为。你还会看到 XPUB 和 XSUB Socket 的引用,我们稍后会讲到它们(它们类似于 PUB 和 SUB 的原始版本)。任何其他组合都会产生未文档化且不可靠的结果,未来的 ZeroMQ 版本如果尝试这些组合可能会返回错误。当然,你可以并且会通过代码桥接其他 Socket 类型,即从一种 Socket 类型读取并写入另一种。高级消息模式 # The`zmq_msg_send()` 这四种核心模式内置于 ZeroMQ 中。它们是 ZeroMQ API 的一部分,在核心 C++ 库中实现,并且保证在所有优秀零售店都有售。在此之上,我们添加了高级消息模式。我们在 ZeroMQ 的基础上构建这些高级模式,并使用我们应用程序所用的语言来实现它们。它们不是核心库的一部分,不包含在 ZeroMQ 包中,并作为 ZeroMQ 社区的一部分独立存在。例如,我们在第 4 章 - 可靠请求-响应模式中探讨的 Majordomo 模式,就位于 ZeroMQ 组织下的 GitHub Majordomo 项目中。 这四种核心模式内置于 ZeroMQ 中。它们是 ZeroMQ API 的一部分,在核心 C++ 库中实现,并且保证在所有优秀零售店都有售。本书的目标之一是为你提供一套这样的高级模式,既有小的(如何理智地处理消息)也有大的(如何构建一个可靠的发布-订阅架构)。

`zmq_msg_get()`

`zmq_msg_set()`消息操作`zmq_msg_copy()`

`zmq_msg_send()` libzmq `zmq_msg_more()`要释放(而非销毁)消息,你调用

`zmq_msg_close()`

。这会减少一个引用计数,最终 ZeroMQ 将销毁消息。

要访问消息内容,你使用

`zmq_msg_data()`

。要获取消息包含的数据大小,使用

  • `zmq_msg_size()`
  • 不要使用 zmq_msg_init()zmq_msg_init_data(),除非你仔细阅读了手册页并确切知道为什么需要它们。消息操作当你将消息传递给
  • `zmq_msg_send()`
  • 后,ØMQ 会清除消息,即将大小设为零。你不能两次发送同一个消息,发送后也不能访问消息数据。

如果你使用

  • `zmq_send()`

  • `zmq_recv()`

  • ,这些规则不适用,因为你传递的是字节数组,而不是消息结构体。

  • 如果你想多次发送同一条消息,并且消息很大,创建一个新的消息,使用 `zmq_msg_init_data()``zmq_msg_init()`

初始化,然后使用 初始化消息`zmq_msg_copy()`

创建第一个消息的副本。这不会复制数据,而是复制一个引用。然后你可以发送消息两次(或更多次,如果你创建更多副本)并且消息只会在最后一个副本被发送或关闭后才会被最终销毁。

ZeroMQ 还支持多部分消息,它允许你将帧列表作为一个单一的网络消息发送或接收。这在实际应用程序中被广泛使用,我们将在本章稍后以及第 3 章 - 高级请求-响应模式中介绍这一点。

帧(在 ZeroMQ 参考手册页面中也称为“消息部分”)是 ZeroMQ 消息的基本网络格式。帧是指定长度的数据块。长度可以从零开始。如果你做过 TCP 编程,你会明白为什么帧是回答“现在我应该从这个网络 Socket 读取多少数据?”这一问题的有用方案。

  1. 有一个网络层协议叫做 ZMTP,它定义了 ZeroMQ 如何在 TCP 连接上读取和写入帧。如果你对它的工作原理感兴趣,规范文档很短。
  2. 最初,ZeroMQ 消息就像 UDP 一样只有一个帧。后来我们扩展了这一点,增加了多部分消息,它就是一系列将“更多”位设置为一的帧,后面跟着一个将该位设置为零的帧。ZeroMQ API 允许你写入带有“更多”标志的消息,当你读取消息时,它允许你检查是否还有“更多”部分。
  3. 因此,在低层 ZeroMQ API 和参考手册中,关于消息和帧有一些模糊之处。所以这里有一个有用的词汇表:

一条消息可以包含一个或多个部分。

这些部分也称为“帧”。 每个部分都是一个`zmq_msg_t` 每个部分都是一个对象。

在低层 API 中,你分别发送和接收每个部分。

msreader:Haskell 语言实现的多个 Socket 读取器

示例 msreader 缺少 Haskell 实现:贡献翻译

msreader:Haxe 语言实现的多个 Socket 读取器

示例 msreader 缺少 Haxe 实现:贡献翻译 每个部分都是一个:

C | C++ | CL | Delphi | Erlang | Elixir | Felix | Go | Haskell | Java | Lua | Node.js | Objective-C | Perl | PHP | Python | Ruby | Rust | Scala | Tcl | Ada | Basic | C# | F# | Haxe | Julia | ooc | Q | Racket | OCaml

item 结构包含这四个成员

typedef struct {
    void *socket;       //  ZeroMQ socket to poll on
    int fd;             //  OR, native file handle to poll on
    short events;       //  Events to poll on
    short revents;      //  Events returned after poll
} zmq_pollitem_t;

分段消息 #

ZeroMQ 允许我们将消息由多个帧组成,形成“分段消息”。实际应用中广泛使用分段消息,既可以用于用地址信息封装消息,也可以用于简单的序列化。我们稍后会讨论回复信封。

现在我们将学习如何在任何需要不检查消息内容而转发消息的应用(例如代理)中盲目且安全地读写分段消息。

当你处理分段消息时,每个部分都是一个zmq_msgitem。例如,如果你发送一个包含五个部分的消息,你必须构造、发送并销毁五个zmq_msgitem。你可以提前完成这些操作(并将这些zmq_msgitem 存储在数组或其他结构中),或者在发送时一个接一个地完成。

以下是发送分段消息中帧的方法(我们将每个帧放入一个消息对象中)


zmq_msg_send (&message, socket, ZMQ_SNDMORE);
...
zmq_msg_send (&message, socket, ZMQ_SNDMORE);
...
zmq_msg_send (&message, socket, 0);

以下是接收和处理消息中所有部分(无论是单部分还是多部分)的方法


while (1) {
    zmq_msg_t message;
    zmq_msg_init (&message);
    zmq_msg_recv (&message, socket, 0);
    //  Process the message frame
    ...
    zmq_msg_close (&message);
    if (!zmq_msg_more (&message))
        break;      //  Last message frame
}

关于分段消息的一些须知事项

  • 当你发送分段消息时,只有发送最后一部分时,第一部分(以及所有后续部分)才会真正发送到网络上。
  • 如果你正在使用 每个部分都是一个,当你接收到消息的第一部分时,其余部分也已经到达了。
  • 你将接收到消息的所有部分,或者一个都没有。
  • 消息的每个部分都是一个独立的zmq_msgitem。
  • 无论你是否检查 more 属性,你都将接收到消息的所有部分。
  • 发送时,ZeroMQ 在内存中排队消息帧,直到接收到最后一个,然后一次性发送所有帧。
  • 除了关闭套接字外,无法取消已部分发送的消息。

中介与代理 #

ZeroMQ 旨在实现去中心化智能,但这并不意味着你的网络中间是空的。它充满了消息感知基础设施,而且我们经常使用 ZeroMQ 来构建这些基础设施。ZeroMQ 的管道可以从微小的管道到成熟的面向服务的经纪人。消息传递行业称之为中介,意味着中间的部分处理两端。在 ZeroMQ 中,根据上下文,我们称之为代理、队列、转发器、设备或经纪人。

这种模式在现实世界中极为普遍,这也是为什么我们的社会和经济中充满了中介,他们的主要功能就是降低大型网络的复杂性和扩展成本。现实世界中的中介通常被称为批发商、分销商、经理等等。

动态发现问题 #

在设计大型分布式架构时遇到的问题之一是发现。也就是说,各个部分如何相互知晓?如果各个部分动态地出现和消失,这个问题就尤为困难,因此我们称之为“动态发现问题”。

动态发现有几种解决方案。最简单的方法是通过硬编码(或配置)网络架构来完全避免它,从而手动完成发现。也就是说,当你添加一个新部分时,你需要重新配置网络以使其知晓。

图 12 - 小型发布/订阅网络

在实践中,这会导致架构变得越来越脆弱和笨重。假设你有一个发布者和一百个订阅者。你通过在每个订阅者中配置发布者端点来将每个订阅者连接到发布者。这很容易。订阅者是动态的;发布者是静态的。现在假设你添加了更多的发布者。突然之间,事情就不那么容易了。如果你继续将每个订阅者连接到每个发布者,避免动态发现的成本就会越来越高。

图 13 - 带代理的发布/订阅网络

这个问题有很多解决方案,但最简单的答案是添加一个中介;也就是说,在网络中设置一个所有其他节点都连接到的静态点。在经典消息传递中,这是消息经纪人的工作。ZeroMQ 本身不带消息经纪人,但它允许我们很容易地构建中介。

你可能会想,如果所有网络最终都变得足够大而需要中介,为什么我们不直接为所有应用设置一个消息经纪人呢?对于初学者来说,这是一个合理的折衷方案。只要始终使用星形拓扑,忽略性能,通常就能正常工作。然而,消息经纪人是很贪婪的东西;作为中心中介,它们变得过于复杂、状态过多,最终成为一个问题。

最好将中介视为简单的无状态消息开关。一个好的类比是 HTTP 代理;它在那里,但没有任何特殊作用。添加一个发布/订阅代理解决了我们示例中的动态发现问题。我们将代理设置在网络的“中间”。代理打开一个 XSUB 套接字和一个 XPUB 套接字,并将它们分别绑定到众所周知的 IP 地址和端口。然后,所有其他进程都连接到代理,而不是相互连接。添加更多订阅者或发布者就变得微不足道了。

图 14 - 扩展的发布/订阅

我们需要 XPUB 和 XSUB 套接字,因为 ZeroMQ 会将订阅从订阅者转发到发布者。XSUB 和 XPUB 与 SUB 和 PUB 完全相同,只是它们将订阅作为特殊消息暴露出来。代理必须将这些订阅消息从订阅者端转发到发布者端,方法是从 XPUB 套接字读取并将它们写入 XSUB 套接字。这是 XSUB 和 XPUB 的主要用例。

共享队列 (DEALER 和 ROUTER 套接字) #

在 Hello World 客户端/服务器应用中,我们有一个客户端与一个服务通信。然而,在实际案例中,我们通常需要允许多个服务以及多个客户端。这使得我们可以扩展服务的能力(许多线程或进程或节点而不仅仅是一个)。唯一的限制是服务必须是无状态的,所有状态都包含在请求中或在某些共享存储中,例如数据库。

图 15 - 请求分发

有两种方法将多个客户端连接到多个服务器。蛮力方法是将每个客户端套接字连接到多个服务端点。一个客户端套接字可以连接到多个服务套接字,然后 REQ 套接字将在这些服务之间分发请求。假设你将一个客户端套接字连接到三个服务端点:A、B 和 C。客户端发出请求 R1、R2、R3、R4。R1 和 R4 发送到服务 A,R2 发送到 B,R3 发送到服务 C。

这种设计允许你廉价地添加更多客户端。你也可以添加更多服务。每个客户端会将其请求分发到这些服务。但是每个客户端都必须知道服务拓扑。如果你有 100 个客户端,然后决定再添加三个服务,你需要重新配置并重启 100 个客户端,以便客户端了解这三个新服务。

这显然不是我们在凌晨 3 点,当我们的超级计算集群资源耗尽,急需添加数百个新服务节点时想做的事情。太多静态部分就像液态混凝土:知识是分散的,静态部分越多,改变拓扑所需的努力就越大。我们想要的是客户端和服务之间有一个集中了所有拓扑知识的东西。理想情况下,我们应该能够在任何时候添加和移除服务或客户端,而不影响拓扑的任何其他部分。

因此,我们将编写一个小的消息队列经纪人,为我们提供这种灵活性。该经纪人绑定到两个端点,一个面向客户端的前端和一个面向服务的后端。然后它使用 每个部分都是一个来监视这两个套接字的活动,当有活动时,它就会在两个套接字之间传递消息。它实际上并不显式管理任何队列——ZeroMQ 在每个套接字上自动完成。

当你使用 REQ 与 REP 通信时,你会得到一个严格同步的请求-回复对话。客户端发送请求。服务读取请求并发送回复。客户端然后读取回复。如果客户端或服务尝试做任何其他事情(例如,连续发送两个请求而不等待响应),它们将收到错误。

但我们的经纪人必须是非阻塞的。显然,我们可以使用 每个部分都是一个来等待任一套接字上的活动,但我们不能使用 REP 和 REQ。

图 16 - 扩展的请求-回复

幸运的是,有两种套接字叫做 DEALER 和 ROUTER,它们允许你进行非阻塞的请求-响应。你将在第 3 章 - 高级请求-回复模式中看到 DEALER 和 ROUTER 套接字如何让你构建各种异步请求-回复流程。现在,我们只看看 DEALER 和 ROUTER 如何让我们通过一个中介(也就是我们的小经纪人)扩展 REQ-REP。

在这个简单的扩展请求-回复模式中,REQ 与 ROUTER 通信,DEALER 与 REP 通信。在 DEALER 和 ROUTER 之间,我们必须有代码(像我们的经纪人)将消息从一个套接字取出并推送到另一个套接字。

请求-回复经纪人绑定到两个端点,一个供客户端连接(前端套接字),一个供工作节点连接(后端套接字)。为了测试这个经纪人,你需要修改你的工作节点,使其连接到后端套接字。以下是展示我意思的客户端代码

C | C++ | CL | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Node.js | Perl | PHP | Python | Racket | Ruby | Rust | Scala | Tcl | OCaml | Ada | Basic | C# | F# | Felix | Julia | Objective-C | ooc | Q

以下是工作节点代码

C | C++ | CL | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Node.js | Perl | PHP | Python | Racket | Ruby | Rust | Scala | Tcl | OCaml | Ada | Basic | C# | F# | Felix | Julia | Objective-C | ooc | Q

以下是经纪人代码,它能正确处理分段消息

C | C++ | CL | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Node.js | Perl | PHP | Python | Ruby | Rust | Scala | Tcl | OCaml | Ada | Basic | C# | F# | Felix | Julia | Objective-C | ooc | Q | Racket
图 17 - 请求-回复经纪人

使用请求-回复经纪人可以使你的客户端/服务器架构更容易扩展,因为客户端看不到工作节点,工作节点也看不到客户端。唯一的静态节点是中间的经纪人。

ZeroMQ 的内置代理函数 #

结果发现,上一节中的rrbroker的核心循环非常有用且可重用。它使我们能够轻松构建发布-订阅转发器、共享队列以及其他小型中介。ZeroMQ 将此封装在一个方法中,即 zmq_proxy():


zmq_proxy (frontend, backend, capture);

这两个(或三个套接字,如果我们想捕获数据)必须正确连接、绑定和配置。当我们调用zmq_proxy方法时,就相当于启动了rrbroker的主循环。让我们重写请求-回复经纪人代码来调用zmq_proxy,并将其重新命名为一个听起来很贵的“消息队列”(有人为比这功能少得多的代码要过房子)。

C | C++ | CL | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Node.js | Perl | PHP | Python | Q | Ruby | Rust | Tcl | Ada | Basic | C# | F# | Felix | Julia | Objective-C | ooc | Racket | Scala | OCaml

如果你像大多数 ZeroMQ 用户一样,在这个阶段你的脑海里可能开始思考:“如果我把随机的套接字类型插到代理中会发生什么?” 简短的回答是:试试看,然后弄清楚发生了什么。实际上,你通常会坚持使用 ROUTER/DEALER、XSUB/XPUB 或 PULL/PUSH。

传输桥接 #

ZeroMQ 用户经常问的一个问题是:“我如何将我的 ZeroMQ 网络与技术 X 连接起来?” 其中 X 是某种其他网络或消息传递技术。

图 18 - 发布/订阅转发代理

简单的答案是构建一个桥接器。桥接器是一个小型应用,在一个套接字上使用一种协议,并在另一个套接字上转换为/从第二种协议。你可以称之为协议解释器。ZeroMQ 中常见的桥接问题是连接两种传输或网络。

举个例子,我们将编写一个小型代理,它位于发布者和一组订阅者之间,连接两个网络。前端套接字(SUB)面向内部网络,天气服务器位于其中,后端套接字(PUB)面向外部网络上的订阅者。它在前端套接字上订阅天气服务,并在后端套接字上重新发布其数据。

C | C++ | CL | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Node.js | Perl | PHP | Python | Ruby | Rust | Scala | Tcl | Ada | Basic | C# | F# | Felix | Julia | Objective-C | ooc | Q | Racket | OCaml

它看起来与早期的代理示例非常相似,但关键部分在于前端和后端套接字位于两个不同的网络上。我们可以使用这种模型,例如连接一个多播网络 (`zmq_pgm()`传输)到一个`zmq_tcp()`发布者。

错误处理和 ETERM #

ZeroMQ 的错误处理哲学是“快速失败”(fail-fast) 与“弹性”(resilience) 的结合。我们认为,进程应对内部错误尽可能脆弱,而对外部攻击和错误尽可能健壮。打个比方,活细胞如果检测到单个内部错误就会自毁,但它会尽一切可能抵御外部攻击。

断言(Assertions)遍布 ZeroMQ 代码中,对健壮的代码至关重要;它们只需位于“细胞壁”的正确一侧。而且应该存在这样一道墙。如果无法确定故障是内部的还是外部的,那就是需要修复的设计缺陷。在 C/C++ 中,断言会立即终止应用程序并报错。在其他语言中,你可能会得到异常或中止。

当 ZeroMQ 检测到外部故障时,会向调用代码返回一个错误。在少数罕见情况下,如果没有明显的错误恢复策略,它会默默地丢弃消息。

到目前为止,我们在大多数 C 示例中都没有看到错误处理。实际代码应该对每个 ZeroMQ 调用都进行错误处理。如果你使用的语言绑定不是 C,绑定可能会为你处理错误。在 C 中,你确实需要自己处理。有一些简单的规则,从 POSIX 约定开始

  • 创建对象的方法如果失败则返回 NULL。
  • 处理数据的方法可能返回处理的字节数,或在错误或失败时返回 -1。
  • 其他方法在成功时返回 0,在错误或失败时返回 -1。
  • 错误码由errno`ipc` 或通过 zmq_errno() 提供。.
  • 用于日志记录的描述性错误文本由 zmq_strerror().

提供。例如


void *context = zmq_ctx_new ();
assert (context);
void *socket = zmq_socket (context, ZMQ_REP);
assert (socket);
int rc = zmq_bind (socket, "tcp://*:5555");
if (rc == -1) {
    printf ("E: bind failed: %s\n", strerror (errno));
    return -1;
}

有两个主要异常情况应作为非致命错误处理

  • 当你的代码使用ZMQ_DONTWAIT选项接收消息,且没有待处理数据时,ZeroMQ 会返回 -1 并将错误码errno设置为EAGAIN.

  • 当一个线程调用 zmq_ctx_destroy()时,如果其他线程仍在执行阻塞操作,则该 zmq_ctx_destroy()调用会关闭上下文,所有阻塞调用会以 -1 退出,并且错误码errno设置为ETERM.

在 C/C++ 中,在优化代码中可以完全移除断言,所以不要犯将整个 ZeroMQ 调用包裹在assert()中。这看起来很整洁;然后优化器会移除所有的断言和你希望进行的调用,你的应用程序就会以令人印象深刻的方式崩溃。

图 19 - 带有终止信号的并行管线

让我们看看如何干净地关闭一个进程。我们将以上一节的并行管线示例为例。如果我们已经在后台启动了大量工作进程,那么在批处理完成后,我们现在希望终止它们。我们可以通过向工作进程发送一个终止消息来实现。最好的发送位置是接收端 (sink),因为它确实知道批处理何时完成。

如何将接收端连接到工作进程?PUSH/PULL 套接字是单向的。我们可以切换到另一种套接字类型,或者混合使用多个套接字流。让我们尝试后一种方法:使用发布-订阅 (pub-sub) 模型向工作进程发送终止消息

  • 接收端在新的端点上创建一个 PUB 套接字。
  • 工作进程将其输入套接字连接到此端点。
  • 当接收端检测到批处理结束时,它会向其 PUB 套接字发送一个终止消息。
  • 当工作进程检测到此终止消息时,它会退出。

接收端中不需要太多新代码


void *controller = zmq_socket (context, ZMQ_PUB);
zmq_bind (controller, "tcp://*:5559");
...
//  Send kill signal to workers
s_send (controller, "KILL");

这是工作进程,它管理两个套接字(一个接收任务的 PULL 套接字,一个接收控制命令的 SUB 套接字),使用我们之前看到的 每个部分都是一个技术

C | C++ | CL | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Node.js | Objective-C | Perl | PHP | Python | Ruby | Rust | Scala | Tcl | OCaml | Ada | Basic | C# | F# | Felix | Julia | ooc | Q | Racket

这是修改后的接收端应用程序。当它完成结果收集后,它会向所有工作进程广播一个终止消息

C | C++ | CL | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Node.js | Objective-C | Perl | PHP | Python | Ruby | Scala | Tcl | OCaml | Ada | Basic | C# | F# | Felix | Julia | ooc | Q | Racket | Rust

处理中断信号 #

实际应用程序需要在通过 Ctrl-C 或其他信号(例如SIGTERM)中断时干净地关闭。默认情况下,这些信号只会杀死进程,这意味着消息不会被刷新,文件不会干净地关闭等等。

以下是我们如何在各种语言中处理信号的示例

C | C++ | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Node.js | Perl | PHP | Python | Ruby | Rust | Scala | Ada | Basic | C# | CL | F# | Felix | Julia | Objective-C | ooc | Q | Racket | Tcl | OCaml

该程序提供了s_catch_signals(),它会捕获 Ctrl-C (SIGINT) 和SIGTERMSIGTERM。当任一信号到达时,该s_catch_signals()处理程序会设置全局变量s_interrupted。有了你的信号处理程序,你的应用程序就不会自动终止。相反,你有机会进行清理并优雅地退出。你现在必须显式检查中断并正确处理它。通过在你的主代码的开头调用它来设置信号处理:s_catch_signals()(从interrupt.c) 拷贝这个函数。中断会按如下方式影响 ZeroMQ 调用

  • 如果你的代码在阻塞调用(发送消息、接收消息或轮询)中阻塞,那么当信号到达时,该调用将返回EINTR.
  • s_recv()这样的封装函数如果被中断会返回 NULL。

因此,请检查EINTREINTR 返回码,NULL 返回值,以及/或全局变量 s_interrupted。s_interrupted.

这是一个典型的代码片段

s_catch_signals ();
client = zmq_socket (...);
while (!s_interrupted) {
    char *message = s_recv (client);
    if (!message)
        break;          //  Ctrl-C used
}
zmq_close (client);

如果你调用了s_catch_signals()s_catch_signals() 并且不检查中断情况,那么你的应用程序将对 Ctrl-C 和SIGTERMSIGTERM 免疫,这可能有用,但通常不是你想要的。

检测内存泄漏 #

任何长时间运行的应用程序都必须正确管理内存,否则最终会耗尽所有可用内存并崩溃。如果你使用的语言可以自动处理内存管理,恭喜你。如果你使用 C 或 C++ 或任何其他需要自行负责内存管理的语言编程,这里有一个使用 valgrind 的简短教程,valgrind 除了其他功能外,还会报告你的程序存在的任何内存泄漏。

  • 要安装 valgrind,例如在 Ubuntu 或 Debian 上,请执行以下命令
sudo apt-get install valgrind
  • 默认情况下,ZeroMQ 会导致 valgrind 产生大量警告。为了消除这些警告,创建一个名为vg.supp的文件,其中包含以下内容
{
   <socketcall_sendto>
   Memcheck:Param
   socketcall.sendto(msg)
   fun:send
   ...
}
{
   <socketcall_sendto>
   Memcheck:Param
   socketcall.send(msg)
   fun:send
   ...
}
  • 修改你的应用程序,使其在接收到 Ctrl-C 后干净地退出。对于会自行退出的应用程序来说,这不需要,但对于长时间运行的应用程序来说,这是必不可少的,否则 valgrind 会报告所有当前分配的内存。

  • 使用-DDEBUG编译你的应用程序,如果它不是你的默认设置。这能确保 valgrind 可以准确地告诉你内存泄漏发生在哪里。

  • 最后,像这样运行 valgrind

valgrind --tool=memcheck --leak-check=full --suppressions=vg.supp someprog

在修复了它报告的所有错误后,你应该看到令人愉快的消息

==30536== ERROR SUMMARY: 0 errors from 0 contexts...

使用 ZeroMQ 进行多线程编程 #

ZeroMQ 也许是编写多线程 (MT) 应用程序有史以来最好的方式。虽然如果你习惯了传统的套接字,ZeroMQ 套接字需要一些调整,但 ZeroMQ 的多线程编程将把你所知的关于编写 MT 应用程序的一切知识,扔进花园里的堆肥,倒上汽油,然后点燃。很少有书值得焚烧,但大多数关于并发编程的书都值得。

为了编写绝对完美的多线程程序(我是字面意思),我们不需要互斥锁 (mutexes)、锁 (locks) 或任何其他形式的线程间通信,只需通过 ZeroMQ 套接字发送消息即可。

我所说的“完美多线程程序”,是指易于编写和理解的代码,它在任何编程语言和任何操作系统上都采用相同的设计方法,并且可以在任意数量的 CPU 上扩展,具有零等待状态且没有收益递减点。

如果你花了数年时间学习各种技巧,使用锁、信号量和临界区 (critical sections) 来让你的多线程代码工作起来,更别提快速运行了,那么当你意识到这一切都是徒劳时,你会感到恶心。如果说我们从 30 多年并发编程中学到了什么教训,那就是:不要共享状态。这就像两个醉汉试图共享一杯啤酒一样。他们是否是好朋友并不重要。迟早他们会打起来。你加到桌上的醉汉越多,他们为争啤酒而打得越凶。绝大多数悲惨的多线程应用程序看起来都像醉汉打架。

当你编写经典的共享状态多线程代码时,需要应对的各种奇怪问题列表如果不是直接转化为压力和风险的话,会很有趣,因为看起来能工作的代码会在压力下突然失败。一家在有缺陷代码方面拥有世界顶尖经验的大公司发布了其“多线程代码中 11 个可能出现的问题”列表,其中包括遗忘同步、不正确的粒度、读写撕裂 (read and write tearing)、无锁重排 (lock-free reordering)、锁队列 (lock convoys)、两步舞 (two-step dance) 和优先级反转 (priority inversion)。

是的,我们数了七个问题,不是十一个。但这并不是重点。重点是,你真的希望运行电网或股票市场的代码在一个繁忙的周四下午三点开始出现两步锁队列 (two-step lock convoys) 吗?谁在意这些术语到底是什么意思?这不是让我们爱上编程的原因,我们不是为了用越来越复杂的技巧来对抗越来越复杂的副作用。

一些被广泛使用的模型,尽管是整个行业的基础,但却存在根本性缺陷,共享状态并发就是其中之一。想要无限扩展的代码会像互联网一样,通过发送消息并且除了对有缺陷的编程模型共同抱有蔑视之外,什么都不共享。

你应该遵循一些规则来编写使用 ZeroMQ 的愉快的(即无故障的)多线程代码

  • 将数据私有地隔离在各自的线程内,绝不要在多个线程中共享数据。唯一的例外是 ZeroMQ 上下文 (contexts),它们是线程安全的。

  • 远离经典的并发机制,如互斥锁 (mutexes)、临界区 (critical sections)、信号量 (semaphores) 等。这些在 ZeroMQ 应用程序中是一种反模式 (anti-pattern)。

  • 在进程启动时创建一个 ZeroMQ 上下文,并将其传递给你希望通过`zmq_inproc()`套接字连接的所有线程。

  • 使用 附加线程 在应用程序内部创建结构,并使用通过 inproc 传输的 PAIR 套接字将它们连接到其父线程。模式是:父线程绑定套接字,然后创建子线程连接其套接字。`zmq_inproc()`

  • 使用 分离线程 来模拟独立的任务,它们有自己的上下文。通过`zmq_tcp()`连接它们。稍后,你可以在不显著修改代码的情况下将它们移到独立的进程中。

  • 所有线程间的交互都通过 ZeroMQ 消息发生,你可以或多或少正式地定义这些消息。

  • 不要在线程之间共享 ZeroMQ 套接字。ZeroMQ 套接字不是线程安全的。技术上可以将套接字从一个线程迁移到另一个线程,但这需要技巧。唯一勉强说得通的在线程间共享套接字的地方是在需要对套接字进行垃圾收集等“魔术”操作的语言绑定中。

例如,如果你需要在应用程序中启动多个代理,你会希望每个代理都在自己的线程中运行。很容易犯的错误是在一个线程中创建代理的前端和后端套接字,然后将这些套接字传递给另一个线程中的代理。这最初可能看起来能工作,但在实际使用中会随机失败。记住:不要在创建套接字的线程之外使用或关闭套接字。

如果你遵循这些规则,你可以相当容易地构建优雅的多线程应用程序,并随后根据需要将线程拆分为独立的进程。应用程序逻辑可以位于线程、进程或节点中:取决于你的规模需求。

ZeroMQ使用原生操作系统线程而不是虚拟的“绿色”线程。其优势在于你不需要学习任何新的线程API,并且ZeroMQ线程可以干净地映射到你的操作系统。你可以使用标准工具(例如英特尔的ThreadChecker)来查看你的应用程序正在做什么。缺点是原生线程API并非总是可移植的,并且如果你有大量线程(数千个),一些操作系统会面临压力。

让我们看看这在实践中是如何工作的。我们将把旧的Hello World服务器变成功能更强大的东西。原始服务器运行在单个线程中。如果每个请求的工作量较低,那没问题:一个ØMQ线程可以在一个CPU核心上全速运行,没有等待,完成大量工作。但真实的服务器需要对每个请求做非平凡的工作。当10,000个客户端同时请求服务器时,单个核心可能不够。因此,一个真实的服务器将启动多个工作线程。然后它会尽快接受请求,并将这些请求分发给其工作线程。工作线程处理工作,最终发送回复。

当然,你可以使用代理broker和外部工作进程来完成所有这些工作,但启动一个占用十六个核心的进程通常比启动十六个各占用一个核心的进程更容易。此外,将工作者作为线程运行将减少一次网络跳跃、延迟和网络流量。

Hello World服务的MT版本基本上将broker和工作者合并到一个进程中。

C | C++ | CL | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Perl | PHP | Python | Q | Ruby | Rust | Scala | Ada | Basic | C# | F# | Felix | Julia | Node.js | Objective-C | ooc | Racket | Tcl | OCaml
图 20 - 多线程服务器

现在你应该能认出所有代码了。工作原理如下:

  • 服务器启动一组工作线程。每个工作线程创建一个REP套接字,然后在此套接字上处理请求。工作线程就像单线程服务器一样。唯一的区别在于传输方式(`zmq_inproc()`inproc`zmq_tcp()`而不是

  • tcp`zmq_tcp()`).

  • ),以及bind-connect的方向。服务器创建一个ROUTER套接字与客户端通信,并将此套接字绑定到其外部接口(通过`zmq_inproc()`).

  • tcp

)。服务器创建一个DEALER套接字与工作者通信,并将此套接字绑定到其内部接口(通过inproc)。服务器启动一个连接这两个套接字的代理。代理公平地从所有客户端拉取传入请求,并将这些请求分发给工作者。同时也将回复路由回其来源。请注意,在大多数编程语言中,创建线程是不可移植的。POSIX库是pthreads,但在Windows上你必须使用不同的API。在我们的示例中,pthread_create

调用会启动一个运行worker_routine.

函数的新线程。我们将在第 3 章 - 高级请求-回复模式中看到如何将其封装在一个可移植的API中。

这里的“工作”只是暂停一秒钟。我们可以在工作者中做任何事情,包括与其他节点通信。这是MT服务器在ØMQ套接字和节点方面的样子。注意请求-回复链是如何的:

REQ-ROUTER-queue-DEALER-REP`zmq_inproc()`线程间信号(PAIR套接字)#

C | C++ | CL | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Perl | PHP | Python | Q | Ruby | Rust | Scala | Ada | Basic | C# | F# | Felix | Julia | Node.js | Objective-C | ooc | Racket | Tcl | OCaml
mtrelay:Tcl中的多线程接力

Tcl中缺少示例 mtrelay贡献翻译

  1. mtrelay:OCaml中的多线程接力`zmq_inproc()`OCaml中缺少示例 mtrelay贡献翻译
  2. 图 21 - 接力赛这是使用ZeroMQ进行多线程编程的经典模式:两个线程通过
  3. inproc这是使用ZeroMQ进行多线程编程的经典模式:进行通信,使用共享上下文。

父线程创建一个套接字,将其绑定到一个`zmq_inproc()`inproc:@<*>@`zmq_ipc()``ipc``zmq_tcp()`端点,*然后*启动子线程,并将上下文传递给它。

子线程创建第二个套接字,连接到该

  • inproc

  • 端点,*然后*向父线程发送信号表示已准备就绪。

  • 请注意,使用此模式的多线程代码无法扩展到进程。如果你使用

inproc

和套接字对,你正在构建一个紧密绑定的应用程序,也就是说,你的线程在结构上相互依赖。当低延迟确实至关重要时才这样做。另一种设计模式是松散绑定的应用程序,其中线程有自己的上下文,并通过

tcp

进行通信。你可以轻松地将松散绑定的线程拆分为独立进程。

这是我们第一次展示使用PAIR套接字的示例。为什么使用PAIR?其他套接字组合似乎也能工作,但它们都有可能干扰信号传递的副作用:

你可以使用PUSH作为发送方,PULL作为接收方。这看起来很简单并且会工作,但请记住,PUSH会将消息分发给所有可用的接收方。如果你不小心启动了两个接收方(例如,你已经有一个正在运行,然后启动第二个),你就会“丢失”一半的信号。PAIR的优点在于拒绝多个连接;这对连接是独占的

  • 你可以使用DEALER作为发送方,ROUTER作为接收方。然而,ROUTER会将你的消息包装在一个“信封”中,这意味着你零大小的信号会变成一个多部分消息。如果你不在乎数据,并将任何内容视为有效信号,并且如果你不从套接字多次读取,那没关系。但是,如果你决定发送真实数据,你会突然发现ROUTER给你提供了“错误”的消息。DEALER也会分发传出消息,带来与PUSH相同的风险。

  • 你可以使用PUB作为发送方,SUB作为接收方。这会正确地按你发送的方式传递你的消息,并且PUB不像PUSH或DEALER那样进行分发。然而,你需要为订阅者配置一个空订阅,这很麻烦。

  • 基于这些原因,PAIR是线程对之间进行协调的最佳选择。

节点协调#

Q中缺少示例 syncpub贡献翻译

syncpub:Racket中的同步发布者

Q中缺少示例 syncpub贡献翻译

syncsub:Q中的同步订阅者

echo "Starting subscribers..."
for ((a=0; a<10; a++)); do
    syncsub &
done
echo "Starting publisher..."
syncpub

Q中缺少示例 syncsub贡献翻译

Starting subscribers...
Starting publisher...
Received 1000000 updates
Received 1000000 updates
...
Received 1000000 updates
Received 1000000 updates

syncsub:Racket中的同步订阅者`zmq_inproc()`Racket中缺少示例 syncsub贡献翻译

syncsub:Ruby中的同步订阅者

  • syncsub:Rust中的同步订阅者
  • syncsub:Scala中的同步订阅者
  • syncsub:Tcl中的同步订阅者

syncsub:OCaml中的同步订阅者

OCaml中缺少示例 syncsub贡献翻译

这个Bash shell脚本将启动十个订阅者,然后启动发布者:

Which gives us this satisfying output 初始化消息这会给我们带来令人满意的输出:我们不能假设REQ/REP对话完成时SUB连接也已经完成。如果你使用的不是inproc `zmq_msg_send()`传输方式,无法保证出站连接会按任何顺序完成。因此,此示例在订阅和发送REQ/REP同步消息之间强制暂停了一秒。一个更健壮的模型可能是:发布者打开PUB套接字并开始发送“Hello”消息(非数据)。


void my_free (void *data, void *hint) {
    free (data);
}
//  Send message from buffer, which we allocate and ZeroMQ will free for us
zmq_msg_t message;
zmq_msg_init_data (&message, buffer, 1000, my_free, NULL);
zmq_msg_send (&message, socket, 0);

订阅者连接SUB套接字,当收到Hello消息时,通过一对REQ/REP套接字告知发布者已准备就绪。 `zmq_msg_init_data()`当发布者收到所有必要的确认后,便开始发送真实数据。你还会看到 XPUB 和 XSUB Socket 的引用,我们稍后会讲到它们(它们类似于 PUB 和 SUB 的原始版本)。任何其他组合都会产生未文档化且不可靠的结果,未来的 ZeroMQ 版本如果尝试这些组合可能会返回错误。当然,你可以并且会通过代码桥接其他 Socket 类型,即从一种 Socket 类型读取并写入另一种。零拷贝#

ZeroMQ的消息API允许你直接从应用程序缓冲区发送和接收消息,而无需复制数据。我们称之为零拷贝,在某些应用程序中可以提高性能。

你应该考虑在发送大量内存块(数千字节)、高频率发送的特定情况下使用零拷贝。对于短消息或较低的消息速率,使用零拷贝会使你的代码更混乱、更复杂,而没有明显的收益。像所有优化一样,只在你确定它有帮助时才使用,并且在使用前后进行测量

要实现零拷贝,你需要使用

zmq_msg_init_data

来创建一个引用已用

malloc()

或其他分配器分配的数据块的消息,然后将该消息传递给

C | C++ | CL | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Node.js | Perl | PHP | Python | Ruby | Rust | Scala | Tcl | Ada | Basic | C# | F# | Felix | Julia | Objective-C | ooc | Q | Racket | OCaml

psenvpub:Objective-C中的Pub-Sub信封发布者

C | C++ | CL | Delphi | Erlang | Elixir | Go | Haskell | Haxe | Java | Lua | Node.js | Perl | PHP | Python | Ruby | Rust | Scala | Tcl | Ada | Basic | C# | F# | Felix | Julia | Objective-C | ooc | Q | Racket | OCaml

Node.js中缺少示例 psenvsub贡献翻译

[B] We would like to see this
[B] We would like to see this
[B] We would like to see this
...

psenvsub:Objective-C中的Pub-Sub信封订阅者

Objective-C中缺少示例 psenvsub贡献翻译

psenvsub:ooc中的Pub-Sub信封订阅者

ooc中缺少示例 psenvsub贡献翻译

psenvsub:Perl中的Pub-Sub信封订阅者

psenvsub:PHP中的Pub-Sub信封订阅者

psenvsub:Python中的Pub-Sub信封订阅者

psenvsub:Q中的Pub-Sub信封订阅者

Q中缺少示例 psenvsub贡献翻译

psenvsub:Racket中的Pub-Sub信封订阅者

Racket中缺少示例 psenvsub贡献翻译`zmq_inproc()`psenvsub:Ruby中的Pub-Sub信封订阅者

最后,高水位标记(HWMs)并非精确;虽然默认情况下你最多可获得1,000条消息,但实际缓冲区大小可能要低得多(低至一半),这是由于其方式导致的你还会看到 XPUB 和 XSUB Socket 的引用,我们稍后会讲到它们(它们类似于 PUB 和 SUB 的原始版本)。任何其他组合都会产生未文档化且不可靠的结果,未来的 ZeroMQ 版本如果尝试这些组合可能会返回错误。当然,你可以并且会通过代码桥接其他 Socket 类型,即从一种 Socket 类型读取并写入另一种。实现其队列。

消息丢失问题排查 #

在使用 ZeroMQ 构建应用程序时,你会不止一次遇到这个问题:丢失你预期会收到的消息。我们整理了一张图表,逐步说明了导致此问题的最常见原因。

图 25 - 消息丢失问题排查

图表总结如下:

  • 在 SUB socket 上,设置订阅 `zmq_setsockopt()`使用ZMQ_SUBSCRIBE,否则你将收不到消息。由于你是按前缀订阅消息,如果你订阅 ""(空订阅),你将收到所有消息。

  • 如果你*在* PUB socket 开始发送数据*之后*才启动 SUB socket(即与 PUB socket 建立连接),你将会丢失连接建立之前已发布的所有消息。如果这是一个问题,请调整你的架构,确保 SUB socket 先启动,然后 PUB socket 再开始发布。

  • 即使你同步启动 SUB 和 PUB socket,你仍可能丢失消息。这是因为内部队列直到连接真正建立后才会创建。如果你能切换绑定/连接方向,让 SUB socket 进行绑定,而 PUB socket 进行连接,你可能会发现它更符合你的预期。

  • 如果你使用 REP 和 REQ socket,并且不遵守同步的 send/recv/send/recv 顺序,ZeroMQ 将报告错误,而你可能会忽略这些错误。这样一来,就会看起来像是你丢失了消息。如果你使用 REQ 或 REP,请严格遵守 send/recv 顺序,并且在实际代码中始终检查 ZeroMQ 调用是否出错。

  • 如果你使用 PUSH socket,你会发现第一个连接上的 PULL socket 会获取到不公平比例的消息。消息的准确轮换分配只会在所有 PULL socket 都成功连接后才会发生,这可能需要几毫秒时间。作为 PUSH/PULL 的替代方案,对于较低的数据速率,可以考虑使用 ROUTER/DEALER 和负载均衡模式。

  • 如果你在线程间共享 socket,请勿这样做。这会导致随机的异常行为和崩溃。

  • 如果你正在使用`zmq_inproc()`,请确保两个 socket 都在同一个上下文(context)中。否则连接方实际上会失败。另外,先绑定,然后连接。`zmq_inproc()`不像以下传输那样是非连接型的:`zmq_tcp()`.

  • (此处应为传输类型)如果你使用 ROUTER socket,很容易因意外丢失消息,例如发送格式错误的身份帧(或忘记发送身份帧)。通常情况下,设置ZMQ_ROUTER_MANDATORY

  • 选项是一个好主意,但也请务必检查每次 send 调用的返回值。