connor4312/cockatiel

BulkheadPolicy queue stalls when dequeued item has an already-aborted signal

Summary

  • Context: The BulkheadPolicy implements a concurrency limiter with a queue for excess requests that are processed when execution slots become available.

  • Bug: When a queued item with an already-aborted signal is dequeued and starts execution, it throws TaskCancelledError before entering the main execution path, which prevents the dequeue() method from being called again.

  • Actual vs. expected: The queue processing stalls after a cancelled item is dequeued, leaving subsequent items in the queue unprocessed; the expected behavior is that queue processing continues and subsequent items are dequeued normally.

  • Impact: Applications using BulkheadPolicy with cancellation can experience permanent queue stalls, causing subsequent queued operations to never execute and potentially leading to resource leaks and hung promises.

Code with bug

// From src/BulkheadPolicy.ts, execute() method:
public async execute<T>(
  fn: (context: IDefaultPolicyContext) => PromiseLike<T> | T,
  signal = neverAbortedSignal,
): Promise<T> {
  if (signal.aborted) {
    throw new TaskCancelledError(); // <-- BUG 🔴 Throws before entering try/finally block
  }

  if (this.active < this.capacity) {
    this.active++;

    try {
      return await fn({ signal });
    } finally {
      this.active--;
      this.dequeue(); // Only called if we entered the try block
    }
  }

  if (this.queue.length < this.queueCapacity) {
    const { resolve, reject, promise } = defer<T>();
    this.queue.push({ signal, fn, resolve, reject });
    return promise;
  }

  this.onRejectEmitter.emit();
  throw new BulkheadRejectedError(this.capacity, this.queueCapacity);
}

private dequeue() {
  const item = this.queue.shift();
  if (!item) {
    return;
  }

  Promise.resolve()
    .then(() => this.execute(item.fn, item.signal)) // Calls execute with potentially aborted signal
    .then(item.resolve)
    .catch(item.reject); // Catches the TaskCancelledError and rejects the promise
    // <-- BUG 🔴 dequeue() is never called again to process next item
}

Logical proof

  1. Precondition: A queued item’s signal is aborted by the time it is dequeued.

  2. dequeue() shifts the item and calls this.execute(item.fn, item.signal). Because signal.aborted === trueexecute() throws TaskCancelledError immediately (before entering its try/finally).

  3. The finally that decrements active and calls dequeue() is never reached, so no follow-up dequeue occurs for remaining items.

  4. The promise chain’s .catch(item.reject) rejects the cancelled item’s promise but does not continue processing the queue. With active potentially 0 and queue non-empty, processing stalls indefinitely.

Recommended fix

The fix is to ensure dequeue() is called to process the next queued item even when the current item’s execution throws before entering the main execution path. There are two possible approaches:

Option 1: Check signal status in dequeue() before calling execute()

Move the abort check earlier, before calling execute():

private dequeue() {
  const item = this.queue.shift();
  if (!item) {
    return;
  }

  // Check if already cancelled before executing // <-- FIX 🟢
  if (item.signal.aborted) {
    item.reject(new TaskCancelledError());
    this.dequeue(); // Continue processing queue
    return;
  }

  Promise.resolve()
    .then(() => this.execute(item.fn, item.signal))
    .then(item.resolve)
    .catch(item.reject);
}

Option 1 is recommended because it:

  • Is simpler and more direct

  • Handles cancellation earlier (fail-fast)

  • Doesn’t create additional promise chain overhead

  • Makes the control flow more explicit

  • Matches the existing pattern of checking cancellation early