zmq pub/sub使用详解_zmq pub sub-程序员宅基地

技术标签: pub/sub  并发发布  详解  zmq  

关于zmq的基本简介,请参考ZeroMQ基础入门

pub/sub模式介绍

发布/订阅模式,全称为Publish/Subscribe,支持多个发布者/多订阅者,使用在消息单向传输的应用场景,消息总是从发布者发送到订阅者。

一般的使用流程为:

pub/sub

pub端:

  • 创建context
  • 创建socket,设置ZMQ_PUB模式
  • bind端口
  • 循环发布消息send

socket特性:

特性
兼容的对端套接字 ZMQ_SUB
方向性 单向
发送/接收模式 仅发送
进入路由策略
流出路由策略 扇出(呈扇形发出)
ZMQ_HWM 选项行为 丢弃

sub端:

  • 创建context
  • 创建socket,设置ZMQ_SUB模式
  • connect到pub端
  • setsockopt设置ZMQ_SUBSCRIBE订阅的消息
  • 循环接收recv
特性
兼容的对端套接字 ZMQ_PUB
方向性 单向
发送/接收模式 仅接收
进入路由策略 平等排队
流出路由策略
ZMQ_HWM 选项行为 丢弃
注意事项

  • pub端socket不能使用recv函数,同样,sub端不能使用send函数
  • 当pub端由于到达了高水位而使ZMQ_PUB套接字进入静默模式的时候,所有发送到这个有问题的订阅者的消息都会被丢弃,直到静默模式终止
  • pub端socket的zmq_send()函数永远不会阻塞
  • sub端刚创建socket后是无法订阅到任何消息的,必须使用setsockopt设置订阅的消息后才能接收到
  • sub端是根据参数前缀进行过滤的。订阅的内容以pub端发出的内容从头开始匹配为过滤条件,完全匹配订阅内容的消息会被sub接收,如订阅内容为"a"时,所有以a开头的消息都会接收
  • 当订阅内容为"",长度为0时为订阅所有内容,因为所有消息都匹配成功
  • 如果存在某个pub没有被任何sub连接,则该pub会丢弃所有的消息
  • 如果采用tcp的连接方式,sub很慢时,消息将会堆积在pub,可以通过设置ZMQ_HWM选项来保护发布者,对于重要的消息,也可以写入硬盘等待发送
  • pub和sub谁bind谁connect并无严格要求(虽本质并无区别),但仍建议pub使用bind,sub使用connect,在实际测试中,使用sub绑定而pub connect时,sub端无法接收到消息
  • 一个显著的问题是,“slow joiner”可能导致发布者的第一笔消息总是丢失,下文会进一步说明该问题
  • 一个订阅者(subcriber)可以链接超过一个发布者(publisher)。数据到达后将交叉存取(公平队列),以保证发布者之间的数据不会淹没
  • 从ZeroMQ v3.x版本开始,使用tcp://或者ipc://协议连接时会在发布者进行消息过滤,使用epgm://协议仍在订阅者过滤;在ZeroMQ v2.x,所有消息过滤都发生在订阅者
关于slow joiner问题

一般描述为,即使是先启动订阅者,再启动发布者,订阅者也有可能会丢失前一部分数据。你无法得知SUB是何时开始接收消息的。

这是因为订阅者向发布者建立连接也是耗费时间的,虽然时间极短,但不为0。这个时间内发布者发布的内容将没有订阅者能够接收。

几种处理方式:

  • 不处理

对于可以容忍数据丢失的应用来说,不必理会丢失的数据。比如接收天气信息的应用,你可能只关注最新的天气信息。

  • 使用延时发布策略

因为数据丢失的原因是发布者在没有稳定接收者的情况下仍然发送了数据,所以可以让发布者等待一段时间再发送数据。

最简单的方式是sleep一段时间。

缺点是:

  1. 不知道sleep多久合适
  2. 即使sleep也不能保证sub者一定就准备好了
  3. 在程序中加sleep不是一种优雅的设计
  • 通知发布者模式

发布者可以在确保订阅者已经启动成功的情况下再发送数据,只需要订阅者在准备好后通知发布者。

这个不难实现,订阅者在准备好后,首先使用req/rep模式向发布者发送一个特定的请求,发布者接收请求并应答,然后再发布真正的数据。

这种方式增加了简单一步操作,但保证了数据的完整。

多线程发送问题

注意:context是线程安全的,但socket非线程安全。在多个线程中使用同一个socket会导致程序崩溃(不提倡使用锁,它会导致竞争并减慢性能,不符合zmq的设计理念)。

  1. 使用proxy

一种比较常见的场景是,发布者使用多个线程来发布不同的数据,所有的数据通过一个endpoint发送。

这与动态发现问题类似,对于一个应用场景,可能会随时增加发布者或者订阅者,构建一个合适的系统可以减小编码和出错的机会。

在这里插入图片描述

如图所示,使用一个proxy可以轻松解决这个问题。

  • 增加了中间层。中间层是静态的,它不会经常变动
  • pub可以随意增减,它们都connect到xsub
  • sub也可以随意增减,它们都connect到xpub
  • xsub收到的所有数据会通过proxy转发到xpub,从而发布到各sub

对于这种情况,可以在多个发布线程中分别创建socket,connect到xsub,从而避免多线程共用socket。

这种情况下,当发布线程较多时,会导致socket堆积,最终导致系统文件描述符过多而失败。可以使用线程池,每个线程使用自己的socket。

还有一种使用代理的情况是:

在这里插入图片描述

这里proxy类似于一个桥,连接了两个不同的网络。也可以作为协议网关,用于连接两个使用不同协议的网络。

结合天气服务,实现一个Proxy的例程如下:

//
//  Weather proxy device C++
//
// Olivier Chamoux <[email protected]>
//

#include "zhelpers.hpp"

int main (int argc, char *argv[])
{
    
    zmq::context_t context(1);

    //  This is where the weather server sits
    zmq::socket_t frontend(context, ZMQ_SUB);
    frontend.connect("tcp://192.168.55.210:5556");

    //  This is our public endpoint for subscribers
    zmq::socket_t backend (context, ZMQ_PUB);
    backend.bind("tcp://10.1.1.0:8100");

    //  Subscribe on everything
    frontend.setsockopt(ZMQ_SUBSCRIBE, "", 0);

    //  Shunt messages out to our own subscribers
    while (1) {
    
        while (1) {
    
            zmq::message_t message;
            int more;
            size_t more_size = sizeof (more);

            //  Process all parts of the message
            frontend.recv(&message);
            frontend.getsockopt( ZMQ_RCVMORE, &more, &more_size);
            backend.send(message, more? ZMQ_SNDMORE: 0);
            if (!more)
                break;      //  Last message part
        }
    }
    return 0;
}
  1. 使用push/pull

使用push/pull建立一个发布端模型,pull接收到所有数据通过proxy转到pub,再发送出去。

这种方式是使用xpub/xsub的变体。因为实际使用中,在单进程内使用sub绑定,pub连接的方式无法接收到数据,而push/pull是一个可行的替代。

启动代理:

// 代码片断
void StartPubProxy(string& port)
{
    
    try
    {
    
        // 面向client的socket,多线程发来所有数据
        zmq::socket_t frontend(m_ctx, ZMQ_PULL);
        frontend.bind("inproc://#0");

        // 面向services的socket,提供对外端口,并实际发布数据
        zmq::socket_t backend(m_ctx, ZMQ_PUB);
        string strBind = "tcp://*:" + port;
        backend.bind(strBind);

        // 创建proxy
        zmq::proxy(static_cast<void*>(frontend), static_cast<void*>(backend), nullptr);
    }
    catch(const std::exception& e)
    {
    
        std::cerr <<"zmq启动转发代理失败:" << e.what() << '\n';
    }
}

原发布线程向"inproc://#0"push数据即可。

  1. 使用boost asio单线程处理数据

在有些情况下,加锁的多线程未必比无锁单线程更快。

对于多线程发布模型来说,可以把它们要发送的数据通过strand.post到io_service的单线程队列里,由工作线程异步处理。

这种方式简单方便,关于使用asio创建线程的使用可以参考:boost::asio::io_service创建线程池简单实例

保护发布者

前方讲述,当sub端处理较慢时,pub端在到达高水位线后会丢弃数据。对于重要的应用,由于不能对sub端的性能作出任何假设,所以需要一定的策略来保证。

ZMQ_HWM

ZMQ_SNDHWM:对向外发送的消息设置高水位(最大缓存量),ZMQ_RCVHWM:对进入socket的消息设置高水位。

ZMQ_SNDHWM属性将会设置socket参数指定的socket对外发送的消息的高水位。高水位是一个硬限制,它会限制每一个与此socket相连的在内存中排队的未处理的消息数目的最大值。

0值代表着没有限制,默认值为1000,就在bind/connect之前设置该属性。如果设置为无限,可能会导致发布者崩溃。

如果已经到达了规定的限制,socket就需要进入一种异常的状态,表现形式因socket类型而异。socket会进行适当的调节,比如阻塞或者丢弃已发送的消息。

总是给套接字设置一个基于期望的订阅方数量的最大值,你打算用于队列的内存的数量,和一个消息平均大小的高水位线。例如,如果你希望有5000个订阅方,有1G的内存可有,消息平均200字节,那么一个安全的高水位线应该是(1000000000/200/5000)=1000.


参考资料

?MQ - The Guide
Unable to receive messages when binding subscriber socket
What is a simple example of a working XSUB / XPUB proxy in zeromq
How to implement Pub-Sub Network with a Proxy by using XPUB and XSUB in ZeroMQ(C++)?
Proxy with inproc frontend
ZMQ模式详解——发布/订阅模式
ZeroMQ基础入门

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/guotianqing/article/details/101429319

智能推荐

使用nginx解决浏览器跨域问题_nginx不停的xhr-程序员宅基地

文章浏览阅读1k次。通过使用ajax方法跨域请求是浏览器所不允许的,浏览器出于安全考虑是禁止的。警告信息如下:不过jQuery对跨域问题也有解决方案,使用jsonp的方式解决,方法如下:$.ajax({ async:false, url: 'http://www.mysite.com/demo.do', // 跨域URL ty..._nginx不停的xhr

在 Oracle 中配置 extproc 以访问 ST_Geometry-程序员宅基地

文章浏览阅读2k次。关于在 Oracle 中配置 extproc 以访问 ST_Geometry,也就是我们所说的 使用空间SQL 的方法,官方文档链接如下。http://desktop.arcgis.com/zh-cn/arcmap/latest/manage-data/gdbs-in-oracle/configure-oracle-extproc.htm其实简单总结一下,主要就分为以下几个步骤。..._extproc

Linux C++ gbk转为utf-8_linux c++ gbk->utf8-程序员宅基地

文章浏览阅读1.5w次。linux下没有上面的两个函数,需要使用函数 mbstowcs和wcstombsmbstowcs将多字节编码转换为宽字节编码wcstombs将宽字节编码转换为多字节编码这两个函数,转换过程中受到系统编码类型的影响,需要通过设置来设定转换前和转换后的编码类型。通过函数setlocale进行系统编码的设置。linux下输入命名locale -a查看系统支持的编码_linux c++ gbk->utf8

IMP-00009: 导出文件异常结束-程序员宅基地

文章浏览阅读750次。今天准备从生产库向测试库进行数据导入,结果在imp导入的时候遇到“ IMP-00009:导出文件异常结束” 错误,google一下,发现可能有如下原因导致imp的数据太大,没有写buffer和commit两个数据库字符集不同从低版本exp的dmp文件,向高版本imp导出的dmp文件出错传输dmp文件时,文件损坏解决办法:imp时指定..._imp-00009导出文件异常结束

python程序员需要深入掌握的技能_Python用数据说明程序员需要掌握的技能-程序员宅基地

文章浏览阅读143次。当下是一个大数据的时代,各个行业都离不开数据的支持。因此,网络爬虫就应运而生。网络爬虫当下最为火热的是Python,Python开发爬虫相对简单,而且功能库相当完善,力压众多开发语言。本次教程我们爬取前程无忧的招聘信息来分析Python程序员需要掌握那些编程技术。首先在谷歌浏览器打开前程无忧的首页,按F12打开浏览器的开发者工具。浏览器开发者工具是用于捕捉网站的请求信息,通过分析请求信息可以了解请..._初级python程序员能力要求

Spring @Service生成bean名称的规则(当类的名字是以两个或以上的大写字母开头的话,bean的名字会与类名保持一致)_@service beanname-程序员宅基地

文章浏览阅读7.6k次,点赞2次,收藏6次。@Service标注的bean,类名:ABDemoService查看源码后发现,原来是经过一个特殊处理:当类的名字是以两个或以上的大写字母开头的话,bean的名字会与类名保持一致public class AnnotationBeanNameGenerator implements BeanNameGenerator { private static final String C..._@service beanname

随便推点

二叉树的各种创建方法_二叉树的建立-程序员宅基地

文章浏览阅读6.9w次,点赞73次,收藏463次。1.前序创建#include&lt;stdio.h&gt;#include&lt;string.h&gt;#include&lt;stdlib.h&gt;#include&lt;malloc.h&gt;#include&lt;iostream&gt;#include&lt;stack&gt;#include&lt;queue&gt;using namespace std;typed_二叉树的建立

解决asp.net导出excel时中文文件名乱码_asp.net utf8 导出中文字符乱码-程序员宅基地

文章浏览阅读7.1k次。在Asp.net上使用Excel导出功能,如果文件名出现中文,便会以乱码视之。 解决方法: fileName = HttpUtility.UrlEncode(fileName, System.Text.Encoding.UTF8);_asp.net utf8 导出中文字符乱码

笔记-编译原理-实验一-词法分析器设计_对pl/0作以下修改扩充。增加单词-程序员宅基地

文章浏览阅读2.1k次,点赞4次,收藏23次。第一次实验 词法分析实验报告设计思想词法分析的主要任务是根据文法的词汇表以及对应约定的编码进行一定的识别,找出文件中所有的合法的单词,并给出一定的信息作为最后的结果,用于后续语法分析程序的使用;本实验针对 PL/0 语言 的文法、词汇表编写一个词法分析程序,对于每个单词根据词汇表输出: (单词种类, 单词的值) 二元对。词汇表:种别编码单词符号助记符0beginb..._对pl/0作以下修改扩充。增加单词

android adb shell 权限,android adb shell权限被拒绝-程序员宅基地

文章浏览阅读773次。我在使用adb.exe时遇到了麻烦.我想使用与bash相同的adb.exe shell提示符,所以我决定更改默认的bash二进制文件(当然二进制文件是交叉编译的,一切都很完美)更改bash二进制文件遵循以下顺序> adb remount> adb push bash / system / bin /> adb shell> cd / system / bin> chm..._adb shell mv 权限

投影仪-相机标定_相机-投影仪标定-程序员宅基地

文章浏览阅读6.8k次,点赞12次,收藏125次。1. 单目相机标定引言相机标定已经研究多年,标定的算法可以分为基于摄影测量的标定和自标定。其中,应用最为广泛的还是张正友标定法。这是一种简单灵活、高鲁棒性、低成本的相机标定算法。仅需要一台相机和一块平面标定板构建相机标定系统,在标定过程中,相机拍摄多个角度下(至少两个角度,推荐10~20个角度)的标定板图像(相机和标定板都可以移动),即可对相机的内外参数进行标定。下面介绍张氏标定法(以下也这么称呼)的原理。原理相机模型和单应矩阵相机标定,就是对相机的内外参数进行计算的过程,从而得到物体到图像的投影_相机-投影仪标定

Wayland架构、渲染、硬件支持-程序员宅基地

文章浏览阅读2.2k次。文章目录Wayland 架构Wayland 渲染Wayland的 硬件支持简 述: 翻译一篇关于和 wayland 有关的技术文章, 其英文标题为Wayland Architecture .Wayland 架构若是想要更好的理解 Wayland 架构及其与 X (X11 or X Window System) 结构;一种很好的方法是将事件从输入设备就开始跟踪, 查看期间所有的屏幕上出现的变化。这就是我们现在对 X 的理解。 内核是从一个输入设备中获取一个事件,并通过 evdev 输入_wayland

推荐文章

热门文章

相关标签