Pages

Message Queue

The boost interprocess message queue lets different threads put and get messages on queue available even to different processes.

Each message has a priority, a length, and the data associated.

It is possible to get and put messages on the queue in three different ways: blocking, try, timed.

This message queue works with raw bytes, so it is not possible to manage directly instantiation of classes that are not trivial: it could be used to exchange int items, but not std::string. To overcome this limitation we could use the boost serialization library.

On message queue construction we must specify how the object should be generated (create and/or open), its name, and the message max number and size. At end of usage we should explicitly remove the message queue calling message_queue::remove().

In the simple example below a producer process creates a message queue of C strings (in the sense of array of chars) and sends a few messages. The consumer process read the messages and output them to the console.

In the main we select to execute the producer or the consumer accordingly to the number of argument passed to the executable:
if(argc == 2)
  ip11a(argv[1]);
else
  ip11b();
If we try to run the consumer before the producer we get an exception.

Here is the code:
#include <iostream>
#include <cstring>
#include <vector>
#include <string>

#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/scoped_ptr.hpp>

using namespace boost::interprocess;

namespace
{
  const char* MQ_NAME = "MessageQueue";
  const int MQ_MSG_NR = 10;
  const int MSG_NR = MQ_MSG_NR * 2; // 1

  class QueueManager
  {
  private:
    bool drop_; // 2
    boost::scoped_ptr<message_queue> mq_; // 3

    void remove() { message_queue::remove(MQ_NAME); }
  public:
    enum { MSG_SIZE = 80 };

    // ctor for producer
    QueueManager(int maxNr) : drop_(false)
    {
      remove();
      mq_.reset(new message_queue(create_only, MQ_NAME, maxNr, MSG_SIZE));
    }

    // ctor for consumer
    QueueManager() : drop_(true), mq_(new message_queue(open_only, MQ_NAME)) {}

    ~QueueManager() { if(drop_) remove(); }

    void send(const char* id, int i)
    {
      char buffer[MSG_SIZE];
      sprintf(buffer, "%s_%d", id, i);

      mq_->send(buffer, MSG_SIZE, 0);
    }

    std::string receive()
    {
      char buffer[MSG_SIZE];

      unsigned int priority;
      std::size_t recvd_size;
      mq_->receive(&buffer, MSG_SIZE, recvd_size, priority);

      return std::string(buffer);
    }

    static bool checkIdLen(const char* id)
    {
      if(strlen(id) > QueueManager::MSG_SIZE - 5)
      {
        std::cout << "The specified id [" << id << "] is too long" << std::endl;
        return false;
      }
      return true;
    }
  };
}

void ip11a(const char* id)
{
  std::cout << "Starting producer ..." << std::endl;
  if(QueueManager::checkIdLen(id) == false)
    return;

  try
  {
    QueueManager qm(MQ_MSG_NR);

    std::cout << "Sending messages: ";
    for(int i = 0; i < MSG_NR; ++i)
    {
      qm.send(id, i);
      std::cout << i << ' ';
    }
    std::cout << std::endl;
  }
  catch(interprocess_exception &ex)
  {
    std::cout << ex.what() << std::endl;
    return;
  }

  std::cout << "done" << std::endl;
}

void ip11b()
{
  std::cout << "Starting consumer ..." << std::endl;

  std::vector<std::string> vec;
  vec.reserve(MSG_NR);

  try
  {
    QueueManager qm;

    for(int i = 0; i < MSG_NR; ++i)
    {
      vec.push_back(qm.receive());
      std::cout << '.';
    }
    std::cout << std::endl;
  }
  catch(interprocess_exception &ex)
  {
    std::cout << ex.what() << std::endl;
    return;
  }

  std::copy(vec.begin(), vec.end(), std::ostream_iterator<std::string>(std::cout, " "));
  std::cout << std::endl;
}
  1. Just to have a thrill, we'll send to the queue 2x its maximum capacity of messages.
  2. The QueueManager member drop_ is set to true when we want to remove the message queue in the destructor.
  3. In the costructor for the producer we can initialize the message queue object only after removing an eventually pending previous message queue with the same name. But a message_queue can't be created empty and then filled with the required information, so we use a pointer, or better, a smart pointer to stay on the safe side.
The code is based on an example provided by the Boost Library Documentation.

No comments:

Post a Comment