Pages

Improved divide and conquer

The simple ØMQ parallel processing application that we have implemented in the previous posts has a few minor flaws, the most noticeable of them is that the workers hang when the stream of messages terminates.

Now that we have seen how to poll on sockets, we can use this feature to solve this issue.

We want the sink sending a terminator when it detects the job has been completed. To do that we can't use the already existing socket connection between workers and sink, because it is a pull/push, that is one-directional, and here it goes from the workers to the sink. Since we want to send a message in the opposite direction, we create another socket connection, implementing a publisher/subscriber pattern. The sink would publish a kill message to all its subscribers, the workers.

Publisher

We change the code for the sink, creating a publisher socket that is going to send the terminating messages:
zmq::context_t context(1);

// ...

zmq::socket_t terminator(context, ZMQ_PUB);
terminator.bind("tcp://*:5559");

Then, after all the expected messages have been received and processed from the workers, an empty "killing" message is sent to the subscribers:
for(int task_nbr = 0; task_nbr < 100; task_nbr++)
{
   // ...
}

// ...

zmq::message_t kill(0);
terminator.send(kill);
boost::this_thread::sleep(boost::posix_time::seconds(1));

Subscribers

A bit more job is required in the workers.

Firstly, we create a subscription socket that connects to the publisher socket in the sink:
zmq::socket_t controller(context, ZMQ_SUB);
controller.connect("tcp://localhost:5559");
controller.setsockopt(ZMQ_SUBSCRIBE, "", 0);
Notice that we have to specify a filter even though, as in this case, we want it to be empty.

Secondly, since we already have in the worker code another socket that has to be checked for incoming messages (the one we called receiver, a pull socket connected to the push socket defined in the ventilator), we need to create an array of pollitem_t, so that we can actuall poll on them:
zmq::pollitem_t items [] =
{
   { receiver, 0, ZMQ_POLLIN, 0 },
   { controller, 0, ZMQ_POLLIN, 0 }
};
Thirdly, we change the while loop to poll on both sockets:
while(true)
{
   zmq::message_t msgIn;
   zmq::poll(items, 2); // 1

   if(items[0].revents & ZMQ_POLLIN) // 2
   {
      if(receiver.recv(&msgIn))
      {
         // ...
      }
      // ...
   }
   if(items[1].revents & ZMQ_POLLIN) // 3
   {
      std::cout << " Kill!" << std::endl;
      break;
   }
}
1. First change in the loop: we poll on the sockets, hanging forever (the underlying function API zmq_poll() is passed with its third parameter, timeout, set to -1) waiting for a new message.
2. Once poll() returns, signalling in this way that a message is available, we check which socket we should work with. The first one in the items array was the receiver, so in this branch we recv() its message, executing the same code we have precedently used.
3. Then we check for the controller. We know that when we get a message on the controller, that could mean only one thing: we have to shut down. So we don't even check the message, just break the loop.

In the official Z-Guide you will find more details on the solution design and the original C code on which I have based this C++ rewriting.

No comments:

Post a Comment