Pages

Router to router among peers

We have seen a couple of ways to let the ZeroMQ router to router pattern work in a client/server setting, with clients id predefined, or a predetermined server id.

But we can't use these approaches in a peer to peer context. All elements are both client and server at the same time, so we need to have all the identities defined in advance.

This example is going to be very simple, we want to create a peer to peer network where each element sends an hallo message to all its peers, then receives all the messages posted to it, and finally terminates.

Firstly, let's define a few constants:
const char* SKA_NODE_SRV[] = {"tcp://*:5380", "tcp://*:5381", "tcp://*:5382" }; // 1
const char* SKA_NODE_CLI[] = {"tcp://localhost:5380", "tcp://localhost:5381", "tcp://localhost:5382" };

const char* NODE_IDS[] = { "N0", "N1", "N2" }; // 2
enum NodeId { N0 = 0, N1, N2, N_NODES = N2 + 1 };
1. Each node has a couple of ROUTERS socket, the server has one of the addresses specified in the SKA_NODE_SRV array, the client connects to the peers' servers by the addresses specified in SKA_NODE_CLI.
2. We can use the same id for both routers in the same node, since there is not risk of overlapping.

In this example, all nodes run in the same process, this is not realistic, but it makes testing easier. It shouldn't be an issue for you to redesign this code to run as a 0MQ multiprocess application. In any case, the main thread creates a few threads, one for each node:
boost::thread_group threads;
for(NodeId id = N0; id < N_NODES; id = static_cast<NodeId>(id+1))
    threads.create_thread(std::bind(node, id));
threads.join_all();

This is the code run by each peer:
void node(NodeId nid)
{
    zmq::context_t context(1);

    dumpId(NODE_IDS[nid], SKA_NODE_SRV[nid]);
    zmq::Socket skServer(context, ZMQ_ROUTER, NODE_IDS[nid]); // 1
    skServer.bind(SKA_NODE_SRV[nid]);

    dumpId("client router", NODE_IDS[nid]);
    zmq::Socket skClient(context, ZMQ_ROUTER, NODE_IDS[nid]); // 2
    for(NodeId id = N0; id < N_NODES; id = static_cast<NodeId>(id+1))
    {
        if(id == nid)
            continue;

        skClient.connect(SKA_NODE_CLI[id]); // 3
        dumpId("client connected to", SKA_NODE_CLI[id]);
    }

    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 4

    for(NodeId id = N0; id < N_NODES; id = static_cast<NodeId>(id+1))
    {
        if(id == nid)
            continue;

        dumpId("sending a message to", NODE_IDS[id]);
        skClient.send(NODE_IDS[id], "hey"); // 5
    }

    zmq_pollitem_t servers[] = { { skServer, 0, ZMQ_POLLIN, 0 } };
    while(zmq_poll(servers, 1, 1000 * 1000) > 0) // 6
    {
        zmq::Frames frames;
        if(servers[0].revents & ZMQ_POLLIN)
        {
            dumpId("Receiving from peers");

            frames = skServer.blockingRecv(2); // 7
            dumpId(frames[0].c_str(), frames[1].c_str());

            servers[0].revents = 0; // 8
        }
    }
    dumpId(NODE_IDS[nid], "done");
}
1. A ZeroMQ server ROUTER socket with the specified id.
2. A ZeroMQ client ROUTER socket with the same id.
3. Set a connection to all the other nodes.
4. Give time to all the nodes to be up before start sending messages around.
5. Send a message to each peer through the client router socket.
6. Poll on the server socket. If no message gets available in its input queue for a second, it is assumed that there is nothing more to do.
7. Receive a two-frames message. First frame is the sender id, second frame is the payload.
8. Reset the flag for the next iteration.

Full C++ code for this example is on github.

No comments:

Post a Comment