Pages

A second ROUTER-DEALER example

In the previous post we have seen a ZeroMQ ROUTER-DEALER application where the ROUTERs act as servers. We can easily modify it to have the DEALER acting as servers instead.

Now the routers are clients, so they connect to all the DEALER sockets available and then pend on the socket to get its share of messages. They are a stored in a temporary vector, and then they are are send back to the dealer. Before sending messages, the dealers should have been connected by the routers. A simple way to achieve this, is let the dealer threads to sleep for a while before start to send.

The routers and dealers are started in this way:
boost::thread_group threads;
for(int i = 0; i < nRouters; ++i) // 1
    threads.create_thread(std::bind(router2dealer, nDealers));
for(int i = 0; i < nDealers; ++i) // 2
    threads.create_thread(std::bind(dealer4router, nRouters, i));
threads.join_all();
1. There should be at least on router, the upper limit is determined by you system. Each client thread runs an instance of router2dealer().
2. I use the same definition for socket address as in the previous example, so we can have just one or two dealers. Each server thread runs on dealer4router().

This is the function for each client thread:
void router2dealer(int n) // 1
{
    dumpId("ROUTER startup");
    zmq::context_t context(1);
    
    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id()); // 2
    zmq::Socket skRouter(context, ZMQ_ROUTER, id);
    for(int i =0; i < n; ++i)
    {
        skRouter.connect(SKA_CLI[i]); // 3
        dumpId("ROUTER CLI on", SKA_CLI[i]);
    }

    std::vector<zmq::Frames> messages;
    messages.reserve(n); // 4

    for(int i =0; i < n; ++i)
    {
        zmq::Frames message = skRouter.blockingRecv(2);
        dumpId(message[0].c_str(), message[1].c_str());
        messages.push_back(message);
    }
    dumpId("all received");

    std::for_each(messages.begin(), messages.end(), [&skRouter](zmq::Frames& frames) // 5
    {
        skRouter.send(frames);
    });
}
1. Each client connects to all the n servers, it receives n messages, one from each server.
2. It is handy to specify the socket id for testing purpose, the thread id looks to me a good choice.
3. Each client binds to all the servers.
4. We already know how may messages we are going to receive, one for each server, we are going to store them in the messages STL vector.
5. A router could act as an asynchronous client, and we see it here. Firstly we have received all the expected messages, now we are sending them back, each to its own original sender.

Each server thread runs on this function:
void dealer4router(int n, int index) // 1
{
    dumpId("DEALER startup");
    zmq::context_t context(1);

    std::string id = boost::lexical_cast<std::string>(boost::this_thread::get_id());
    zmq::Socket skDealer(context, ZMQ_DEALER, id);
    skDealer.bind(SKA_SRV[index]);

    boost::this_thread::sleep(boost::posix_time::seconds(1)); // 2
    for(int i =0; i < n; ++i)
    {
        std::string out(i+1, 'k' + i + index);
        skDealer.send(out);
    }
    dumpId("all sent");

    for(int i =0; i < n; ++i)
    {
        std::string in = skDealer.recvAsString(); // 3
        dumpId(in.c_str());
    }
}
1. Each dealer knows the number of clients, so to send a message to each of them, and its own id, to bind to the right socket address.
2. For such a simple example, waiting a while before start sending messages, it is enough as a way to ensure that all the expected client are up.
3. The dealer servers too act asynchronously. Firstly sending all the messages to the clients, and now receiving the feedback. Notice that the dealer is sending a mono-frame message, 0MQ prepend the dealer id to the message, transforming it in a two-frame message.

No comments:

Post a Comment