在多线程编程环境中,有时为了保证数据处理的效率会使用一些无锁(Lockfree)的数据结构,例如无锁队列(Lockfree Queue)、无锁栈(Lockfree Stack)、环形缓冲(Ring Buffer)、无锁哈希(Lockfree Hash)之类。本文分析Boost库(V1.57.0)中的无锁队列,并将其从复杂的Boost数据结构中剥离出来单独实现。
std::memory_order一些用在本文的知识点
参考文献:http://wilburding.github.io/blog/2013/04/07/c-plus-plus-11-atomic-and-memory-model/
http://bartoszmilewski.com/2008/12/01/c-atomics-and-memory-ordering/
编译器从计算机的远古时代发展至今,已经单纯脱离原始的翻译代码,而是会进行非常多的优化,将“糟糕”的代码改写成高效的代码,在多线程环境下会导致指令重排序happens-befors问题std::memory_order作用就是告诉编译器哪些数据是线程共享的,编译器只在必要的情况下对其保守优化。
enum memory_order {
memory_order_relaxed,
memory_order_consume,
memory_order_acquire,
memory_order_release,
memory_order_acq_rel,
memory_order_seq_cst};
尽量用一句话在简明概括一下上面每条枚举的用处:
memory_order_relaxed – 不保证顺序性,只保证原子性和修改的一致性。
memory_order_consume – memory_order_acquire弱化版,C++专家特性,一般用不到。
memory_order_acquire – 不会把其他读操作移动它前面,保证读取到(其他线程store(memory_order_release))最新修改。
memory_order_release – 不会把其他写操作移动它后面,保证其他线程(load(memory_order_acquire))可以取到最新修改。
memory_order_acq_rel – 用在对内存值本身操作上,内置acquire/release合体。
memory_order_scq_cst – memory_order_acq_rel强化版,支持处理顺序一致性,最容易用的默认参数。
刚开始提到的memory_order那几个枚举就是告知编译器数据是在线程间共享的,并且是怎样共享的。memory_order_acquire和memory_order_release的关系大概就像mutex的acquire(lock)和release(unlock)。mutex使线程间串行执行,并且上一个线程unlock后另一个线程lock进入,保证可以看到上一个线程的所有数据修改而不会有读写操作被reorder后出问题的情况。
无锁队列Boost库Push的源码和注释
template <bool Bounded>; bool do_push(T const & t) { using detail::likely; node * n = pool.template construct<true, Bounded>(t, pool.null_handle()); handle_type node_handle = pool.get_handle(n); if (n == NULL) return false; for (;;) { tagged_node_handle tail = tail_.load(memory_order_acquire); node * tail_node = pool.get_pointer(tail); tagged_node_handle next = tail_node->next.load(memory_order_acquire); node * next_ptr = pool.get_pointer(next); tagged_node_handle tail2 = tail_.load(memory_order_acquire); //进行比较是为了防止其他线程进行Push导致了tail变化 //内存模型memory_order_acquire保证tail,tail2取值的顺序性 if (likely(tail == tail2)) { //Tail.next必须指向0才进行push下一步关键操作 //若不为0,可能另外一个线程提前push,我们进入else将Tail指向Tail.next if (next_ptr == 0) { tagged_node_handle new_tail_next(node_handle, next.get_next_tag()); //这里next一般指向0,尝试将Tail.next赋值成新node.next的数值 //若是第一个push node那么Head.next也会改变 //赋值的目的是为了将新node与原有node形成链状关系 if ( tail_node->next.compare_exchange_weak(next, new_tail_next) ) { //成功赋值,将Tail前移(指向新node) tagged_node_handle new_tail(node_handle, tail.get_next_tag()); tail_.compare_exchange_strong(tail, new_tail); return true; } } else { //移动Tail tagged_node_handle new_tail(pool.get_handle(next_ptr), tail.get_next_tag()); tail_.compare_exchange_strong(tail, new_tail); } } } }
Push图示
无锁队列Boost库Pop的源码和注释
template <typename U> bool pop (U & ret) { using detail::likely; for (;;) { tagged_node_handle head = head_.load(memory_order_acquire); node * head_ptr = pool.get_pointer(head); tagged_node_handle tail = tail_.load(memory_order_acquire); tagged_node_handle next = head_ptr->next.load(memory_order_acquire); node * next_ptr = pool.get_pointer(next); tagged_node_handle head2 = head_.load(memory_order_acquire); //进行比较是为了防止其他线程进行Pop导致了head变化 //内存模型memory_order_acquire保证head,head2取值的顺序性 if (likely(head == head2)) { //头尾一致 if (pool.get_handle(head) == pool.get_handle(tail)) { //下一个node为0,证明没有可以pop的node if (next_ptr == 0) return false; //还有可以pop的node我们先移动tail //这里可能出现的情况是有node在其他线程快速push进来 tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag()); tail_.compare_exchange_strong(tail, new_tail); } else { if (next_ptr == 0) /* this check is not part of the original algorithm as published * by michael and scott * * however we reuse the tagged_ptr part for the freelist and clear * the next part during node allocation. we can observe a null-pointer here. * */ continue; //取出赋值,这里需要注意的是Head永远是用作哨兵,实际有意义数据的节点是Head.next detail::copy_payload(next_ptr->data, ret); tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag()); //为了防止这个节点已经被其他线程pop if (head_.compare_exchange_weak(head, new_head)) { pool.template destruct<true>(head); return true; } } } } }
Push图示
点击查看 – 改写的源码
(全文结束)
转载文章请注明出处:漫漫路 - lanindex.com
1 Trackback