Paranoid Pirate Heartbeating - worker

The worker described here, is the companion to the ZeroMQ router server described in the previous post. In another post there is an overview for this simple heartbeat example. Full C++ source code is available on github.

The app controller part spawns a new thread for the worker on this function:
void worker(char id, int lifespan) // 1
{
    const int PATIENCE = 3;
    HeartbeatWorker worker(id); // 2

    int patience = 0;
    int cycle = 0;
    int sent = 0;
    int iteration = 1;

    while(true)
    {
        uint8_t control = worker.recv(); // 3
        if(control == PHB_NOTHING)
        {
            if(cycle++ == PATIENCE)
            {
                if(patience++ == PATIENCE)
                    break; // 4

                int factor = static_cast<int>(std::pow(2.0, patience));
                boost::this_thread::sleep(bp::millisec(factor * BASE_INTERVAL)); // 5

                worker.reset();
                cycle = 0;
            }

            if(iteration++ == lifespan) // 6
                break;
        }
        else
            cycle = patience = 0; // 7
        worker.heartbeat(); // 8
    }
    worker.shutdown(); // 9
}
1. The worker is identified by a single character that is used to generate a unique id for each socket that is created here. The thing is that each time the worker looses contact with the server, the existing socket is closed and replaced by a new one. Any new socket should have a different id, so that the server won't be confused. As a second parameter we pass to the worker function its lifespan.
2. A HeartbeatWorker class is used to make the code more readable, it is showed below.
3. HeartbeatWorker::recv() combines a poll() and a recv() call to the worker socket. If no message was pending the PHB_NOTHING value is returned.
4. If no heartbeat is received from the server for a while, the worker shuts itself down.
5. But before committing suicide, the worker tries to reconnect to the server, closing its socket, waiting for a (growing) while, and then opening a new socket, with a new identity but pointing to the same endpoint.
6. When we reach the limit imposed by the caller, even if the server is still alive, we terminate the worker.
7. If there was actually something on the socket, we reset the cycle counting .
8. In any case, send an heartbeat to the server.
9. Gracefully terminate the worker.

This is my HeartbeatWorker implementation:
class HeartbeatWorker
{
private:
    zmq::context_t context_;
    std::unique_ptr<zmq::Socket> sk_; // 1
    std::string id_; // 2
    char idRoot_;
    bp::ptime heartbeat_; // 3
    zmq::pollitem_t items_[1];

public:
    HeartbeatWorker(char id) : context_(1), idRoot_(id),
        heartbeat_(bp::microsec_clock::local_time() +  bp::millisec(BASE_INTERVAL))
    {
        reset();

        items_[0].fd = 0;
        items_[0].events = ZMQ_POLLIN;
        items_[0].revents = 0;
    }

    void reset()
    {
        id_ += idRoot_; // 4
        sk_.reset(new zmq::Socket(context_, ZMQ_DEALER, id_)); // 5
        sk_->setLinger(0); // 6
        sk_->connect(SKA_BACKEND_CLI);
        items_[0].socket = *sk_.get(); // 7

        sk_->send(PHB_READY); // 8
    }

    void heartbeat() // 9
    {
        if(bp::microsec_clock::local_time() > heartbeat_)
        {
            heartbeat_ = bp::microsec_clock::local_time() + bp::millisec(BASE_INTERVAL);
            sk_->send(PHB_HEARTBEAT);
        }
    }

    void shutdown()
    {
        sk_->send(PHB_DOWN);
    }

    uint8_t recv() // 10
    {
        if(zmq::poll(items_, 1, BASE_INTERVAL * 1000) < 1)
            return PHB_NOTHING;

        uint8_t res = PHB_NOTHING;
        if(items_[0].revents & ZMQ_POLLIN)
            res = sk_->recvAsByte();
        items_[0].revents = 0;
        
        return res;
    }
};
1. Any time we lose the connection to the server, we kill the socket and create a new one. For this reason, I use a (smart) pointer instead of an object on the stack.
2. Socket id. It is based on the idRoot_ character (defined in the next line).
3. Expected time for sending an heartbeat to the server. The bp namespace is defined as synonym of boost::posix_time.
4. The socket id is changed any time a reset occurs.
5. The unique_ptr::reset() take cares of closing the previous socket (if set) before deleting it.
6. We don't want to wait for messages sent on a socket to be delivered when we close it. By default we have an indefinite lingering, but this is no good here, because it could happen that we send messages to a server that is not there anymore, and we want to close the socket (to create a new one) even if those messages are not delivered.
7. A tricky line. We store a pointer to socket, but the zmq::pollitem_t structure requires a socket itself about its members. So dereferencing is required.
8. Sends a "ready" message to the server.
9. The heartbeat message is sent to the server only if its time has arrived.
10. HeartbeatWorker::recv() combines zmq::poll() and an actual receiving on the socket. We expect in input a single-part message containing a single byte.

Go to the full post

Paranoid Pirate Heartbeating - server

As described in the previous post, I have developed a cut to the bone ZeroMQ C++ router-dealer heartbeat exchange-only application that should help understanding the basics of the process.

The reason to write such an example is that, as they say in the ZGuide, "as for the Paranoid Pirate queue, the heartbeating is quite tricky to get right". I hope that such a stripped down example could help to clarify the concept.

In its simplest configuration, this heartbeat example is supposed to run as a monoprocess, multithread application. The main thread spawns two new threads, one for the server and one for a worker, and they are left alone to interact till their natural termination.

We could even let run just the server (or, as we'll see in the next post, the worker) alone. It won't get any ping from a worker, and after a while it would shutdown.

Here is the server function, it makes use of a custom class, HeartbeatServer, that I comment below:
void server(int lifespan) // 1
{
    HeartbeatServer server(lifespan); // 2

    while(server.isAlive()) // 3
    {
        zmq::Frames input = server.recv(); // 4
        if(!input.empty())
        {
            uint8_t control = (input.size() != 2 || input[1].empty()) ? PHB_UNEXPECTED : input[1].at(0);

            switch(control)
            {
            case PHB_READY:
                server.pushWorker(input[0]); // 5
                break;
            case PHB_HEARTBEAT:
                server.refreshWorker(input[0]); // 6
                break;
            case PHB_DOWN:
                server.dropWorker(input[0]); // 7
                break;
            default: // 8
                break;
            }
        }
        server.heartbeat(); // 9
    }
}
1. The server function expects in input a parameter stating how long it should theoretically run, passing INT_MAX (as defined in limits.h) states that we'd like to run the server (almost) forever.
2. Create an Heartbeat Server, see below for details.
3. HeartbeatServer::isAlive() returns false when the server is ready to shutdown.
4. HeartbeatServer::recv() combines a poll and a receive on the server socket. If a (multipart) message is pending on the socket, it is received and returned as deque of strings (if you wonder why, have a look at this post). We expect a two-part message, with the worker id as first element and the payload (a single byte) in second position. Whatever else could be happily discarded.
5. If the payload is a "ready" message, the worker id is stored in the server.
6. We received an "heartbeat" from the worker, we refresh its status. Remember that a worker should be seen as active by the server only if it sends at least an heartbeat once in a while. After a longish silence from its side, we should consider it as lost in space.
7. If the worker ends its life gracefully, it sends a message to notify the server, so that it would remove it on the spot.
8. We received something weird. Let it know to the user.
9. If there is a worker pending on the server, send an heartbeat to it.

And this is the server class:
class HeartbeatServer
{
private:
    zmq::context_t context_;
    zmq::Socket backend_;
    zmq::pollitem_t items_[1];

    std::string wid_; // 1
    bp::ptime expiry_; // 2

    int lifespan_; // 3
    int busy_; // 4
    bp::ptime heartbeat_; // 5
    enum { INTERVAL = 1000, BUSY = 5 };
public:
    HeartbeatServer(int lifespan = INT_MAX) : context_(1), backend_(context_, ZMQ_ROUTER), lifespan_(lifespan), 
        heartbeat_(bp::microsec_clock::local_time() + bp::millisec(INTERVAL)), busy_(BUSY)
    {
        backend_.bind(SKA_BACKEND_SRV);
        backend_.setLinger(0); // 6

        items_[0].socket = backend_; // 7
        items_[0].fd = 0;
        items_[0].events = ZMQ_POLLIN;
        items_[0].revents = 0;
    }

    bool isAlive() // 8
    {
        return busy_ > 0 && lifespan_ > 0;
    }

    zmq::Frames recv() // 9
    {
        --lifespan_;

        zmq::Frames res;
        if(zmq::poll(items_, 1, INTERVAL * 1000) > 0)
        {
            busy_ = BUSY;

            if(items_[0].revents & ZMQ_POLLIN)
                res = backend_.blockingRecv();
            items_[0].revents = 0;

            return res;
        }
        else
            --busy_;

        return res; // no message
    }

    void heartbeat() // 10
    {
        if(wid_.empty())
            return;

        // if it's time, send heartbeat
        if(bp::microsec_clock::local_time() > heartbeat_)
        {
            std::string control(&PHB_HEARTBEAT, &PHB_HEARTBEAT + 1);
            backend_.send(wid_, control);
            heartbeat_ = bp::microsec_clock::local_time() + bp::millisec(BASE_INTERVAL);
        }

        // if the worker is expired, remove its id 
        if(expiry_ < bp::microsec_clock::local_time())
            wid_ = "";
    }

    void pushWorker(const std::string& id) // 11
    {
        wid_ = id;
    }

    void refreshWorker(const std::string& id)
    {
        if(wid_ == id)
            expiry_ = bp::microsec_clock::local_time() + bp::millisec(CHECK_FACTOR * BASE_INTERVAL);
    }

    void dropWorker(const std::string& id)
    {
        if(wid_ == id)
            wid_ = "";
    }
};
1. The worker id pending on the server. In this minimal implementation we can have only one worker. If no worker is available, the string is empty.
2. Keep track of when the worker identified by (1) is going to expire. The namespace bp is a synonym for boost::posix_time.
3. Expected server lifespan.
4. If the server is idle for a while, this counter goes to zero, and then we signal that the server should shutdown.
5. When the server should send the next heartbeat.
6. The sockets in this app have a troubled life, it often happens that one sends a message to the other but that one is already gone. Besides, the worker has a custom identity, so it is marked as persistent. To avoid the server socket hanging on termination, it is better to specify that we don't want it linger. This function calls zmq_setsockopt() for the ZMQ_LINGER option on the underlying socket.
7. Initialize the array of zmq::pollitem_t to poll on the socket.
8. To be considered alive, the server should both be in the time frame defined by the user and busy.
9. Each time recv() is called, the server life expectancy decreases. If there is actually anything on the socket, the busy counter is restored to its maximum value, otherwise it decreases too.
10. An heartbeat is sent to the pending worker only if a worker is actually there, and it is actually time to send it. After sending it (if required) the worker expiration time is checked, to see if it is time to remove it from the server.
11. In this trivial implementation, pushing, refreshing, dropping a worker on the server is very easy. We have just one (or none) of them, so we just have to ensure nothing unexpected (bad worker id) happens.

The full C++ code for the application is on github.

Go to the full post

Paranoid Pirate Heartbeating

Measuring a paranoid pirate heartbeating is not for the faint of heart. I guess that that is true in any case, but here I talking about the ZeroMQ Paranoid Pirate Protocol.

As they state in the ZGuide, talking about their implementation for that protocol based on czmq, the high level C wrapper to 0MQ, "Heartbeating is simple once it works, but quite difficult to invent." For this reason I though it was interesting seeing first how heartbeating could work, and then integrating it in a paranoid pirate implementation.

I wrote the code for ZeroMQ version 2.2, developed on Windows + MSVC2010, using the standard 0MQ C++ wrapper with my zmq::Socket class that adds some features to the original zmq::socket_t. See previous posts, like this one, for details.

Since I am interested only in heartbeating, I have designed the application to be as simple as I could think, to the point of looking quite meaningless.

We have a server (without clients) with one or no associated worker. The server has a ROUTER socket, and the worker a DEALER one. They exchange only status information, no real payload.

When a worker signals to the server that it is alive, the server stores its id, and then it sends to the worker an heartbeat to signal it that it is still alive. The same from the worker, it sends to the server a heartbeat till it shutdowns.

If the server stops getting heartbeats from the worker, it simply remove its id, and doesn't care about it anymore.

The worker is more caring. It retries a few time the check on the server heartbeat, and only when it has lost any hope, it considers the server lost, wait for a while, an then create a new socket, trying to establish a new connection.

Even with this minimal requirement, the resulting code is not immediate. For this reason I split the discussion in a few posts. After this introduction you could read about the router server and the dealer worker in the next two posts.

The main thread in this heartbeat testing application runs a couple of test cases:
boost::thread_group threads;
threads.create_thread(std::bind(server, INT_MAX)); // 1
threads.create_thread(std::bind(worker, 'A', 6)); // 2
threads.join_all();
1. The server() function, expects in input an int parameter, representing how many iteration we want to run. Here I specify the largest int available, as defined in limits.h, meaning that I want to run it (almost) forever.
2. The worker() needs to know which character should be used as seed for the worker id, the second one is the number of heartbeat that the worker is going to send to the server before shutting down.

The result of this test case should be that the server should stay up till the workers sends all its heartbeats, then its is going to wait a bit more idle, before shutting down. The worker id should be "A".

Second test case:
boost::thread_group threads;
threads.create_thread(std::bind(server, 3)); // 1
threads.create_thread(std::bind(worker, 'B', 7)); // 2

boost::this_thread::sleep(boost::posix_time::seconds(10)); // 3
threads.create_thread(std::bind(server, INT_MAX)); // 4
threads.join_all();
1. This server is going to be short lived.
2. Same worker as before.
3. Ensure enough time passes before restarting the server.
4. An almost-forever server is started.

Here we expect that worker seeing the server going offline, restarting and completing its job. The worker id should swap from "A" to "AA".

The full C++ code for the complete example is on github.

Go to the full post

Sending and receiving a single byte

After the major change of using deque as container for multipart messages, I have done a minor change in my zmq::Socket class, that extends the standard ZeroMQ C++ wrapper provided with version 2 of the package.

I was reading the Paranoid Pirate Protocol when I saw that it expects the ready and heartbeat commands to be sent as single bytes. So I decided to add a couple of methods in my class to explicitly manage this case. It is not a strict necessity, but they surely make the code more readable.

Here are the two new methods:
bool send(uint8_t value, int flags =0) // 1
{
    zmq::message_t msg(sizeof(uint8_t));
    memcpy(msg.data(), &value, sizeof(uint8_t));
    return socket_t::send(msg, flags);
}

uint8_t recvAsByte(int flags =0)
{
    zmq::message_t message;
    if(!socket_t::recv(&message, flags))
        throw error_t();

    return *(static_cast<uint8_t*>(message.data()));
}
uint8_t is defined in stdint.h as unsigned char, assuming in your environment byte has 8 bits, as almost everywhere happens. Using this typedef should make clearer that we are interested in a tiny number and not in a character.

I have pushed on github the new file version.

Go to the full post

ZeroMQ Multipart message as deque

I originally designed my zmq::Socket class extending the zmq::socket_t class, as defined in the standard C++ wrapper included in the ØMQ 2.x distribution, to provide a more intuitive multipart message management, basing it on a std::vector container.

This is fine for building up multipart messages on receive, but it gets clumsy when we want to modify an existing multipart message, typically to put an envelope around it, or to strip that envelope out.

For this kind of manipulation I think is much better using std::deque.

The previous changes to the this class, till the latest introduction of a couple of methods to send and receive ints, were not dramatic ones. But this redesign breaks the original interface, a deque does not provide a reserve() method as the vector does, and I often used it in my code when creating a multipart message. Besides, I have also changed the blockingRecv() signature, since I found out that delegating to that method the check of the actual number of frames in a multipart message didn't add much value to the resulting code. On the contrary, it made it a bit less intuitive.

For this reason, I left on github the old include file, and I created a new one, zmq2a.h, where the deque version of multipart messages is defined and used.

Again, here is the change applied:

- zmq::Frames, type used for multipart messages, now is a typedef for a deque of strings.
- blockingRecv() now does not have any input parameter, it still throws an zmq::error_t exception, but only in case of failure receiving a frame in the message.

Go to the full post

Improved LRU with basic reliable queuing

We can apply the just seen lazy pirate pattern (LPP) for the LRU Queue Broker. In this way we make it more reliable inpacting only on the client side of the application.

This approach is called in the ZGuide simple pirate pattern.

What we had, a queue of least recently used workers, managed by a ROUTER to ROUTER 0MQ broker that gets in input requests coming from clients, send them to workers (accordingly to their availability), that do the job, send the result back, through their REQ socket, to the broker, that sends it back to the original client, is matched to the previously seen LPP client.

We have alredy seen about all the code, the only issue is matching it. The full example C++ code is on github, here are just the startup lines:
boost::thread_group threads; // 1
threads.create_thread(std::bind(client, 2500,  3)); //2
threads.create_thread(lruQueue); // 3
threads.create_thread(worker);

boost::this_thread::sleep(boost::posix_time::seconds(20)); // 4
dumpId("---");
threads.create_thread(worker);
threads.create_thread(std::bind(client, 2500,  3));

threads.join_all();
1. To simplify the testing, I have put all the components in the same process, each one running in its own thread. This is not realistic, but it is not an issue to refactor the example to run as a multiprocess application.
2. The function client() is the one we have seen in the LPP, see previous post for details.
3. The lruQueue() and worker() components come from the LRU example, the worker has been modified to perform restlessly.
4. Actually, the sample worker is designed to simulate crash after a few seconds, and the client would shutdown if it won't get any feedback. On the other side, the broker is designed to run forever (but it is not protected against interrupts). Here we wait a while, to be sure that the worker crashes, and the client shuts itself down. Then we create another worker and client, and we can check how the system springs back to work.

The code shown on the ZGuide is written for the high level czmq c-binding. This porting could be interesting to you, if you want to see an example of how to do the same in C++. I have used the official C++ wrapper provided for ZeroMQ 2.x, adapted with my extension to zmq::socket_t, called zmq::Socket, that provides some better suppport to multipart and int messages.

Go to the full post