Post on ASIO strand

IMHO, the ASIO strand example on the official Boost tutorial is a bit too complex. Instead of focusing on the matter, it involves also some ASIO deadline_timer knowledge, that makes sense in the tutorial logic, but I'd say make think blurred.

So I have written a minimal example that I hope would result more intuitive as a first introduction to this concept.

We have a class designed to be used in a multithread context, it has as data member a resource that is meant to be shared, and we have a couple of functions that modify that shared value, and could be called from different threads.

Usually what we do is relying on mutexes and locks to synchronize the access to the shared resource. ASIO provides us the strand class, as a way to serialize the execution of the works posted to it, making unnecessary explicit synchronization. But be aware that this is true only for the functions going to the same strand.

We want to write a piece of code like this:
namespace ba = boost::asio;

// ...

ba::io_service aios;
Printer p(aios, 10); // 1
boost::thread t(std::bind(&ba::io_service::run, &aios)); // 2
aios.run(); // 3
t.join(); // 4
1. See below for the class Printer definition. In a few words, it is going to post the execution of a couple of its functions on ASIO, both of them acting on the same shared resource.
2. We run a working thread on the ASIO I/O service.
3. Also the main thread is running on ASIO.
4. Wait for the worker completion, than end the execution.

So we have two threads running on ASIO. Let's see now the Printer class private section:
class Printer
{
private:
    ba::strand strand_; // 1
    int count_; // 2

    void print(const char* msg) // 3
    {
        std::cout << boost::this_thread::get_id() << ' ' << msg << ' ' << count_ << std::endl;
    }
    
    void print1() // 4
    {
        if(count_ > 0)
        {
            print("print one");
            --count_;
            strand_.post(std::bind(&Printer::print1, this));
        }
    }

// ...
};
1. We are going to operate on a Boost Asio strand object.
2. This is our shared resource, a simple integer.
3. A utility function that dumps to standard output console the current thread ID, a user message, and the shared resource.
4. Client function for (3), if the shared resource count_ is not zero, it calls (3), than decreases count_ and post through the strand a new execution of this function. There is another private function, print2(), that is exactly like print1(), it just logs a different message.

Since we are in a multithread context, these function should look suspicious. No mutex/lock? No protection to the access of count_? And, being cout an implicitly shared resource, we are risking to get a garbled output too.

Well, these are no issues, since we are using a strand.

But let's see the Printer ctor:
Printer(ba::io_service& aios, int count) : strand_(aios), count_(count) // 1
{
    strand_.post(std::bind(&Printer::print1, this)); // 2
    strand_.post(std::bind(&Printer::print2, this));
}
1. Pay attention to how the private ASIO strand object is constructed from the I/O service.
2. We prepare the first runs, posting on the strand the execution of the private functions.

What happens is that all the works posted on the strand are sequentially executed. Meaning that a new work starts only after the previous one has completed. There is no overlapping, no concurrency, so no need of locking. Since we have two threads available, ASIO will choose which one to use for each work execution. We have no guarantee on which thread is executed what.

We don't have the troubles associated with multithreading, but we don't have some of its advantages either. Namely, when running on a multicore/multiprocessor machine, a strand won't use all the available processing power for its job.

The full C++ source code for this example is on github.

Go to the full post

Fibonacci with ASIO

The Fibonacci function is commonly used for writing testing code, because it is conceptually easy without being fully uninteresting. In this blog I have already used it a couple of times, once when showing how to implement a C++11 lambda function, then as a mean to show where standard C++11 multithreading could come in handy.

Now I use Fibonacci to show a more sensible example of an ASIO multithread application.

In the previous post, we have seen how to explicitly run and stop the ASIO I/O service, here we'll rely on the fact that ASIO would automatically end the I/O service when it has no more work to do. The trick is to post all the tasks to the service before starting the working threads. When they will go out of jobs, ASIO will end.

The worker function is still the same simple call to run() on the I/O service, with some logging added for testing purpose:
void worker(ba::io_service& aios)
{
    dump("start worker");
    aios.run();
    dump("end worker");
}
Where I have defined ba to be a synonym for boost::asio.

Our main thread is about to post to ASIO as job a few call to this function:
void calculate(unsigned int input)
{
    dump("input", input);
    int result = fibonacci(input);
    dump("output", result);
}
Finally, it calls this recursive function, trivial implementation of Fibonacci:
unsigned int fibonacci(unsigned int n)
{
    if(n < 2)
        return n;
    return fibonacci(n - 1) + fibonacci(n - 2);
}
And here is the interesting part, where the main thread prepares the ASIO I/O service, spawns a few worker threads, and waits till the job is done:
ba::io_service aios; // 1

dump("starting up");
aios.post(std::bind(calculate, 35)); // 2
aios.post(std::bind(calculate, 30));
aios.post(std::bind(calculate, 20));
aios.post(std::bind(calculate, 10));

boost::thread_group threads;
for(int i = 0; i < 2; ++i) // 3
    threads.create_thread(std::bind(worker, std::ref(aios)));

dump("ready to join");
threads.join_all(); // 4
dump("job done");
1. No fake work is associated to the ASIO I/O service created here.
2. A few jobs are posted on the service.
3. A couple of threads are created on the worker function seen above. So, each thread calls run() on the service, signaling that it is available to process a pending job. ASIO will take care of assigning a new task to each thread, and when a thread finishes its work on a job, it assigns to it the next available one. And so on till there is nothing more to do.
4. The main thread waits here till all the worker threads are done, meaning, ASIO has assigned all the pending tasks to them, and they have completed each run.

The full source C++ code for this example is on github.

Go to the full post

Run and stop an ASIO I/O service

This example doesn't do anything useful, but should clarify how you could use Boost ASIO to control the execution of a multithread application.

Our main thread will create a bunch of worker threads, do some job, and finally terminate gracefully.

ASIO would be used to synchronize the job. The I/O service is created by the main thread and passed to the worker by reference, remember that it is a non-copyable object, it won't make sense to pass an ASIO I/O service by value, and if you try to do that, you are going to have a compiler error.

The worker threads are running a dummy function that just dumps a couple of messages, one before, and one after calling run() on the I/O service.

The main thread has provided a dummy I/O service work object, so that the run() called by the workers result in let them hang on it. At this point the main thread will be the only active one, and it would do whatever is its job before calling the stop() function on the ASIO I/O service. That would let terminate the execution of run() on each worker.

Last duty of the main thread is join all its spawned threads, and then it could happily terminate.

The code run by the workers won't be anything more than this:
void worker(boost::asio::io_service& aios, int id) // 1
{
    dump(id, "first step"); // 2
    aios.run(); // 3
    dump(id, "last step");
}
1. The ASIO I/O service object is passed by reference, we are actually working on the same object of the caller.
2. We are in a multithread environment, and we are accessing a shared resource, the standard output console. We have to rule its access by a mutex, if we want to avoid unpleasant mixups, and this is what the dump() function does.
3. Let's run the service.

Here is the code executed by the main thread:
boost::asio::io_service aios; // 1
boost::asio::io_service::work work(aios); // 2

boost::thread_group threads; // 3
for(int i = 0; i < 6; ++i )
    threads.create_thread(std::bind(worker, std::ref(aios), i)); // 4

dump("Main thread has spawned a bunch of threads");
boost::this_thread::sleep(boost::posix_time::seconds(1)); // 5
aios.stop(); // 6
dump("Main thread asked ASIO io service to stop");
threads.join_all(); // 7
dump("Worker threads joined");
1. The ASIO I/O service object is created. 2. A dummy ASIO work is put on the service. 3. We want to manage a group of threads, the Boost class thread_group has been designed exactly for that. 4. The worker function has to be adapted to become suitable for the create_thread() request, that's way I used std::bind(). Besides, I need to enforce that the ASIO I/O service object is passed by reference, and that is the reason for using std::ref(). 5. Using this framework just to let the main thread to take a one second nap is a kind of overkilling, but I guess you see the point. 6. Main thread is ready to terminate, so it issues a request to ASIO to stop its work. 7. And after ensuring all the threads have joined, we can terminated the application. The full C++ source code for this example is freely available on github.

Go to the full post

LRU Queue Broker version two

When working with the extended zmq socket I have introduced in the previous post, writing a ZeroMQ application like the LRU queue device previously described, gets easier.

OK, the comparison is not 100% fair, because when rewriting the example, I have not just used my brand new zmq::Socket, but I have also changed a bit the application login. Now the client is not sending an integer used by the worker to do some job (actually, sleeping), but just a string (its ID) that would be echoed back. It wouldn't be difficult to reintroduce the original behavior in this version, but I found that in this way the example is more readable. Besides, this version lacks almost completely of any error handling. This does not differ much from the previous version, still you don't want to keep such a wishful attitude in your production code.

[After a while, I have changed my Socket class to manage differently multipart messages, this impacted on this code, see my post where I say something more on the broker changes, still, the general structure of the code shown here is the same]

Saying that, here is how the client function looks now, compare it with the original LRU device client:
void client(zmq::context_t& context)
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id());
    zmq::Socket skClient(context, ZMQ_REQ, id); // 1
    skClient.connect(SK_ADDR_FRONTEND);
    dump(id, "client is up");

    skClient.send(id); // 2
    dump(id, "client request sent");

    std::string reply = skClient.recvAsString(); // 3
    dump(reply, "received by client");
}
1. Create a ØMQ request socket with the specified ID.
2. Send a std::string on the socket.
3. Receive a std::string on the socket.

The code is slimmer, terser, easier to write, read, and modify.

If you liked the impact of zmq::Socket on the client, you are going to love what it does to the LRU device worker:
void worker(zmq::context_t& context)
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id());
    zmq::Socket skWorker(context, ZMQ_REQ, id);
    skWorker.connect(SK_ADDR_BACKEND);

    zmq::Frames frames(2); // 1
    while(true)
    {
        skWorker.send(frames); // 2

        frames = skWorker.blockingRecv(2, false); // 3
        if(frames.size() != 2)
        {
            dump(id, "terminating");
            return;
        }
        dump(frames[0], "client id");
        dump(frames[1], "payload");
    }
}
1. Remember that Frames is a std::vector of std::string's. Here we are creating it with two elements set to empty.
2. First time we send a multipart message containing just empty frames (as expected), otherwise the received multipart message is sent back to the caller.
3. The call returns a dummy Frames (for terminating) or a Frames with the client ID and the payload. If your C++ compiler is recent, and implements the C++11 move syntax, this assignment is going to be as cheap as a few pointers moved around.

The device job of receiving on back- and frontend gets cleaner too:
void receivingOnBackend()
{
    zmq::Frames input = backend_.blockingRecv(3, false); // 1

    dump(input[0], "registering worker");
    qWorkers_.push(input[0]);

    if(input.size() == 3)
    {
        zmq::Frames output(input.begin() + 1, input.end()); // 2
        frontend_.send(output);
    }
}

void receivingOnFrontend()
{
    zmq::Frames input = frontend_.blockingRecv(2); // 3

    dump(input[0], "client id received on frontend");
    dump(input[1], "payload received on frontend");

    std::string id = qWorkers_.front();
    qWorkers_.pop();
    dump(id, "selected worker on frontend");

    zmq::Frames output;
    output.reserve(3); // 4
    output.push_back(id);
    output.insert(output.end(), input.begin(), input.end());

    backend_.send(output);
    dump(id, "message sent to worker");
}
1. The backend socket can receive two different kind of messages: just the worker ID, signalling that it is ready, or the worker ID followed by the client ID and the effective message payload.
2. If the backend sent a "real" message, the device should simply discard the worker ID (first element in the input Frames) and send the resulting multipart message to the frontend.
3. a message on the frontend is always made of two frames, a client ID, and the payload.
4. The device adds at the beginning a worker ID and then forward the multipart message as received by the client to the worker.

The full C++ code for this example is on github. I developed it for ZeroMQ 2.2.0 on MSVC 2010. Notice that the head version on github has differences with the code reported here, now it is not anymore a Socket responsibility to manage separators in multipart messages, but are managed in the user code, that means, here.

Go to the full post

Extending zmq::socket_t

It took me some time to work my way through the ØMQ LRU queue broker example. If I think how complex could be implementing the same functionality with a different framework, I won't complain much. Still, one could wonder if there is a way of simplifying the most tedious, and error prone, part of its coding. The 0MQ guys have an answer to this, a high level API for ZeroMQ, meaning a C binding built on top the basic, low level, C API. You can see it and get it from the czmq page on the ZeroMQ official site.

The czmq API is nice and good, but I would prefer to stick to the C++ light weight interface. It is not optimal, either, but it integrates better with my C++ code.

So, what I have done is extending the C++ zmq::socket_t class to provide multipart send() and receive(). It is not a big thing, but the impact on the resulting code, as I'll show you in a next post, is nice, making it easier to write and maintain. Notice that I have lazily written it only for 0MQ 2.x and for Windows by MSVC2010. I would consider it more a proof of concept than a stable piece of code.

You can see the full C++ source code for my zmq::Socket class on github (a new version for zmq::Socket is now available, follow the link for details). Here I have jotted down a few notes on what I think are the more interesting points:
// ...

namespace zmq
{
    typedef std::vector<std::string> Frames; // 1

    class Socket : public socket_t // 2
    {
    public:
        // ...

        bool send(const Frames& frames) // 3
        {
            if(!frames.size()) // 3a
                throw error_t();

            for(unsigned int i = 0; i < frames.size() - 1; ++i) // 3b
            {
                if(!send(frames[i], ZMQ_SNDMORE))
                    return false;
                if(!sendSeparator())
                    return false;
            }

            return send(frames.back()); // 3c
        }

        Frames blockingRecv(int n, bool checked =true) // 4
        {
            Frames frames;
            frames.reserve(n);

            int currentFrame = 1;
            do {
                zmq::message_t message;
                if(!socket_t::recv(&message, 0)) // 4a
                    throw error_t();

                if(!(currentFrame++ % 2)) // 4b
                {
                    if(message.size())
                        throw error_t();
                }
                else
                {
                    const char* base = static_cast<const char*>(message.data());
                    frames.push_back(std::string(base, base + message.size())); // 4c
                }
            } while(sockopt_rcvmore()); // 4d

            if(checked && frames.size() != n)
                throw error_t();

            return frames;
        }
        
        // ...

    private:
        // ...

        bool sockopt_rcvmore() // 5
        {
            int64_t rcvmore;
            size_t type_size = sizeof(int64_t);
            getsockopt(ZMQ_RCVMORE, &rcvmore, &type_size);
            return rcvmore ? true : false;
        }
    };
}
1. Instead of sending each frame in a multipart message as a different message, I am going to provide alternative methods that manage more frames in a single chunk. Frames is just a alias for a vector of strings representing a multipart message (stripped by the zero-length separator).
2. My Socket class IS-A zmq::socket_t, so it derives publicly from it.
3. Send for multipart messages. Remember that no separator (the empty frames between one "real" frame and the other) should be included in the vector of frames.
3a. It is expected at least a frame.
3b. All the frame but the last one should be managed in the same way. We send each of them followed by a separator, and all of them should be sent in "send more" mode. The actual job is done by a couple of methods, that prepare the zmq::message_t and then send it through the underlying socket. See full code for details (or follow the link for detail on its new version).
3c. Last frame is different. There is no separator after, and it should be send as "single/last" frame.
4. Receive method for multipart messages. For the current example, I needed just to implement blocking receive. A non blocking receive implementation would need some more planning, so I delayed its design to a better time. The function gets in input two parameters, the number of expected frames, and if a flag to specify what to do if actually there is a different number of frames. If we are positive on our request, we could specify that checked is true, meaning that in the mismatch case an exception is thrown. Otherwise the passed number of frames is considered just as a performance hint.
4a. Anything goes wrong this function throws an exception.
4b. I don't want any separator in the frame vectors, so I skip them, after ensuring that they are zero sized. Otherwise something weird should have happened.
4c. All the "real" frames are converted in std::string's and pushed in the container to be returned to the user.
4d. We go on looping till the message is not the last one in a multipart series, or it actually is a stand alone message. See (5) for details on how the check is done. Using a do-while loop should be the most natural choice, since we have to loop at least once, and the loop check has to be performed after the action (of receiving a frame, in this case) is executed.
5. Check the socket option "receive more" to see if the last received message has a next one to be fetched or not. Notice that this code has been designed for ZeroMQ version 2.x, so the option is a 64 bit object. For ZeroMQ version 3 we should have used a plain int instead.

Go to the full post

LRU Queue Device - receiving on the routers

Last big chunk left in the development of our sample LRU queue broker is its couple of private methods that take cares of receiving messages on its two router sockets. If you don't have a clue of what I am talking about, you should know that this is a post in a series dedicated to a ZeroMQ device that gets input messages from a few clients, uses a bunch of workers to elaborate a result, that is sent back to the original client. The worker choice is made on a Least Recently Used algorithm, and the available worker identities are stored in a queue. That should explain at least the name of the post. A better introduction is given in a previous post.

The two functions I am about to talk about here are used by the poll() function that is described in the post where the class public interface is discussed.

In the private section of this class are also defined a bunch of private data members:
zmq::context_t context_;
zmq::socket_t backend_;
zmq::socket_t frontend_;
std::queue<std::string> qWorkers_;
boost::thread_group thWorkers_;
boost::thread_group thClients_;
They represent the ZeroMQ context, the two ROUTER sockets, the queue that keeps the worker ids currently available, and the worker and client threads.

Let's see what we do when a message is pending on the backend socket:
void receivingOnBackend()
{
    zmq::message_t zmWorkerId; // 1
    backend_.recv(&zmWorkerId);

    zmq::message_t zmDummy;
    backend_.recv(&zmDummy);

    zmq::message_t zmClientId; // 2
    backend_.recv(&zmClientId);

    backend_.recv(&zmDummy);

    zmq::message_t zmPayload; // 3
    backend_.recv(&zmPayload);

    const char* base = static_cast<const char*>(zmWorkerId.data());
    std::string id(base, base + zmWorkerId.size());
    dump(id, "registering worker");
    qWorkers_.push(id); // 4

    if(zmClientId.size()) // 5
    {
        frontend_.send(zmClientId, ZMQ_SNDMORE); // 6
        frontend_.send(zmDummy, ZMQ_SNDMORE);
        frontend_.send(zmPayload);
    }
}
1. Messages are multipart. The first frame is the worker id, we need to store it somewhere, so that we can use it to send a reply to it.
2. Then we have an empty frame, conceptually a separator between "real" frames, and then a message that represent the id of the client that originally sent a message to be elaborated by the worker.
3. Another separator, and finally we have the actual payload, the result of the job done by the worker that we want to pass back to the client.
4. First thing that we have to do now, is adding the worker id on our queue, marking it in this way as available for a new task.
5. Actually, there is a special case. The first time a worker sends a message to the device, it is just to signal that it is up, and no real client id (and payload either) is actually passed. This is detected by this check on the client id frame size. A zero size means there is nothing to do more.
6. Otherwise, we have to forward to the frontend the message as received from the worker, stripping down the envelope containing the worker address.

And this is the frontend socket message management:
void receivingOnFrontend()
{
    zmq::message_t zmIdCli;
    frontend_.recv(&zmIdCli); // 1
    dump("Receiving on frontend from", zmIdCli);

    zmq::message_t zmDummy;
    frontend_.recv(&zmDummy);

    zmq::message_t zmPayload;
    frontend_.recv(&zmPayload); // 2

    std::string idWork = qWorkers_.front(); // 3
    qWorkers_.pop();
    dump(idWork, "selected worker");

    zmq::message_t zmIdWork(idWork.size());
    memcpy(zmIdWork.data(), idWork.c_str(), idWork.size());
    backend_.send(zmIdWork, ZMQ_SNDMORE); // 4
    backend_.send(zmDummy, ZMQ_SNDMORE);
    backend_.send(zmIdCli, ZMQ_SNDMORE);
    backend_.send(zmDummy, ZMQ_SNDMORE);
    backend_.send(zmPayload);

    dump(idWork, "message sent to worker");
}
1. The first frame in the multipart message contains the client id.
2. After a separator frame, we have the payload.
3. We should send that message to a worker, so we get the first worker id from the queue, that represents the last recently used worker. Than we pop it from the queue, the worker is not available anymore.
4. The first frame in the multipart message that we send through the backend socket, is the address of the worker that should receive it, than we send the client id, and finally the payload. Remember to always send an empty frame between one "real" frame and other.

Go to the full post

LRU Queue Device - using the device

After seeing what LRU queue broker clients and workers do, now it is finally time to have a look at the device itself.

Recalling what we said in the first post dedicated to LRU queue broker, I have designed the device as a class that it is used in this way:
QueueDevice device;

device.start(nClients, nWorkers);
device.poll();
There are a couple of constants of which we should be aware of:
const char* SK_ADDR_BACKEND = "inproc://backend";
const char* SK_ADDR_FRONTEND = "inproc://frontend";
They represent the address used by the backend and the frontend. Notice that the inproc protocol is specified, since we are developing a ZeroMQ multithread application.

The private class data members are:
zmq::context_t context_;
zmq::socket_t backend_;
zmq::socket_t frontend_;
std::queue<std::string> qWorkers_;
boost::thread_group thWorkers_;
boost::thread_group thClients_;
Constructor, destructor, and start() method are small fish:
QueueDevice() : context_(1), // 1
    backend_(context_, ZMQ_ROUTER), frontend_(context_, ZMQ_ROUTER) // 2
{
    backend_.bind(SK_ADDR_BACKEND); // 3
    frontend_.bind(SK_ADDR_FRONTEND);
}

~QueueDevice() // 4
{
    thWorkers_.join_all();
    thClients_.join_all();
}

void start(int nClients, int nWorkers) // 5
{
    for(int i = 0; i < nWorkers; ++i)
        thWorkers_.create_thread(std::bind(worker, std::ref(context_))); // 6

    boost::thread_group thClients;
    for(int i = 0; i < nClients; ++i)
    {
        int root = 42 + i * 10;
        thClients.create_thread(std::bind(client, std::ref(context_), root)); // 7
    }
}
1. As required, we initialize the 0MQ context.
2. Our device is based on two ROUTER ZeroMQ sockets, that are connecting it to the backend (the workers), and the frontend (the clients).
3. We bind our sockets to the expected address, as specified above.
4. The class dtor takes care of joining on all the threads we created for workers and clients.
5. The user specifies how many clients and workers should be created.
6. For each worker is created a new thread, and each of them is attached to the function worker(), that has as a parameter a reference to the ZeroMQ context stored as private data member.
7. The client threads creation is similar to worker ones, with the minor change that one more parameter, an integer used to distinguish the different clients and making testing more interesting.

More interesting is the poll() function:
void poll()
{
    zmq_pollitem_t items [] = // 1
    {
        { backend_,  0, ZMQ_POLLIN, 0 },
        { frontend_, 0, ZMQ_POLLIN, 0 }
    };

    while(zmq_poll(items, qWorkers_.empty() ? 1 : 2, 1000000) > 0) // 2
    {
        if(items[0].revents & ZMQ_POLLIN) // 3
        {
            receivingOnBackend();
            items[0].revents = 0; // 4
        }
        if(items[1].revents & ZMQ_POLLIN)
        {
            receivingOnFrontend();
            items[1].revents = 0;
        }
    }

    while(!qWorkers_.empty()) // 5
    {
        std::string id = qWorkers_.front(); // 6
        qWorkers_.pop();

        dump(id, "Terminating worker");

        zmq::message_t zmAddress(id.length());
        memcpy(zmAddress.data(), id.c_str(), id.length());

        backend_.send(zmAddress, ZMQ_SNDMORE); // 7
        zmq::message_t zmEmpty;
        backend_.send(zmEmpty, ZMQ_SNDMORE); // 8
        backend_.send(zmEmpty); // 9
    }
}
1. To poll on sockets we need to define an array of zmq_pollitem_t items, where we specify which socket to poll, which kind of polling perform (here POLL IN, polling for messages coming in input), and we set the flag "revents" to zero. This last flag is the one specifying if something happened on that socket.
2. Interesting line. We are polling on the array of pollitems, but if there is no element in the qWorkers queue, meaning no worker is available, we don't even poll on the frontend. The sense is that if we don't have workers available to do the job required by a client, we won't know what to do with that request. The last parameter, one million, is the number of microseconds that poll hangs waiting for something to be received. That is, one second. If at least a socket has something to be received, we enter the loop, otherwise the job is done. That would do, for this toy application. But obviously it is not a reliable assumption for stable code.
3. We check each pollitem, to see which one has something pending to be received. A private function is called to managed each case, receivingOnBackend() and receivingOnFrontend(), we'll see them in the next post, that this one is getting too long.
4. Finally the flag is reset, making it ready for the next iteration.
5. We get here when the looping is considered completed. The clients should shutdown by themselves, but we have to take care personally of the workers. We send a terminator to each worker in queue.
6. Get the least recent worker in queue, and pop it.
7. Specify as address for the message we are sending through the backend socket, the worker id we popped by the queue.
8. Send an empty frame as separator, as required by the ZeroMQ transmission protocol.
9. And finally send a terminator to the worker socket.

The full C++ source code for this example is available on github.

Go to the full post

LRU Queue Device - worker

Before taking care of the LRU queue broker itself, and after having seen what its client does, it is time to look to the other side of the device, the worker threads.

Our device is designed to run a user defined number of workers. Each worker is associated to a thread, and each of them runs this function:
const char* SK_ADDR_BACKEND = "inproc://backend"; // 1

// ...

void worker(zmq::context_t& context) // 2
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 3
    zmq::socket_t skWorker(context, ZMQ_REQ); // 4
    zmq_setsockopt(skWorker, ZMQ_IDENTITY, id.c_str(), id.length());
    skWorker.connect(SK_ADDR_BACKEND);

    std::string receiver;
    int payload = 0;
    while(true)
    {
        zmq::message_t zmReceiver(receiver.length()); // 5
        memcpy(zmReceiver.data(), receiver.c_str(), receiver.length());
        skWorker.send(zmReceiver, ZMQ_SNDMORE); // 5a

        zmq::message_t zmDummy;
        skWorker.send(zmDummy, ZMQ_SNDMORE); // 5b

        zmq::message_t zmOutput(sizeof(int));
        memcpy(zmOutput.data(), &payload, sizeof(int));
        skWorker.send(zmOutput); // 5c

        zmq::message_t zmClientId;
        skWorker.recv(&zmClientId); // 6
        dump(id, zmClientId);

        if(!zmClientId.size()) // 7
        {
            dump(id, "terminating");
            return;
        }
        const char* base = static_cast<const char*>(zmClientId.data());
        receiver = std::string(base, base + zmClientId.size()); // 8

        skWorker.recv(&zmDummy); // 9

        zmq::message_t zmPayload;
        skWorker.recv(&zmPayload); // 10
        if(zmPayload.size() != sizeof(int)) // 11
        {
            dump(id, "bad payload detected");
            return;
        }

        payload = *(int*)zmPayload.data(); // 12
        dump(id, payload);

        boost::this_thread::sleep(boost::posix_time::millisec(payload));
    }
}
1. We are developing a multithread ZeroMQ application, where the sockets are connected on the inproc protocol. The name I choose for the backend connection between the device and the workers is "backend".
2. As we should remember, the 0MQ context is the only object that could be shared among different threads, and actually it should be shared among them when, as in this case, we want to have a data exchange.
3. Usually we don't care to specify the socket identity, and we let ØMQ to do it. Here it is done as a way to make easier to test the application. That's why I used the Boost thread id.
4. To this point, the worker acts just like the client. Both of them are REQUEST sockets, connected inproc to a ROUTER socket in the device. The difference is on what is going to happen next. The worker is going to send a dummy message to let the device know it is up and running, than it waits for a reply, doing some job on it, sends an answer back and waits for a new job, till it gets a terminating message.
5. It takes eight lines to send a request to the device. And the first time, as said in (4), it is just a dummy. The trouble is that we are sending a multipart message, and there is no easy way to do it, out of taking care of the gory details.
As first part (5a) we are sending a character string representing the address of the client that asked for this job. The first time we have no associated client, we are just signalling to the device that we are ready, so we are actually sending an empty frame.
Second part (5b) is a zero sized frame, seen from ZeroMQ as a separator.
Third and last part (5c) is the payload, representing the result of the job performed by the worker. For the first loop this value is meaningless.
6. Now the worker hangs, waiting to receive a reply on the socket. As the name of the variable suggests, what we expect to get in it is the client id, that we'll use to send back an answer in (5a).
7. If no client id is specified, we interpret the message as a termination request, so we stop looping.
8. Otherwise, we convert the raw byte array in a proper C++ string, and we store it in the local variable that will be used in (5a).
9. Next frame is a dummy, we just ignore it.
10. Finally we receive the payload, the read stuff we should work with.
11. This sample application is designed so that only int are expected here, anything else is considered a catastrophic failure (real production shouldn't behave like this, as I guess you well know).
12. The integer in the last frame of the message is converted to an int, and used by the worker to, well, determine how long it should sleep. The value itself, without any change, is going to be used in (5c) to be sent back to the client.

The full C++ source code for this example is on github.

Go to the full post

LRU Queue Device - client

If you have read the previous post, it should be about clear how the LRU queue broker that we are going to create should work. Here we are focusing on the clients used by the device.

When we feel ready, we call the start() public function that creates a thread for each client, associating it to this function:
const char* SK_ADDR_FRONTEND = "inproc://frontend"; // 1

// ...

void client(zmq::context_t& context, int value) // 2
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 3
    zmq::socket_t skClient(context, ZMQ_REQ); // 4
    zmq_setsockopt(skClient, ZMQ_IDENTITY, id.c_str(), id.length());
    skClient.connect(SK_ADDR_FRONTEND);

    dump(id, "client is up"); // 5

    zmq::message_t zmPayload(sizeof(int)); // 6
    memcpy(zmPayload.data(), &value, sizeof(int));
    skClient.send(zmPayload);

    dump(id, "client request sent");

    zmq::message_t zmReply;
    skClient.recv(&zmReply); // 7

    if(zmReply.size() == sizeof(int)) // 8
    {
        int reply = *((int*)zmReply.data());
        dump(id, reply);
    }
    else // unexpected
        dump(id, zmReply);
}
1. This is a multithread ZeroMQ application, the sockets are connected on the inproc protocol. The name I choose for the frontend connection device-clients is "frontend".
2. The synchronization among threads in a 0MQ application happens sharing a 0MQ context object. The second parameter passed to the function is just a value different for each client, so that we can keep track of what is going on in our testing.
3. Here we could have let ØMQ to choose the socket identity, but having a determined socket id for each client is useful from a testing point of view. I guessed the most natural choice was picking up the Boost thread id. Since there is no direct way to see that id as a string, we have to explicitly cast it using the Boost lexical_cast operator.
4. Each client has its own REQ socket, for which we set the identity as described in (2), and that we connect to the frontend router socket defined in our device.
5. We are in a multithread environment, so it is not safe to use a shared resource without protecting it through a mutex. That's why I wrote a few dump() functions to give some basic feedback to the user.
6. What this client does is simply sending the value it gets as input parameter to the device that create it by the socket that connect them. Pretty meaningless, I agree, but it should do as example.
7. This is a REQ socket, we have sent our request, now we pend indefinitely for a reply.
8. I have removed almost all the error checking from the application, to keep it as readable as possible. But I couldn't help to add just a few minimal checks like this one. We are expecting an integer as a reply (actually, it should be just an echo of the integer that we sent), so I ensure that the received message size would be as expected. If so, I extract the int from the message, and print it to the console as a such. Otherwise I print the reply for debug purpose as it would be a character string.

The C++ source code for this example is on github.

Go to the full post

LRU Queue Device - general view

Implementing the Least Recently Used pattern for ZeroMQ was a longish but not complicated job. Here things are getting tougher, since we want to create a device to rule an exchange of messages from a bunch of clients to a bunch of workers, again using the LRU pattern.

This example is based un the Request-Replay Message Broker described in the ZGuide. I have reworked it a bit, both to adapt it to the Windows environment (no ipc protocol supported there) and both to make it clearer, at least from my point of view. I used 0MQ 2.2.0, its standard light-weight C++ wrapper, and Visual C++ 2010 as developing environment.

As they put it in the ZGuide, "Reading and writing multipart messages using the native ØMQ API is like eating a bowl of hot noodle soup, with fried chicken and extra vegetables, using a toothpick", and since that is what we are actually going to do, don't be suprised if sometime it could look to you overwhelmingly complex and messy. Be patient, try to work your way though the example one step after the other, and in the end you will find out that it is not so bad, after all.

The idea is that we want to have a class, QueueDevice, that is going to be used in this way:
void lruQueue(int nWorkers, int nClients)
{
    QueueDevice device;

    device.start(nClients, nWorkers); // 1
    device.poll(); // 2
}
1. We specify the number of clients and workers insisting on our device when we start it.
2. Then we poll on the device till some condition is reached.

The QueueDevice class is going to have a bunch of private data members:
class QueueDevice
{
    // ...
private:
    zmq::context_t context_; // 1
    zmq::socket_t backend_; // 2
    zmq::socket_t frontend_;
    std::queue<std::string> qWorkers_; // 3
    boost::thread_group thWorkers_; // 4
    boost::thread_group thClients_;

    // ...
};
1. First of all, a ZeroMQ context.
2. A couple of sockets, both ROUTER as we'll see in the ctor, one for managing the backend, AKA the connection to the workers, the other for the frontend, meaning the clients.
3. I am going to use a queue to temporary store in the device which worker is available to the clients. We want to implement an LRU pattern, so a queue is just the right data structure.
4. We are going to spawn some threads for the workers and other for the clients. Actually we could have used just one thread_group, but I guess it is more readable having two of them.

There are going to be a few public methods in the QueueDevice class:
  • A constructor to initialize the sockets.
  • A destructor to join all the threads.
  • A start() method to actually start the worker and client threads.
  • A poll() method to, well, poll on the backend and frontend sockets.
The QueueDevice start() method is going to need to access a worker() and a client() fuction representing the jobs executed on the frontend and backend sides of the device.

Finally, we'll have a bunch of dumping functions to be used by about all the many threads that are going to constitute our application to print some feedback for the user on the standard console. As usual, being std::cout a shared resource, we'll need to use a mutex to avoid unpleasent mixing up.

That should be all. In the next posts I'll go through the details. The full C++ source code for this example is available on github.

Go to the full post

LRU Pattern - putting all together

Following the design explained in the ZGuide, I have written a porting to C++ of a simple application that implements the Least Recently Used messaging pattern, using the light-weight ZeroMQ C++ default wrapper, for Windows-MSVC 2010, linking to ØMQ 2.2.0, currently the latest ZeroMQ stable version.

You can find the complete C++ code for this example on github, and in the previous two posts some comments on the client side and on the worker.

There is still a tiny bit of code I should talk about, the dumpMessage() functions:
boost::mutex mio; // 1

void dumpMessage(const std::string& id, int value) // 2
{
    boost::lock_guard<boost::mutex> lock(mio); // 3
    std::cout << id << ' ' << value << std::endl;
}

void dumpMessage(const std::string& id, const char* msg)
{
    boost::lock_guard<boost::mutex> lock(mio);
    std::cout << id << ' ' << msg << std::endl;
}
1. In the rest of the code, all the synchronization among threads is done by 0MQ message exchanges, so we don't need mutexes and locks. But here we have to deal with many threads competing on the same shared resource, namely the standard output console. Ruling its access with a mutex is the most natural solution, I reckon.
2. I provide two overloads for the printing function, one for printing the socket/thread id plus its int payload, and one for id plus a message from the code to the user.
3. If there is already a thread running on the next line, we patiently wait here for our turn. The lock would be released by the dtor.

All the printing in the application is done through this couple of functions, with a notable exception in the client main function:
void lru(int nWorkers)
{
    // ...
    boost::thread_group threads;
    // ...
    threads.join_all();
    std::cout << "Number of processed messages: " << processed << std::endl;
}
But at that point we have already executed a "join all" on all the worker threads spawned by the client. We are back in the condition where only one thread is running for this process. So we don't have to worry about competing for that shared resource.

The design of this example should be clear. We enter the lru() function, create as many worker threads as required by the caller (actually, in real code it would be worthy to perform a check on that number), have a data exchange between the client and the workers, retrieve a result from all this job, and terminate the run.

From the ZeroMQ point of view, the interesting part is in the REQ-ROUTER pattern. Each worker has a request socket, so it would communicate to the client socket (a router) when it is ready for another run. When the router has no more job to be done, it would simply send a terminator to each worker asking for new feed, till no one of them is around anymore.

Go to the full post

LRU Pattern - worker

Second step of the LRU pattern example implementation. After the client, now is time to write the worker. The ZeroMQ version I used is the brand new 2.2.0, the chosen implementation language is C++, in the MSVC 2010 flavor, for the Windows operating system. The complete C++ source code is on github.

This is a multithread application, the client creates a thread for each worker, and each of them executes this function:
const char* SOCK_ADDR = "inproc://workers";

// ...

void worker(zmq::context_t& context)
{
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 1
    zmq::socket_t worker(context, ZMQ_REQ); // 2
    zmq_setsockopt(worker, ZMQ_IDENTITY, id.c_str(), id.length());
    worker.connect(SOCK_ADDR);

    int processed = 0; // 3
    while(true)
    {
        zmq::message_t msg(&processed, sizeof(int), NULL);
        worker.send(msg); // 4
        zmq::message_t payload;
        if(worker.recv(&payload) == false) // 5
        {
            dumpMessage(id, "error receiving message");
            return;
        }
        if(payload.size() != sizeof(int)) // 6
        {
            dumpMessage(id, "terminating");
            return;
        }

        int value = *(int*)payload.data(); // 7
        dumpMessage(id, value);

        boost::this_thread::sleep(boost::posix_time::millisec(value)); // 8
        ++processed;
    }
}
1. As id for the REQ socket I used the thread id. To convert a Boost thread id to a string we need to go through a lexical cast.
2. The REQ socket is created on the context passed by the main thread: all the threads in a 0MQ multithread application that want to be connected should share the same context.
3. Keeping track of the messages processed in this thread, so that we can pass back this information to the client.
4. Sending a message through the socket. Notice that this socket has an identity set, so what we are sending here is actually a multipart message consisting of three frames: the identity, empty data, and the number of currently processed messages.
5. Receiving the reply to our request. The identity is automatically stripped, we have to take care only of the effective payload.
6. The error handling is very poor in this example. We expect an int as message, anything with a different size is considered equivalent to a null sized message, that I conventionally consider as a terminator.
7. Extract the int contained in the message ...
8. ... and use it to do some job, in this case just sleeping for a while.

All the synchronization among threads is done through messages sent on ZeroMQ sockets. The exception are the dumpMessage() functions, that have access to a resource (std::cout) shared among all threads. We'll how to deal with this in the next post.

Go to the full post

LRU Pattern - client

Let's implement the Least Recently Used Routing pattern for ZeroMQ 2.2.0 (just released) in C++ on Windows by MSCV. On the ZGuide you would find the rationale behind this code, and the C implementation that I used as guideline in this post. Beside porting it to C++, I have adapted it to Windows (there is no IPC support for this platform) and I have done some minor changes that, I guess, makes this example even more interesting.

It is a multithreaded application, where the client has a router socket that connects to a bunch of request sockets, one for each worker thread. The REQ sends a message containing its own id to the ROUTER when it is ready to process a new message; the ROUTER use the REQ id to reply to that specific REQ socket that is known as available. Here I talk about the client part of the application, but you can already have a look at the entire source code on github.

The main function gets as parameter the number of workers that we want to run, creates a router socket and the worker threads, each of them with its own REQ socket, on the same context, lets the router send a few messages to the workers, and then terminates them, getting as a side effect the number of messages processed by each thread:
void lru(int nWorkers)
{
    zmq::context_t context(1);
    MyRouter router(context); // 1

    boost::thread_group threads; // 2
    for(int i = 0; i < nWorkers; ++i)
        threads.create_thread(std::bind(worker, std::ref(context))); // 3

    for(int i = 0; i < nWorkers * 10; ++i)
        router.sendMessage(); // 4

    int processed = 0;
    for(int i = 0; i < nWorkers; ++i)
        processed += router.terminateWorker(); // 5

    threads.join_all(); // 6
    std::cout << "Number of processed messages: " << processed << std::endl;
}
1. See below for the MyRouter class, for the moment think of it as a wrapper to a ØMQ ROUTER socket.
2. The Boost thread_group makes easy to manage a group of threads, like the one we want to have here.
3. Each thread is created on the function worker(), we'll talk about it in a next post, passing to it a reference to the 0MQ context.
4. Send an average of ten messages to each 0MQ REQ socket.
5. Send a terminator to each worker, terminateWorker() returns the number of processed message for the current worker.
6. Join all the threads, give a feedback to the user and terminate.

The MyRouter class uses a const and a class:
const char* SOCK_ADDR = "inproc://workers"; // 1

class RandomTimeGenerator // 2
{
private:
    boost::random::mt19937 generator_;
    boost::random::uniform_int_distribution<> random_;
public:
    RandomTimeGenerator(int low, int high) : random_(low, high) {}
    int getValue() { return random_(generator_); }
};

class MyRouter
{
private:
    zmq::socket_t client_;
    RandomTimeGenerator rtg_;

public:
    MyRouter(zmq::context_t& context) : client_(context, ZMQ_ROUTER), rtg_(1, 1000) // 3
    {
        client_.bind(SOCK_ADDR);
    }

    void sendMessage() // 4
    {
        zmq::message_t zmAddress;
        client_.recv(&zmAddress);

        zmq::message_t zmDummy1;
        client_.recv(&zmDummy1);
        zmq::message_t zmDummy2;
        client_.recv(&zmDummy2);

        client_.send(zmAddress, ZMQ_SNDMORE);

        zmq::message_t zmEmpty; // 5
        client_.send(zmEmpty, ZMQ_SNDMORE);

        int value = rtg_.getValue();
        zmq::message_t zmPayload(sizeof(int));
        memcpy(zmPayload.data(), &value, sizeof(int));
        client_.send(zmPayload);
    }

    int terminateWorker() // 6
    {
        zmq::message_t zmAddress;
        client_.recv(&zmAddress);
        zmq::message_t zmDummy;
        client_.recv(&zmDummy);

        zmq::message_t zmPayload;
        client_.recv(&zmPayload);
        std::string id((char*)zmAddress.data(), (char*)zmAddress.data() + zmAddress.size());
        int acknowledged = *(int*)zmPayload.data();
        dumpMessage(id, acknowledged); // 7

        client_.send(zmAddress, ZMQ_SNDMORE); // 8

        zmq::message_t zmEmpty;
        client_.send(zmEmpty, ZMQ_SNDMORE);
        client_.send(zmEmpty);

        return acknowledged;
    }
};
1. The address used by the sockets for the inproc connections.
2. Class to generate a random int that will be used to simulate a randomly long task to be executed by the worker. As generator is used a Boost Marsenne twister, and as distribution an uniform integer one.
3. The ctor expects in input the ZeroMQ context that should be used to create the 0MQ ROUTER socket. Besides, the random generator is initialized, so that it would generate a series of number in the interval [1, 1000]. The client socket is bound to the above specified inproc socket address.
4. The core of this class, firstly, we wait for a worker ZeroMQ REQ socket to state that it is available, we care only about the first frame it has sent to us, that contains the address of the REQ socket. The other two parts are discarded, but the first is sent back as the first frame of our reply, so that ZeroMQ could associate it to the original sender.
5. Second frame of the reply is empty, and in the third one we place an int as returned by the random generator. Notice the ZMQ_SNDMORE flag for the first two frames, to let ZeroMQ understand as the three sends have to be seen as three parts of an single message.
6. The last message received from each 0MQ REQ is managed differently from all the previous ones. The first frame is sent back as in (4), but we use also the third frame, the actual payload in this multipart message, from which we extract an integer, that represent the number of messages that have been received by that worker.
7. This function just print on the standard console the passed parameters. But we are in a multithread context, and std::cout is a shared resource, so we should be careful.
8. We send a last message to the current worker. Actually an empty message that has to be interpreted as a terminator.

Go to the full post

uBLAS matrix_column proxy

When I wrote a function that calculates the matrix L1 norm, the resulting code would have been more readable if I had based it on the vector L1 norm functionality previously defined. But to do that, we need a class that acts like a proxy for the underlying matrix, making available one of its rows. uBLAS implements matrix_column just for cases like this.

Let's refactor my::norm_1() to use matrix_column:
// ...
#include <boost/numeric/ublas/matrix.hpp>
#include <boost/numeric/ublas/matrix_proxy.hpp> // 1
namespace ublas = boost::numeric::ublas;
namespace my
{
    double norm_1(ublas::matrix<double>& m)
    {
        double norm = 0.0;
        for(unsigned int j = 0; j < m.size2(); ++j)
        {
            ublas::matrix_column<ublas::matrix<double> > mc(m, j); // 2
            double current = my::norm_1(mc); // 3
            if(current > norm)
                norm = current;
        }
        return norm;
    }
}
1. Here are the matrix proxy definitions, remember to include this header file.
2. Declaring a matrix_column based on a matrix of doubles, passing to it the actual matrix and the index of the column we are interested in.
3. The matrix_column could be treated as a normal uBLAS vector, here we are passing it to our norm_1() functionality.

Go to the full post

Matrix and Vector L1 Norm

The L1 norm is defined for both vectors and matrices, we can easily write a C++ function to calculate it, but when possible it is better to use a more stable and generic implementation, as the one provided by the Boost Numeric uBLAS library.

L1 Norm for vectors

The L1 vector norm is defined as the sum of all absolute values of the vector elements. This is a simple L1 norm for double's vectors implementation:
namespace my
{
    double norm_1(const ublas::vector<double>& v)
    {
        auto adder = [](double partial, double value) { return partial + abs(value); }; // 1
        return std::accumulate(v.begin(), v.end(), 0.0, adder); // 2
    }
}
1. The natural implementation of this function is a one-liner, I have extract the lambda from the next line only for readability purpose. A simple adder is implemented, where the current partial result is passed in, increased with the absolute value of the current value, and returned.
2. The STL accumulate is used to sum all the element using our (1) custom adder. Note that the initial value should be explicitly set to double, since accumulate() deduces its return type from it.

L1 Norm for matrices

The L1 matrix norm is defined as the biggest L1 vector norm for each of its column. Here is a simple implementation:
namespace my
{
    double norm_1(const ublas::matrix<double>& m)
    {
        double norm = 0.0;
        for(unsigned int j = 0; j < m.size2(); ++j)
        {
            double current = 0.0; // 1
            for(unsigned int i = 0; i < m.size1(); ++i)
                current += abs(m(i,j));
            if(current > norm) // 2
                norm = current;
        }
        return norm;
    }
}
1. Calculate the L1 norm for a column in "current".
2. If the current candidate is bigger that the previously calculated candidate L1 norm, it is stored as new current L1 norm.

Examples

This piece of sample code shows how to calculate the L1 norm on a few vectors and matrices, using both my simple functions and the uBLAS versions:
// ...
namespace ublas = boost::numeric::ublas;

ublas::vector<double> v1(6);
ublas::vector<double> v2(6);
for(int i = 0; i < 6; ++i)
{
    v1[i] = i * (i%2 ? 1 : -1);
    v2[5 - i] = i;
}
ublas::vector<double> v3 = v1 * -1;

ublas::matrix<double> m1(3,3), m2(3,3);
for(int i = 0; i < 3; ++i)
{
    for(int j = 0; j < 3; ++j)
    {
        m1(i, j) = (i + j) * ((j+i)%2 ? 1 : -1);
        m2(2 - i, 2 - j) = i + j;
    }
}
ublas::matrix<double> m3 = m1 * -1;

std::cout << v1 << v2 << v3 << std::endl;
std::cout << m1 << std::endl << m2 << std::endl << m3 << std::endl;
std::cout << "Norm 1: " << ublas::norm_1(v1) << ' ' << my::norm_1(v1) << std::endl;
std::cout << "        " << ublas::norm_1(v2) << ' ' << my::norm_1(v2) << std::endl;
std::cout << "        " << ublas::norm_1(v3) << ' ' << my::norm_1(v3) << std::endl;
std::cout << "Norm 1: " << ublas::norm_1(m1) << ' ' << my::norm_1(m1) << std::endl;
std::cout << "        " << ublas::norm_1(m2) << ' ' << my::norm_1(m2) << std::endl;
std::cout << "        " << ublas::norm_1(m3) << ' ' << my::norm_1(m3) << std::endl;

Go to the full post