引言:volatile不能保证原子性
回顾之前讲过的例子:
public class Counter {
private volatile int count = 0;
public void increment() {
count++; // 不是原子操作!
}
}
问题:
count++;
// 实际上是3个操作:
// 1. temp = count (读)
// 2. temp = temp + 1 (修改)
// 3. count = temp (写)
// 多线程执行时可能丢失更新
解决方案1:synchronized
public synchronized void increment() {
count++; // 原子操作
}
// 缺点:性能开销大
解决方案2:原子类
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet(); // 原子操作,性能好
}
性能对比:
synchronized: 1200ms
AtomicInteger: 280ms
性能提升: 4.3倍
本篇文章将深入Java原子类的实现原理和使用方法。
一、原子类的分类
Java提供了丰富的原子类,位于java.util.concurrent.atomic包:
1.1 基本类型原子类
| 类名 | 说明 |
|---|---|
| AtomicInteger | 原子更新整型 |
| AtomicLong | 原子更新长整型 |
| AtomicBoolean | 原子更新布尔型 |
1.2 数组类型原子类
| 类名 | 说明 |
|---|---|
| AtomicIntegerArray | 原子更新整型数组的元素 |
| AtomicLongArray | 原子更新长整型数组的元素 |
| AtomicReferenceArray | 原子更新引用类型数组的元素 |
1.3 引用类型原子类
| 类名 | 说明 |
|---|---|
| AtomicReference | 原子更新引用类型 |
| AtomicStampedReference | 原子更新带版本号的引用(解决ABA问题) |
| AtomicMarkableReference | 原子更新带标记位的引用 |
1.4 字段更新器
| 类名 | 说明 |
|---|---|
| AtomicIntegerFieldUpdater | 原子更新整型字段 |
| AtomicLongFieldUpdater | 原子更新长整型字段 |
| AtomicReferenceFieldUpdater | 原子更新引用类型字段 |
1.5 累加器(JDK 1.8+)
| 类名 | 说明 |
|---|---|
| LongAdder | 长整型累加器(高并发场景性能更好) |
| LongAccumulator | 长整型累加器(支持自定义函数) |
| DoubleAdder | 双精度累加器 |
| DoubleAccumulator | 双精度累加器(支持自定义函数) |
二、AtomicInteger详解
2.1 基本使用
public class AtomicIntegerDemo {
public static void main(String[] args) {
AtomicInteger count = new AtomicInteger(0);
// 获取当前值
int value = count.get(); // 0
// 设置值
count.set(10);
// 获取并设置(返回旧值)
int old = count.getAndSet(20); // 返回10
// 获取并自增(i++)
int old2 = count.getAndIncrement(); // 返回20,count变为21
// 自增并获取(++i)
int new1 = count.incrementAndGet(); // 返回22
// 获取并自减(i--)
int old3 = count.getAndDecrement(); // 返回22,count变为21
// 自减并获取(--i)
int new2 = count.decrementAndGet(); // 返回20
// 获取并增加(+=)
int old4 = count.getAndAdd(5); // 返回20,count变为25
// 增加并获取
int new3 = count.addAndGet(5); // 返回30
}
}
2.2 CAS操作
compareAndSet(CAS的核心方法):
AtomicInteger count = new AtomicInteger(0);
// CAS:如果当前值是expect,则更新为update
boolean success = count.compareAndSet(0, 1);
// success = true,count = 1
success = count.compareAndSet(0, 2);
// success = false,count 仍然是 1(因为当前值不是0)
工作原理:
CAS(内存位置V, 期望值A, 新值B)
├─ 如果 V == A
│ └─ V = B,返回 true
└─ 否则
└─ 不修改,返回 false
2.3 incrementAndGet的实现
源码分析:
public class AtomicInteger {
private volatile int value; // 使用volatile保证可见性
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
}
// Unsafe类中的实现
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset); // 读取当前值
} while (!compareAndSwapInt(o, offset, v, v + delta)); // CAS更新
return v;
}
流程图:
incrementAndGet():
1. 读取当前值 v
2. 计算新值 newV = v + 1
3. CAS(value, v, newV)
├─ 成功 → 返回 newV
└─ 失败 → 回到步骤1(自旋重试)
2.4 性能测试
public class AtomicPerformanceTest {
private static final int THREAD_COUNT = 10;
private static final int ITERATIONS = 1_000_000;
// synchronized版本
static class SyncCounter {
private int count = 0;
public synchronized void increment() {
count++;
}
public int get() {
return count;
}
}
// AtomicInteger版本
static class AtomicCounter {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet();
}
public int get() {
return count.get();
}
}
public static void testSync() throws InterruptedException {
SyncCounter counter = new SyncCounter();
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
long start = System.currentTimeMillis();
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
for (int j = 0; j < ITERATIONS; j++) {
counter.increment();
}
latch.countDown();
}).start();
}
latch.await();
long time = System.currentTimeMillis() - start;
System.out.println("synchronized: " + time + "ms");
System.out.println("Result: " + counter.get());
}
public static void testAtomic() throws InterruptedException {
AtomicCounter counter = new AtomicCounter();
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
long start = System.currentTimeMillis();
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
for (int j = 0; j < ITERATIONS; j++) {
counter.increment();
}
latch.countDown();
}).start();
}
latch.await();
long time = System.currentTimeMillis() - start;
System.out.println("AtomicInteger: " + time + "ms");
System.out.println("Result: " + counter.get());
}
public static void main(String[] args) throws InterruptedException {
testSync();
testAtomic();
}
}
实测结果(10线程,各100万次操作):
synchronized: 1200ms
Result: 10000000
AtomicInteger: 280ms
Result: 10000000
性能提升: 4.3倍
三、AtomicReference详解
3.1 基本使用
public class AtomicReferenceDemo {
static class User {
String name;
int age;
User(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "User{name='" + name + "', age=" + age + "}";
}
}
public static void main(String[] args) {
User user1 = new User("Alice", 25);
User user2 = new User("Bob", 30);
AtomicReference<User> ref = new AtomicReference<>(user1);
// 获取当前引用
User current = ref.get();
System.out.println(current); // User{name='Alice', age=25}
// CAS更新引用
boolean success = ref.compareAndSet(user1, user2);
System.out.println("CAS成功: " + success); // true
System.out.println(ref.get()); // User{name='Bob', age=30}
// 再次CAS(期望值不匹配,失败)
success = ref.compareAndSet(user1, new User("Charlie", 35));
System.out.println("CAS成功: " + success); // false
}
}
3.2 实现无锁栈
public class LockFreeStack<T> {
private static class Node<T> {
T value;
Node<T> next;
Node(T value) {
this.value = value;
}
}
private AtomicReference<Node<T>> top = new AtomicReference<>();
public void push(T value) {
Node<T> newTop = new Node<>(value);
Node<T> oldTop;
do {
oldTop = top.get();
newTop.next = oldTop;
} while (!top.compareAndSet(oldTop, newTop));
}
public T pop() {
Node<T> oldTop;
Node<T> newTop;
do {
oldTop = top.get();
if (oldTop == null) {
return null;
}
newTop = oldTop.next;
} while (!top.compareAndSet(oldTop, newTop));
return oldTop.value;
}
}
工作原理:
push(5):
1. 创建节点 Node(5)
2. 读取当前top
3. 设置 Node(5).next = top
4. CAS(top, 旧top, Node(5))
├─ 成功 → 完成
└─ 失败 → 回到步骤2(其他线程修改了top)
pop():
1. 读取当前top
2. 如果top == null,返回null
3. 读取newTop = top.next
4. CAS(top, 当前top, newTop)
├─ 成功 → 返回当前top.value
└─ 失败 → 回到步骤1
四、AtomicIntegerArray详解
4.1 基本使用
public class AtomicArrayDemo {
public static void main(String[] args) {
AtomicIntegerArray array = new AtomicIntegerArray(10);
// 设置值
array.set(0, 100);
// 获取值
int value = array.get(0); // 100
// 获取并自增
int old = array.getAndIncrement(0); // 返回100,array[0]变为101
// CAS更新
boolean success = array.compareAndSet(0, 101, 200);
System.out.println("CAS成功: " + success); // true
System.out.println("新值: " + array.get(0)); // 200
}
}
4.2 实现并行累加
public class ParallelSum {
private static final int ARRAY_SIZE = 10_000_000;
private static final int THREAD_COUNT = 10;
public static void main(String[] args) throws InterruptedException {
int[] data = new int[ARRAY_SIZE];
for (int i = 0; i < ARRAY_SIZE; i++) {
data[i] = i % 1000;
}
AtomicIntegerArray result = new AtomicIntegerArray(THREAD_COUNT);
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
long start = System.currentTimeMillis();
for (int i = 0; i < THREAD_COUNT; i++) {
int threadId = i;
new Thread(() -> {
int start_idx = threadId * (ARRAY_SIZE / THREAD_COUNT);
int end_idx = (threadId + 1) * (ARRAY_SIZE / THREAD_COUNT);
int sum = 0;
for (int j = start_idx; j < end_idx; j++) {
sum += data[j];
}
result.set(threadId, sum);
latch.countDown();
}).start();
}
latch.await();
int totalSum = 0;
for (int i = 0; i < THREAD_COUNT; i++) {
totalSum += result.get(i);
}
long time = System.currentTimeMillis() - start;
System.out.println("并行累加结果: " + totalSum);
System.out.println("耗时: " + time + "ms");
}
}
五、AtomicStampedReference(解决ABA问题)
5.1 什么是ABA问题
场景:
初始值: A
线程1: 读到A → 准备CAS(A, B)
线程2: A → B → A (线程1暂停期间完成)
线程1: CAS(A, B) 成功!(但中间值已经变化过)
问题:
- CAS只比较值,不关心中间变化
- 某些场景下,中间变化很重要
5.2 AtomicStampedReference的解决方案
思路:维护一个版本号(stamp)
public class AtomicStampedReferenceDemo {
static AtomicStampedReference<Integer> ref =
new AtomicStampedReference<>(100, 1); // 初始值100,版本1
public static void main(String[] args) throws InterruptedException {
// 线程1:读取初始值和版本号
int[] stampHolder = new int[1];
Integer value = ref.get(stampHolder);
int stamp = stampHolder[0];
System.out.println("线程1读取: value=" + value + ", stamp=" + stamp);
// 线程2:修改值(ABA操作)
Thread t2 = new Thread(() -> {
System.out.println("线程2: 100 → 200");
ref.compareAndSet(100, 200, ref.getStamp(), ref.getStamp() + 1);
System.out.println("线程2: 200 → 100");
ref.compareAndSet(200, 100, ref.getStamp(), ref.getStamp() + 1);
});
t2.start();
t2.join();
// 线程1:尝试CAS(值相同但版本号不同)
boolean success = ref.compareAndSet(value, 300, stamp, stamp + 1);
System.out.println("线程1 CAS结果: " + success); // false
System.out.println("当前版本号: " + ref.getStamp()); // 3(版本号已变化)
}
}
输出:
线程1读取: value=100, stamp=1
线程2: 100 → 200
线程2: 200 → 100
线程1 CAS结果: false ← 检测到ABA问题
当前版本号: 3
六、LongAdder(高性能累加器)
6.1 AtomicLong的性能瓶颈
高并发场景下的问题:
AtomicLong counter = new AtomicLong(0);
// 100个线程同时自增
for (int i = 0; i < 100; i++) {
new Thread(() -> {
for (int j = 0; j < 1_000_000; j++) {
counter.incrementAndGet(); // CAS竞争激烈
}
}).start();
}
// 大量CAS失败,性能下降
6.2 LongAdder的优化思路
核心思想:分段累加
AtomicLong:
所有线程竞争同一个变量
┌──────┐
│ 变量 │ ← 100个线程竞争
└──────┘
LongAdder:
每个线程使用独立的Cell
┌──────┐ ← 线程1
│Cell 0│
├──────┤ ← 线程2
│Cell 1│
├──────┤ ← 线程3
│Cell 2│
└──────┘
...
最后求和: sum = Cell[0] + Cell[1] + Cell[2] + ...
6.3 LongAdder使用示例
public class LongAdderDemo {
public static void main(String[] args) throws InterruptedException {
LongAdder adder = new LongAdder();
int threadCount = 100;
CountDownLatch latch = new CountDownLatch(threadCount);
long start = System.currentTimeMillis();
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
for (int j = 0; j < 1_000_000; j++) {
adder.increment(); // 累加
}
latch.countDown();
}).start();
}
latch.await();
long time = System.currentTimeMillis() - start;
System.out.println("LongAdder结果: " + adder.sum());
System.out.println("耗时: " + time + "ms");
}
}
6.4 性能对比
public class LongAdderVsAtomicLong {
private static final int THREAD_COUNT = 100;
private static final int ITERATIONS = 1_000_000;
public static void testAtomicLong() throws InterruptedException {
AtomicLong counter = new AtomicLong(0);
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
long start = System.currentTimeMillis();
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
for (int j = 0; j < ITERATIONS; j++) {
counter.incrementAndGet();
}
latch.countDown();
}).start();
}
latch.await();
long time = System.currentTimeMillis() - start;
System.out.println("AtomicLong: " + time + "ms");
}
public static void testLongAdder() throws InterruptedException {
LongAdder adder = new LongAdder();
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
long start = System.currentTimeMillis();
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
for (int j = 0; j < ITERATIONS; j++) {
adder.increment();
}
latch.countDown();
}).start();
}
latch.await();
long time = System.currentTimeMillis() - start;
System.out.println("LongAdder: " + time + "ms");
}
public static void main(String[] args) throws InterruptedException {
testAtomicLong();
testLongAdder();
}
}
实测结果(100线程,各100万次操作):
AtomicLong: 8500ms
LongAdder: 850ms
性能提升: 10倍
6.5 什么时候用LongAdder?
使用LongAdder的场景:
- ✅ 高并发累加(100+线程)
- ✅ 只需要最终结果(调用sum())
- ✅ 写多读少
使用AtomicLong的场景:
- ✅ 需要精确的实时值
- ✅ 并发度不高(<10线程)
- ✅ 需要CAS操作
七、总结
7.1 核心要点
原子类的分类
- 基本类型:AtomicInteger、AtomicLong、AtomicBoolean
- 引用类型:AtomicReference、AtomicStampedReference
- 数组类型:AtomicIntegerArray、AtomicLongArray
- 累加器:LongAdder、LongAccumulator
CAS操作
- Compare-And-Swap
- 无锁算法的基础
- 自旋重试
性能对比
- AtomicInteger vs synchronized:4倍提升
- LongAdder vs AtomicLong:10倍提升(高并发)
ABA问题
- 中间值变化无法检测
- AtomicStampedReference解决(版本号)
使用建议
- 低并发:AtomicInteger/AtomicLong
- 高并发累加:LongAdder
- 引用更新:AtomicReference
- 需要版本控制:AtomicStampedReference
7.2 最佳实践
优先使用原子类
// ✅ 好 private AtomicInteger count = new AtomicInteger(0); // ❌ 不好 private volatile int count = 0; public synchronized void increment() { count++; }选择合适的原子类
// 计数器(低并发) AtomicInteger counter = new AtomicInteger(); // 计数器(高并发) LongAdder counter = new LongAdder(); // 引用更新 AtomicReference<User> userRef = new AtomicReference<>();注意ABA问题
// 如果关心中间变化,使用版本号 AtomicStampedReference<Integer> ref = new AtomicStampedReference<>(100, 0);
7.3 思考题
- 为什么AtomicInteger比synchronized快?
- LongAdder为什么比AtomicLong快?
- 什么场景下必须使用AtomicStampedReference?
7.4 下一篇预告
在理解了原子类的使用后,下一篇我们将深入学习CAS算法的原理 —— Compare-And-Swap如何实现无锁算法,以及ABA问题的完整解决方案。
扩展阅读
JDK源码:
- java.util.concurrent.atomic包
- sun.misc.Unsafe类
书籍:
- 《Java并发编程实战》第15章:原子变量与非阻塞同步
- 《Java并发编程的艺术》第7章:原子操作类
论文:
- “Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms” - Maged M. Michael, Michael L. Scott
工具:
- JMH:性能基准测试
系列文章:
- 上一篇:Java并发14:synchronized的优化演进
- 下一篇:Java并发16:CAS算法与ABA问题 (即将发布)