1. 基础

第 1 章 - 基础 #

修复世界 #

如何解释 ZeroMQ?我们中的一些人会先说它所做的所有美妙的事情。它是打了兴奋剂的套接字。它就像带路由的邮箱。它速度飞快!其他人试图分享他们的顿悟时刻,那种突然领悟、豁然开朗的范式转变时刻。事情变得更简单了。复杂性消失了。它能打开你的思维。还有人试图通过比较来解释。它更小巧、更简单,但看起来仍然很熟悉。就我个人而言,我喜欢回顾我们为何要创建 ZeroMQ,因为这很可能就是你,读者,今天所处的位置。

编程是伪装成艺术的科学,因为我们大多数人不理解软件的物理学,即使教也很少教。软件的物理学不是算法、数据结构、语言和抽象。这些只是我们制造、使用、丢弃的工具。软件真正的物理学是人的物理学——具体来说,是我们在复杂性方面的局限性,以及我们渴望通过合作来分块解决大型问题。这就是编程的科学:制造人们可以理解和轻松使用的构建块,这样人们就会一起努力解决那些最大的问题。

我们生活在一个互联世界,现代软件必须在这个世界中航行。所以未来大型解决方案的构建块是相互连接且大规模并行的。代码不再仅仅是“强壮而沉默”的。代码必须与代码对话。代码必须是健谈的、善于交际的、连接良好的。代码必须像人类大脑一样运行,数万亿个独立的神经元相互发送消息,这是一个没有中心控制、没有单点故障的大规模并行网络,却能解决极其困难的问题。未来的代码看起来像人类大脑绝非偶然,因为每个网络的终端,在某种程度上,都是人类大脑。

如果你做过任何线程、协议或网络相关的工作,你会意识到这几乎是不可能的。这只是一个梦想。当你开始处理现实生活中的情况时,即使是连接几台程序通过几个套接字也是相当糟糕的。数万亿?成本将无法想象。连接计算机如此困难,以至于提供这类软件和服务的行业价值达数十亿美元。

所以我们生活在一个布线远超我们使用能力的时代。我们在 20 世纪 80 年代经历了一场软件危机,当时 Fred Brooks 等顶尖软件工程师认为不存在“银弹”能够“承诺在生产力、可靠性或简单性方面带来哪怕一个数量级的提升”。

Brooks 错过了自由和开源软件,这解决了那场危机,使我们能够高效地共享知识。今天我们面临另一场软件危机,但这危机我们很少谈论。只有最大、最富有的公司才能负担得起创建互联应用。确实有云,但它是专有的。我们的数据和知识正在从我们的个人电脑中消失,进入我们无法访问、无法与之竞争的云端。谁拥有我们的社交网络?这就像大型机-PC 革命的反向进行。

我们可以把政治哲学留给另一本书。重点是,虽然互联网提供了大规模互联代码的潜力,但现实是这对我们大多数人来说遥不可及,因此许多有趣的大问题(在健康、教育、经济、交通等方面)仍未解决,因为没有办法连接代码,也就没有办法连接那些可以共同解决这些问题的大脑。

已经有许多尝试来解决互联代码的挑战。有成千上万的 IETF 规范,每种都解决了部分难题。对于应用开发者来说,HTTP 也许是足够简单且行之有效的唯一解决方案,但它无疑通过鼓励开发者和架构师以大型服务器和瘦弱、愚蠢的客户端来思考问题,从而让问题变得更糟。

因此,今天人们仍然使用原始的 UDP 和 TCP、专有协议、HTTP 和 Websockets 来连接应用程序。这仍然是痛苦的、缓慢的、难以扩展的,并且本质上是中心化的。分布式 P2P 架构大多用于娱乐,而非工作。有多少应用程序使用 Skype 或 Bittorrent 来交换数据?

这又把我们带回了编程的科学。要修复世界,我们需要做两件事。第一,解决“如何将任何代码连接到任何代码,无论在哪里”这个普遍问题。第二,将其封装成人们可以理解和轻松使用的最简单的构建块。

这听起来简单得可笑。也许它就是这样。这就是整个重点所在。

初始假设 #

我们假设你至少使用 ZeroMQ 3.2 版本。我们假设你使用 Linux 或类似的系统。我们假设你大致能读懂 C 代码,因为这是示例的默认语言。我们假设当我们写像 PUSH 或 SUBSCRIBE 这样的常量时,你可以想象它们实际上叫做ZMQ_PUSHZMQ_SUBSCRIBE如果编程语言需要的话。

获取示例 #

示例代码存放在公共的GitHub 仓库中。获取所有示例最简单的方法是克隆这个仓库

git clone --depth=1 https://github.com/imatix/zguide.git

接下来,浏览 examples 子目录。你会按语言找到示例。如果你使用的语言缺少示例,我们鼓励你提交翻译。感谢许多人的辛勤工作,这使得本书内容如此实用。所有示例均遵循 MIT/X11 许可。

有求必应 #

那么,让我们从代码开始。当然,我们从一个 Hello World 示例开始。我们将创建一个客户端和一个服务器。客户端发送“Hello”给服务器,服务器回复“World”。以下是用 C 语言编写的服务器代码,它在端口 5555 上打开一个 ZeroMQ 套接字,读取请求,并对每个请求回复“World”。

C | C++ | CL | Delphi | Erlang | Elixir | Felix | Go | Haskell | Haxe | Java | Julia | Lua | Node.js | Objective-C | Perl | PHP | Python | Q | Racket | Ruby | Rust | Scala | Tcl | OCaml | Ada | Basic | C# | F# | ooc
图 2 - 请求-回复

REQ-REP 套接字对是同步的。客户端执行 zmq_send()然后执行 zmq_recv(),循环进行(或者如果只需要一次就执行一次)。执行任何其他顺序(例如,连续发送两条消息)将导致从sendrecv调用返回 -1。同样,服务方执行 zmq_recv()然后执行 zmq_send(),按此顺序,根据需要重复执行。

ZeroMQ 使用 C 作为其参考语言,这也是我们示例中主要使用的语言。如果你在线阅读本文,示例下方的链接会带你查看其他编程语言的翻译。我们来比较一下同样的服务器代码在 C++ 中是怎样的。

hwserver: C++ 语言实现的 Hello World 服务器
//
//  Hello World server in C++
//  Binds REP socket to tcp://*:5555
//  Expects "Hello" from client, replies with "World"
//
#include <zmq.hpp>
#include <string>
#include <iostream>
#ifndef _WIN32
#include <unistd.h>
#else
#include <windows.h>

#define sleep(n)	Sleep(n)
#endif

int main () {
    //  Prepare our context and socket
    zmq::context_t context (2);
    zmq::socket_t socket (context, zmq::socket_type::rep);
    socket.bind ("tcp://*:5555");

    while (true) {
        zmq::message_t request;

        //  Wait for next request from client
        socket.recv (request, zmq::recv_flags::none);
        std::cout << "Received Hello" << std::endl;

        //  Do some 'work'
        sleep(1);

        //  Send reply back to client
        zmq::message_t reply (5);
        memcpy (reply.data (), "World", 5);
        socket.send (reply, zmq::send_flags::none);
    }
    return 0;
}

你可以看到 ZeroMQ 的 API 在 C 和 C++ 中是相似的。在像 PHP 或 Java 这样的语言中,我们可以隐藏更多细节,代码变得更容易阅读。

hwserver: PHP 语言实现的 Hello World 服务器
<?php
/*
 *  Hello World server
 *  Binds REP socket to tcp://*:5555
 *  Expects "Hello" from client, replies with "World"
 * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
 */

$context = new ZMQContext(1);

//  Socket to talk to clients
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");

while (true) {
    //  Wait for next request from client
    $request = $responder->recv();
    printf ("Received request: [%s]\n", $request);

    //  Do some 'work'
    sleep (1);

    //  Send reply back to client
    $responder->send("World");
}

hwserver: Java 语言实现的 Hello World 服务器
package guide;

//
//  Hello World server in Java
//  Binds REP socket to tcp://*:5555
//  Expects "Hello" from client, replies with "World"
//

import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import org.zeromq.ZContext;

public class hwserver
{
    public static void main(String[] args) throws Exception
    {
        try (ZContext context = new ZContext()) {
            // Socket to talk to clients
            ZMQ.Socket socket = context.createSocket(SocketType.REP);
            socket.bind("tcp://*:5555");

            while (!Thread.currentThread().isInterrupted()) {
                byte[] reply = socket.recv(0);
                System.out.println(
                    "Received " + ": [" + new String(reply, ZMQ.CHARSET) + "]"
                );

                Thread.sleep(1000); //  Do some 'work'

                String response = "world";
                socket.send(response.getBytes(ZMQ.CHARSET), 0);
            }
        }
    }
}

其他语言实现的服务器

C | C++ | CL | Delphi | Erlang | Elixir | Felix | Go | Haskell | Haxe | Java | Julia | Lua | Node.js | Objective-C | Perl | PHP | Python | Q | Racket | Ruby | Rust | Scala | Tcl | OCaml | Ada | Basic | C# | F# | ooc

以下是客户端代码

C | C++ | CL | Delphi | Erlang | Elixir | Felix | Go | Haskell | Haxe | Java | Julia | Lua | Node.js | Objective-C | Perl | PHP | Python | Q | Racket | Ruby | Rust | Scala | Tcl | OCaml | Ada | Basic | C# | F# | ooc

现在看来这太简单了,不太现实,但正如我们之前了解到的,ZeroMQ 套接字拥有超能力。你可以同时向这个服务器发起数千个客户端连接,它仍然会愉快而快速地工作。为了好玩,尝试先启动客户端,然后启动服务器,看看一切如何仍然正常工作,然后思考一下这意味着什么。

让我们简要解释一下这两个程序实际上在做什么。它们创建一个 ZeroMQ 上下文进行工作,以及一个套接字。别担心这些词是什么意思。你会慢慢理解的。服务器将其 REP(回复)套接字绑定到端口 5555。服务器在循环中等待请求,并对每个请求回复。客户端发送一个请求并从服务器读取回复。

如果你终止服务器(Ctrl-C)并重新启动它,客户端将无法正常恢复。从崩溃的进程中恢复并非如此简单。构建一个可靠的请求-回复流程非常复杂,我们将在第 4 章 - 可靠请求-回复模式中才会介绍它。

幕后发生了很多事情,但对我们程序员来说,重要的是代码有多么简洁明了,以及即使在高负载下它也不常崩溃。这就是请求-回复模式,可能是使用 ZeroMQ 最简单的方式。它对应于 RPC 和经典的客户端/服务器模型。

关于字符串的一个小注 #

ZeroMQ 除了数据的字节大小之外,对你发送的数据一无所知。这意味着你有责任安全地格式化数据,以便应用程序能够读取回来。对于对象和复杂数据类型,这是 Protocol Buffers 等专门库的工作。但即使对于字符串,你也需要小心处理。

在 C 和一些其他语言中,字符串以 null 字节终止。我们可以发送一个像“HELLO”这样的字符串,带上那个额外的 null 字节

zmq_send (requester, "Hello", 6, 0);

然而,如果你从其他语言发送字符串,它可能不包含那个 null 字节。例如,当我们在 Python 中发送同样的字符串时,我们这样做

socket.send ("Hello")

然后在线路上发送的是一个长度(对于短字符串是一个字节)和字符串内容的单个字符。

图 3 - 一个 ZeroMQ 字符串

如果你在 C 程序中读取这个,你会得到一个看起来像字符串的东西,如果幸运地那五个字节后面紧跟着一个无意中出现的 null,它可能偶然会表现得像一个字符串,但它不是一个真正的字符串。当你的客户端和服务器对字符串格式不一致时,你会得到奇怪的结果。

当你在 C 中从 ZeroMQ 接收字符串数据时,你不能相信它已被安全地终止。每一次接收字符串,你都应该分配一个新缓冲区,预留一个额外字节的空间,复制字符串,并用一个 null 字符正确终止它。

所以我们确立一个规则:ZeroMQ 字符串是由长度指定的,并且在线路上发送时不包含末尾的 null 字符。在最简单的情况下(我们将在示例中这样做),一个 ZeroMQ 字符串正好映射到一个 ZeroMQ 消息帧,它看起来就像上图所示——一个长度加上一些字节。

在 C 中,我们需要这样做来接收一个 ZeroMQ 字符串并将其作为有效的 C 字符串提供给应用程序:

//  Receive ZeroMQ string from socket and convert into C string
//  Chops string at 255 chars, if it's longer
static char *
s_recv (void *socket) {
    char buffer [256];
    int size = zmq_recv (socket, buffer, 255, 0);
    if (size == -1)
        return NULL;
    if (size > 255)
        size = 255;
    buffer [size] = '\0';
    /* use strndup(buffer, sizeof(buffer)-1) in *nix */
    return strdup (buffer);
}

这形成了一个方便的辅助函数,本着创建可以有效重用的东西的精神,我们来写一个类似的s_send函数,该函数以正确的 ZeroMQ 格式发送字符串,并将其打包到一个我们可以重用的头文件中。

结果是zhelpers.h,它让我们可以用 C 编写更简洁、更短的 ZeroMQ 应用程序。这是一个相当长的源代码,只有 C 开发者会觉得有趣,所以你可以自行阅读

关于命名约定 #

前缀s_zhelpers.h和本指南接下来的示例中使用,表示静态方法或变量。

版本报告 #

ZeroMQ 确实有多个版本,而且通常情况下,如果你遇到问题,很可能是在更高版本中已修复的问题。所以知道你实际链接的 ZeroMQ 版本确切是什么是一个有用的技巧。

这里有一个小程序可以做到这一点

C | C++ | CL | Delphi | Erlang | Elixir | Felix | Go | Haskell | Java | Julia | Lua | Node.js | Objective-C | Perl | PHP | Python | Q | Ruby | Rust | Scala | Tcl | OCaml | Ada | Basic | C# | F# | Haxe | ooc | Racket

发出消息 #

第二个经典模式是单向数据分发,其中服务器将更新推送到一组客户端。我们来看一个推送包含邮政编码、温度和相对湿度天气更新的示例。我们将生成随机值,就像真实的气象站一样。

这是服务器代码。我们将使用端口 5556 用于此应用程序

C | C++ | CL | Delphi | Erlang | Elixir | Felix | Go | Haskell | Haxe | Java | Julia | Lua | Node.js | Objective-C | Perl | PHP | Python | Racket | Ruby | Rust | Scala | Tcl | OCaml | Ada | Basic | C# | F# | ooc | Q

这个更新流没有开始也没有结束,它就像一个永不停止的广播。

这是客户端应用程序,它监听更新流,并抓取与特定邮政编码相关的所有信息,默认为纽约市,因为那是开始任何冒险的好地方。

C | C++ | CL | Delphi | Erlang | Elixir | Felix | Go | Haskell | Haxe | Java | Julia | Lua | Node.js | Objective-C | Perl | PHP | Python | Racket | Ruby | Rust | Scala | Tcl | OCaml | Ada | Basic | C# | F# | ooc | Q
图 4 - 发布-订阅

请注意,当你使用 SUB 套接字时,你必须使用以下方法设置订阅 zmq_setsockopt()和 SUBSCRIBE,就像这段代码中所示。如果你没有设置任何订阅,你就不会收到任何消息。这是初学者常犯的错误。订阅者可以设置多个订阅,它们会累加。也就是说,如果一个更新匹配了任意一个订阅,订阅者就会收到它。订阅者也可以取消特定的订阅。订阅通常是一个可打印的字符串,但并非总是如此。参见 zmq_setsockopt()了解其工作原理。

PUB-SUB 套接字对是异步的。客户端执行 zmq_recv(),循环进行(或者如果只需要一次就执行一次)。试图向 SUB 套接字发送消息会导致错误。同样,服务方执行 zmq_send(),根据需要重复执行,但绝不能执行 zmq_recv()在 PUB 套接字上。

理论上,对于 ZeroMQ 套接字,哪一端连接哪一端绑定并不重要。然而,实际上存在未记录的差异,我稍后会讲到。目前,请将 PUB 绑定并将 SUB 连接,除非你的网络设计使得这不可能。

关于 PUB-SUB 套接字,还有一件重要的事情需要知道:你无法精确知道订阅者何时开始接收消息。即使你启动一个订阅者,等待一段时间,然后启动发布者,订阅者总是会错过发布者发送的最初几条消息。这是因为当订阅者连接发布者时(这个过程需要很短但非零的时间),发布者可能已经开始发送消息了。

这个“慢加入者”问题困扰了足够多的人,而且经常发生,所以我们将详细解释它。请记住 ZeroMQ 是异步 I/O,即在后台进行。假设你有两个节点按以下顺序操作:

  • 订阅者连接到一个端点并接收和计数消息。
  • 发布者绑定到一个端点并立即发送 1,000 条消息。

那么订阅者很可能什么也收不到。你会眨眨眼,检查是否设置了正确的过滤器并再次尝试,但订阅者仍然收不到任何东西。

建立 TCP 连接涉及来回握手,根据你的网络和对等节点之间的跳数,这需要几毫秒。在这段时间里,ZeroMQ 可以发送许多消息。假设建立连接需要 5 毫秒,而同一条链路每秒可以处理 100 万条消息。在订阅者连接发布者的这 5 毫秒内,发布者只需要 1 毫秒就可以发送出那 1 千条消息。

第 2 章 - 套接字与模式中,我们将解释如何同步发布者和订阅者,这样在你确定订阅者真正连接并准备好之前,你不会开始发布数据。有一种简单而愚蠢的方法可以延迟发布者,那就是睡眠。但在真实的应用程序中不要这样做,因为它极其脆弱、不优雅且缓慢。你可以使用睡眠来证明自己正在发生的事情,然后等待第 2 章 - 套接字与模式来了解如何正确地做这件事。

同步的替代方法是简单地假设发布的 数据流是无限的,没有开始也没有结束。还假设订阅者不关心它启动之前发生了什么。这就是我们构建天气客户端示例的方式。

因此,客户端订阅其选择的邮政编码,并为该邮政编码收集 100 条更新。如果邮政编码是随机分布的,这意味着服务器发送了大约一千万条更新。你可以先启动客户端,然后启动服务器,客户端将继续工作。你可以随意停止和重新启动服务器,客户端也会继续工作。当客户端收集了 100 条更新后,它会计算平均值,打印出来,然后退出。

关于发布-订阅(pub-sub)模式的几点说明

  • 订阅者可以通过多次调用 connect 来连接多个发布者。然后数据会到达并交错排列(“公平队列”),这样任何单个发布者都不会压垮其他发布者。

  • 如果发布者没有连接的订阅者,那么它会简单地丢弃所有消息。

  • 如果你使用 TCP,并且订阅者速度较慢,消息将在发布者端排队。我们稍后将介绍如何使用“高水位标记”来保护发布者免受此影响。

  • 从 ZeroMQ v3.x 开始,当使用连接协议(tcp:@<*>@*ipc:@<>@)时,过滤发生在发布者端。使用epgm:@@协议,过滤发生在订阅者端。在 ZeroMQ v2.x 中,所有过滤都发生在订阅者端。

这是我的笔记本电脑(一台 2011 年的 Intel i5,不错但没什么特别的)接收和过滤 1000 万条消息所需的时间

$ time wuclient
Collecting updates from weather server...
Average temperature for zipcode '10001 ' was 28F

real    0m4.470s
user    0m0.000s
sys     0m0.008s

分而治之 #

图 5 - 并行管道

作为最后一个例子(你肯定已经厌倦了那些有趣的代码,想要深入探讨关于比较抽象规范的语文学讨论了),让我们来做一点点超级计算。然后再喝咖啡。我们的超级计算应用程序是一个相当典型的并行处理模型。我们有

  • 一个产生可以并行执行任务的通风器(ventilator)
  • 一组处理任务的工作者(workers)
  • 一个汇集(sink)从工作进程返回结果的收集器

实际上,工作者运行在超高速的机器上,也许使用 GPU(图形处理单元)来完成繁重的计算。这里是通风器。它生成 100 个任务,每个任务是一条消息,告诉工作者睡眠若干毫秒

C | C++ | CL | Delphi | Erlang | Elixir | Felix | Go | Haskell | Haxe | Java | Julia | Lua | Node.js | Objective-C | Perl | PHP | Python | Ruby | Rust | Scala | Tcl | OCaml | Ada | Basic | C# | F# | ooc | Q | Racket

这里是工作者应用程序。它接收一条消息,然后睡眠若干秒,接着发出信号表示完成

C | C++ | CL | Delphi | Erlang | Elixir | Felix | Go | Haskell | Haxe | Java | Julia | Lua | Node.js | Objective-C | Perl | PHP | Python | Ruby | Rust | Scala | Tcl | OCaml | Ada | Basic | C# | F# | ooc | Q | Racket

这里是汇集(sink)应用程序。它收集 100 个任务,然后计算整体处理花费了多长时间,这样我们就可以确认如果有一个以上的工作者,它们确实是在并行运行的

C | C++ | CL | Delphi | Erlang | Elixir | Felix | Go | Haskell | Haxe | Java | Julia | Lua | Node.js | Objective-C | Perl | PHP | Python | Ruby | Rust | Scala | Tcl | OCaml | Ada | Basic | C# | F# | ooc | Q | Racket

一个批次的平均成本是 5 秒。当我们启动 1、2 或 4 个工作者时,我们从汇集器得到如下结果:

  • 1 个工作者:总耗时:5034 毫秒。
  • 2 个工作者:总耗时:2421 毫秒。
  • 4 个工作者:总耗时:1018 毫秒。

让我们更详细地看看这段代码的一些方面

  • 工作者向上游连接到通风器,向下游连接到汇集器。这意味着你可以随意添加工作者。如果工作者绑定到它们的端点,你将需要 (a) 更多的端点,并且 (b) 每次添加工作者时都要修改通风器和/或汇集器。我们说通风器和汇集器是我们架构中稳定的部分,而工作者是其中动态的部分。

  • 我们必须同步批次的开始,确保所有工作者都已启动并运行。这是 ZeroMQ 中一个相当常见的陷阱,并且没有简单的解决方案。zmq_connectzmq_connect方法需要一定的时间。因此,当一组工作者连接到通风器时,第一个成功连接的工作者会在其他工作者连接的这段短时间内获得大量的消息。如果你没有以某种方式同步批次的开始,系统根本不会并行运行。尝试移除通风器中的等待,看看会发生什么。

  • 通风器的 PUSH 套接字(假设所有工作者都在批次开始发送之前连接好)将任务均匀地分发给工作者。这被称为负载均衡,我们将在后面更详细地讨论它。

  • 汇集器的 PULL 套接字均匀地收集来自工作者的结果。这被称为公平排队

图 6 - 公平排队

管道模式也表现出“慢加入者”综合征,这导致了 PUSH 套接字不能正确进行负载均衡的指责。如果你正在使用 PUSH 和 PULL,并且你的某个工作者接收到的消息远远多于其他工作者,那是因为那个 PULL 套接字比其他套接字连接得更快,并在其他套接字设法连接之前抓取了大量消息。如果你想要适当的负载均衡,你可能需要查看第三章 - 高级请求-回复模式 中的负载均衡模式。

ZeroMQ 编程 #

看过一些示例后,你一定迫不及待地想在某些应用程序中使用 ZeroMQ 了。在你开始之前,深呼吸,放松一下,并思考一些基本的建议,这将为你省去很多压力和困惑。

  • 一步一步学习 ZeroMQ。它只有一个简单的 API,但却隐藏着无限的可能性。慢慢来,掌握每一个可能性。

  • 编写优雅的代码。丑陋的代码隐藏问题,让别人难以帮助你。你可能习惯了没有意义的变量名,但阅读你代码的人不会。使用实际的单词作为名称,它们能说明变量的用途,而不是说“我太粗心了,没告诉你这个变量是干什么的”。使用一致的缩进和清晰的布局。编写优雅的代码,你的世界会更舒适。

  • 边写边测试。当你的程序无法工作时,你应该知道是哪五行代码出了问题。当你使用 ZeroMQ 神奇功能时尤其如此,它在第一次尝试时就是不会工作的。

  • 当你发现事情没有按预期进行时,将你的代码分解成小块,分别测试每个部分,看看哪个有问题。ZeroMQ 让你能够编写本质上是模块化的代码;利用这一点来发挥你的优势。

  • 根据需要创建抽象(类、方法等等)。如果你复制代码太多,你也会复制代码中的错误。

正确使用 Context #

ZeroMQ 应用程序总是先创建context,然后使用它来创建套接字。在 C 语言中,它是zmq_ctx_new() zmq_ctx_new()调用。你应该在你的进程中只创建和使用一个 context。从技术上讲,context 是单个进程中所有套接字的容器,并充当inproc进程内套接字的传输层,这是连接同一进程中线程的最快方式。如果在运行时一个进程有两个 contexts,它们就像是独立的 ZeroMQ 实例。如果你明确想要这样,那没问题,否则请记住:在进程启动时调用一次zmq_ctx_new()

调用 zmq_ctx_new()在进程启动时调用一次,并且 zmq_ctx_destroy()在进程结束时调用一次。

如果你正在使用fork()fork()系统调用,请在zmq_ctx_new()fork() 之后,并在子进程代码的开头调用。一般来说,你希望在子进程中做有趣的事情(ZeroMQ),而在父进程中做无聊的进程管理。 zmq_ctx_new() fork 之后以及子进程代码的开头调用。通常,你会在子进程中做一些有趣(ZeroMQ相关)的事情,而在父进程中处理无聊的进程管理。

优雅地退出 #

优秀的程序员和优秀的杀手有同样的座右铭:完成工作后总是要清理现场。当你在像 Python 这样的语言中使用 ZeroMQ 时,内存会自动为你释放。但当使用 C 语言时,你必须在使用完对象后小心地释放它们,否则会导致内存泄漏、应用程序不稳定以及通常不好的结果。

内存泄漏是一回事,但 ZeroMQ 对如何退出应用程序非常挑剔。原因很技术也很令人头疼,但结果是如果你留下任何套接字没有关闭,zmq_ctx_destroy() zmq_ctx_destroy()函数将永远挂起。即使你关闭了所有套接字,zmq_ctx_destroy() zmq_ctx_destroy()默认情况下也会永远等待,如果存在未决的连接或发送,除非你在关闭这些套接字之前将它们的 LINGER 选项设置为零。

我们需要关心的 ZeroMQ 对象是消息、套接字和 context。幸运的是,这非常简单,至少在简单的程序中是这样:

  • 使用zmq_send() zmq_send()zmq_recv() zmq_recv()当可以时,因为这可以避免使用 zmq_msg_t 对象。

  • 如果你确实使用了zmq_msg_recv() zmq_msg_recv(),请在使用完收到的消息后立即释放它,通过调用zmq_msg_close() zmq_msg_close().

  • 如果你频繁地打开和关闭大量套接字,这很可能表明你需要重新设计你的应用程序。在某些情况下,套接字句柄直到你销毁 context 时才会被释放。

  • 当你退出程序时,关闭你的套接字,然后调用zmq_ctx_destroy() zmq_ctx_destroy()。这会销毁 context。

至少在 C 语言开发中是这样。在具有自动对象销毁的语言中,当你离开作用域时,套接字和 context 会被销毁。如果你使用异常,你必须在类似于“finally”的代码块中进行清理,这与其他任何资源一样。

如果你正在进行多线程工作,事情会变得更加复杂。我们将在下一章讨论多线程,但是由于你们中有些人尽管收到了警告,仍会尝试在安全掌握之前就去实践,下面是关于如何在多线程 ZeroMQ 应用程序中优雅退出的快速简要指南。

首先,不要尝试从多个线程使用同一个套接字。请不要解释你为什么认为这样做会很有趣,只是请不要这样做。其次,你需要关闭每个有正在进行请求的套接字。正确的方法是设置一个较低的 LINGER 值(1 秒),然后关闭套接字。如果你的语言绑定在销毁 context 时没有自动为你这样做,我建议你提交一个补丁。

最后,销毁 context。这会导致附属线程(即,共享同一个 context 的线程)中任何阻塞的接收、轮询或发送操作返回错误。捕获该错误,然后设置 linger,并关闭线程中的套接字,然后退出。不要销毁同一个 context 两次。zmq_ctx_destroyzmq_ctx_destroy在主线程中将会阻塞,直到它知道的所有套接字都安全关闭为止。

瞧!这足够复杂和令人头疼,任何称职的语言绑定作者都会自动处理这些,让套接字关闭的“舞蹈”变得不必要。

为什么我们需要 ZeroMQ #

现在你已经看到了 ZeroMQ 的实际应用,让我们回到“为什么”的问题。

如今许多应用程序由跨越某种网络(局域网或互联网)的组件组成。因此,许多应用程序开发人员最终都会做一些消息传递的工作。有些开发人员使用消息队列产品,但大多数时候他们会自己使用 TCP 或 UDP 来实现。这些协议本身不难使用,但是从 A 发送几个字节到 B 与以任何可靠方式进行消息传递之间存在巨大的差异。

让我们看看当我们开始使用原始 TCP 连接各个部分时所面临的典型问题。任何可复用的消息层都需要解决所有或大部分这些问题:

  • 我们如何处理 I/O?我们的应用程序是阻塞的,还是在后台处理 I/O?这是一个关键的设计决策。阻塞 I/O 创建的架构扩展性不好。但后台 I/O 可能非常难以正确处理。

  • 我们如何处理动态组件,例如,暂时消失的部分?我们是否正式将组件分为“客户端”和“服务器”,并强制规定服务器不能消失?那么,如果我们要连接服务器到服务器呢?我们是否每隔几秒尝试重新连接?

  • 我们如何在网络上传输消息?我们如何封装数据,使其易于写入和读取,避免缓冲区溢出,对小消息高效,同时又能满足传输戴派对帽跳舞猫的超大视频的需求?

  • 我们如何处理无法立即发送的消息?特别是,如果我们正在等待某个组件重新上线?我们是丢弃消息,将其放入数据库,还是放入内存队列?

  • 我们将消息队列存储在哪里?如果从队列中读取的组件非常慢,导致队列积压,会发生什么?那我们的策略是什么?

  • 我们如何处理丢失的消息?我们是等待新的数据,请求重发,还是构建某种可靠性层来确保消息不会丢失?如果那个层本身崩溃了怎么办?

  • 如果我们需要使用不同的网络传输方式怎么办?比如,使用多播而不是 TCP 单播?或者使用 IPv6?我们需要重写应用程序吗,还是传输方式在某个层中被抽象化了?

  • 我们如何路由消息?我们可以将同一条消息发送给多个对等方吗?我们可以将回复发送回原始请求者吗?

  • 我们如何为另一种语言编写 API?我们是重新实现一个网络协议,还是封装一个库?如果是前者,我们如何保证高效稳定的协议栈?如果是后者,我们如何保证互操作性?

  • 我们如何表示数据,使其可以在不同架构之间读取?我们是否强制对数据类型使用特定的编码?这在多大程度上是消息系统的责任,而不是更高层的责任?

  • 我们如何处理网络错误?我们是等待并重试,默默忽略它们,还是中止?

以一个典型的开源项目,如 Hadoop Zookeeper 为例,阅读其中的 C API 代码src/c/src/zookeeper.c src/c/src/zookeeper.c。当我阅读这段代码时(2013 年 1 月),它是 4200 行的谜团,其中包含一个未文档化的客户端/服务器网络通信协议。我认为它很高效,因为它使用了pollpoll而不是selectselect。但实际上,Zookeeper 应该使用通用的消息层和一个明确文档化的网络协议。团队一遍又一遍地重复制造这个特定的轮子是令人难以置信的浪费。

但是如何创建一个可复用的消息层呢?为什么当这么多项目需要这项技术时,人们仍然通过在代码中操作 TCP 套接字来做这件事,一遍又一遍地解决那长串列表中的问题?

事实证明,构建可复用的消息系统非常困难,这就是为什么很少有 FOSS 项目尝试这样做,也是为什么商业消息产品复杂、昂贵、不够灵活且脆弱的原因。2006 年,iMatix 设计了 AMQP,这或许是首次为 FOSS 开发者提供了可复用的消息系统方案。AMQP 比许多其他设计表现得更好,但仍然相对复杂、昂贵且脆弱。学习使用它需要数周时间,创建在复杂情况下不会崩溃的稳定架构需要数月时间。

图 7 - 消息系统的起点

大多数消息项目,比如 AMQP,试图以可复用的方式解决这一长串问题,它们通过发明一个新概念——“消息代理(broker)”——来实现,消息代理负责寻址、路由和排队。这会产生一个客户端/服务器协议,或者在某个未文档化协议之上构建一套 API,让应用程序能够与这个消息代理通信。消息代理在降低大型网络复杂性方面非常出色。但是将基于消息代理的消息传递添加到像 Zookeeper 这样的产品中只会使其更糟,而不是更好。这意味着添加一个额外的庞大组件,并引入新的单点故障。消息代理很快就会成为瓶颈和一个需要管理的新风险。如果软件支持,我们可以添加第二个、第三个和第四个消息代理,并建立一些故障转移方案。人们确实这样做。但这会产生更多的活动部件、更多的复杂性以及更多可能发生故障的地方。

而且以消息代理为中心的设置需要自己的运维团队。你确实需要日夜监控消息代理,并在它们行为异常时“用棍子敲打它们”(比喻)。你需要机器,需要备份机器,还需要有人来管理这些机器。这只适用于由多个团队耗费数年时间构建的、具有许多活动部件的大型应用程序。

图 8 - 消息系统的演变

因此,中小型应用程序开发人员陷入了困境。要么他们避免网络编程,构建无法扩展的单体应用程序。要么他们投入网络编程,构建脆弱、复杂、难以维护的应用程序。要么他们选择某个消息产品,最终得到依赖于昂贵、易于损坏的技术的可扩展应用程序。一直以来都没有真正好的选择,这也许就是为什么消息传递在很大程度上停留在上个世纪,并激起强烈情感的原因:用户感到负面,而销售支持和许可证的人则感到欢欣鼓舞。

我们需要的是能够完成消息传递工作的东西,但其方式要如此简单和廉价,以至于几乎零成本地在任何应用程序中运行。它应该是一个你只需链接的库,没有任何其他依赖。没有额外的活动部件,因此没有额外的风险。它应该能在任何操作系统上运行,并与任何编程语言协同工作。

这就是 ZeroMQ:一个高效、可嵌入的库,它解决了应用程序在网络上实现良好弹性所需的大部分问题,而且成本不高。

具体来说

  • 它在后台线程中异步处理 I/O。这些后台线程使用无锁数据结构与应用程序线程通信,因此并发的 ZeroMQ 应用程序不需要锁、信号量或其他等待状态。

  • 组件可以动态加入和离开,ZeroMQ 会自动重新连接。这意味着你可以按任何顺序启动组件。你可以创建“面向服务的架构”(SOA),其中服务可以随时加入和离开网络。

  • 它在需要时自动对消息进行排队。它会智能地执行此操作,在将消息排队之前,尽可能地将消息推送到接近接收方的位置。

  • 它有处理队列过满的方法(称为“高水位标记”)。当队列满时,ZeroMQ 会根据你正在进行的消息类型(所谓的“模式”)自动阻塞发送者,或丢弃消息。

  • 它允许你的应用程序通过任意传输方式相互通信:TCP、多播、进程内、进程间。你无需修改代码即可使用不同的传输方式。

  • 它使用取决于消息模式的不同策略,安全地处理慢速/阻塞的读者。

  • 它允许你使用各种模式(例如请求-回复和发布-订阅)来路由消息。这些模式是你创建网络拓扑结构的方式。

  • 它允许你通过一次调用创建代理,来对消息进行排队、转发或捕获。代理可以降低网络的互连复杂性。

  • 它使用简单的网络帧将完整的消息按发送时的样子交付。如果你发送一个 10k 的消息,你将收到一个 10k 的消息。

  • 它不对消息强制要求任何格式。消息可以是零到数千兆字节大小的二进制数据块。当你想要表示数据时,你可以选择在其之上使用其他产品,例如 msgpack、Google 的 Protocol Buffers 等等。

  • 它会智能地处理网络错误,在合理的场景下自动重试。

  • 它减少了你的碳足迹。用更少的 CPU 完成更多工作意味着你的机器功耗更低,并且你可以更长时间地使用旧机器。阿尔·戈尔会喜欢 ZeroMQ 的。

实际上,ZeroMQ 的作用远不止于此。它对你开发网络应用程序的方式产生了颠覆性的影响。表面上看,它是一个受套接字启发的 API,你可以在其上进行zmq_send zmq_recv()zmq_recv() zmq_send()zmq_recv。但是消息处理很快就成为中心循环,你的应用程序很快就会分解成一系列消息处理任务。这既优雅又自然。而且它可以扩展:这些任务中的每一个都映射到一个节点,并且这些节点可以通过任意传输方式相互通信。同一进程中的两个节点(节点是一个线程),同一台机器上的两个节点(节点是一个进程),或者同一网络上的两个节点(节点是一台机器)——都是一样的,无需更改应用程序代码。

套接字的可扩展性 #

让我们看看 ZeroMQ 的可扩展性实际表现。这是一个 shell 脚本,它启动天气服务器,然后并行启动一堆客户端

wuserver &
wuclient 12345 &
wuclient 23456 &
wuclient 34567 &
wuclient 45678 &
wuclient 56789 &

当客户端运行时,我们使用top顶部命令查看活跃进程,我们会看到类似这样的输出(在一台 4 核机器上):

PID  USER  PR  NI  VIRT  RES  SHR S %CPU %MEM   TIME+  COMMAND
7136  ph   20   0 1040m 959m 1156 R  157 12.0 16:25.47 wuserver
7966  ph   20   0 98608 1804 1372 S   33  0.0  0:03.94 wuclient
7963  ph   20   0 33116 1748 1372 S   14  0.0  0:00.76 wuclient
7965  ph   20   0 33116 1784 1372 S    6  0.0  0:00.47 wuclient
7964  ph   20   0 33116 1788 1372 S    5  0.0  0:00.25 wuclient
7967  ph   20   0 33072 1740 1372 S    5  0.0  0:00.35 wuclient

让我们稍微思考一下这里发生了什么。天气服务器只有一个套接字,但它却同时向五个客户端并行发送数据。我们可以拥有成千上万个并发客户端。服务器应用程序看不到它们,也不会直接与它们通信。因此,ZeroMQ 套接字就像一个小型服务器,默默地接受客户端请求,并以网络能处理的最快速度将数据推送给它们。而且它是一个多线程服务器,榨取了你 CPU 更大的性能。

从 ZeroMQ v2.2 升级到 ZeroMQ v3.2 #

兼容性变更 #

这些变更不会直接影响现有的应用程序代码

  • 发布-订阅过滤现在发生在发布者端,而不是订阅者端。这显著提高了许多发布-订阅用例的性能。你可以安全地混合使用 v3.2 和 v2.1/v2.2 的发布者和订阅者。

  • ZeroMQ v3.2 有许多新的 API 方法(zmq_disconnect() zmq_disconnect(), zmq_unbind(), zmq_monitor(), zmq_ctx_set()等)

不兼容变更 #

这些是对应用程序和语言绑定的主要影响领域

  • 更改的发送/接收方法zmq_send() zmq_send()zmq_recv() zmq_recv()zmq_recv() 有了不同、更简单的接口,旧的功能现在由zmq_msg_send() 提供。症状:编译错误。解决方案:修改你的代码。zmq_recv() zmq_msg_recv(). 症状:编译错误。解决方案:修复你的代码。

  • zmq_send()zmq_recv() 在成功时返回正值,错误时返回 -1。在 v2.x 中,它们成功时总是返回零。症状:实际工作正常时却出现错误。解决方案:严格测试返回码是否等于 -1,而不是非零。

  • zmq_poll()现在等待的是毫秒,而不是微秒。症状:应用程序停止响应(实际上响应速度慢了 1000 倍)。解决方案:在所有ZMQ_POLL_MSEC宏定义(如下),在所有zmq_poll下面定义的宏,在所有zmq_poll调用中使用。

  • ZMQ_NOBLOCK现在称为ZMQ_DONTWAIT。症状:在ZMQ_NOBLOCK宏上出现编译失败。解决方案:使用 ZMQ_DONTWAITZMQ_NOBLOCK宏。

  • ZMQ_HWM套接字选项现在被拆分为ZMQ_SNDHWMZMQ_RCVHWMzmq_recv()ZMQ_RCVHWM宏上出现编译失败。解决方案:使用 ZMQ_DONTWAITZMQ_HWM宏。

  • 大多数但不是所有zmq_getsockopt() 选项现在是整数值。症状:在zmq_setsockopt选项现在是整数值。症状:在运行时返回错误zmq_getsockoptzmq_recv()调用时返回运行时错误。.

  • ZMQ_SWAP选项已被移除。症状:在ZMQ_SWAPZMQ_SWAP上出现编译失败。解决方案:重新设计任何使用此功能的代码。

建议的 Shim 宏 #

对于希望在 v2.x 和 v3.2 上都能运行的应用程序,例如语言绑定,我们的建议是尽可能地模拟 v3.2。以下是 C 宏定义,它们有助于你的 C/C++ 代码兼容这两个版本(取自 CZMQ


#ifndef ZMQ_DONTWAIT
#   define ZMQ_DONTWAIT     ZMQ_NOBLOCK
#endif
#if ZMQ_VERSION_MAJOR == 2
#   define zmq_msg_send(msg,sock,opt) zmq_send (sock, msg, opt)
#   define zmq_msg_recv(msg,sock,opt) zmq_recv (sock, msg, opt)
#   define zmq_ctx_destroy(context) zmq_term(context)
#   define ZMQ_POLL_MSEC    1000        //  zmq_poll is usec
#   define ZMQ_SNDHWM ZMQ_HWM
#   define ZMQ_RCVHWM ZMQ_HWM
#elif ZMQ_VERSION_MAJOR == 3
#   define ZMQ_POLL_MSEC    1           //  zmq_poll is msec
#endif

警告:不稳定的范例! #

传统的网络编程建立在一般的假设之上:一个套接字与一个连接、一个对等方通信。存在多播协议,但这属于特殊情况。当我们假设“一个套接字 = 一个连接”时,我们就会以特定的方式扩展我们的架构。我们创建逻辑线程,每个线程处理一个套接字、一个对等方。我们将智能和状态置于这些线程中。

在 ZeroMQ 的世界里,套接字是通往快速后台通信引擎的门户,这些引擎会自动为你管理整套连接。你无法看到、操作、打开、关闭这些连接,也无法将状态附加到它们。无论你使用阻塞发送或接收,还是轮询,你所能与之通信的只有套接字,而不是它为你管理的连接。这些连接是私有的、不可见的,这是 ZeroMQ 可扩展性的关键所在。

这是因为你的代码通过与套接字通信,就可以处理跨越各种网络协议的任意数量的连接,而无需修改代码。ZeroMQ 中的消息模式比应用程序代码中的消息模式扩展成本更低。

因此,一般的假设不再适用。当你阅读代码示例时,你的大脑会尝试将其映射到你已知的事物上。你会读到“套接字”,然后想“啊,这代表了与另一个节点的连接”。这是错误的。你会读到“线程”,你的大脑会再次想“啊,一个线程代表了与另一个节点的连接”,你的大脑又错了。

如果你是第一次阅读本指南,请意识到,在你真正编写 ZeroMQ 代码一两天(也许三四天)之前,你可能会感到困惑,特别是 ZeroMQ 让事情变得如此简单,你可能会试图将那个普遍假设强加给 ZeroMQ,而它不会奏效。然后你就会体验到顿悟和信任的时刻,那个 zap-pow-kaboom 般令人茅塞顿开的范式转变时刻。