Pages

PUB-SUB coordination by REQ-REP: the publisher

We can't use ZeroMQ PAIR sockets to synchronize processes. As seen in the previous posts, they are explicitly designed to work in multithreaded scope. For a ØMQ multiprocess application we use instead the REQ-REP pattern. Here I am going to take care of the publisher, next post is dedicated to the subscriber.

The code is a porting in C - ØMQ 3.1 of the similar example I have written in C++ for version 2.1. The original source is the Z-Guide, currently still referring to 2.1.

Before start publishing, the publisher waits that all the requested subscribers are connected:
void* context = zmq_init(1);
void* skPub = zmq_socket(context, ZMQ_PUB); // 1
zmq_bind(skPub, "tcp://*:5561");

void* skSync = zmq_socket(context, ZMQ_REP); // 2
zmq_bind(skSync, "tcp://*:5562");

std::cout << "Waiting for " << SUBS << " subscribers." << std::endl;
int subscribers = 0;
while(subscribers < SUBS) // 3
{
    ping(skPub); // 4
    if(handshake(skSync)) // 5
        ++subscribers;
}

publishing(skPub, MSG_NBR); // 6
1. PUB socket, used to publish the messages to the subscribers.
2. REP socket, used by the subscribers to signal that they are ready to receive messages.
3. SUBS is the number of expected subscribers. The publisher won't start sending the real messages till all of them are connected.
4. A "ping" message is sent on the PUB socket to show the subscribers that the publisher is up and waiting for them.
5. The handshake() function checks for subscribers synchronization on the REP socket.
6. All the subscribers are there, the publisher can start publishing.

The ping() function is pretty simple, a dummy message, here a minus one integer, is sent on the PUB socket:
const int PING_FLAG = -1;

void ping(void* skPub)
{
    zmq_send(skPub, &PING_FLAG, sizeof(int), 0);
}
More interesting the polling on the REP socket:
bool handshake(void* skSync)
{
    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 1

    if(zmq_recv(skSync, NULL, 0, ZMQ_DONTWAIT) == 0) // 2
    {
        zmq_send(skSync, NULL, 0, 0); // 3
        std::cout << " handshake!" << std::endl;
        return true;
    }

    std::cout << '.';
    return false;
}
1. Sleep one second, to give time to the subscribers to connect.
2. Check on the socket for an empty message.
3. Acknowledge the subscriber that the publisher has seen it.

The publishing itself is nothing fancy, a bunch of integers are sent on the PUB socket, followed by an empty message as terminator:
void publishing(void* skPub, const int MSG_NBR)
{
    std::cout << "sending messages" << std::endl;
    for(int i = 1; i <= MSG_NBR; ++i)
        zmq_send(skPub, &i, sizeof(int), 0);

    std::cout << "sending terminator" << std::endl;
    zmq_send(skPub, NULL, 0, 0);
}

No comments:

Post a Comment