Queues & Stacks

Pool of items:

  • Queue: enq() and deq(), FIFO
  • Stack: push() and pop(), LIFO

Bounded vs. Unbounded

  • Bounded: fixed capacity
  • Unbounded: holds any number of objects

Blocking vs. Non-blocking

  • Cases: remove from empty, add to full
  • Blocking: caller waits until state changes
  • Non-blocking: method throws exception

Queues

Bounded Queue

public class BoundedQueue<T> {
    ReentrantLock enqLock, deqLock;
    Condition notEmptyCondition, notFullCondition;
    AtomicInteger permits;
    Node head, tail;
    int capacity;

    public BoundedQueue() {
        enqLock = new ...();
        deqLock = new ...();
        notFullCondition = enqLock.newCondition();
        notEmptyCondition = deqLock.newCondition();
    }

    public void enq(T x) {
        boolean mustWakeDequeuers = false;
        enqLock.lock();
        try {
            while (permits.get() == 0) {
                // if queue full, give up lock and sleep
                // when await returns, lock is acquired back
                notFullCondition.await();
            }
            Node e = new Node(x);
            tail.next = e;
            tail = e;
            if (permits.getAndDecrement() == capacity) {
                // if queue is empty, wake dequeuers up
                mustWakeDequeuers = true;
            }
        } finally {
            enqLock.unlock();
        }
        if (mustWakeDequeuers) {
            deqLock.lock();
            try {
                notEmptyCondition.signalAll();
            } finally {
                deqLock.unlock();
            }
        }
    }

    public T deq() {
        T result;
        boolean mustWakeEnqueuers = false;
        deqLock.lock();
        try {
            while (permits.get() == capacity) {
                notEmptyCondition.await();
            }
            result = head.next.value;
            head = head.next;
            if (permits.getAndIncrement() == 0) {
                mustWakeEnqueuers = true;
            }
        } finally {
            deqLock.unlock();
        }
        if (mustWakeEnqueuers) {
            enqLock.lock();
            try {
                notFullCondition.signalAll();
            } finally {
                enqLock.unlock();
            }
        }
        return result;
    }
}

Split Counter

  • enq() decrements permit only, cares only if value is zero
  • deq() increaments permit only, cares only if value is capacity

Split counter:

  • enqueuers decrements enqSidePermits
  • dequeuers increments deqSidePermits
  • When enqueuer runs out, they lock deqLock and transfer permits

Unbounded Queue

public void enq(T x) {
    enqLock.lock();
    try {
        Node e = new Node(x);
        tail.next = e;
        tail = e;
    } finally {
        enqLock.unlock();
    }
}

public T deq() throws EmptyException {
    T result;
    deqLock.lock();
    try {
        if (head.next == null) {
            throw new EmptyException();
        }
        result = head.next.value;
        head = head.next;
    } finally {
        deqLock.unlock();
    }
    return result;
}

Lock-Free Queue

Two CAS steps: not atomic

  • update tail.next
  • update tail

What to do if you find a trailing tail?

  • Stop and fix it
  • If tail has non-null next field, CAS tail to tail.next
public void enq(T x) {
    Node e = new Node(x);
    while (true) {
        Node t = tail.get();
        Node n = t.next.get();
        if (t == tail.get()) {
            if (n == null) {
                if (t.next.compareAndSet(n, e)) {
                    tail.compareAndSet(t, e);
                    return;
                }
            } else {
                tail.compareAndSet(t, n);
            }
        }
    }
}

public T deq() throws EmptyException {
    while (true) {
        Node h = head.get();
        Node n = h.next.get();
        Node t = tail.get();
        if (h == head.get()) {
            if (h == t) {
                if (n == null) {
                    throws new EmptyException();
                }
                tail.compareAndSet(t, n);
            } else {
                T result = n.value;
                if (head.compareAndSet(h, n)) {
                    return result;
                }
            }
        }
    }
}

Memory Reuse

Simple solution:

  • Each thread has a free list of unused queue nodes
  • Allocate node: pop from list
  • Free node: push onto list

The dreaded ABA problem:

  • Head pointer has value A, thread reads value A
  • Head pointer has value B, node A freed
  • Head pointer has value A again, node A recycled and reinitialized
  • CAS succeeds because pointer matches even though pointer's meaning has changed

A solution for dreaded ABA:

  • Tag each pointer with a counter
  • Unique over lifetime of node
  • AtomicStampedReference in Java

Stacks

Lock-Free Stack

public class LockFreeStack {
    private AtomicReference top = new AtomicReference(null);
    
    public boolean tryPush(Node node) {
        Node oldTop = top.get();
        node.next = oldTop;
        return top.compareAndSet(oldTop, node);
    }

    public void push(T value) {
        Node node = new Node(value);
        while (true) {
            if (tryPush(node)) {
                return;
            } else {
                backOff(); // wait for another try
            }
        }
    }
}

Benefits: no locking

Drawbacks:

  • without GC, dreaded ABA occurs
  • without backoff, huge contention at top
  • in any case above, no parallelism

Elimination-Backoff Stack

Idea: elimination array

  • if push collides with pop, there is no need to access stack
  • if there is no collision between push and pop, access stack

That is, if CAS at top fails, back off to elimination array. Pick a random range and max time to wait for collision based on level of contention encountered.(当链表头部产生竞争,CAS操作失败时,直接去数组里等别人来找自己就可以了)

Linearizability:

  • Un-eliminated calls: as before
  • Elminated calls: linearize pop() immediately after push()