개요
Michael-Scott's queue는 Lock free를 고려한 큐의 구현이다.
기본 원칙
Treiber's stack과 마찬가지로, queue에 push, pop할때 변경이 있는지 없는지 확인하고 없을 경우에만 푸쉬,팝을 하는 구조이다.
Pop은 헤드 포인터에, Push는 테일 포인터에 적용된다. 이때 대원칙은 Treiber's stack과 동일한다.
하나 예외상황은, Tail포인터가 Stale할 수 있다는 점이다. 즉, Pushing이 일어난 직후, Tail pointer가 가르키는 pointer는 테일이 아닐 수도 있기 때문이다. 따라서, Michael-Scott's 큐에서는 테일이 진짜 테일이 아닐 수도 있다. 테일의 정의를 "Head에서 접근 가능한 노드중 하나"로 정의함으로써, 테일에 대한 조건을 Relax하여 정의하게 된다. 나중에 Push하는 상황에서 현재 Tail이 진짜 테일인지 아닌지 확인하여, Tail을 우선 업데이트하고 push하는 식으로 알고리즘이 구현된다.
알고리즘
- POP
- 만약 Head가 Dummy pointer일 경우 tail pointer를 업데이트 한다. Dummy head를 두는 이유는, empty인 상황에서도 Tail이 특정 노드를 가르키도록 하기 위해서이다.
- Head pointer를 업데이트 한다. (treiber's stack과 같은 방식임)
- PUSH
- 우선 테일을 업데이트 한다.
- 새로운 테일 포인터를 업데이트 한다.
- 테일 포인터를 업데이트 한다. 이때 실패해도 괜찮다. 여기서 업데이트를 실패해도 해주는 이유는 Cache적으로 기존에 접근한 테일 포인터를 업데이트 해주는 것이 캐시에 있기 떄문에 좋기 때문이다.
예시
/// Adds `t` to the back of the queue.
pub fn push(&self, t: T, guard: &mut Guard) {
let mut new = Owned::new(Node {
data: MaybeUninit::new(t),
next: Atomic::null(),
});
loop {
guard.repin();
// We push onto the tail, so we'll start optimistically by looking there first.
let tail = self.tail.load(Acquire, guard);
// Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed.
let tail_ref = unsafe { tail.deref() };
let next = tail_ref.next.load(Acquire, guard);
// If `tail` is not the actual tail, try to "help" by moving the tail pointer forward.
if !next.is_null() {
let _ = self
.tail
.compare_exchange(tail, next, Release, Relaxed, guard);
continue;
}
// looks like the actual tail; attempt to link at `tail.next`.
match tail_ref
.next
.compare_exchange(Shared::null(), new, Release, Relaxed, guard)
{
Ok(new) => {
// try to move the tail pointer forward.
let _ = self
.tail
.compare_exchange(tail, new, Release, Relaxed, guard);
break;
}
Err(e) => new = e.new,
}
}
}
/// Attempts to dequeue from the front.
///
/// Returns `None` if the queue is observed to be empty.
pub fn try_pop(&self, guard: &mut Guard) -> Option<T> {
loop {
guard.repin();
let head = self.head.load(Acquire, guard);
let next = unsafe { head.deref() }.next.load(Acquire, guard);
let next_ref = unsafe { next.as_ref() }?;
// Moves `tail` if it's stale. Relaxed load is enough because if tail == head, then the
// messages for that node are already acquired.
let tail = self.tail.load(Relaxed, guard);
if tail == head {
let _ = self
.tail
.compare_exchange(tail, next, Release, Relaxed, guard);
}
if self
.head
.compare_exchange(head, next, Release, Relaxed, guard)
.is_ok()
{
// Since the above `compare_exchange()` succeeded, `head` is detached from `self` so
// is unreachable from other threads.
// SAFETY: `next` will never be the sentinel node, since it is the node after
// `head`. Hence, it must have been a node made in `push()`, which is initialized.
//
// Also, we are returning ownership of `data` in `next` by making a copy of it via
// `assume_init_read()`. This is safe as no other thread has access to `data` after
// `head` is unreachable, so the ownership of `data` in `next` will never be used
// again as it is now a sentinel node.
let result = unsafe { next_ref.data.assume_init_read() };
// SAFETY: `head` is unreachable, and we no longer access `head`. We destroy `head`
// after the final access to `next` above to ensure that `next` is also destroyed
// after.
unsafe { guard.defer_destroy(head) };
return Some(result);
}
}
}