C++ Concurrency

Table of Contents

1 Overview

1.1 Purpose

1.1.1 Separation of concerns

1.1.2 Performance

  • Task parallelism
  • Data parallelism: Each thread performs the same operation on different parts of the data.

2 Managing threads

2.1 Daemon Threads

std::thread t(do_background_work);
t.detach();

2.2 Passing Arguments

The arguments are copied into thread's internal storage by default. Use std::ref to bind a reference argument.

std::thread t(update_ref_data, std::ref(data)); // use std::move for move scenario

2.3 RAII(Resource Acquisition Is Initialization)

Use RAII to avoid use of try/catch blocks. Instances of std::thread are movable, but aren't copyable

class ScopedThread {
public:
  explicit ScopedThread(std::thread t) : t_(std::move(t)) {
    if (!t_.joinable())
      throw std::logic_error("No thread");
  }
  ~ScopedThread() { t_.join(); }
  ScopedThread(const ScopedThread &) = delete;
  ScopedThread &operator=(const ScopedThread &) = delete;

private:
  std::thread t_;
};

ScopedThread t(std::thread(some_func));

2.4 hardware_concurrency

std::cout << std::thread::hardware_concurrency() << "\n";

3 Sharing data

3.1 Avoiding Deadlock

3.1.1 std::lock

A function that can lock two or more mutexes at once without risk of deadlock

std::lock(mu1, mu2);
std::lock_guard<std::mutex> lock_a(mu1, std::adopt_lock);
std::lock_guard<std::mutex> lock_b(mu2, std::adopt_lock);

3.2 Lazy Initialization

3.2.1 std::once_flag & std::call_once

std::shared_ptr<some_resource> resource_ptr;

void init_resource() {
  resource_ptr.reset(new some_resource);
}

void foo() {
  static std::once_flag resource_flag;
  std::call_once(resource_flag, init_resource());
  resource_ptr->do_something();
}

3.2.2 static(C++11)

SomeClass &get_instance() {
  static SomeClass instance;
  return instance;
}

3.3 Reader-writer Mutex

  • boost::shared_mutex
  • readers: shared_lock
  • writers: unique_lock

3.4 More Approach

3.4.1 software transactional memory(STM)

like transaction in database, this is an active research area.

4 Paradigm

  • FP style

    Use future can be passed around between threads to allow the result of one computation to depend on the result of another, without any explicit access to shared data.

  • CSP(Communicating Sequential Processes)

    See atm example

5 Synchronizing Concurrent Operations

5.1 Condition Variables

  • example: Thread-safe Queue
template <typename T> class threadsafe_queue {
public:
  threadsafe_queue() {}
  threadsafe_queue(const threadsafe_queue &other) {
    std::lock_guard<std::mutex> lk(other.mu_);
    data_queue_ = other.data_queue_;
  }

  bool empty() const {
    std::lock_guard<std::mutex> lk(mu_);
    return data_queue_.empty();
  }

  void push(T new_value) {
    std::lock_guard<std::mutex> lk(mu_);
    data_queue_.push(new_value);
    data_cond_.notify_one(); // notify_all
  }
  void wait_and_pop(T &value) {
    std::unique_lock<std::mutex> lk(mu_);
    data_cond_.wait(lk, [this] { !data_queue_.empty(); }); // unlock->wait->lock
    value = data_queue.front();
    data_queue.pop();
  }

private:
  mutable std::mutex mu_; // const member func like empty can lock mu_
  std::queue<T> data_queue_;
  std::condition_variable data_cond_;
};

5.2 std::future<> & std::async

only moveable, wrapped by unique_ptr

std::future<data> result = std::async(find_the_answer);
do_other_things();
std::cout << result.get() << "\n";

options: std::launch::async, std::launch::deferred

5.2.1 std::shared_future

copyable, wrapped by shared_ptr

  • operations on copied shared_future are thread-safe
  • std::future -> std::shared_future
  • access to the same shared state from multiple threads is safe if each thread does it through its own copy of a shared_future object.

5.3 std::packaged_task<>

std::packaged_task<> ties a future to a function or callable object.

std::packaged_task<int(int, int)> some_task(task_func);
auto result = some_task.get_future();
std::thread t{std::move(some_task), a, b};
result.get();
t.join();

5.3.1 when to use

  • used as a building block for thread pools
  • task management schemes

5.3.2 specialization of std::packaged_task<>

template <> class packaged_task<std::string(std::vector<char> *, int)> {
public:
  template <typename Callable> explicit packaged_task(Callable &&f);
  std::future<std::string> get_future();
  void operator()(std::vector<char> *, int);
};

5.4 std::promise<>

  • std::promise<> stores a return value or exception

std::promise<T> provides a means of setting a value (of type T), which can later be read through an associated std::future<T> object. The waiting thread could block on the future, while the thread providing the data could use the promise half of the pairing to set the associated value and make the future ready.

void do_work(std::promise<int> promise) {
  prepare_data();
  promise.set_value(42);
}

void do_work(std::promise<void> barrier) {
  std::this_thread::sleep_for(std::chrono::seconds(1));
  barrier.set_value();
}

int main() {
  std::promise<int> promise;
  std::future<int> future = promise.get_future();
  std::thread worker(do_work, std::move(promise));
  future.wait();
  std::cout << future.get() << "\n";
  worker.join();

  // Demonstrate using promise<void> to signal state between threads.
  std::promise<void> barrier;
  std::future<void> barrier_future = barrier.get_future();
  std::thread new_work_thread(do_work, std::move(barrier));
  barrier_future.wait();
  new_work_thread.join();
  return 0;
}

5.4.1 set_exception()

try {
  some_promise.set_value(calculate_value());
} catch (...) {
  some_promise.set_exception(std::copy_exception(std::logic_error("foo")));
  //alternative some_promise.set_exception(std::current_exception());
}
  • std::current_exception()
  • prefer the copy_exception way: Not only does it simplify the code, but it also provides the compiler with greater opportunity to optimize the code.
  • a call to get() rethrows that stored exception

6 Memory Model

6.1 Modification Orders

Every object in a C++ program has a defined modification order composed of all the writes to that object from all threads in the program

6.2 Atomic Types

An atomic operation is an indivisible operation. You can't observe such an operation half-done from any thread in the system; it's either done or not done.

  • If you do use atomic operations, the compiler is responsible for ensuring that the necessary synchronization is in place.

6.2.1 Operations

  • is_lock_free(): allows the user to determine whether operations on a given type are done directly with atomic instructions
  • load()
  • store()
  • exchange()
  • compare_exchange_weak()
  • compare_exchange_strong()
  • =, +=, *=, |=, --, ++, or fetch_add(), fetch_or() etc.

Each of the operations on the atomic types has an optional memory-ordering argument.

  • The default ordering for all operations is memory_order_seq_cst.
  • Store Operations

    which can have memory_order_relaxed, memory_order_release, or memory_order_seq_cst ordering.

  • Load Operations

    which can have memory_order_relaxed, memory_order_consume, memory_order_acquire, or memory_order_seq_cst ordering

  • Read-modify-write Operations

    which can have memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_release, memory_order_acq_rel, or memory_order_seq_cst ordering

6.2.2 std::atomic_flag

  • the simplest standard atomic type, which represents a Boolean flag
  • two states: set or clear
std::atomic_flag f = ATOMIC_FLAG_INIT; // clear state
f.clear(std::memory_order_release); // store operation
bool x = f.test_and_set(); // read-modify-write operation
  • simple spinlock_mutex
    class spinlock_mutex {
      std::atomic_flag flag;
    
    public:
      spinlock_mutex : flag(ATOMIC_FLAG_INIT) {}
    
      void lock() {
        while (flag.test_and_set(std::memory_order_acquire))
          ;
      }
    
      void unlock() { flag.clear(std::memory_order_release); }
    };
    

6.2.3 std::atomic<bool>

std::atomic<bool> b;
bool x = b.load(std::memory_order_acquire);
b.store(true);
x = b.exchange(false, std::memory_order_acq_rel);
  • std::atomic<bool> may not be lock-free

6.2.4 compare/exchange operation(RMW)

  • stores the desired value if expected value is equal to atomic variable
  • the expected value is updated with atomic variable if they aren't equal
  • return true if the store was performed
  • underlying is a CAS(compare-and-swap) instruction if the processor have one
  • some platform support a double-word-compare-and-swap (DWCAS) instruction
  • compare_exchange_weak()

    The store might not be successful even if the original value was equal to the expected value, in which case the value of the variable is unchanged and the return value of compare_exchange_weak() is false. This is due to spurious failure on some platforms where a sequence of instructions are used to implement a single CAS instruction.

    bool expected{false};
    extern atomic<bool> b; // set somewhere else
    while (!b.compare_exchange_weak(expected, true) && !expected)
      ;
    
    • Why is !expected in the example

      It depends on the situation and its desired semantics, but usually it is not needed for correctness. Omitting it would yield a very similar semantics. Only in a case where another thread might reset the value to false, the semantics could become slightly different (yet I cannot find a meaningful example where you would want that). See Tony D.'s comment for a detailed explanation. It is simply a fast track when another thread writes true: Then the we abort instead of trying to write true again.

  • compare_exchange_strong()

    compare_exchange_strong() is guaranteed to return false only if the actual value wasn't equal to the expected value.

  • weak or strong

    If the calculation of the value to be stored is simple, it may be beneficial to use compare_exchange_weak() in order to avoid a double loop on platforms where compare_exchange_weak() can fail spuriously (and so compare_exchange_strong() contains a loop). On the other hand, if the calculation of the value to be stored is itself time consuming, it may make sense to use compare_exchange_strong() to avoid having to recalculate the value to store when the expected value hasn't changed.

    • hints:

      When a compare-and-exchange is in a loop, the weak version will yield better performance on some platforms. When a weak compare-and-exchange would require a loop and a strong one would not, the strong one is preferable.

  • memory-ordering semantics
    • one for success
    • another for failure
    • if you don't specify an ordering for failure, it's assumed to be the same as that for success

6.2.5 atomic<UDT>

The type must not have any virtual functions or virtual base classes and must use the compiler-generated copy-assignment operator. This essentially permits the compiler to use memcpy() and memcmp().

  • In general, the compiler will use an internal lock for all the operations

6.2.6 std::shared_ptr<>

  • atomic operations available for shared_ptr are load, store, exchange, and compare/exchange

7 Useful tools

  • std::ref
  • std::distance(iter1, iter2)
  • std::advance(iter, distance)
  • list::splice:
  • std::partition
  • std::ostream_iterator

7.1 std::mem_fn

useful for std algorithms

class SomeClass {
  void some_func() {}
};

auto some_func = std::mem_fn(&SomeClass::some_func);
SomeClass s;
some_func(s);
std::some_algo(iter1, iter2, some_func);

7.2 Time Facilities

7.2.1 Clocks

  • now
    auto time_point = std::chrono::some_clock::now();
    
  • tick period
    • std::ratio<1, 25> ticks every 1/25 seconds
    • std::ratio<5, 2> ticks every 2.5 seconds

    There's no guarantee that the observed tick period in a given run of the program matches the specified period for that clock.

  • steady_clock

    If a clock ticks at a uniform rate and can't be adjusted, the clock is said to be a steady clock. Steady clocks are important for timeout calculations.(system_clock can be changed)

  • high

    high_resolution_clock provides the smallest possible tick period of all the library-supplied clocks.

7.2.2 Durations

std::duration<representation, std::ratio<>>

  • example millisecond duration: std::chrono::duration<double, std::ratio<1,1000>>
  • std::chrono::duration_cast<>: floor(high_resolution)

The time for a duration-based wait is measured using a steady clock internal to the library, even if the system clock was adjusted during the wait

7.2.3 Time points

std::chrono::time_point<clock, duration>

  • time_since_epoch() returns a duration value specifying the length of time since the clock epoch to that particular time point

7.2.4 wait_for & wait_until

  • Clock adjustments are taken into account by the wait_until