Queues & Stacks
Pool of items:
- Queue:
enq()
anddeq()
, FIFO - Stack:
push()
andpop()
, LIFO
Bounded vs. Unbounded
- Bounded: fixed capacity
- Unbounded: holds any number of objects
Blocking vs. Non-blocking
- Cases: remove from empty, add to full
- Blocking: caller waits until state changes
- Non-blocking: method throws exception
Queues
Bounded Queue
public class BoundedQueue<T> {
ReentrantLock enqLock, deqLock;
Condition notEmptyCondition, notFullCondition;
AtomicInteger permits;
Node head, tail;
int capacity;
public BoundedQueue() {
enqLock = new ...();
deqLock = new ...();
notFullCondition = enqLock.newCondition();
notEmptyCondition = deqLock.newCondition();
}
public void enq(T x) {
boolean mustWakeDequeuers = false;
enqLock.lock();
try {
while (permits.get() == 0) {
// if queue full, give up lock and sleep
// when await returns, lock is acquired back
notFullCondition.await();
}
Node e = new Node(x);
tail.next = e;
tail = e;
if (permits.getAndDecrement() == capacity) {
// if queue is empty, wake dequeuers up
mustWakeDequeuers = true;
}
} finally {
enqLock.unlock();
}
if (mustWakeDequeuers) {
deqLock.lock();
try {
notEmptyCondition.signalAll();
} finally {
deqLock.unlock();
}
}
}
public T deq() {
T result;
boolean mustWakeEnqueuers = false;
deqLock.lock();
try {
while (permits.get() == capacity) {
notEmptyCondition.await();
}
result = head.next.value;
head = head.next;
if (permits.getAndIncrement() == 0) {
mustWakeEnqueuers = true;
}
} finally {
deqLock.unlock();
}
if (mustWakeEnqueuers) {
enqLock.lock();
try {
notFullCondition.signalAll();
} finally {
enqLock.unlock();
}
}
return result;
}
}
Split Counter
enq()
decrementspermit
only, cares only if value is zerodeq()
increamentspermit
only, cares only if value iscapacity
Split counter:
- enqueuers decrements
enqSidePermits
- dequeuers increments
deqSidePermits
- When enqueuer runs out, they lock
deqLock
and transfer permits
Unbounded Queue
public void enq(T x) {
enqLock.lock();
try {
Node e = new Node(x);
tail.next = e;
tail = e;
} finally {
enqLock.unlock();
}
}
public T deq() throws EmptyException {
T result;
deqLock.lock();
try {
if (head.next == null) {
throw new EmptyException();
}
result = head.next.value;
head = head.next;
} finally {
deqLock.unlock();
}
return result;
}
Lock-Free Queue
Two CAS steps: not atomic
- update
tail.next
- update
tail
What to do if you find a trailing tail?
- Stop and fix it
- If tail has non-null next field, CAS
tail
totail.next
public void enq(T x) {
Node e = new Node(x);
while (true) {
Node t = tail.get();
Node n = t.next.get();
if (t == tail.get()) {
if (n == null) {
if (t.next.compareAndSet(n, e)) {
tail.compareAndSet(t, e);
return;
}
} else {
tail.compareAndSet(t, n);
}
}
}
}
public T deq() throws EmptyException {
while (true) {
Node h = head.get();
Node n = h.next.get();
Node t = tail.get();
if (h == head.get()) {
if (h == t) {
if (n == null) {
throws new EmptyException();
}
tail.compareAndSet(t, n);
} else {
T result = n.value;
if (head.compareAndSet(h, n)) {
return result;
}
}
}
}
}
Memory Reuse
Simple solution:
- Each thread has a free list of unused queue nodes
- Allocate node: pop from list
- Free node: push onto list
The dreaded ABA problem:
- Head pointer has value A, thread reads value A
- Head pointer has value B, node A freed
- Head pointer has value A again, node A recycled and reinitialized
- CAS succeeds because pointer matches even though pointer's meaning has changed
A solution for dreaded ABA:
- Tag each pointer with a counter
- Unique over lifetime of node
AtomicStampedReference
in Java
Stacks
Lock-Free Stack
public class LockFreeStack {
private AtomicReference top = new AtomicReference(null);
public boolean tryPush(Node node) {
Node oldTop = top.get();
node.next = oldTop;
return top.compareAndSet(oldTop, node);
}
public void push(T value) {
Node node = new Node(value);
while (true) {
if (tryPush(node)) {
return;
} else {
backOff(); // wait for another try
}
}
}
}
Benefits: no locking
Drawbacks:
- without GC, dreaded ABA occurs
- without backoff, huge contention at top
- in any case above, no parallelism
Elimination-Backoff Stack
Idea: elimination array
- if push collides with pop, there is no need to access stack
- if there is no collision between push and pop, access stack
That is, if CAS at top fails, back off to elimination array. Pick a random range and max time to wait for collision based on level of contention encountered.(当链表头部产生竞争,CAS操作失败时,直接去数组里等别人来找自己就可以了)
Linearizability:
- Un-eliminated calls: as before
- Elminated calls: linearize
pop()
immediately afterpush()