Pages

Coordinating threads with pair sockets

The coordination among different threads is achieved in ZeroMQ using messages. Typically two threads get connected using pair sockets and sending/receiving messages to each other to exchange information. As example we are about to write a multithread application designed in this way: - The main thread creates a secondary thread, does some preparation job, then waits for the other thread to signal back, an finally completing its functionality. - The secondary thread creates another thread, that represents the real first step in the job, then when it is done, it lets it know to step 2. Let's see a way to implement the first step:
void step1(zmq::context_t& context) // 1
{
    print("step 1");
    boost::this_thread::sleep(boost::posix_time::milliseconds(50)); // 2

    zmq::socket_t xmitter(context, ZMQ_PAIR); // 3
    xmitter.connect("inproc://step2");

    boost::this_thread::sleep(boost::posix_time::milliseconds(50)); // 4
    print("tell to step2 we're ready");

    zmq::message_t message(0);
    xmitter.send(message); // 5

    print("step1 done");
}
1. Remember that the ZeroMQ context is the only object that could be shared among different threads in the same process. Actually we must use the same context, if we want different threads to connect. 2. Simulation of a real job. 3. Pair socket used to signal when this thread is done. Notice it is connected to step2 using the inproc protocol. 4. Some other things to do. 5. Send a message to signal we are done. The second step is a bit more complicated, since we have to manage two connections, one to step one, the other to the main thread:
void step2(zmq::context_t& context)
{
    print("step2");

    {
        print("step2A");

        zmq::socket_t receiver(context, ZMQ_PAIR); // 1
        receiver.bind("inproc://step2");

        print("creating thread for step1");
        boost::thread t1(std::bind(step1, std::ref(context))); // 2

        print("doing something");
        boost::this_thread::sleep(boost::posix_time::milliseconds(150)); // 3

        zmq::message_t message;
        receiver.recv(&message); // 4

        print("signal received form step1");
        t1.join();
    }

    {
        print("step2B");

        zmq::socket_t xmitter(context, ZMQ_PAIR); // 5
        xmitter.connect("inproc://main");

        print("doing something else");
        boost::this_thread::sleep(boost::posix_time::milliseconds(150));

        print("signal back to main");
        zmq::message_t message(0);
        xmitter.send(message); // 6
    }

    print("step2 done");
}
1. This pair socket estabilishes a connection to step 1. 2. The step 1 is executed in another thread. 3. Some useful job. 4. Pending on the socket, waiting the OK message to continue from step 1. 5. A second pair socket, this one is used to let a message going from this thread to main. 6. The job assigned to step 2 has been completed. And this the main code:
print("entering main function");
zmq::context_t context(1); // 1

print("bind inproc socket before starting step2");
zmq::socket_t receiver(context, ZMQ_PAIR); // 2
receiver.bind("inproc://main");

print("creating thread for step 2");
boost::thread t2(std::bind(step2, std::ref(context))); // 3

print("doing some preparation job");
boost::this_thread::sleep(boost::posix_time::milliseconds(200));

print("wait for step 2 to be completed");
zmq::message_t message;
receiver.recv(&message); // 4

t2.join();
print("job done");
1. This context object is the only one created for this process, and it is shared among all the threads. 2. We use a pair socket so that we can get a signal from step 2 when it completes. 3. A thread that runs on the function step2() is created. 4. This thread hangs here waiting for a message on the socket. I have left out till this moment the code for the logging function print(). It is defined in the anonymous local namespace - that means, it is local to the current file scope - and it uses a mutex, defined in the same context:
boost::mutex mio; // 1

void print(const char* message)
{
    boost::lock_guard<boost::mutex> lock(mio); // 2

    std::cout << boost::this_thread::get_id() << ": " << message << std::endl;
}
1. Since all the threads are going to compete on a shared resource, in this case the standard output console, we need a mutex. 2. Lock on the mutex, to avoid mixing up when writing the message. You can find the C code on which this C++ example is based in the Z-Guide.

No comments:

Post a Comment