Pages

Synchronized subscriber

We are about to write a C++ client for a synchronized ZeroMQ PUB/SUB application. More details in the cited post, and I guess it is a good idea to read before this one. Moreover, you should probably have a look at the post on the server, before going on reading here.

First thing, the client sets up its ZeroMQ context, and create a SUB socket connected to the server PUB, with an empty filter:
zmq::context_t context(1);

zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5561");
subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);

Then it ensures the server is available, checking if it is emitting an hello message:
{
std::cout << "Waiting for server" << std::endl;
zmq::message_t hello;
subscriber.recv(&hello);
std::cout << "Received hello message from server" << std::endl;
}

The server is up, so we connect to its reply socket to let it know another client is ready:
{
zmq::socket_t sync(context, ZMQ_REQ); // 1
sync.connect("tcp://localhost:5562");

zmq::message_t message(MSG_MESSAGE); // 2
sync.send(message);
sync.recv(&message); // 3
}

1. The request socket is used just here, so we create it in a local scope, letting it disappear at the end of it.
2. Send a synchronization request.
3. wait for synchronization reply.

The client main loop:
std::cout << "ready to receive messages" << std::endl;
int nMsg = 0; // 1
for(bool done = false; !done;) // 2
{
zmq::message_t message;
subscriber.recv(&message); // 3

switch(message.size()) // 4
{
case MSG_MESSAGE: // 5
++nMsg;
break;
case MSG_HELLO: // 6
std::cout << "Server is still waiting for other SUB" << std::endl;
break;
case MSG_END: // 7
default:
done = true;
break;
}
}

std::cout << nMsg << " messages received" << std::endl;

1. Counter for the messages actually received, we expect no loss in the communication.
2. We loop until we'll detect the terminator message. As explained talking about the server, we are using the hackish convention of sending the information just in the message size, and not in its actual body - not a very clean approach, I admit it, but it makes the code more concise.
3. Hang waiting for the next message from the server.
4. Check what the server has sent, as said in (2), we just have to check the message size to get it.
5. We have got a new proper message. The counter is increased to keep track of it.
6. When not all the expected clients are connected to the server, the already connected clients get hello messages that have no use here.
7. The message is a terminator (or something unexpected caught by the default label)

If you wonder why I wrote this post, the answer is buried in the Z-Guide.

No comments:

Post a Comment