Pages

PUSH-PULL with ZeroMQ 3.1: push ventilator

Now it is the time to refresh the divide and conquer example that I originally wrote in C++ for ZeroMQ 2.x (based on the Z Guide example, written in C, and currently still referring to 2.x). Here is rewritten to use the raw C ØMQ 3.1 interface.

First step is creating a ventilator, the process that in a divide and conquer pattern has to split the (usually huge) original task in many (relatively tiny) tasks that are going to be executed by the workers.

The ventilator will push the messages to all the pulling workers connected. Since the ventilator is going to push all of them very fast, it is a good idea to start all the workers before giving the green light to the ventilator to run.

The greenLight() function could be very simple:
void greenLight()
{
    std::cout << "Press Enter when all the workers are ready";
    std::string input;
    std::getline(std::cin, input);
}
The tasks to be execute are meaningless. The workers will get an int representing the number of milliseconds they have to sleep (simulating a job of variable length). To add a bit of interest in this step, I used the Boost random generator, implementing a commonly used Marsenne twister for a uniform distribution:
class VentiRand
{
private:
    boost::random::mt19937 generator_;
    boost::random::uniform_int_distribution<> random_;
public:
    VentiRand(int low, int hi) : random_(low, hi) {}

    int getValue() { return random_(generator_); }
};
This is the ventilator code:
void* context = zmq_init(1);
void* socket = zmq_socket(context, ZMQ_PUSH); // 1
zmq_bind(socket, "tcp://*:5557"); // 2

greenLight();
zmq_send(socket, NULL, 0, 0); // 3

VentiRand vr(1, 100);
int total = 0; // 4
for(int i = 0; i < 100; ++i)
{
    int workload = vr.getValue(); // 5
    total += workload;

    std::cout << workload << '.';
    if(zmq_send(socket, &workload, sizeof(int), 0) == -1) // 6
        std::cout << '!';
}

std::cout << "Total expected cost: " << total << " msec" << std::endl;
boost::this_thread::sleep(boost::posix_time::seconds(1));

zmq_close(socket);
zmq_term(context);
1. This socket is pushing the messages to all the connected puller.
2. This is the server in the PUSH-PULL pattern.
3. Let's send a first empty message, to signal that the ventilator is about to send the real messages.
4. Total expected cost in msecs for all the jobs generated.
5. The random workload is in the range from 1 to 100msecs, as specified by the VentiRand ctor.
6. In ZeroMQ 3.1, zmq_send() returns the number of bytes sent. Minus one means something wrong happened. Here the error handling is limited to dump an exclamation mark to the output console. Notice also that zmq_send() happily accepts the address of an int as message. It is our responsibility to avoid incongruences.

The code shouldn't look complicated, if you have already written some 0MQ code. The problem is you can't see it at work alone, you need to implement the worker before. Let see it in the next post.

No comments:

Post a Comment