引言:volatile不能保证原子性

回顾之前讲过的例子:

public class Counter {
    private volatile int count = 0;

    public void increment() {
        count++;  // 不是原子操作!
    }
}

问题

count++;
// 实际上是3个操作:
// 1. temp = count     (读)
// 2. temp = temp + 1  (修改)
// 3. count = temp     (写)

// 多线程执行时可能丢失更新

解决方案1:synchronized

public synchronized void increment() {
    count++;  // 原子操作
}
// 缺点:性能开销大

解决方案2:原子类

private AtomicInteger count = new AtomicInteger(0);

public void increment() {
    count.incrementAndGet();  // 原子操作,性能好
}

性能对比

synchronized: 1200ms
AtomicInteger: 280ms
性能提升: 4.3倍

本篇文章将深入Java原子类的实现原理和使用方法。


一、原子类的分类

Java提供了丰富的原子类,位于java.util.concurrent.atomic包:

1.1 基本类型原子类

类名说明
AtomicInteger原子更新整型
AtomicLong原子更新长整型
AtomicBoolean原子更新布尔型

1.2 数组类型原子类

类名说明
AtomicIntegerArray原子更新整型数组的元素
AtomicLongArray原子更新长整型数组的元素
AtomicReferenceArray原子更新引用类型数组的元素

1.3 引用类型原子类

类名说明
AtomicReference原子更新引用类型
AtomicStampedReference原子更新带版本号的引用(解决ABA问题)
AtomicMarkableReference原子更新带标记位的引用

1.4 字段更新器

类名说明
AtomicIntegerFieldUpdater原子更新整型字段
AtomicLongFieldUpdater原子更新长整型字段
AtomicReferenceFieldUpdater原子更新引用类型字段

1.5 累加器(JDK 1.8+)

类名说明
LongAdder长整型累加器(高并发场景性能更好)
LongAccumulator长整型累加器(支持自定义函数)
DoubleAdder双精度累加器
DoubleAccumulator双精度累加器(支持自定义函数)

二、AtomicInteger详解

2.1 基本使用

public class AtomicIntegerDemo {
    public static void main(String[] args) {
        AtomicInteger count = new AtomicInteger(0);

        // 获取当前值
        int value = count.get();  // 0

        // 设置值
        count.set(10);

        // 获取并设置(返回旧值)
        int old = count.getAndSet(20);  // 返回10

        // 获取并自增(i++)
        int old2 = count.getAndIncrement();  // 返回20,count变为21

        // 自增并获取(++i)
        int new1 = count.incrementAndGet();  // 返回22

        // 获取并自减(i--)
        int old3 = count.getAndDecrement();  // 返回22,count变为21

        // 自减并获取(--i)
        int new2 = count.decrementAndGet();  // 返回20

        // 获取并增加(+=)
        int old4 = count.getAndAdd(5);  // 返回20,count变为25

        // 增加并获取
        int new3 = count.addAndGet(5);  // 返回30
    }
}

2.2 CAS操作

compareAndSet(CAS的核心方法):

AtomicInteger count = new AtomicInteger(0);

// CAS:如果当前值是expect,则更新为update
boolean success = count.compareAndSet(0, 1);
// success = true,count = 1

success = count.compareAndSet(0, 2);
// success = false,count 仍然是 1(因为当前值不是0)

工作原理

CAS(内存位置V, 期望值A, 新值B)
├─ 如果 V == A
│  └─ V = B,返回 true
└─ 否则
   └─ 不修改,返回 false

2.3 incrementAndGet的实现

源码分析

public class AtomicInteger {
    private volatile int value;  // 使用volatile保证可见性

    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
}

// Unsafe类中的实现
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);  // 读取当前值
    } while (!compareAndSwapInt(o, offset, v, v + delta));  // CAS更新
    return v;
}

流程图

incrementAndGet():

1. 读取当前值 v
2. 计算新值 newV = v + 1
3. CAS(value, v, newV)
   ├─ 成功 → 返回 newV
   └─ 失败 → 回到步骤1(自旋重试)

2.4 性能测试

public class AtomicPerformanceTest {
    private static final int THREAD_COUNT = 10;
    private static final int ITERATIONS = 1_000_000;

    // synchronized版本
    static class SyncCounter {
        private int count = 0;

        public synchronized void increment() {
            count++;
        }

        public int get() {
            return count;
        }
    }

    // AtomicInteger版本
    static class AtomicCounter {
        private AtomicInteger count = new AtomicInteger(0);

        public void increment() {
            count.incrementAndGet();
        }

        public int get() {
            return count.get();
        }
    }

    public static void testSync() throws InterruptedException {
        SyncCounter counter = new SyncCounter();
        CountDownLatch latch = new CountDownLatch(THREAD_COUNT);

        long start = System.currentTimeMillis();

        for (int i = 0; i < THREAD_COUNT; i++) {
            new Thread(() -> {
                for (int j = 0; j < ITERATIONS; j++) {
                    counter.increment();
                }
                latch.countDown();
            }).start();
        }

        latch.await();
        long time = System.currentTimeMillis() - start;

        System.out.println("synchronized: " + time + "ms");
        System.out.println("Result: " + counter.get());
    }

    public static void testAtomic() throws InterruptedException {
        AtomicCounter counter = new AtomicCounter();
        CountDownLatch latch = new CountDownLatch(THREAD_COUNT);

        long start = System.currentTimeMillis();

        for (int i = 0; i < THREAD_COUNT; i++) {
            new Thread(() -> {
                for (int j = 0; j < ITERATIONS; j++) {
                    counter.increment();
                }
                latch.countDown();
            }).start();
        }

        latch.await();
        long time = System.currentTimeMillis() - start;

        System.out.println("AtomicInteger: " + time + "ms");
        System.out.println("Result: " + counter.get());
    }

    public static void main(String[] args) throws InterruptedException {
        testSync();
        testAtomic();
    }
}

实测结果(10线程,各100万次操作):

synchronized: 1200ms
Result: 10000000

AtomicInteger: 280ms
Result: 10000000

性能提升: 4.3倍

三、AtomicReference详解

3.1 基本使用

public class AtomicReferenceDemo {
    static class User {
        String name;
        int age;

        User(String name, int age) {
            this.name = name;
            this.age = age;
        }

        @Override
        public String toString() {
            return "User{name='" + name + "', age=" + age + "}";
        }
    }

    public static void main(String[] args) {
        User user1 = new User("Alice", 25);
        User user2 = new User("Bob", 30);

        AtomicReference<User> ref = new AtomicReference<>(user1);

        // 获取当前引用
        User current = ref.get();
        System.out.println(current);  // User{name='Alice', age=25}

        // CAS更新引用
        boolean success = ref.compareAndSet(user1, user2);
        System.out.println("CAS成功: " + success);  // true
        System.out.println(ref.get());  // User{name='Bob', age=30}

        // 再次CAS(期望值不匹配,失败)
        success = ref.compareAndSet(user1, new User("Charlie", 35));
        System.out.println("CAS成功: " + success);  // false
    }
}

3.2 实现无锁栈

public class LockFreeStack<T> {
    private static class Node<T> {
        T value;
        Node<T> next;

        Node(T value) {
            this.value = value;
        }
    }

    private AtomicReference<Node<T>> top = new AtomicReference<>();

    public void push(T value) {
        Node<T> newTop = new Node<>(value);
        Node<T> oldTop;
        do {
            oldTop = top.get();
            newTop.next = oldTop;
        } while (!top.compareAndSet(oldTop, newTop));
    }

    public T pop() {
        Node<T> oldTop;
        Node<T> newTop;
        do {
            oldTop = top.get();
            if (oldTop == null) {
                return null;
            }
            newTop = oldTop.next;
        } while (!top.compareAndSet(oldTop, newTop));
        return oldTop.value;
    }
}

工作原理

push(5):
1. 创建节点 Node(5)
2. 读取当前top
3. 设置 Node(5).next = top
4. CAS(top, 旧top, Node(5))
   ├─ 成功 → 完成
   └─ 失败 → 回到步骤2(其他线程修改了top)

pop():
1. 读取当前top
2. 如果top == null,返回null
3. 读取newTop = top.next
4. CAS(top, 当前top, newTop)
   ├─ 成功 → 返回当前top.value
   └─ 失败 → 回到步骤1

四、AtomicIntegerArray详解

4.1 基本使用

public class AtomicArrayDemo {
    public static void main(String[] args) {
        AtomicIntegerArray array = new AtomicIntegerArray(10);

        // 设置值
        array.set(0, 100);

        // 获取值
        int value = array.get(0);  // 100

        // 获取并自增
        int old = array.getAndIncrement(0);  // 返回100,array[0]变为101

        // CAS更新
        boolean success = array.compareAndSet(0, 101, 200);
        System.out.println("CAS成功: " + success);  // true
        System.out.println("新值: " + array.get(0));  // 200
    }
}

4.2 实现并行累加

public class ParallelSum {
    private static final int ARRAY_SIZE = 10_000_000;
    private static final int THREAD_COUNT = 10;

    public static void main(String[] args) throws InterruptedException {
        int[] data = new int[ARRAY_SIZE];
        for (int i = 0; i < ARRAY_SIZE; i++) {
            data[i] = i % 1000;
        }

        AtomicIntegerArray result = new AtomicIntegerArray(THREAD_COUNT);
        CountDownLatch latch = new CountDownLatch(THREAD_COUNT);

        long start = System.currentTimeMillis();

        for (int i = 0; i < THREAD_COUNT; i++) {
            int threadId = i;
            new Thread(() -> {
                int start_idx = threadId * (ARRAY_SIZE / THREAD_COUNT);
                int end_idx = (threadId + 1) * (ARRAY_SIZE / THREAD_COUNT);

                int sum = 0;
                for (int j = start_idx; j < end_idx; j++) {
                    sum += data[j];
                }

                result.set(threadId, sum);
                latch.countDown();
            }).start();
        }

        latch.await();

        int totalSum = 0;
        for (int i = 0; i < THREAD_COUNT; i++) {
            totalSum += result.get(i);
        }

        long time = System.currentTimeMillis() - start;

        System.out.println("并行累加结果: " + totalSum);
        System.out.println("耗时: " + time + "ms");
    }
}

五、AtomicStampedReference(解决ABA问题)

5.1 什么是ABA问题

场景

初始值: A

线程1: 读到A → 准备CAS(A, B)
线程2: A → B → A (线程1暂停期间完成)
线程1: CAS(A, B) 成功!(但中间值已经变化过)

问题

  • CAS只比较值,不关心中间变化
  • 某些场景下,中间变化很重要

5.2 AtomicStampedReference的解决方案

思路:维护一个版本号(stamp)

public class AtomicStampedReferenceDemo {
    static AtomicStampedReference<Integer> ref =
        new AtomicStampedReference<>(100, 1);  // 初始值100,版本1

    public static void main(String[] args) throws InterruptedException {
        // 线程1:读取初始值和版本号
        int[] stampHolder = new int[1];
        Integer value = ref.get(stampHolder);
        int stamp = stampHolder[0];
        System.out.println("线程1读取: value=" + value + ", stamp=" + stamp);

        // 线程2:修改值(ABA操作)
        Thread t2 = new Thread(() -> {
            System.out.println("线程2: 100 → 200");
            ref.compareAndSet(100, 200, ref.getStamp(), ref.getStamp() + 1);

            System.out.println("线程2: 200 → 100");
            ref.compareAndSet(200, 100, ref.getStamp(), ref.getStamp() + 1);
        });
        t2.start();
        t2.join();

        // 线程1:尝试CAS(值相同但版本号不同)
        boolean success = ref.compareAndSet(value, 300, stamp, stamp + 1);
        System.out.println("线程1 CAS结果: " + success);  // false
        System.out.println("当前版本号: " + ref.getStamp());  // 3(版本号已变化)
    }
}

输出

线程1读取: value=100, stamp=1
线程2: 100 → 200
线程2: 200 → 100
线程1 CAS结果: false  ← 检测到ABA问题
当前版本号: 3

六、LongAdder(高性能累加器)

6.1 AtomicLong的性能瓶颈

高并发场景下的问题

AtomicLong counter = new AtomicLong(0);

// 100个线程同时自增
for (int i = 0; i < 100; i++) {
    new Thread(() -> {
        for (int j = 0; j < 1_000_000; j++) {
            counter.incrementAndGet();  // CAS竞争激烈
        }
    }).start();
}
// 大量CAS失败,性能下降

6.2 LongAdder的优化思路

核心思想:分段累加

AtomicLong:
所有线程竞争同一个变量
┌──────┐
│ 变量 │ ← 100个线程竞争
└──────┘

LongAdder:
每个线程使用独立的Cell
┌──────┐ ← 线程1
│Cell 0│
├──────┤ ← 线程2
│Cell 1│
├──────┤ ← 线程3
│Cell 2│
└──────┘
...
最后求和: sum = Cell[0] + Cell[1] + Cell[2] + ...

6.3 LongAdder使用示例

public class LongAdderDemo {
    public static void main(String[] args) throws InterruptedException {
        LongAdder adder = new LongAdder();

        int threadCount = 100;
        CountDownLatch latch = new CountDownLatch(threadCount);

        long start = System.currentTimeMillis();

        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1_000_000; j++) {
                    adder.increment();  // 累加
                }
                latch.countDown();
            }).start();
        }

        latch.await();
        long time = System.currentTimeMillis() - start;

        System.out.println("LongAdder结果: " + adder.sum());
        System.out.println("耗时: " + time + "ms");
    }
}

6.4 性能对比

public class LongAdderVsAtomicLong {
    private static final int THREAD_COUNT = 100;
    private static final int ITERATIONS = 1_000_000;

    public static void testAtomicLong() throws InterruptedException {
        AtomicLong counter = new AtomicLong(0);
        CountDownLatch latch = new CountDownLatch(THREAD_COUNT);

        long start = System.currentTimeMillis();

        for (int i = 0; i < THREAD_COUNT; i++) {
            new Thread(() -> {
                for (int j = 0; j < ITERATIONS; j++) {
                    counter.incrementAndGet();
                }
                latch.countDown();
            }).start();
        }

        latch.await();
        long time = System.currentTimeMillis() - start;

        System.out.println("AtomicLong: " + time + "ms");
    }

    public static void testLongAdder() throws InterruptedException {
        LongAdder adder = new LongAdder();
        CountDownLatch latch = new CountDownLatch(THREAD_COUNT);

        long start = System.currentTimeMillis();

        for (int i = 0; i < THREAD_COUNT; i++) {
            new Thread(() -> {
                for (int j = 0; j < ITERATIONS; j++) {
                    adder.increment();
                }
                latch.countDown();
            }).start();
        }

        latch.await();
        long time = System.currentTimeMillis() - start;

        System.out.println("LongAdder: " + time + "ms");
    }

    public static void main(String[] args) throws InterruptedException {
        testAtomicLong();
        testLongAdder();
    }
}

实测结果(100线程,各100万次操作):

AtomicLong: 8500ms
LongAdder: 850ms
性能提升: 10倍

6.5 什么时候用LongAdder?

使用LongAdder的场景

  • ✅ 高并发累加(100+线程)
  • ✅ 只需要最终结果(调用sum())
  • ✅ 写多读少

使用AtomicLong的场景

  • ✅ 需要精确的实时值
  • ✅ 并发度不高(<10线程)
  • ✅ 需要CAS操作

七、总结

7.1 核心要点

  1. 原子类的分类

    • 基本类型:AtomicInteger、AtomicLong、AtomicBoolean
    • 引用类型:AtomicReference、AtomicStampedReference
    • 数组类型:AtomicIntegerArray、AtomicLongArray
    • 累加器:LongAdder、LongAccumulator
  2. CAS操作

    • Compare-And-Swap
    • 无锁算法的基础
    • 自旋重试
  3. 性能对比

    • AtomicInteger vs synchronized:4倍提升
    • LongAdder vs AtomicLong:10倍提升(高并发)
  4. ABA问题

    • 中间值变化无法检测
    • AtomicStampedReference解决(版本号)
  5. 使用建议

    • 低并发:AtomicInteger/AtomicLong
    • 高并发累加:LongAdder
    • 引用更新:AtomicReference
    • 需要版本控制:AtomicStampedReference

7.2 最佳实践

  1. 优先使用原子类

    // ✅ 好
    private AtomicInteger count = new AtomicInteger(0);
    
    // ❌ 不好
    private volatile int count = 0;
    public synchronized void increment() { count++; }
    
  2. 选择合适的原子类

    // 计数器(低并发)
    AtomicInteger counter = new AtomicInteger();
    
    // 计数器(高并发)
    LongAdder counter = new LongAdder();
    
    // 引用更新
    AtomicReference<User> userRef = new AtomicReference<>();
    
  3. 注意ABA问题

    // 如果关心中间变化,使用版本号
    AtomicStampedReference<Integer> ref =
        new AtomicStampedReference<>(100, 0);
    

7.3 思考题

  1. 为什么AtomicInteger比synchronized快?
  2. LongAdder为什么比AtomicLong快?
  3. 什么场景下必须使用AtomicStampedReference?

7.4 下一篇预告

在理解了原子类的使用后,下一篇我们将深入学习CAS算法的原理 —— Compare-And-Swap如何实现无锁算法,以及ABA问题的完整解决方案。


扩展阅读

  1. JDK源码

    • java.util.concurrent.atomic包
    • sun.misc.Unsafe类
  2. 书籍

    • 《Java并发编程实战》第15章:原子变量与非阻塞同步
    • 《Java并发编程的艺术》第7章:原子操作类
  3. 论文

    • “Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms” - Maged M. Michael, Michael L. Scott
  4. 工具

    • JMH:性能基准测试

系列文章