C++ Concurrency
Table of Contents
1 Overview
1.1 Purpose
1.1.1 Separation of concerns
1.1.2 Performance
- Task parallelism
- Data parallelism: Each thread performs the same operation on different parts of the data.
2 Managing threads
2.1 Daemon Threads
std::thread t(do_background_work); t.detach();
2.2 Passing Arguments
The arguments are copied into thread's internal storage by default. Use std::ref to bind a reference argument.
std::thread t(update_ref_data, std::ref(data)); // use std::move for move scenario
2.3 RAII(Resource Acquisition Is Initialization)
Use RAII to avoid use of try/catch blocks. Instances of std::thread are movable, but aren't copyable
class ScopedThread { public: explicit ScopedThread(std::thread t) : t_(std::move(t)) { if (!t_.joinable()) throw std::logic_error("No thread"); } ~ScopedThread() { t_.join(); } ScopedThread(const ScopedThread &) = delete; ScopedThread &operator=(const ScopedThread &) = delete; private: std::thread t_; }; ScopedThread t(std::thread(some_func));
2.4 hardware_concurrency
std::cout << std::thread::hardware_concurrency() << "\n";
3 Sharing data
3.1 Avoiding Deadlock
3.1.1 std::lock
A function that can lock two or more mutexes at once without risk of deadlock
std::lock(mu1, mu2); std::lock_guard<std::mutex> lock_a(mu1, std::adopt_lock); std::lock_guard<std::mutex> lock_b(mu2, std::adopt_lock);
3.2 Lazy Initialization
3.2.1 std::once_flag & std::call_once
std::shared_ptr<some_resource> resource_ptr; void init_resource() { resource_ptr.reset(new some_resource); } void foo() { static std::once_flag resource_flag; std::call_once(resource_flag, init_resource()); resource_ptr->do_something(); }
3.2.2 static(C++11)
SomeClass &get_instance() { static SomeClass instance; return instance; }
3.3 Reader-writer Mutex
- boost::shared_mutex
- readers: shared_lock
- writers: unique_lock
3.4 More Approach
3.4.1 software transactional memory(STM)
like transaction in database, this is an active research area.
4 Paradigm
FP style
Use future can be passed around between threads to allow the result of one computation to depend on the result of another, without any explicit access to shared data.
CSP(Communicating Sequential Processes)
See atm example
5 Synchronizing Concurrent Operations
5.1 Condition Variables
- example: Thread-safe Queue
template <typename T> class threadsafe_queue { public: threadsafe_queue() {} threadsafe_queue(const threadsafe_queue &other) { std::lock_guard<std::mutex> lk(other.mu_); data_queue_ = other.data_queue_; } bool empty() const { std::lock_guard<std::mutex> lk(mu_); return data_queue_.empty(); } void push(T new_value) { std::lock_guard<std::mutex> lk(mu_); data_queue_.push(new_value); data_cond_.notify_one(); // notify_all } void wait_and_pop(T &value) { std::unique_lock<std::mutex> lk(mu_); data_cond_.wait(lk, [this] { !data_queue_.empty(); }); // unlock->wait->lock value = data_queue.front(); data_queue.pop(); } private: mutable std::mutex mu_; // const member func like empty can lock mu_ std::queue<T> data_queue_; std::condition_variable data_cond_; };
5.2 std::future<> & std::async
only moveable, wrapped by unique_ptr
std::future<data> result = std::async(find_the_answer); do_other_things(); std::cout << result.get() << "\n";
options: std::launch::async, std::launch::deferred
5.2.1 std::shared_future
copyable, wrapped by shared_ptr
- operations on copied shared_future are thread-safe
- std::future -> std::shared_future
- access to the same shared state from multiple threads is safe if each thread does it through its own copy of a
shared_futureobject.
5.3 std::packaged_task<>
std::packaged_task<> ties a future to a function or callable object.
std::packaged_task<int(int, int)> some_task(task_func); auto result = some_task.get_future(); std::thread t{std::move(some_task), a, b}; result.get(); t.join();
5.3.1 when to use
- used as a building block for thread pools
- task management schemes
5.3.2 specialization of std::packaged_task<>
template <> class packaged_task<std::string(std::vector<char> *, int)> { public: template <typename Callable> explicit packaged_task(Callable &&f); std::future<std::string> get_future(); void operator()(std::vector<char> *, int); };
5.4 std::promise<>
- std::promise<> stores a return value or exception
std::promise<T> provides a means of setting a value (of type T), which can later be read through an associated std::future<T> object. The waiting thread could block on the future, while the thread providing the data could use the promise half of the pairing to set the associated value and make the future ready.
void do_work(std::promise<int> promise) { prepare_data(); promise.set_value(42); } void do_work(std::promise<void> barrier) { std::this_thread::sleep_for(std::chrono::seconds(1)); barrier.set_value(); } int main() { std::promise<int> promise; std::future<int> future = promise.get_future(); std::thread worker(do_work, std::move(promise)); future.wait(); std::cout << future.get() << "\n"; worker.join(); // Demonstrate using promise<void> to signal state between threads. std::promise<void> barrier; std::future<void> barrier_future = barrier.get_future(); std::thread new_work_thread(do_work, std::move(barrier)); barrier_future.wait(); new_work_thread.join(); return 0; }
5.4.1 set_exception()
try { some_promise.set_value(calculate_value()); } catch (...) { some_promise.set_exception(std::copy_exception(std::logic_error("foo"))); //alternative some_promise.set_exception(std::current_exception()); }
- std::current_exception()
- prefer the
copy_exceptionway: Not only does it simplify the code, but it also provides the compiler with greater opportunity to optimize the code. - a call to get() rethrows that stored exception
6 Memory Model
6.1 Modification Orders
Every object in a C++ program has a defined modification order composed of all the writes to that object from all threads in the program
6.2 Atomic Types
An atomic operation is an indivisible operation. You can't observe such an operation half-done from any thread in the system; it's either done or not done.
- If you do use atomic operations, the compiler is responsible for ensuring that the necessary synchronization is in place.
6.2.1 Operations
is_lock_free(): allows the user to determine whether operations on a given type are done directly with atomic instructionsload()store()exchange()compare_exchange_weak()compare_exchange_strong()=, +=, *=, |=, --, ++, orfetch_add(), fetch_or()etc.
Each of the operations on the atomic types has an optional memory-ordering argument.
- The default ordering for all operations is memory_order_seq_cst.
- Store Operations
which can have memory_order_relaxed, memory_order_release, or memory_order_seq_cst ordering.
- Load Operations
which can have memory_order_relaxed, memory_order_consume, memory_order_acquire, or memory_order_seq_cst ordering
- Read-modify-write Operations
which can have memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_release, memory_order_acq_rel, or memory_order_seq_cst ordering
6.2.2 std::atomic_flag
- the simplest standard atomic type, which represents a Boolean flag
- two states: set or clear
std::atomic_flag f = ATOMIC_FLAG_INIT; // clear state f.clear(std::memory_order_release); // store operation bool x = f.test_and_set(); // read-modify-write operation
6.2.3 std::atomic<bool>
std::atomic<bool> b; bool x = b.load(std::memory_order_acquire); b.store(true); x = b.exchange(false, std::memory_order_acq_rel);
std::atomic<bool>may not be lock-free
6.2.4 compare/exchange operation(RMW)
- stores the desired value if expected value is equal to atomic variable
- the expected value is updated with atomic variable if they aren't equal
- return true if the store was performed
- underlying is a CAS(compare-and-swap) instruction if the processor have one
- some platform support a double-word-compare-and-swap (DWCAS) instruction
compare_exchange_weak()
The store might not be successful even if the original value was equal to the expected value, in which case the value of the variable is unchanged and the return value of
compare_exchange_weak()is false. This is due to spurious failure on some platforms where a sequence of instructions are used to implement a single CAS instruction.bool expected{false}; extern atomic<bool> b; // set somewhere else while (!b.compare_exchange_weak(expected, true) && !expected) ;
Why is
!expected inthe exampleIt depends on the situation and its desired semantics, but usually it is not needed for correctness. Omitting it would yield a very similar semantics. Only in a case where another thread might reset the value to false, the semantics could become slightly different (yet I cannot find a meaningful example where you would want that). See Tony D.'s comment for a detailed explanation. It is simply a fast track when another thread writes true: Then the we abort instead of trying to write true again.
compare_exchange_strong()
compare_exchange_strong()is guaranteed to return false only if the actual value wasn't equal to the expected value.- weak or strong
If the calculation of the value to be stored is simple, it may be beneficial to use
compare_exchange_weak()in order to avoid a double loop on platforms wherecompare_exchange_weak()can fail spuriously (and socompare_exchange_strong()contains a loop). On the other hand, if the calculation of the value to be stored is itself time consuming, it may make sense to usecompare_exchange_strong()to avoid having to recalculate the value to store when the expected value hasn't changed.hints:
When a compare-and-exchange is in a loop, the weak version will yield better performance on some platforms. When a weak compare-and-exchange would require a loop and a strong one would not, the strong one is preferable.
- memory-ordering semantics
- one for success
- another for failure
- if you don't specify an ordering for failure, it's assumed to be the same as that for success
6.2.5 atomic<UDT>
The type must not have any virtual functions or virtual base classes and must use the
compiler-generated copy-assignment operator. This essentially permits the compiler to
use memcpy() and memcmp().
- In general, the compiler will use an internal lock for all the operations
6.2.6 std::shared_ptr<>
- atomic operations available for
shared_ptrare load, store, exchange, and compare/exchange
7 Useful tools
- std::ref
- std::distance(iter1, iter2)
- std::advance(iter, distance)
- list::splice:
- std::partition
- std::ostream_iterator
7.1 std::mem_fn
useful for std algorithms
class SomeClass { void some_func() {} }; auto some_func = std::mem_fn(&SomeClass::some_func); SomeClass s; some_func(s); std::some_algo(iter1, iter2, some_func);
7.2 Time Facilities
7.2.1 Clocks
- now
auto time_point = std::chrono::some_clock::now();
- tick period
- std::ratio<1, 25> ticks every 1/25 seconds
- std::ratio<5, 2> ticks every 2.5 seconds
There's no guarantee that the observed tick period in a given run of the program matches the specified period for that clock.
- steady_clock
If a clock ticks at a uniform rate and can't be adjusted, the clock is said to be a steady clock. Steady clocks are important for timeout calculations.(system_clock can be changed)
- high
high_resolution_clock provides the smallest possible tick period of all the library-supplied clocks.
7.2.2 Durations
std::duration<representation, std::ratio<>>
- example millisecond duration: std::chrono::duration<double, std::ratio<1,1000>>
- std::chrono::duration_cast<>: floor(high_resolution)
The time for a duration-based wait is measured using a steady clock internal to the library, even if the system clock was adjusted during the wait
7.2.3 Time points
std::chrono::time_point<clock, duration>
- time_since_epoch() returns a duration value specifying the length of time since the clock epoch to that particular time point
7.2.4 wait_for & wait_until
- Clock adjustments are taken into account by the wait_until