Pages

PUB-SUB with ZeroMQ 3.1: server

Publish-subscribe is the second messaging pattern directly supported by ØMQ that I am going to show at work with an easy example. If you want to read a simple description of the PUB-SUB pattern as implemented by ZeroMQ you could jump to a previous post of mine, written for version 2.1 but at such high level that it is not affected by the 0MQ changes in version 3.1, or you could read the Z-Guide, currently still referring to 2.1, but much more complete (and fun) than my posts.

Let's start with the server. I have rewritten the 2.1 ZMQ PUB code this time using the bare C interface, and not anymore the C++ native wrapper, and doing some tiny changes, just for the fun of it.

The publisher is going send hundred messages in this format:
[0,1]:n:i
That means, 0 or 1, colon, a number, colon, the message index, in (0..99). And then is sending an empty message, as an end of transmission signal.

If you have already seen the ZeroMQ reply server for the REQ-REP 0MQ pattern, you should find it strightforward:
void* context = zmq_init(1);
void* socket = zmq_socket(context, ZMQ_PUB); // 1
zmq_bind(socket, "tcp://*:50014");

readyToSend(); // 2
std::stringstream ss;
for(int i = 0; i < 100; ++i)
{
    ss.str("");
    ss << i%2 << ':' << i*42 << ':' << i; // 3

    std::string s = ss.str();
    int len = s.length();
    std::cout << "Sending " << s << std::endl; // 4
    zmq_send(socket, s.c_str(), s.length(), 0); // 5
}

std::cout << "Sending an empty message, as terminator" << std::endl;
zmq_send(socket, NULL, 0, 0);

zmq_close(socket);
zmq_term(context);
1. We say to ZeroMQ that this socket is used as a publisher in the PUB-SUB pattern.
2. A tiny procedure to let us the time to start the clients before the server sends all its messages. An alternative could be to slow down the server calling sleep().
3. Set the message as defined above, values are quite meaningless, but who cares, it is just a silly example.
4. Some feedback.
5. And finally we send the message. Remember that in ØMQ we use raw byte arrays, so we specify explicitly the buffer and its size.

The function to hang the server on till the clients are ready could be something simple like this:
void readyToSend()
{
    std::cout << "Enter when ready" << std::endl;
    std::string input;
    std::getline(std::cin, input);
}
Next step will be to write the client. We'll see it in a minute.

No comments:

Post a Comment