[C++并发] C++内存模型

[C++并发] C++内存模型

scene 1

int data = 0;

void entry(MTIndex<0>) { // 0 号线程
  data = 42;
}

void entry(MTIndex<1>) { // 1 号线程
  MTTest::result = data;
}

当前,0号线程写入,1号线程读取,显然会发生数据竞争问题,得到的 result 结果可能为0或42。

scene 2

那么添加一个 while 循环等待呢?直到数据被写入了这时再读取,像这样

int data = 0;

void entry(MTIndex<0>) { // 0 号线程
  data = 42;
}

void entry(MTIndex<1>) { // 1 号线程
  while (data == 0)
    ; 
  MTTest::result = data;
}

这样是不行的,这是因为在不同线程进行副作用的访问属于未定义行为,编译器可以基于此做优化。编译器可能尝试先将 data 的值读出来,然后while循环只判断寄存器的值就可以了,也就是像这样

int data = 0;

void entry(MTIndex<0>) { // 0 号线程
  data = 42;
}

void entry(MTIndex<1>) { // 1 号线程
  auto eax = data;
  while (eax == 0)
    ; 
  MTTest::result = data;
}

这样1号线程在循环时就感知不到0号线程数据的变化了,就会死循环。

scene 3

换个方式来描述当前程序情况:0号线程的修改对1号线程不可见。

现在我们使用atomic修饰来解决。

std::atomic_int data = 0; // 使用 atomic 修饰

void entry(MTIndex<0>) { // 0 号线程
  data = 42; // 相当于 data.store(42, std::memory_order_seq_cst)
}

void entry(MTIndex<1>) { // 1 号线程
  while (data == 0) // 相当于 while (data.load(std::memory_order_seq_cst) == 0)
    ; 
  MTTest::result = data;
}

这样可行的本质是,std::atomic 的操作默认使用 std::memory_order_seq_cst(顺序一致性)。效果是,一旦0号线程写入42,1号线程的while循环一定能看到这个值的更新。要满足这个要求,编译器生成的汇编代码就不会再是下面这样的了。

int data = 0;

void entry(MTIndex<0>) { // 0 号线程
  data = 42;
}

void entry(MTIndex<1>) { // 1 号线程
  auto eax = data;
  while (eax == 0)
    ; 
  MTTest::result = data;
}

也就是说,默认使用的 std::memory_order_seq_cst 的这个内存序,保证了 data=42 这个写入对所有线程可见,且 data==0 读取时能看到所有线程写入的数据。

scene 4

atomic 发挥两个作用,原子性+内存序。可以利用atomic的flag变量,影响其它变量

int data = 0;
char space[64];
std::atomic_int flag = 0;

void entry(MTIndex<0>) { // 0 号线程
  data = 42;
  flag.store(1, std::memory_order_seq_cst);
}

void entry(MTIndex<1>) { // 1 号线程
  while (flag.load(std::memory_order_seq_cst) == 0)
    ; 
  MTTest::result = data;
}

这里的flag产生了内存屏障从而保证了data的线程安全

int data = 0;
char space[64];
std::atomic_int flag = 0;

void entry(MTIndex<0>) { // 0 号线程
  data = 42;
  // barrier
  flag.store(1, std::memory_order_seq_cst);
  // barrier
}

void entry(MTIndex<1>) { // 1 号线程
  // barrier
  while (flag.load(std::memory_order_seq_cst) == 0)
    ; 
  // barrier
  MTTest::result = data;
}

这里可以使用 release 和 acquire 优化

int data = 0;
char space[64];
std::atomic_int flag = 0;

void entry(MTIndex<0>) { // 0 号线程
  data = 42;
  // barrier
  flag.store(1, std::memory_order_release);
}

void entry(MTIndex<1>) { // 1 号线程
  while (flag.load(std::memory_order_acquire) == 0)
    ; 
  // barrier
  MTTest::result = data;
}

release 和 acquire 配合使用,release 之前的写入对于 acquire 后的读取可见。

scene 5

看下面一段最朴素的程序,它是线程安全的吗?

std::atomic<int> data = 0;

void entry(MTIndex<0>) {
  data = data + 1; // data.store(data.load() + 1)
}

void entry(MTIndex<1>) {
  data = data + 1;
}

void teardown() {
  MTTest::result = data;
}

不是,因为对于data = data + 1 ,它是三条指令,并不是原子的。

那么这样是线程安全的吗?

std::atomic<int> data = 0;

void entry(MTIndex<0>) {
  data.fetch_add(1, std::memory_order_relaxed);
}

void entry(MTIndex<1>) {
  data.fetch_add(1, std::memory_order_relaxed);
}

void teardown() {
  MTTest::result = data;
}

是的,因为0号线程和1号线程之间没有依赖关系,fetch_add 可以保证自己的原子性。

scene 6

我们知道对于atomic封装的变量,标准库提供了 fetch_add 等原子性的方法,那么如果我现在要对变量进行平方呢?例如下面这段程序

int data = 2;

void entry(MTIndex<0>) {
  data = data * data;
}

void entry(MTIndex<1>) {
  data = data * data;
}

void teardown() {
  MTTest::result = data;
}

使用 compare_exchange_strong来解决这类问题

std::atomic<int> data = 2;

void entry(MTIndex<0>) {
again:
  auto old_data = data.load();
  auto new_data = old_data * old_data;
  if (!data.compare_exchange_strong(old_data, new_data)) {
    goto again;
  }
}

void entry(MTIndex<1>) {
again:
  auto old_data = data.load();
  auto new_data = old_data * old_data;
  if (!data.compare_exchange_strong(old_data, new_data)) {
    goto again;
  }
}

void teardown() {
  MTTest::result = data;
}

先了解 compare_exchange_strong 的用法,这个函数被称为 CAS 操作,签名如下

bool compare_exchange_strong(T& expected, T desired);
// expected:你“预期”变量现在应该有的值
// desired:如果你预期对了,想要写入的新值
// 返回值:
//   如果 data == expected,则执行 data = desired,返回 true(交换成功)。
//   如果 data != expected,则不进行交换,而是将 expected 更新为 data 的当前实际值,返回 false(交换失败)。

程序逻辑用简单来说就是,查一下旧值data,如果在此期间没有其他线程修改data,交换成功;如果其他线程抢先一步修改了data,交换失败,old_data 被自动更新为新值,goto again 重新计算。

C++ 还有一个 compare_exchange_weak ,和strong的相比是返回值的逻辑有些不一样。strong中如果data ≠ expected,会返回 false,而 weak 中即使 data == expected,也可能会返回失败。对于有 while 或 goto 循环的场景中,可以使用 weak,因为返回 false 再循环判断一次也无伤大雅,也能保证程序运行结果正确。

scene 7 - 实现自旋锁

实现的自旋锁将使用下述代码进行验证

struct SpinMutex {
  void lock() {
    // todo
  }

  void unlock() {
    // todo
  }
};

int data = 0;
SpinMutex mutex;

void entry(MTIndex<0>) {
  mutex.lock();
  data += 1;
  mutex.unlock();
}

void entry(MTIndex<1>) {
  mutex.lock();
  data += 1;
  mutex.unlock();
}

void teardown() {
  MTTest::result = data;
}
struct SpinMutex {
  void lock() {
    while (flag != 0)
      ;
    flag = 1;
  }

  void unlock() {
    flag = 0;
  }

  std::atomic<int> flag{0};
}

这里的操作会失败,原因为 flag 执行了多次操作,这在原子变量的操作中是不允许的,解决方法是可以利用前边的 compare_exchange_strong,将多个操作合为一个。

struct SpinMutex {
  bool try_lock() {
    int old = 0;
    if (compare_exchange_strong(old, 1)) 
      return true;
    return false;
  }

  void lock() {
    while (!try_lock())
      ;
  }
  // 或者这种实现也是可以的
  // void lock() {
  //   int old = 0;
  //   while (!flag.compare_exchange_weak(old, 1))
  //     old = 0;
  // }

  void unlock() {
    flag = 0;
  }

  std::atomic<int> flag{0};
}

标准库的 std::mutex 的实现是先尝试自旋一小会,若仍然未等到则调用系统调用挂起线程,可以使用 C++20 的 atomic wait 来完成

struct SpinMutex {
  void lock() {
    bool old;
#if __cpp_lib_atomic_wait
    int retries = 1000;
    do {
      old = false;
      if (flag.compare_exchange_weak(old, true, std::memory_order_acquire, std::memory_order_relaxed))
        return;
    } while (--retries);
#endif
    do {
#if __cpp_lib_atomic_wait
      flag.wait(true, std::memory_order_relaxed); // wait until flag != true
#endif
      old = false;
    } while (!flag.compare_exchange_weak(old, true, std::memory_order_acquire, std::memory_order_relaxed));
    // load barrier
  }

  void unlock() {
    // store barrier
    flag.store(false, std::memory_order_release);
    flag.notify_one(); // flag changed, notify one of the waiters
  }

  std::atomic<bool> flag{false};
}

scene 8 - 实现线程安全的无锁链表

首先可以得知标准库的链表不是线程安全的,我们这么写有并发问题

std::list<int> data;

void entry(MTIndex<0>) {
  data.push_back(1);
}

void entry(MTIndex<1>) {
  data.push_back(2);
}

void teardown() {
  MTTest::result = data.size(); // 1 or 2
}

如果使用锁的话,可以使用锁来构成 happens-before 关系

std::list<int> data;
std::mutex mutex;

void entry(MTIndex<0>) {
  mutex.lock();
  data.push_back(1);
  mutex.unlock();
}

void entry(MTIndex<1>) {
  mutex.lock();
  data.push_back(2);
  mutex.unlock();
}

void teardown() {
  MTTest::result = data.size(); // must be 2
}

现在使用无锁编程来完成线程安全的无锁链表

struct MyList {
  struct Node {
    Node *next;
    int value;
  };

  std::atomic<Node*> head{nullptr};

  void push_front(int value) {
    Node *new_node = new Node;
    new_node->value = value;

    Node *old_head = head.load(std::memory_order_relaxed);
    do {
      new_node->next = old_head;
    }
    while (!head.compare_exchange_weak(old_head, new_node, std::memory_order_release, std::memory_order_relaxed));
#if __cpp_lib_atomic_wait
    head.notify_one();
#endif
  }

  int pop_front() {
    Node *old_head = head.load(std::memory_order_relaxed);
    do {
      if (old_head == nullptr) 
        return -1;
    } while (!head.compare_exchange_weak(old_head, old_head->next, std::memory_order_acquire, std::memory_order_relaxed));
    int value = old_head->value;
    delete old_head;
    return value;
  }

  int pop_front_wait() {
    Node *old_head = head.load(std::memory_order_relaxed);
    do {
      while (old_head == nullptr) 
#if __cpp_lib_atomic_wait
        int retries = 200;
        if (retries <= 0) {
          head.wait(nullptr, std::memory_order_relaxed);
          --retries;
        }
#endif
        old_head = head.load(std::memory_order_relaxed);
    } while (!head.compare_exchange_weak(old_head, old_head->next, std::memory_order_acquire, std::memory_order_relaxed));
    int value = old_head->value;
    delete old_head;
    return value;
  }

  int size_unsafe() {
    int count = 0;
    Node* node = head.load();
    while (node != nullptr) {
      count++;
      node = node->next;
    }
    return count;
  }
}

为什么第三个参数使用 release 或 acquire,而第四个参数使用 relaxed 即可?

第三个参数是指CAS成功时的内存序,release是为了保证在修改原子变量之前,所有的内存写入,如 new_node->value = value; 都已完成,以便其它线程能看到。

第四个参数是指CAS失败时的内存序,CAS失败只要把 head 自身的值加载回 old_head 即可,不依赖其他非原子变量的可见性,所以 relaxed 已经足够。