Pages

LRU Queue Device - using the device

After seeing what LRU queue broker clients and workers do, now it is finally time to have a look at the device itself.

Recalling what we said in the first post dedicated to LRU queue broker, I have designed the device as a class that it is used in this way:
QueueDevice device;

device.start(nClients, nWorkers);
device.poll();
There are a couple of constants of which we should be aware of:
const char* SK_ADDR_BACKEND = "inproc://backend";
const char* SK_ADDR_FRONTEND = "inproc://frontend";
They represent the address used by the backend and the frontend. Notice that the inproc protocol is specified, since we are developing a ZeroMQ multithread application.

The private class data members are:
zmq::context_t context_;
zmq::socket_t backend_;
zmq::socket_t frontend_;
std::queue<std::string> qWorkers_;
boost::thread_group thWorkers_;
boost::thread_group thClients_;
Constructor, destructor, and start() method are small fish:
QueueDevice() : context_(1), // 1
    backend_(context_, ZMQ_ROUTER), frontend_(context_, ZMQ_ROUTER) // 2
{
    backend_.bind(SK_ADDR_BACKEND); // 3
    frontend_.bind(SK_ADDR_FRONTEND);
}

~QueueDevice() // 4
{
    thWorkers_.join_all();
    thClients_.join_all();
}

void start(int nClients, int nWorkers) // 5
{
    for(int i = 0; i < nWorkers; ++i)
        thWorkers_.create_thread(std::bind(worker, std::ref(context_))); // 6

    boost::thread_group thClients;
    for(int i = 0; i < nClients; ++i)
    {
        int root = 42 + i * 10;
        thClients.create_thread(std::bind(client, std::ref(context_), root)); // 7
    }
}
1. As required, we initialize the 0MQ context.
2. Our device is based on two ROUTER ZeroMQ sockets, that are connecting it to the backend (the workers), and the frontend (the clients).
3. We bind our sockets to the expected address, as specified above.
4. The class dtor takes care of joining on all the threads we created for workers and clients.
5. The user specifies how many clients and workers should be created.
6. For each worker is created a new thread, and each of them is attached to the function worker(), that has as a parameter a reference to the ZeroMQ context stored as private data member.
7. The client threads creation is similar to worker ones, with the minor change that one more parameter, an integer used to distinguish the different clients and making testing more interesting.

More interesting is the poll() function:
void poll()
{
    zmq_pollitem_t items [] = // 1
    {
        { backend_,  0, ZMQ_POLLIN, 0 },
        { frontend_, 0, ZMQ_POLLIN, 0 }
    };

    while(zmq_poll(items, qWorkers_.empty() ? 1 : 2, 1000000) > 0) // 2
    {
        if(items[0].revents & ZMQ_POLLIN) // 3
        {
            receivingOnBackend();
            items[0].revents = 0; // 4
        }
        if(items[1].revents & ZMQ_POLLIN)
        {
            receivingOnFrontend();
            items[1].revents = 0;
        }
    }

    while(!qWorkers_.empty()) // 5
    {
        std::string id = qWorkers_.front(); // 6
        qWorkers_.pop();

        dump(id, "Terminating worker");

        zmq::message_t zmAddress(id.length());
        memcpy(zmAddress.data(), id.c_str(), id.length());

        backend_.send(zmAddress, ZMQ_SNDMORE); // 7
        zmq::message_t zmEmpty;
        backend_.send(zmEmpty, ZMQ_SNDMORE); // 8
        backend_.send(zmEmpty); // 9
    }
}
1. To poll on sockets we need to define an array of zmq_pollitem_t items, where we specify which socket to poll, which kind of polling perform (here POLL IN, polling for messages coming in input), and we set the flag "revents" to zero. This last flag is the one specifying if something happened on that socket.
2. Interesting line. We are polling on the array of pollitems, but if there is no element in the qWorkers queue, meaning no worker is available, we don't even poll on the frontend. The sense is that if we don't have workers available to do the job required by a client, we won't know what to do with that request. The last parameter, one million, is the number of microseconds that poll hangs waiting for something to be received. That is, one second. If at least a socket has something to be received, we enter the loop, otherwise the job is done. That would do, for this toy application. But obviously it is not a reliable assumption for stable code.
3. We check each pollitem, to see which one has something pending to be received. A private function is called to managed each case, receivingOnBackend() and receivingOnFrontend(), we'll see them in the next post, that this one is getting too long.
4. Finally the flag is reset, making it ready for the next iteration.
5. We get here when the looping is considered completed. The clients should shutdown by themselves, but we have to take care personally of the workers. We send a terminator to each worker in queue.
6. Get the least recent worker in queue, and pop it.
7. Specify as address for the message we are sending through the backend socket, the worker id we popped by the queue.
8. Send an empty frame as separator, as required by the ZeroMQ transmission protocol.
9. And finally send a terminator to the worker socket.

The full C++ source code for this example is available on github.

No comments:

Post a Comment