Pages

REQ-REP with ZeroMQ 3.1: server

To see what has changed from ØMQ 2.x to 3.1, I have rewritten my old simple request-reply example for the new library. And I found a few little surprises.

This time I won't use the C++ wrapper provided by ZMQ, but the naked C interface. The resulting code is more verbose, and the use of void pointers is alarming, for someone used to higher level programming, but we have a better chance to see the process in more details.

This server is designed to synchronously waiting for a client to connect, printing whatever it receives (assuming it a string of readable characters) and send back to the client an acknowledgment message. When the client sends an empty message, or when any error happens, the looping on receiving is interrupted and the server closes.

As we will see, sending a message is pretty straightforward, while receiving is a bit more complex. So I created a receive() function, called by the main loop.
void* context = zmq_init(1); // 1
void* socket = zmq_socket(context, ZMQ_REP); // 2
zmq_bind(socket, "tcp://*:50013"); // 3
while(true)
{
    if(!receive(socket)) // 4
        break;

    boost::this_thread::sleep(boost::posix_time::millisec(250)); // 5
    std::cout << "Sending acknowledgment to the client" << std::endl;
    char* buffer = "Ack";
    zmq_send(socket, buffer, strlen(buffer), 0); // 6
}

zmq_close(socket);
zmq_term(context);
1. Nothing has changed here. Before using any 0MQ functionality we have to initialize the a 0MQ context. The parameter passed is the ØMQ thread pool size, a value of zero make sense only for inproc uses.
2. This is the server, the guy who sits waiting to reply to the client, so here we need a socket type ZMQ_REP.
3. The binding specifies the protocol, tcp, a star to say that we accept connections from anywhere, and the port associated to this service.
4. See below, if the receive on the socket fails, or if the client explicitly asks the server to shutdown sending to it an empty message, receive() return false, and so the loop is interrupted.
5. Let's slow down the execution - I used Boost sleep() to keep the code as portable as possible. If you don't have the Boost libraries on your development environment you could remove this line or, better, add it now.
6. First surprise. The zmq_send() syntax has changed. Now it doesn't expect anymore a pointer to a zmq_msg_t structure, but a raw pointer to void and the number of bytes we want to send starting from that address. The first parameter is still a pointer to the current socket, and the last one flags for more option on sending. For the moment no special option is used, so a 0 suffices.

Let's see now how to receive:
const int MSG_SIZE = 64;

// ...

bool receive(void* socket)
{
    char buffer[MSG_SIZE]; // 1
    int size = zmq_recv(socket, buffer, MSG_SIZE, 0); // 2
    if(size < 0) // 3
    {
        std::cout << "Error code " << errno <<  ". Terminating." << std::endl;
        return false;
    }
    if(!size) // 4
    {
        std::cout << "Empty message received. Terminating." << std::endl;
        return false;
    }
    std::cout << "Received: ";
    std::for_each(buffer, buffer + size, [](char c){std::cout << c;}); // 5
    std::cout << std::endl;
    return true;
}
1. In ZeroMQ 3.1 we could use a plain char buffer to manage our messages. The nuisance of this solution is that we have to explicitly allocate an array with a known size.
2. Same as zmq_send(), we have to pass the pointer to the memory where it should store the message and the max size allowed. What if the incoming message is bigger? It is truncated. The returned value is the size of the message, as received. So it could happen that it is bigger than the actual size of the buffer, and this let us know that the message has been truncated.
3. If the returned value is less than zero an error occurred. The code for the error is stored in the errno global variable.
4. As designed for this simple example application, an empty message means a request from the client to shut down the server.
5. I thought it was kind of cool to dump the data received using a combination of STL for_each() and a lambda function, even though this is not the major point of the post.

Once you get how the server works, the client looks easy. But I am out of time, we'll see it in the next post.

No comments:

Post a Comment