Synchronizing with a condition_variable

It is quite common having the requisite in a multithreaded application that a thread should wait for another thread to complete some task, and the common way to accomplish this in C++11 (or Boost, if your compiler does not implemented it yet) by using condition_variable. For instance, we have previously seen an example of a threadsafe implementation of a queue that uses a condition_variable to communicate between threads the status change.

Here we'll see an even simpler example.

A class is designed to be used in multithreaded environment. A member function is going to set an int member variable a number of times, as specified by the caller. Before resetting that value it waits that another thread would use the previously set value. To keep track of the status we use an accessory boolean variable.

Let's see a first implementation that makes no use of condition_variable:
class Conditional : boost::noncopyable
{
private:
  int value_; // 1
  bool produced_; // 2
  boost::mutex m_;

public:
  Conditional() : value_(-1), produced_(false) {}

  void produce(unsigned int count)
  {
    for(int i = count; i >= 0; --i)
    {
      boost::unique_lock<boost::mutex> l(m_); // 3
      while(produced_) // 4
      {
        std::cout << "Producer waits" << std::endl;

        l.unlock(); // 5
        boost::this_thread::sleep(boost::posix_time::millisec(100));
        l.lock();
      }
      std::cout << "Producer sets value to " << i << std::endl; // 6
      value_ = i;
      produced_ = true;
    }
  }

  void consume()
  {
    do {
      boost::unique_lock<boost::mutex> l(m_);
      while(!produced_) // wait for producer
      {
        std::cout << "Consumer waits" << std::endl;

        l.unlock();
        boost::this_thread::sleep(boost::posix_time::millisec(100));
        l.lock();
      }
      std::cout << "Consumer now is in control: " << value_ << std::endl;
      produced_ = false;
    } while(value_); // 7
  }
};
1. Variable shared among threads.
2. Flag to keep track if the current value is ready to be consumed.
3. We enter in the critical section.
4. Waiting for consumer to use a value previously set.
5. Give a chance to the other thread to do its job.
6. When we reach this line, we are ready to set the value and the flag that marks it as ready to be consumed.
7. The consumer is similar to the producer, the main difference is that it cycles till it finds a invalid value in the member variable (that is, zero).

Here is the code for testing this:
Conditional c;

boost::thread t1(&Conditional::consume, &c);
boost::thread t2(&Conditional::produce, &c, 10);

t1.join();
t2.join();
As we can see, this solution works alright. But this is not an elegant solution, main issue is that we have to specify "by hand" the interval we want our threads to sleep when waiting for the other thread to do its job. Using a condition_variable makes our code cleaner and simpler. Firstly we add a private member variable:
boost::condition_variable c_;
Then we rewrite consume and produce in this way:
void produce(unsigned int count)
{
  for(int i = count; i >= 0; --i)
  {
    boost::unique_lock<boost::mutex> l(m_);
    c_.wait(l, [this](){ return !produced_; } ); // 1
    std::cout << "Producer sets value to " << i << std::endl;
    value_ = i;
    produced_ = true;
    c_.notify_one(); // 2
  }
}

void consume() // 3
{
  do {
    boost::unique_lock<boost::mutex> l(m_);
    c_.wait(l, [this](){ return produced_; } );

    std::cout << "Consumer now is in control: " << value_ << std::endl;
    produced_ = false;
    c_.notify_one();
  } while(value_);
}
1. We can wait directly on the mutex (see the other example for details) or, as here, on the unique_lock. In this case we should pass to wait() a predicate that returns a boolean, the condition we are waiting for. Here as predicate we pass a lambda function, with a closure on this, so that we can access the produced_ flag.
2. Another thread that is pending on the condition is notified that the status has changed.
3. As before, consume() is very close to produce().

We could use the same code seen above to test our new version, and we should appreciate as fast is it now, that we removed the tentative sleeps.

Go to the full post

Reader-Writer mutex

Say that in our C++ application we have to protect some data that are read at high frequence from multiple threads and are seldom written. We could use a normal mutex-lock pattern, but that would slow down all the reading threads without any reason. The fact is that we should be able to track all the existing access to the resource and actually create a critical section only when the data change.

We can use the Boost Thread library shared_mutex and shared_lock for this purpose.

Instead of a plain mutex we define a shared_mutex to control the access to the resource; the readers would acquire a shared_lock on it before reading the data. When a writer wants to modify it, it would acquire a lock_guard on the shared_mutex, that would give it full control on the resource.

Let's see the code:
class ReadWrite
{
private:
int value_; // 1
boost::shared_mutex mx_; // 2

public:
ReadWrite() : value_(42) {}

void read()
{
boost::shared_lock<boost::shared_mutex> lx(mx_); // 3
std::cout << boost::this_thread::get_id() << " read: " << value_ << std::endl; // 4
boost::this_thread::sleep(boost::posix_time::millisec(100)); // 5
}

void increase()
{
boost::lock_guard<boost::shared_mutex> lx(mx_); // 6
std::cout << boost::this_thread::get_id() << " increasing " << value_++ << std::endl; // 7
boost::this_thread::sleep(boost::posix_time::millisec(100));
std::cout << boost::this_thread::get_id() << " new value: " << value_ << std::endl;
}

void reading() // 8
{
for(int i = 0; i < 15; ++i)
read();
}
};

1. This is just a silly example, so it would suffice having an integer as undelying resource.
2. And this is the shared_mutex we'll use to access the resource.
3. Before actually reading the data we should succeed acquiring a shared_lock on the defined shared_mutex. As many thread as we like could do it, if are all reading.
4. Some feedback to see what it is actually going on. Notice that this code is buggy, no mutex has been used for the standard output to console - this means that we are almost surely to have mixedup output. I expressely avoided to use such a mutex to keep the code to point, but it should be easy for you to fix the problem.
5. Some sleeping to make the effect of the lock more readable.
6. Here we need to access the data for changing it, so we create a lock_guard on the shared_mutex. This lock would patiently wait its turn if other threads owns the mutex, but when it get it, it would be the only one that could access the data - as we expect from a lock_guard - till it exits.
7. Notice that here the data changes.
8. Just an utility function, to make the testing easier.

And talking about testing:
TEST(ReadWrite, One)
{
ReadWrite rw;

boost::thread t1(&ReadWrite::reading, &rw); // 1
boost::thread t2(&ReadWrite::reading, &rw);

for(int i = 0; i < 4; ++i)
{
boost::this_thread::sleep(boost::posix_time::millisec(300));
rw.increase(); // 2
}

t1.join();
t2.join();
}

1. Two threads run on the reading() function.
2. Sometimes the main thread butts in and changes the undelying data.

Go to the full post

once_flag and call_once()

We have seen that, if we are developing in C++ for a multithreaded environment, it is not a good idea using the double checked locking strategy for ensure that a functionality is called just once in all the process life.

C++11 and the Thread Boost Library offer a robust alternative approach that is based on the couple once_flag and call_once().

The same code that we have seen in the previous post, could be rewritten like this:
std::shared_ptr<SomethingExpensive> sp; // 1
boost::once_flag ofsp; // 2

void setSomething()
{
boost::call_once(ofsp, [](){ // 3
sp.reset(new SomethingExpensive()); // 4
});
}

1. Using the volatile keyword on smart pointers leads to a number of complications. For this reason the original example was written using a raw pointer. We have seen that actually there is no real reason to use the volatile qualifier, so here we are free to go smart.
2. This is the once_flag associated to the resource we want to protect. Here we are using the boost implementation, but if this C++11 feature is supported by your current compiler you can use it just changing the namespace to std.
3. Calling boost::call_once() - again, use std::call_once if available for your compiler - we ensure that the passed function is called only once in all the process life. Here we pass as a function a lambda, so to make the code very compact.
4. That's the code called just once. An object is created on the heap and its address is stored in the smart pointer.

I have added a ctor to the expected expensive class to have some feedback:
SomethingExpensive() { std::cout << "ctor" << std::endl; }
And then I have written a test:
TEST(Expensive, Good)
{
boost::thread t1(&setSomething);
boost::thread t2(&setSomething);

t1.yield();
t2.yield();
}

Go to the full post

Data race on double checked locking

Double checked locking is a well known pattern used to initialize singleton. It works fine in some context, for instance it is absolutely fine using it to implement the Singleton Pattern with Java.

But when the implementation programming language is C++, the double checked locking strategy is often referred to as an anti-pattern - see for instance this wiki page on google code about data races.

The issue arise from the different meaning of the keyword volatile in Java vs. C++. In C++ volatile is just an hint to the compiler, meaning that we don't want any optimization performed on that variable, while in Java it implies constraints that ensures that any threads will read the most recent value of that variable.

So, that code that we see working fine for Java, is not equivalent with this (bad) rewrite in C++:
class SomethingExpensive
{
    // ...
};

volatile SomethingExpensive* se = nullptr; // !!! BAD IDEA !!!
boost::mutex mxse;

void setSomething()
{
    if(se == nullptr) // !!! BAD IDEA !!!
    {
        boost::lock_guard<boost::mutex> l(mxse);
        if(se == nullptr)
            se = new SomethingExpensive();
    }
}
The worst part of this code is that often, or even almost ever, it works. Just sometimes it gives an unexpected behavior that would be very difficult to track down to its real root.

Go to the full post

unique_lock when lock_guard is not enough

In the previous post we have seen that we can use lock_guard in conjunction with the lock() function (both are available in the Boost Thread library and as part of the new C++11 standard) to avoid deadlocks.

In this case the idiom is: call lock() for all the mutexes that you need to lock for that critical region, than create a lock_guard for adopting each of them, so that you have them automatically released till the end of scope - even in case of exceptions.

This should be enough for the normale usage, but sometimes we need more flexibility. That could be the time for using unique_lock (or equivalently, if you are using its boost implementation, its boost::mutex::scoped_lock typedef).

Doesn't make much sense in this case, since unique_lock adds a tiny overhead that is not required, but we can rewrite the original code in this way:
boost::lock<boost::mutex, boost::mutex>(m1_, m2_);
boost::unique_lock<boost::mutex> l1(m1_, boost::adopt_lock);
boost::unique_lock<boost::mutex> l2(m2_, boost::adopt_lock);

More interestingly, instead of adopting a mutex already locked, we could associate a mutex to unique_lock without locking it on construction, and then calling another overload of the lock() function, this one expecting in input unique_locks:
boost::unique_lock<boost::mutex> l1(m1_, boost::defer_lock);
boost::unique_lock<boost::mutex> l2(m2_, boost::defer_lock);
boost::lock(l1, l2);

Keep in mind that if you use the latter setting you must call the lock() function version that works on unique_locks, because otherwise you would lock the underlying mutex but unique_lock wouldn't get this information, so it wouldn't unlock it on destruction. That means, unique_lock would be completely useless.

Go to the full post

How to avoid deadlocks

We have seen how easy is to write code that could lead to a deadlock, and we already seen a protocol that, when carefully observed, remove that risks.

We have see how a deadlock could occur if we need to lock more than one mutex at the same time, because in that case we could have a thread that has already locked a mutex, but can't get its hands on the other (or one of the others) mutex that it needs, since another thread has already locked it for its use. And the other thread is in the same situation. Both guys are waiting for locking a mutex locked from another thread. No easy way out, one would say.

But actually, there is a way out, and it is part of the C++11 standard - and, if your compiler does not implement this new feature, you could get it through the Boost Thread library. The trick is done by the free function std::lock() / boost::lock() that locks all the passed mutex, or wait till this is possible. No partial locking is performed, so in this case the risk of deadlock is completely removed.

We can rewrite both f1() and f2() (as seen in the previous post) as this:
void f(char feedback)
{
for(int i = 0; i < SIZE_; ++i)
{
boost::lock<boost::mutex, boost::mutex>(m1_, m2_); // 1

std::cout << feedback;
++v1_;
++v2_;

m1_.unlock(); // 2
m2_.unlock();
}
}

1. As you see, lock() is a template function. It returns when it actually has performed the lock on both mutexes. There is no yield() call after this line because it actually doesn't have any use anymore, but you can try putting it (or maybe even a sleep, to force a context switch) and see what happens.
2. Major nuisance: the lock() function performs a simple lock() on the passed mutexes, no RAII as when lock_guard is used, that means we should unlock "by hand" both of them at the end of the critical area.

Do you feel uneasy for that explicit unlocks? Right, we could still use lock_guard, specifying that it has not to acquire the lock, just adopt it:
boost::lock<boost::mutex, boost::mutex>(m1_, m2_);
boost::lock_guard<boost::mutex> l1(m1_, boost::adopt_lock);
boost::lock_guard<boost::mutex> l2(m2_, boost::adopt_lock);

No more need of explicit unlock() at the end of the scope, the lock_guard dtor would take care of it.

Go to the full post

How to deadlock

It is easy to write code that could lead to a deadlock, less easy to see it in action, given that everything related to threads is in same way out of developer control, since it is the operating system - or maybe the hardware - that has the last word on when and where a thread should be executed.

But manipulating a bit the code we could magnify the effect of bad code.

Let see a class that is (badly) designed to be used in multithreading environment:
class Deadlock : boost::noncopyable
{
private:
int v1_; // 1
int v2_;
const int SIZE_;
boost::mutex m1_;
boost::mutex m2_;

public:
Deadlock(int size) : v1_(0), v2_(0), SIZE_(size) {}

int getV1() { return v1_; }
int getV2() { return v2_; }

void f1() // 2
{
for(int i = 0; i < SIZE_; ++i)
{
boost::lock_guard<boost::mutex> l1(m1_);
boost::this_thread::yield(); // 3
boost::lock_guard<boost::mutex> l2(m2_);

std::cout << '_'; // 4
++v1_; // 5
++v2_;
}
}

void f2() // 6
{
for(int i = 0; i < SIZE_; ++i)
{
boost::lock_guard<boost::mutex> l2(m2_);
boost::this_thread::yield();
boost::lock_guard<boost::mutex> l1(m1_);

std::cout << '.';
++v1_;
++v2_;
}
}
};

1. The class is build around a couple of integer, that are protected in their usage by a mutex each.
2. We have a couple of functions, f1 and f2, that acquires both mutex and then modify the two private member variables a number of times.
3. This yield() call is made to give an hint to the thread manager that we would be happy to let other threads to butt in.
4. Some visual feedback is always welcomed.
5. And here is the real job done by the function: increasing both data members.
6. The other function looks almost the same, but the locks are created in reverse order. Firstly we acquire a lock on m2_ and then on m1_. This could easily leads to a deadlock, since it tends to put the two threads in a situation where the first one owns the lock on m1_ and the second one on m2_. Both of them, in this case, will indefinitely wait for the other one, and none of them would release its one to help the other thread in carrying on its job.

Here is a test that usually would show the problem:
TEST(Deadlock, Bad)
{
const int SIZE = 20;
Deadlock d(SIZE);

boost::thread t1(&Deadlock::f1, &d);
boost::thread t2(&Deadlock::f2, &d);

boost::this_thread::sleep(boost::posix_time::seconds(1)); // 1
ASSERT_EQ(SIZE * 2, d.getV1());
ASSERT_EQ(SIZE * 2, d.getV2());
}

1. It won't make much sense to join() the working threads, since we expect them to be deadlocked. So we just wait for a long time (an entire second!) and then check what they did in the meantime.

If no deadlock happened, the assertions would fail. Maybe on your machine this test could unexpectedly succeed, even more the once. If that is your case you could increase SIZE, or run the test more times (or both), and in the end you should get the expected behavior.

These code is so simple that it is very easy to find a way to make it right. It is enough to follow a common policy in acquiring locks, so I rewrite the second function:
void f2a()
{
for(int i = 0; i < SIZE_; ++i)
{
boost::lock_guard<boost::mutex> l1(m1_);
boost::this_thread::yield();
boost::lock_guard<boost::mutex> l2(m2_);

std::cout << '.';
++v1_;
++v2_;
}
}

And I write another test:
TEST(Deadlock, Good)
{
const int SIZE = 20;
Deadlock d(SIZE);

boost::thread t1(&Deadlock::f1, &d);
boost::thread t2(&Deadlock::f2a, &d);

boost::this_thread::sleep(boost::posix_time::seconds(1));
ASSERT_EQ(SIZE * 2, d.getV1());
ASSERT_EQ(SIZE * 2, d.getV2());
}

This one should run fine in any circumstance.

This patch is very easy to be done, but it has the issue that is all right only for simple code. We'll see in a next post a more robust and general solution.

Go to the full post

Why std::stack is not threadsafe

The C++ STL standard containers are not thredsafe, and this is a feature. The point is that threadsafe code is much slower than the code designed to be run on a single thread, so it doesn't make much sense to force a programmer to use a threadsafe container when there are no reasons for that.

Let's see the case of std::stack. This class has been designed expressely to be used in monothreading code, and it can't be easily adapted to multithreading. The main issue is in how its top() and pop() functions are designed. If we call these methods on an empty stack we have an undefined behaviour, so we have to call the other member function empty() to ensure we can actually perform that operations.

Here is a couple of tests written for Google Test that show how top() and pop() are expected to be called:
TEST(StdStack, Top)
{
const int value = 42;
std::stack<int> si;
si.push(value);

if(si.empty() == false)
{
// *** [A] ***

ASSERT_FALSE(si.empty());
EXPECT_EQ(value, si.top());
}
}

TEST(StdStack, Pop)
{
const int value = 42;
std::stack<int> si;
si.push(value);

if(si.empty() == false)
{
// *** [A] ***

ASSERT_FALSE(si.empty());
si.pop();
EXPECT_TRUE(si.empty());
}
}

We expect both tests to succeeded. But what if we have two threads sharing the same stack container? What if in [A] the other thread butt in and pop() an element from the stack? It is easy to redesign the tests to emulate this behaviour, and we can see what is the result, in both case the subsequent assertions are bound to fail.

As we see, the problem is in how the std::stack interface is designed. Splitting the functionality in couple of methods, empty()/top() and empty()/pop(), makes it impossible to adapt it to a multithreading environment. We should provide a different interface where top() and pop() does not require a previous call to empty().

Go to the full post

Automatic join for a thread

When using a thread, we often follow a simple pattern that requires us to create in a function one or more boost::thread object, doing some more job, joining all the created threads, and finally terminate the function execution.

It is such a common structure than one could wonder why not using a wrapper class, similar to lock_guard, that would join the thread in the destructor. Beside simplyfing our code, we have the free benefit of making it more robust, since we should not worry what it is going on in case of an exception occurs after a working thread is created and before its joining.

Being so similar to lock_guard in its use and behaviour, I called such an utility class ThreadGuard (using the CamelCase naming convention to stress the fact that it is not part of a standard library).

Here is it, just a few lines of code but with some interesting points of discussion in it:

class ThreadGuard : boost::noncopyable // 1
{
private:
boost::thread t_;
public:
explicit ThreadGuard(boost::thread& t): t_(std::move(t)) {} // 2

~ThreadGuard()
{
if(t_.joinable())
t_.join(); // 3
}
};

1. No copy of this class is allowed, so we declare privately its copy ctor and assignment operator - to save a bit of typing we make the class derived from the utility boost class expressely designed for this task.
2. We move the boost::thread passed to the class (by reference) to the private member. So, after creating a ThreadGuard, the original boost::thread is left in a not-a-thread status.
3. The dtor join the thread, after ensuring that the current thread is actually joinable.

We could use this class as showed in this test (written for Google Test):
TEST(ThreadGuard, Simple)
{
boost::thread t(sayDelayedHallo, 300); // 1
EXPECT_TRUE(t.joinable());

ThreadGuard tg(t);
EXPECT_FALSE(t.joinable()); // 2

std::cout << "Main thread is ready to leave" << std::endl;
} // 3

1. A boost::thread is created, and then we ensure its construction is successful
2. After creating a ThreadGuard for a boost::thread, the latter becomes a not-a-thread.
3. The join to the thread is called implicitly here.

The thread is built to run this minimal function:
void sayDelayedHallo(unsigned int delay)
{
boost::this_thread::sleep(boost::posix_time::millisec(delay));
std::cout << "Hello" << std::endl;
}

Since there is no use anymore in having here around the "raw" boost::thread, we could create a ThreadGuard with an anonymous temporary object:
ThreadGuard tg(boost::thread(sayDelayedHallo, 300));

Go to the full post

Running a deamon thread

You probably know what a deamon process is: a process running in the background in your (UNIX) environment. A deamon thread is something very similar: a thread running in the background of your process.

We have a way to interact with a "normal" thread from the main thread that spawned it, but we have no way to relate to a deamon thread - it just runs freely until its natural end, or till the process is terminated.

One could wonder what is the point of having such a beast as a deamon thread, but here it comes handy the analogy with the deamon process. In the same way we sometimes want to start running a process in our environment and just forget about it, letting the operating system the burden to take care of it, we could be interested in having a job performed in the background of our application, letting the process taking care of it, if someone has to.

We can get this effect creating a boost::thread (or a std::thread, if your compiler support this C++11 feature) and then detach the generated thread. It is a simple matter, but better seeing it in an example, just to clarify.

This is the code that we want to run in a background thread:
void countdown(unsigned int count)
{
   do {
      std::cout << '[' << count << ']';
      boost::this_thread::sleep(boost::posix_time::seconds(1));
   } while(count--);
}
As you see, it just dumps data on the standard output console, sleeps for a while, and then iterates, for a number of times specified by the caller. We should notice that we could have some problem in the output generation since we could expect the main thread would write to cout too, so a mutex should be used for this shared resource - but we won't.

Here is the code that is going to use the countdown function:
boost::thread t(countdown, 10);

if(t.joinable()) // 1
{
   std::cout << "Detaching worker.";
   t.detach(); // 2
}

if(t.joinable() == false) // 3
{
   std::cout << "The worker thread now is a deamon, running by itself in background.";
}

std::cout << "Sleeping for a while ... ";
boost::this_thread::sleep(boost::posix_time::seconds(3));
std::cout << " ... terminating main." << std::endl; // 4
1. We should detach only a boost::thread that is alive and kicking or, as they better say, is joinable.
2. That's it. We simply call detach() on the boost::thead object. Actually, no much harm comes if we detach a thread not joinable. It would just result in a silly nonsensical call.
3. The consequent printing it's a bit too optimistic. If a boost::thread is not joinable we can't positively assume that it is still alive and working in the background. It could be anything. We don't have any information on it anymore.
4. Usually we should join all the spawned threads before terminating the main thread, but here we can't. As we said, we have no more control on that working thread. We should just assume that has been written properly and no issue would come from its possible brutal killing from the process manager at the end of the process life.

Go to the full post

Race condition

When different threads or process make use of the same data, we should pay attention not to incur in race conditions.

We have a race condition when two or more concurrent branches of execution depend on the same mutable shared state.

Let's see here a simple example of race condition.

Say that we should manage directly a linked list of elements, that could be defined in such a naively way:
struct Element
{
Element(int v, Element* n) : value(v), next(n) {}

int value;
Element* next;
};

To print all the items in a list of such Element's we could use this raw function:
void dumpList(Element* curr)
{
while(curr != nullptr)
{
std::cout << curr->value << ' ';
curr = curr->next;
}
std::cout << std::endl;
}

Adding an element to the begin of our linked list is a bit more interesting:
void addFront(Element*& head, int value) // 1
{
Element* t = head ? head : nullptr; // 2
Element* e = new Element(value, t); // 3

head = e; // 4
}

1. We pass to our function the pointer to the current head of list, and we pass it by reference, since we want to be able to change it, since we want to create a new head of the list.
2. This line could be merged in the next one, we don't actually need a temporary Element here, but it should make the code a bit clearer. Only if we have a "real" current head we need to set the next of the new element we are about to create.
3. A new Element is created.
4. The Element becomes the new head of the list.

The normal usage of our list is this:
Element* head = nullptr;

addFront(head, 42);
addFront(head, 24);
dumpList(head);

Everything works fine - if we stick to a single thread environment. Using such a code in a multithreading environment is an explicit request of troubles.

But let's start looking at a case where our code still works:

Element* head = nullptr;

boost::thread t1(addFront, std::ref(head), 42); // 1
boost::this_thread::sleep(boost::posix_time::millisec(250)); // 2
boost::thread t2(addFront, std::ref(head), 24); // 3

t1.join();
t2.join();

dumpList(head);

1. This new thread access by reference local data, the pointer to the list head. That means that both thread - main and worker - are accessing the same data. We are risking a race condition.
2. Introducing a relatively long sleeping period, we serialize actually remove the concurrency from this application.
3. Also this new thread is using the same head - the two worker thread are competing on the same state. If we don't acknowledge this situation we could expect big problems.

And it is quite easy to get troubles - it is enough to remove [2]. If there is no sleep, we should expect a mixup in addFront(), leading usually to data loss - only one Element would win the insertion in the list.

To better see what is going on, and in the meantime to make more visible the issue, let's rewrite the addFront() function. We'll just add a sort of emulation of the classic stepping debug function that is designed to work effectively in a multithreading environment:
boost::mutex mio; // 1

void step(int i)
{
{
boost::lock_guard<boost::mutex> l(mio); // 2
std::cout << boost::this_thread::get_id() << "/" << i << std::endl;
}

boost::this_thread::sleep(boost::posix_time::millisec(50)); // 3
}

1. A mutex to protect the output console, being it a shared resource. Actually, we should use it also in the dumpList() function - this is not an issue here, since the dumping is done just at the end of our minimal application, when the working threads have been already joined.
2. Lock the mutex before using the shared resource.
3. Let's make the competing effect more visible adding a sleep.

Let's use the stepping function in this way:
void addFront(Element*& head, int value)
{
step(1);
Element* t = head ? head : nullptr;
step(2);
Element* e = new Element(value, t);
step(3);

head = e;
step(4);
}


Now, running the two working threads should lead to a (catastrophic) result like this one:
006E9D00/1
006E9C88/1
006E9D00/2
006E9C88/2
006E9D00/3
006E9C88/3
006E9D00/4
006E9C88/4
42

It is easy seeing where the problem arise: there is a race to assign the element created by each thread to the head of the list, that is a shared resource. The last thread that writes in head is the winner, since it is going to override the writing previously done by the other thread, that simply gets lost.

The solution to the issue is easy: defining another mutex, and using it to protect the access to the shared resource.

Go to the full post

Producer-consumer on a shared standard container

We want to write a simple multithread producer-consumer application that uses the threadsafe wrapper to std::queue we wrote in the previous post.

The main point of this is seeing how to use a STL container in a multithreading context, and we'll have also a glimpse on how to interrupt a thread (on a interruption point).

We have to use a variation on std::queue because the standard container are not designed for use in a multithreading environment - the logic behind this strategy is that in this way we don't pay the high cost of a multithread-aware data structure if we are not interested in this feature.

Producer

The idea is that the producer generates a number of items, starting from a value passed to its ctor, an puts them in a queue that is accessible to the consumer too:
class Producer
{
private:
  const int root_; // 1
  std::shared_ptr< MyQueue<int> > q_; // 2

public:
  Producer(int root, std::shared_ptr< MyQueue<int> > q) : root_(root), q_(q) {}

  void run(size_t size) // 3
  {
    for(size_t i = 0; i < size; ++i)
    {
      q_->push(root_ + i);
      boost::this_thread::sleep(boost::posix_time::millisec(50)); 
    }
  } 
};
1. Initial value for the sequence generated by the producer.
2. Shared pointer to the queue where we want to put the data.
3. This method would be run in another thread. It accepts in input the number of elements the user wants to generate, and loops, putting each time a new item in the queue and then sleeping for a while.

Consumer

The consumer does not differ much from the producer, here is a simple implementation:
class Consumer
{
private:
  const int ID_; // 1
  std::shared_ptr< MyQueue<int> > q_;

public:
  Consumer(int id, std::shared_ptr< MyQueue<int> > q) : ID_(id), q_(q) {}

  void run(size_t size, bool sleep) // 2
  {
    for(size_t i = 0; i < size; ++i)
    {
      std::cout << "Consumer " << ID_ << ": " << q_->pop() << std::endl; // 3
      if(sleep)
        boost::this_thread::sleep(boost::posix_time::millisec(50));
      else
        boost::this_thread::interruption_point();
    }
  }
};
1. Used to see which consumer actually consumed the item.
2. This function runs in a different thread. The bool parameter is used to decide if this thread should sleep after popping a value from the queue, or just trying to read another item. Notice that in the latter case, a call to interruption_point() is done, so that the main thread could interrupt this worker thread.
3. This line is flawed. There could be multiple consumers, and all of them would try to access the standard output console. This means that we could have a mixed-up printout. The solution to this bug it is quite easy: adding a mutex in this class, and protecting this line with a lock on the mutex in each thread executing it. That should be easy to be fixed, but if you need some help to understand how to do it, you could have a look to other post, like the one where I talk about how to use a functor in multithreading. Incidentally, that same post could give you the idea to rewrite this example using functors. It should be interesting.

Calling producer-consumer

A simple way of running this code is by having a single producer and a single consumer, like this:
std::shared_ptr< MyQueue<int> > spq(new MyQueue<int>()); // 1
size_t size = 10; // 2

Producer p(1000, spq); // 3
boost::thread tp(&Producer::run, &p, size); // 4
boost::this_thread::sleep(boost::posix_time::seconds(1)); // 5

Consumer c(1, spq);
boost::thread tc(&Consumer::run, &c, size, true); // 6

tp.join(); // 7
tc.join();
1. We create a queue and put it in a shared pointer, so that we can pass it around in the producer and consumer.
2. Number of elements that we want to create.
3. Here is the producer ...
4. ... and here we start its run() method in another thread.
5. We give plenty of time to the producer to fill with its data the queue ...
6. ... then we create a new thread on the run() method of a consumer.
7. Finally we join the created threads before completing the main thread execution.

It is fun (if you are that kind of guy) to playing around with variation on this model, removing the sleep at [5], adding producers and consumers, changing the number of elements involved and whatsoever.

Go to the full post

Threadsafe std::queue

The standard containers are not threadsafe, and this is a feature, in the sense of we have not to pay for synchronization if we don't need it. On the other hand this means that when we need to use a container in multithreading environment, we have to take care explicitly of all the synchronization details, and it could be not a big fun.

I am about to write a simple multithreading application that needs a queue. One thread will put data in it, the other thread will read it. To implement this well know producer-consumer model, we can't use std::queue, but we should wrap it in a structure that makes it thread safe.

Our application would be very simple, and it would require just two threadsafe operation: push() and pop(). Just one case that requires a bit of thinking: what is going to happen if we call pop() when the queue is empty? Usually we should let the user a chance to decide if it is more appropriate to return in an error state, or hanging (maybe just for a while) waiting for the producer to provide a value. In this case I decided to keep it very simple, and just waiting.

Here is a possible implementation for such a queue:
template <typename T>
class MyQueue
{
private:
  std::queue<T> q_;
  boost::mutex m_; // 1
  boost::condition_variable c_;

public:
  void push(const T& data)
  {
    boost::lock_guard<boost::mutex> l(m_); // 1
    q_.push(data);
    c_.notify_one(); // 2
  }

  T pop()
  {
    boost::mutex::scoped_lock l(m_); // 3
    while(q_.size() == 0)
    {
      std::cout << "Waiting for data" << std::endl;
      c_.wait(l); // 4
    }

    T res = q_.front();
    q_.pop();
    return res; // 5
  }
};
1. We need just the bare RAII functionality from this lock, so lock_guard is enough. Once the mutex is acquired, we can change the object status, and that here means adding the passed data to the queue.
2. We use the condition member variable to notify the other thread that a new item has been inserted in the queue.
3. This lock should be passed to the member condition, in case we need to wait for coming data, so a lock_guard is not enough.
4. If the queue is currently empty, we put the current thread in a waiting status using the condition member variable. When the other thread notify that something has changed, this thread would resume its running status.
5. The value at the beginning of the queue is popped and returned to the caller.

[edit]
Initially I put the wait() statement above, at the line marked as (4), in an if block:
if(q_.size() == 0) // THIS IS NOT A GOOD IDEA
{
  std::cout << "Waiting for data" << std::endl;
  c_.wait(l);
}
As Sergey noticed (see comments below) this is not enough. Sometimes, in a very erratic way that I couldn't catch in my testing, wait() returns even if there is no change on the condition. Patching the code is pretty easy, we need to loop on the check until we actually get a confirmation for the change, as suggested by Adrián (again, below in the comments).

The section on the official Boost documentation for Condition variables in the Synchronization page, shows to wait on a condition right in this way. Still, I couldn't find there a clarification for the reasons beneath. For that, I'd suggest you to read the post named Spurious wakeups on the Vladimir Prus blog.

In few words, Vladimir remarks that wait() on a condition could return before completing its actual job, and this is an expected behavior, caused mainly by performance reasons. It is so much cheaper introducing a check in the user code than enforcing the condition wait() to return only as a real wakeup.
[/edit]

As usual, I wrote a few tests (using the Google Test implementation for C++ of xUnit) to be driven in the development and to ensure that the class works as I expected. This is one of them:
TEST(TestMyQueue, OneOutOneIn)
{
  MyQueue<int> mq;
  int in = 1;

  boost::thread t([&](){ EXPECT_EQ(in, mq.pop()); });

  boost::this_thread::sleep(boost::posix_time::seconds(1));
  mq.push(in);
}
The main thread spawns a new thread that runs a lambda function (cool, isn't it?) then sleeps for a second and finally push a value on the queue.

The other thread has access by reference to both the variables defined in the caller scope (that is the meaning of the ampersand in the lambda intro square brackets) and uses them in the test macro, comparing the result of popping on the queue with the value stored in the integer variable.

Since the main thread sleeps one big fat second before pushing the value on the queue, we expect that pop() find the queue being initially empty, and could get the value only after a while.

Go to the full post