Pages

Monitoring broker state with PUB-SUB sockets

This is the first step in describing a fairly complex ZeroMQ sample application based on the example named Interbroker Routing in the ZGuide. We want to create a few interconnected brokers, so that we could exchange jobs among them. In order to do that, each broker should be able to signal to its peers when it has workers available for external jobs. We are going to do that using PUB-SUB socket connections.

Each broker as a PUB socket that would publish to all the other brokers its status, and a SUB socket that would get such information from the others.

My prototype implementation differs from the original ZGuide code for a few ways.

- It is written for Windows+MSVC, so I couldn't use the inproc protocol, that is not available on that platform. I have used tcp instead.
- I ported it from the czmq binding to the standard 0MQ 2.x light-weight C++ one, improved by a zmq::Socket class that provides support for multipart messages, feature that is missing in the original zmq::socket_t.
- The implementation language is C++, so in the code you will see some STL and Boost stuff, and even some C++11 goodies, as lambda function.
- The brokers run all in the same process. It wouldn't make much sense in a real life project, but it makes testing faster. In any case, it should be quite easy to refactor the code to let each broker running in its own process. A minor glitch caused by this approach is that I had to take care of concurrency when printing to standard output. That is the reason why all the uses of std::cout in the code are protected by mutex/lock.

As said above, in this first step each broker doesn't do much, just using the PUB/SUB sockets to share its state with its peers.

These are the addresses used for the broker state sockets, both for client and server mode:
const char* SKA_BROKER_STATE_CLI[] =
    {"tcp://localhost:5180", "tcp://localhost:5181", "tcp://localhost:5182"};
const char* SKA_BROKER_STATE_SRV[] =
    {"tcp://*:5180", "tcp://*:5181", "tcp://*:5182"};
Each broker prototype runs on a different thread the below described function, passing as broker an element of the second array above, and as peers the other two elements from the first array. So, for instance, the first broker is going to have "tcp://*:5180" as server address, and "tcp://localhost:5181", "tcp://localhost:5182" as peer:
void stateFlow(const char* broker, const std::vector<std::string>& peers)
{
    zmq::context_t context(1); // 1

    zmq::Socket stateBackend(context, ZMQ_PUB); // 2
    stateBackend.bind(broker);

    zmq::Socket stateFrontend(context, ZMQ_SUB); // 3
    stateFrontend.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    std::for_each(peers.begin(), peers.end(), [broker, &stateFrontend](const std::string& peer)
    {
        stateFrontend.connect(peer.c_str());
    });

    std::string tick("."); // 4
    zmq_pollitem_t items [] = { { stateFrontend, 0, ZMQ_POLLIN, 0 } }; // 5
    for(int i = 0; i < 10; ++i) // 6
    {
        if(zmq_poll(items, 1, 250 * 1000) < 0) // 7
            break;

        if(items[0].revents & ZMQ_POLLIN) // 8
        {
            zmq::Frames frames = stateFrontend.blockingRecv(2);
            dumpId(frames[0].c_str(), frames[1].c_str()); // 9
            items[0].revents = 0; // cleanup
        }
        else // 10
        {
            dumpId("sending on", broker);
            zmq::Frames frames;
            frames.reserve(2);
            frames.push_back(broker);
            frames.push_back(tick);
            stateBackend.send(frames);

            tick += '.';
            boost::this_thread::sleep(boost::posix_time::millisec(333)); // 11
        }
    }
}
1. Each broker has its own ZeroMQ context.
2. The stateBackend socket is a PUB that makes available the state of this broker to all the subscribing ones.
3. The stateFrontend socket is a SUB that gets all the messages from its subscribed publisher (as you see in the next line, no filter is set)
4. Currently we are not really interested on the information the broker is sending, so for the time being a simple increasing sequence of dots will do.
5. We poll on the frontend, waiting for messages coming from the peers.
6. I don't want to loop forever, just a few iterations are enough to check the system.
7. Poll for a quarter of second on the SUB socket, in case of error the loop is interrupted. This is the only, and very rough, error handling in this piece of code, but hey, it is just a prototype.
8. There is actually something to be read. I am expecting a two-part message, so I use a checked zmq::Socket::blockingRecv() that throws an exception (not caught here, meaning risk of brutal termination) if something unexpected happens.
9. Give some feedback to the user, dumpId() is just a wrapper for safely printing to standard output, adding as a plus the thread ID.
10. Only if no message has been received from peers, this broker sends its status.
11. As for (10), this line makes sense only because there is no real logic in here, the idea is that I don't want always the same broker to send messages an the other just receiving, so I put to sleep who sends, ensuring in this way it won't send again the next time.

The full C++ source code for this example is on github. The Socket class is there too.

No comments:

Post a Comment