BlockingQueue核心方法

操作抛异常返回值阻塞超时
插入add(e)offer(e)put(e)offer(e, time, unit)
删除remove()poll()take()poll(time, unit)
检查element()peek()--

核心区别

  • put/take:阻塞,生产者-消费者模式常用
  • offer/poll:不阻塞,适合轮询场景

常用实现类

1. ArrayBlockingQueue

// 有界队列,数组实现,FIFO
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

// 特点
 有界防止OOM
 性能稳定
 容量固定无法扩展

2. LinkedBlockingQueue

// 可选有界/无界,链表实现
BlockingQueue<String> bounded = new LinkedBlockingQueue<>(10);    // 有界
BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();     // 无界

// 特点
 容量可选
 吞吐量高
 无界模式可能OOM

3. PriorityBlockingQueue

// 优先级队列,无界
BlockingQueue<Task> queue = new PriorityBlockingQueue<>();

class Task implements Comparable<Task> {
    int priority;

    @Override
    public int compareTo(Task other) {
        return Integer.compare(other.priority, this.priority);  // 高优先级优先
    }
}

// 特点
 按优先级出队
 无界可能OOM

4. SynchronousQueue

// 不存储元素,直接交付
BlockingQueue<String> queue = new SynchronousQueue<>();

// 特点
 零容量直接交付
 适合传递场景
 生产者必须等待消费者

5. DelayQueue

// 延迟队列
BlockingQueue<DelayedTask> queue = new DelayQueue<>();

class DelayedTask implements Delayed {
    long executeTime;

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(executeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        return Long.compare(this.getDelay(TimeUnit.NANOSECONDS),
                           other.getDelay(TimeUnit.NANOSECONDS));
    }
}

// 特点
 延迟执行
 适合定时任务

生产者-消费者模式

public class ProducerConsumer {
    private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

    // 生产者
    class Producer implements Runnable {
        @Override
        public void run() {
            try {
                for (int i = 0; i < 100; i++) {
                    String product = "Product-" + i;
                    queue.put(product);  // 阻塞
                    System.out.println("Produced: " + product);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    // 消费者
    class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    String product = queue.take();  // 阻塞
                    System.out.println("Consumed: " + product);
                    // 处理产品
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void start() {
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
    }
}

性能对比

队列吞吐量是否有界内存占用适用场景
ArrayBlockingQueue通用
LinkedBlockingQueue可选高吞吐
PriorityBlockingQueue优先级
SynchronousQueue最高最低直接交付
DelayQueue延迟任务

实战案例:任务调度

public class TaskScheduler {
    private final BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(100);
    private final ExecutorService executor = Executors.newFixedThreadPool(10);

    public void submitTask(Task task) {
        try {
            taskQueue.put(task);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void start() {
        for (int i = 0; i < 10; i++) {
            executor.submit(() -> {
                while (!Thread.interrupted()) {
                    try {
                        Task task = taskQueue.take();
                        task.execute();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            });
        }
    }
}

总结

选择指南

  • 通用场景:ArrayBlockingQueue
  • 高吞吐量:LinkedBlockingQueue
  • 优先级:PriorityBlockingQueue
  • 直接交付:SynchronousQueue
  • 延迟执行:DelayQueue

系列文章