Multithreading with ZeroMQ

If you are used to classical multithreading, you are going to be surprised from the approach taken by ZeroMQ. Mutexes and locks are not normally used, and the communication among different threads is usually performed through ØMQ sockets.

A ZeroMQ multithreading application is designed keeping in mind a few rules:

- Each process has its own ZeroMQ context, that is the only object that should be shared among threads. Nothing else, ZeroMQ socket included, should be shared.
- The threads in a process are connected by inproc sockets.
- A thread could have its own ZeroMQ context, but in this case it can't be connected to other threads in the same process using an inproc socket.

In this post, I use the C++ interface to ØMQ 2.1, if you are using version 3.1, you could be interested in another post, where I have also implemented a graceful way to shutdown the worker threads.

A well designed ZeroMQ application could be easily modified to switch from a multiprocess to a multithread support. For instance, let's have a look at the ZeroMQ broker example we have just seen.

The client doesn't change at all. It is going to have a REQ socket like this:
zmq::socket_t socket(context, ZMQ_REQ);
socket.connect("tcp://localhost:5559");
And send/receive through it a message and its reply as generated by the server.

The changes are all in the server. Originally it was built as a bunch of processes connected to the broker, now we rewrite it as a single multithreaded process. The broker is not running anymore in its own process, but it is part of the server itself.

This is a possible C++ implementation for the server main routine:
boost::thread_group threads; // 1
try
{
   zmq::context_t context(1);
   zmq::socket_t clients(context, ZMQ_ROUTER);
   clients.bind("tcp://*:5559");

   zmq::socket_t workers(context, ZMQ_DEALER);
   zmq_bind(workers, "inproc://workers"); // 2

   for(int i = 0; i < threadNbr; ++i) // 3
      threads.create_thread(std::bind(&doWork, std::ref(context))); // 4

   zmq::device(ZMQ_QUEUE, clients, workers); // 5
}
catch(const zmq::error_t& ze)
{
   std::cout << "Exception: " << ze.what() << std::endl;
}
threads.join_all(); // 6
1. To make the application more portable, we can use the Boost Thread library. Here we create a group of threads, that would contain all the service threads.
2. This is the major change in the code. The DEALER socket does not expects anymore a connection from other processes, but from other inproc sockets.
3. In the variable threadNbr we have precedently put the number of concurrent service we want to run. This value could be passed as input argument, or read from a configuration file.
4. We create a new thread, specifying the code it has to run, and a parameter that should be passed by reference to the actual function - doWork(). The parameter is the ZeroMQ context that, as we said, is the only object that we expect to be shared among different threads.
5. As before, we use ZeroMQ queue device to do the dirty job.
6. Currently this code is never executed, since the device is expected to run forever. But a more refined implementation should in any case take care of cleaning up.

The doWork() function is very close to the code that was executed by any single process in the previous version. The main differences are that here we don't create a new context, but use the one passed as parameter, enjoying the fact that it is thread-safe; and that the reply socket connects inproc to the broker:
void doWork(zmq::context_t& context)
{
   try
   {
      zmq::socket_t socket(context, ZMQ_REP); // 1
      socket.connect("inproc://workers"); // 2
      while(true)
      {
         zmq::message_t request;
         socket.recv(&request);

         std::string data((char*)request.data(), (char*)request.data() + request.size());
         std::cout << "Received: " << data << std::endl;

         boost::this_thread::sleep(boost::posix_time::seconds(1)); // 3

         zmq::message_t reply(data.length());
         memcpy(reply.data(), data.c_str(), data.length());
         socket.send(reply);
      }
   }
   catch(const zmq::error_t& ze)
   {
      std::cout << "Exception: " << ze.what() << std::endl;
   }
}
1. The socket is created using the process context.
2. Given (1), we can connect the socket inproc.
3. Using Boost sleep we write code easier to port on different platforms.

You can find the C code on which this C++ example is based in the Z-Guide.

1 comment:

  1. please provide how we can manage in php

    ReplyDelete