Contents
Intro
Conditional Variables are a synchronization primitive in C++ that allows threads to wait for a certain condition to be met1. This is useful when you want to coordinate the execution of multiple threads. To use a conditional variable, you will need:
- A conditional variable
std::contional_variable cv
- A mutex
std::mutex mtx
and a lockstd::unique_lock<std::mutex> lck(mtx)
- Appropriate wait conditions to check for
- Appropriate notification calls from other threads
Here is a simplified structure of how to use a conditional variable:
std::mutex mtx;// Thread 1{ // Acquire the lock std::unique_lock<std::mutex> lck(mtx); cv.wait(lck, [] { return condition; });}// Thread 2{ // Acquire the lock std::unique_lock<std::mutex> lck(mtx); condition = true; cv.notify_one();}
Here, Thread 1 is suspended by .wait
call until condition == true
. However, CVs won't automatically wake up the thread when the condition is met. Instead, it checks the condition whenever receives a notification from another thread. This is where the .notify_one()
and .notify_all()
come in. In other threads, by calling either:
cv.notify_one()
( to wake up one thread suspended by thecv
randomly)cv.notify_all()
(to wake up all threads suspended by thecv
)
The cv
will check the condition and wake up the thread if the condition is met.
Example
Here we use an example to illustrate the actual use of conditional variables. It is abstracted from a real-world scenario I have been working on recently, where we need to instrument performance data for each running thread. Say we have multiple worker threads that have two stages: preparation and payload. It is a common use case that we want to wait for all threads to finish their preparations, do something else in the main threads, and start the actual payload for all the worker threads.
In this example, we have a Worker
class that creates multiple worker threads, where each thread:
- Sets its thread name using
pthread_setname_np
(Preparation Stage) - Delay for 1 second (Payload Stage)
We also have a Reporter
class that reports the thread ID and thread names of all worker threads, which requires all threads to finish their preparation stage before reporting. After reporting, we want to start the payload stage for all worker threads.
To achieve this, we use a conditional variable cv_ready
to wait for all threads to finish their preparation stage. Its condition is num_ready == num_workers
, where num_ready
is the number of threads that have finished their preparation stage, and num_workers
is the total number of worker threads. We use cv_ready.notify_one()
in the worker threads to notify the main thread to check the condition and return. After preparation, the worker threads are suspended by cv
, and this is when we initialize the Reporter
class and call report()
to report the thread information. After reporting, we call cv.notify_all()
to start the payload stage for all worker threads.
#include <condition_variable>#include <string>#include <pthread.h>#include <thread>#include <vector>#include <iostream>/**
* Report information of all worker threads, only works for macOS.
*/#include <mach/mach.h>class Reporter {public: Reporter() { task_threads(mach_task_self(), &threads, &thread_count); std::cout << "[Reporter] Reporter loaded. Found " << thread_count << " threads." << std::endl; } void report() { std::cout << "[Reporter] Reporting all threads:" << std::endl; for (int i = 0; i < thread_count; i++) { char thread_name[64]; uint64_t tid; pthread_t pthread = pthread_from_mach_thread_np(threads[i]); pthread_getname_np(pthread, thread_name, sizeof(thread_name)); pthread_threadid_np(pthread, &tid); std::cout << "[Reporter] - thread " << i << " name: " << thread_name << " tid: " << tid << std::endl; } }private: thread_act_array_t threads; mach_msg_type_number_t thread_count;};class Worker {public: Worker(int16_t n) : num_workers(n), num_ready(0) { } void worker_thread(std::string thread_name, uint16_t thread_id) { // Thread preparation { std::lock_guard<std::mutex> lck(mtx); pthread_setname_np(thread_name.c_str()); std::cout << "[Worker] Thread " << thread_name << " ready to run." << std::endl; num_ready++; cv_ready.notify_one(); } // Suspend all threads until ready std::unique_lock<std::mutex> lck(mtx); cv.wait(lck, [this] { return num_ready == num_workers; }); // Allow parallel execution of worker threads lck.unlock(); // Thread payload std::this_thread::sleep_for(std::chrono::seconds (1)); { std::lock_guard<std::mutex> lck(mtx); std::cout << "[Worker] Thread " << thread_name << " finished." << std::endl; } } void create_workers() { std::string main_thread_name = "main"; pthread_setname_np(main_thread_name.c_str()); for (int i = 0; i < num_workers; i++) { std::string worker_name = "worker-" + std::to_string(i); m_threads.emplace_back(std::thread(&Worker::worker_thread, this, worker_name, i)); } // Wait for all threads to be loaded std::unique_lock<std::mutex> lck(mtx_ready); cv_ready.wait(lck, [this] { return num_ready == num_workers; }); } void report() { Reporter *reporter = new Reporter(); reporter->report(); } void run() { // Notify all threads to start cv.notify_all(); for (auto &t: m_threads) { t.join(); } }private: int16_t num_workers; int16_t num_ready; std::vector<std::thread> m_threads; std::condition_variable cv; std::mutex mtx; std::condition_variable cv_ready; std::mutex mtx_ready;};int main() { // Create threads Worker *worker = new Worker(5); worker->create_workers(); // Do something before actual threads execution, after all threads are created and configured. worker->report(); // Start all worker threads worker->run();}
We can get the following output:
[Worker] Thread worker-0 ready to run.[Worker] Thread worker-3 ready to run.[Worker] Thread worker-1 ready to run.[Worker] Thread worker-2 ready to run.[Worker] Thread worker-4 ready to run.[Reporter] Reporter loaded. Found 6 threads.[Reporter] - thread 0 name: main tid: 4454155[Reporter] - thread 1 name: worker-0 tid: 4454239[Reporter] - thread 2 name: worker-1 tid: 4454240[Reporter] - thread 3 name: worker-2 tid: 4454241[Reporter] - thread 4 name: worker-3 tid: 4454242[Reporter] - thread 5 name: worker-4 tid: 4454243[Worker] Thread worker-4 finished.[Worker] Thread worker-3 finished.[Worker] Thread worker-0 finished.[Worker] Thread worker-2 finished.[Worker] Thread worker-1 finished.
Here, the first 5 lines are the worker threads' preparation stage, and the next 7 lines are the thread information reported by the Reporter
class. After reporting, the worker threads start their payload stage simultaneously and finish in random order.
If you are using Linux
Note that the code uses macOS system calls in the Reporter. If you are using a Linux system, you may use the following Reporter
class instead:
/**
* Reporter for Linux.
*/#include <dirent.h>#include <fstream>class Reporter {public: Reporter() { DIR *dir = opendir("/proc/self/task"); if (dir) { struct dirent *entry; while ((entry = readdir(dir)) != nullptr) { if (entry->d_type == DT_DIR && isdigit(entry->d_name[0])) { thread_ids.push_back(entry->d_name); } } closedir(dir); } thread_count = thread_ids.size(); std::cout << "[Reporter] Reporter loaded. Found " << thread_count << " threads." << std::endl; } void report() { std::cout << "[Reporter] Reporting all threads:" << std::endl; for (int i = 0; i < thread_count; i++) { std::string tid = thread_ids[i]; std::string thread_name = get_thread_name(tid); std::cout << "[Reporter] - thread " << i << " name: " << thread_name << " tid: " << tid << std::endl; } }private: std::vector<std::string> thread_ids; int thread_count; std::string get_thread_name(const std::string &tid) { std::ifstream comm_file("/proc/self/task/" + tid + "/comm"); std::string name; if (comm_file.is_open()) { std::getline(comm_file, name); } return name; }};
You also need to change pthread_setname_np
to pthread_setname_np(pthread_self(), thread_name.c_str())
in the Worker
class.
Dive Deeper
Why do we need unique_lock
and mutex
?
As stated in the cpp reference:
Any thread that intends to wait on a
std::condition_variable
must acquire astd::unique_lock<std::mutex>
on the mutex used to protect the shared variable.
But why? In intuition, if cv
is designed to suspend a thread until a condition is met, a lock does not seem necessary. If we want to ensure thread safety when evaluating conditions, why couldn't we just use atomic variables in the condition predicate?
Let's look into the details of the process:
{ // Lock aquired std::unique_lock<std::mutex> lck(mtx); // Suspend the thread, release the lock, // wait for notifications from other threads. cv.wait(lck, [] { return ready; }); // When other thread calls notify: // cv re-acquires the mutex and check the condition again. // If met, wake up the thread, keep holdong the lock, and continue. // If not met, **release** the lock, and suspend the thread until notified. // Equivalent to: while(!ready) { // Suspend the thread and release the lock cv.wait(lck); // When notified, re-acquire the lock // so that the condition checking is atomic. }}
The reasons for using locks revolve around the wait
and condition checking. First, without the lock, the condition could change between the time the condition is checked and the time the thread is suspended. A lock ensures the atomicity of the process. Another reason is to avoid missing notifications, as the notification might be called just before it starts waiting. Therefore, the notifying threads should also use the same mutex before calling notify
.
In other words, the mutex is designed to protect the conditional variable itself. It will be released as soon as the thread is suspended, so that other threads can access the conditional variable for notification and make changes to the conditions2.
What if the condition is met before wait/never met?
When a thread is already suspended and the cv is notified, it will check the condition and suspend the thread again or wake up. However, the first wait
call of the cv will not check the condition. It will suspend the thread, release the mutex, and wait for notification. Having the condition already met when calling wait
is like calling the function without the condition predicate:
{ std::unique_lock<std::mutex> lck(mtx); // After calling wait, release the lock, suspend the thread, and wait for notification. cv.wait(lck);}
If the condition is never met, well, it suspends forever (as expected). In the example above, the condition for cv_ready
is num_ready == num_workers
. If this is never satisfied, the uspendcreate_workers
function will hang indefinitely, indicating not all threads are ready.
What if notify() is called before wait/never called?
If we notify before the wait call, the notification will be lost since these calls are not queued. Without more notifications, the thread will hang indefinitely until a spurious wakeup3. This sometimes mistakenly happens if you forgot to acquire the mutex in the notifying thread, emphasizing the importance of mutex in cv
s.
If the notification is never called (and the conditions are met), it does not mean the thread will never wake up. There are two possibilities:
- Timeout Expire: If you use
wait_for
orwait_until
, cv will wake up the thread after the timeout expires despite the condition not being met, and returnfalse
so you can handle timeout cases:
if (cv.wait_for(lck, std::chrono::seconds(1), [] { return ready; })) { // Condition met} else { // Timeout}if (cv.wait_until(lck, std::chrono::system_clock::now() + std::chrono::seconds(1), [] { return ready; })) { // Condition met} else { // Timeout}
- Spurious Wakeup: By definition, a spurious wakeup is a situation where the conditional variable wakes up without being notified4. As a phenomenon driven by platform-specific implementations and OS-level thread scheduling considerations5, it is neither guaranteed nor predictable. Therefore, we should always check for the conditions after waking up (this is why the predicate parameter in the
wait
function is typically necessary).
Suspend vs. Block
Throughout this article, I have been using the term "suspend" to describe the behavior of a thread when it calls cv.wait
instead of "block". I prefer to use "suspend" because it better describes the behavior of this wait. When a thread is suspended, it does not consume any CPU time until the cv
is signaled (as long as built with -pthread
flag in Linux systems)6. The thread is now unscheduled with its ID inserted at the tail of a list of waiting threads. In contrast, "blocking" a thread implies that the thread is performing some operation that is preventing it from running through, such as contention for a lock or waiting for I/O, which consumes more CPU resources. For example, a do-while
loop can be considered as blocking the thread:
do { // Busy waiting for the condition, blocking the thread.} while (!ready);
Conclusion
That's a lot to take in! I do spend some time trying to understand this topic, and I hope this article can help you get a general idea of how conditional variables work.
In one word, conditional variables suspend a thread at specific locations and wait for notifications that instruct it to wake up the thread (condition met) or continue to wait (condition not met).