下篇

8.共享模型之工具

8.1线程池

基本概述

线程池:一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作

线程池作用:

  1. 降低资源消耗,减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
  2. 提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死
  3. 提高线程的可管理性,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

线程池的核心思想:线程复用,同一个线程可以被重复使用,来处理多个任务

池化技术 (Pool) :一种编程技巧,核心思想是资源复用,在请求量大时能优化应用性能,降低系统频繁建连的资源开销

一、自定义线程池

线程池不是越大越好,要与CPU核数适配

img

一步步手写线程池-阻塞队列

1.创建基本类和方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class BlockingQueue<T> {
//1.任务队列
private Deque<T> queue = new ArrayDeque<>();
//2.锁
private ReentrantLock lock = new ReentrantLock();
//3.生产者条件变量
private Condition fullWaitSet = lock.newCondition();
//4.消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
//5.容量
private int capacity;

//阻塞获取
public T take() {
lock.lock();
try {
//判断队列是否为空
while (queue.isEmpty()) {
//队列为空,等待
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//队列不为空,取出元素
T t = queue.removeFirst();
//通知生产者队列中有空位可以存放元素了
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}

//阻塞添加
public void put(T element) {
lock.lock();
try {
//判断队列是否已满
while (queue.size() == capacity) {
//队列已满,等待
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//队列未满,队列尾部添加元素
queue.addLast(element);
//通知消费者队列中有元素可以消费了
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}


//获取大小
public int size() {
lock.lock();
try {

return queue.size();
}finally {
lock.unlock();
}
}
}
2.带超时的阻塞获取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//带超时的阻塞获取
public T poll(long timeout, TimeUnit unit){
lock.lock();
try {
//将timeout统一转换为纳秒
long nanos = unit.toNanos(timeout);
//判断队列是否为空
while (queue.isEmpty()) {
//队列为空,等待
try {
//返回的是剩余时间,如果小于等于0,则直接返回null
if(nanos<=0) return null;
//等待,直到被唤醒或者超时(返回剩余时间
nanos=emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//队列不为空,取出元素
T t = queue.removeFirst();
//通知生产者队列中有空位可以存放元素了
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
3.下面是线程池的基本框架:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class ThreadPool {
//任务队列
private BlockingQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers = new HashSet<>();
//核心线程数
private int coreSize;
//获取任务的超时时间
private long timeout;

private TimeUnit timeUnit;

public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapacity);
}


class Worker {


}

class BlockingQueue<T> {
//1.任务队列
private Deque<T> queue = new ArrayDeque<>();
//2.锁
private ReentrantLock lock = new ReentrantLock();
//3.生产者条件变量
private Condition fullWaitSet = lock.newCondition();
//4.消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
//5.容量
private int capacity;

//构造器
public BlockingQueue(int capacity) {
this.capacity = capacity;
}

//带超时的阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
//将timeout统一转换为纳秒
long nanos = unit.toNanos(timeout);
//判断队列是否为空
while (queue.isEmpty()) {
//队列为空,等待
try {
//返回的是剩余时间,如果小于等于0,则直接返回null
if (nanos <= 0) return null;
//等待,直到被唤醒或者超时(返回剩余时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//队列不为空,取出元素
T t = queue.removeFirst();
//通知生产者队列中有空位可以存放元素了
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}

//阻塞获取
public T take() {
lock.lock();
try {
//判断队列是否为空
while (queue.isEmpty()) {
//队列为空,等待
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//队列不为空,取出元素
T t = queue.removeFirst();
//通知生产者队列中有空位可以存放元素了
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}

//阻塞添加
public void put(T element) {
lock.lock();
try {
//判断队列是否已满
while (queue.size() == capacity) {
//队列已满,等待
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//队列未满,队列尾部添加元素
queue.addLast(element);
//通知消费者队列中有元素可以消费了
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}


//获取大小
public int size() {
lock.lock();
try {

return queue.size();
} finally {
lock.unlock();
}
}
}
}
4.完善任务提交 Worker实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@Slf4j(topic="c.ThreadPool")
class ThreadPool{
//任务队列
private BlockingQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers = new HashSet<>();
//核心线程数
private int coreSize;
//获取任务的超时时间
private long timeout;
//时间单位
private TimeUnit timeUnit;
//执行任务
public void execute(Runnable task){
//当任务数没有超过coreSize时,直接交给worker对象执行
//如果任务数超过coreSize时,加入任务队列暂存
synchronized (workers){
if(workers.size()<coreSize){
Worker worker = new Worker(task);
log.debug("新增 worker{},{}",worker,task);
workers.add(worker);
worker.start();
}else{
log.debug("加入任务队列 {}",task);
taskQueue.put(task);
}
}

}

public ThreadPool( int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity) {
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
}

class Worker extends Thread{
private Runnable task;

public Worker(Runnable task) {
this.task = task;
}
public void run(){
//执行任务
// 1)当task不为空,执行任务
// 2)当task执行完毕,再接着从任务队列获取任务
while(task !=null || (task = taskQueue.take() )!= null){
try{
log.debug("正在执行...{}",task);
task.run();
}catch(Exception e){
e.printStackTrace();
}finally {
task=null;
}
}
synchronized (workers){
log.debug("worker被移除{}",this);
workers.remove(this);
}
}
}

}

下面是执行方法:

1
2
3
4
5
6
7
8
9
10
11
12
@Slf4j(topic="c.TestPool")
public class TestPool {
public static void main(String[] args){
ThreadPool threadPool = new ThreadPool(2,1000,TimeUnit.MICROSECONDS,1);
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(()->{
log.debug("{}",j);
});
}
}
}

因为线程池线程数为2,所以是创建了2个worker线程。

然后把创建出来的2个线程加入到任务队列中等待执行。

然后Thread1和Thread2分别执行。输出结果1和0

执行完一个任务,新的任务会被继续加入任务队列。

总共设置了5个任务,全部比线程执行完毕。

img

现在有一个问题,线程会无限死等:

img

只需要把take方法替换为poll方法即可:

img

自动停止:

img

5.当任务队列已满

现在假如核心线程数为2,队列容量大小为10,假如处理一个任务的耗时很长,生成了15个任务,必然会有10个任务在任务队列中阻塞,而有3个任务等待加入任务队列。

img

应该添加一个拒绝策略。

6. offer增强
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//带超时时间的阻塞添加
public boolean offer(T task,long timeout,TimeUnit timeUnit){
lock.lock();
try{
long nanos = timeUnit.toNanos(timeout);
while(queue.size()==capcity){
try {
log.debug("等待加入任务队列{}...",task);
if(nanos<=0){
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}",task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
7.拒绝策略

如果把拒绝策略写死在执行方法里需要很多的if-else判断,现在的思路是将拒绝策略的选择交给用户端,由用户来决定要用哪种拒绝策略。

img

现在可以用策略模式,把操作抽象为接口,具体的实现由调用者传递进来。

主要是修改main方法和execute方法:

img

img

首先是死等策略,只要队列还有任务就一直死等

img

第二种带超时等待

img

img

第三种,让调用者放弃任务执行

img

第四种让调用者抛出异常

img

第五种让调用者自己执行任务

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
@Slf4j(topic="c.TestPool")
public class TestPool {
public static void main(String[] args){
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,1,(queue,task)->{
//1.死等
// queue.put(task);
// 2)带超时等待
// queue.offer(task,1500,TimeUnit.MILLISECONDS);
// 3)让调用者 放弃任务执行
// log.debug("放弃{}",task); //队列一满就自动放弃执行
// 4)让调用者 抛出异常
// throw new RuntimeException("任务执行失败 "+task);
// 5)让调用者自己执行任务
task.run();
});
for (int i = 0; i < 4; i++) {
int j = i;
threadPool.execute(()->{
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("{}",j);
});

}
}
}
@FunctionalInterface //拒绝策略
interface RejectPolicy<T>{
void reject(BlockingQueue<T> queue,T task);
}
@Slf4j(topic="c.ThreadPool")
class ThreadPool{
//任务队列
private BlockingQueue<Runnable> taskQueue;
//线程集合
private HashSet<Worker> workers = new HashSet<>();
//核心线程数
private int coreSize;
//获取任务的超时时间
private long timeout;
//时间单位
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
//执行任务
public void execute(Runnable task){
//当任务数没有超过coreSize时,直接交给worker对象执行
//如果任务数超过coreSize时,加入任务队列暂存
synchronized (workers){
if(workers.size()<coreSize){
Worker worker = new Worker(task);
log.debug("新增 worker{},{}",worker,task);
workers.add(worker);
worker.start();
}else{
taskQueue.tryPut(rejectPolicy,task);
}
}

}

public ThreadPool( int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.rejectPolicy = rejectPolicy;
}

class Worker extends Thread{
private Runnable task;

public Worker(Runnable task) {
this.task = task;
}
public void run(){
//执行任务
// 1)当task不为空,执行任务
// 2)当task执行完毕,再接着从任务队列获取任务
while(task !=null || (task = taskQueue.poll(timeout,timeUnit) )!= null){
try{
log.debug("正在执行...{}",task);
task.run();
}catch(Exception e){
e.printStackTrace();
}finally {
task=null;
}
}
synchronized (workers){
log.debug("worker被移除{}",this);
workers.remove(this);
}
}
}

}

@Slf4j(topic="c.BlockingQueue")
class BlockingQueue<T>{
// 1.任务队列
private Deque<T> queue = new ArrayDeque<>();
// 2.锁
private ReentrantLock lock = new ReentrantLock();
// 3.生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 4.消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 5.容量
private int capcity;

public BlockingQueue(int capcity) {
this.capcity = capcity;
}

//带超时的阻塞获取
public T poll(long timeout, TimeUnit unit){
lock.lock();
try{
long nanos = unit.toNanos(timeout);//将timeout时间统一转化为纳秒
while(queue.isEmpty()){
try {
//没等到直接返回
if(nanos<=0){
return null;
}
//返回的是剩余时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally{
lock.unlock();
}
}


//阻塞获取

public T take(){
lock.lock();
try{
while(queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally{
lock.unlock();
}
}
//阻塞添加
public void put(T task){
lock.lock();
try{
while(queue.size()==capcity){
try {
log.debug("等待加入任务队列{}...",task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
log.debug("加入任务队列 {}",task);
emptyWaitSet.signal();
}finally {
lock.unlock();

}
}
//带超时时间的阻塞添加
public boolean offer(T task,long timeout,TimeUnit timeUnit){
lock.lock();
try{
long nanos = timeUnit.toNanos(timeout);
while(queue.size()==capcity){
try {
log.debug("等待加入任务队列{}...",task);
if(nanos<=0){
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}",task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
//获取大小
public int size(){
lock.lock();
try{
return queue.size();
}finally {
lock.unlock();
}
}


public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try{
//判断队列是否已满
if(queue.size()==capcity){
rejectPolicy.reject(this,task);
}else{ //队列有空闲
log.debug("加入任务队列 {}",task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally{
lock.unlock();
}
}
}

二、JDK为我们提供的线程池ThreadPoolExecutor

ThreadPoolExecutor 是什么?

ThreadPoolExecutor 是 Java 提供的最基础、最灵活的线程池实现类,几乎所有线程池的底层实现都是基于它来构建的。比如:

  • Executors.newFixedThreadPool() 就是基于它创建固定线程池
  • Executors.newCachedThreadPool() 是可缓存线程池
  • ScheduledThreadPoolExecutor 是它的子类,支持定时调度任务

img

说明:

  • ScheduledThreadPoolExecutor是带调度的线程池
  • ThreadPoolExecutor是不带调度的线程池
1.线程池状态

img

状态名 高3位 接收新任务 处理队列任务 描述
RUNNING 111 正常运行中
SHUTDOWN 000 拒绝新任务,但会继续执行队列中的任务
STOP 001 拒绝所有任务,并中断正在执行的任务
TIDYING 010 无意义 无意义 所有任务已清空,活动线程为 0,准备彻底关闭
TERMINATED 011 无意义 无意义 已彻底终止

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

为何这样设计?

将状态信息与线程数量合并在一起(原子变量ctl),可以通过一次原子操作(CAS)同时更新状态与线程数,提升性能,减少同步成本

1
2
3
4
// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }
2.构造方法
1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • corePoolSize 核心线程数目 (常驻线程数)
  • maximumPoolSize 最大线程数目(救急线程上限)
  • keepAliveTime 生存时间 - 针对救急线程(
    救急线程多久无任务会销毁)
  • unit 时间单位 - 针对救急线程
  • workQueue 阻塞队列(任务队列,存储待处理任务)
  • threadFactory 线程工厂 - 可以为线程创建时起个好名字(创建线程的工厂,常用于设置线程名)
  • handler 拒绝策略(任务无法处理时的行为)
3.工作方式

img

  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  • 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排 队,直到有空闲的线程。
  • 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
  • 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现
    • AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
    • CallerRunsPolicy 让调用者运行任务
    • DiscardPolicy 放弃本次任务
    • DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
    • Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方 便定位问题
    • Netty 的实现,是创建一个新线程来执行任务
    • ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
    • PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
  • 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。

img

拒绝策略(RejectedExecutionHandler)

策略名 行为说明
AbortPolicy(默认) 抛出 RejectedExecutionException 异常
CallerRunsPolicy 由提交任务的线程(调用者)自己执行任务
DiscardPolicy 直接丢弃该任务,不抛异常
DiscardOldestPolicy 丢弃队列中最早的任务,然后执行当前任务
自定义策略 框架常见实现,如:Dubbo、Netty、ActiveMQ、Pinpoint 等

img

根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池。

Executors 工厂方法(快速创建线程池)

工厂方法 内部使用 ThreadPoolExecutor
newFixedThreadPool(n) 固定核心线程数,任务多时进入队列
newCachedThreadPool() 没有限制的线程池,空闲线程会被回收
newSingleThreadExecutor() 单线程池,串行执行任务
newScheduledThreadPool(n) 定时线程池(用 ScheduledThreadPoolExecutor)
4.Executors 固定大小线程池newFixedThreadPool
1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

线程池特性

  • 核心线程数 = 最大线程数 = n
  • 没有救急线程,不涉及超时销毁逻辑
  • 队列为无界队列(LinkedBlockingQueue
  • 拒绝策略:默认抛异常(AbortPolicy

适用场景

任务量已知且较重,适合持久运行的任务处理,例如日志记录、I/O 任务等。

5. Executors 带缓冲线程池newCachedThreadPool

核心线程数为0,最大线程数为Integer.Max_VALUE

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

特点

  • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,
    • 意味着全部都是救急线程(60s 后可以回收)
    • 救急线程可以无限创建
  • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
SynchronousQueue<Integer> integers = new SynchronousQueue<>();
new Thread(() -> {
try {
log.debug("putting {} ", 1);
integers.put(1);
log.debug("{} putted...", 1);
log.debug("putting...{} ", 2);
integers.put(2);
log.debug("{} putted...", 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1").start();
sleep(1);
new Thread(() -> {
try {
log.debug("taking {}", 1);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
sleep(1);
new Thread(() -> {
try {
log.debug("taking {}", 2);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t3").start();

输出

1
2
3
4
5
6
11:48:15.500 c.TestSynchronousQueue [t1] - putting 1 
11:48:16.500 c.TestSynchronousQueue [t2] - taking 1
11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted...
11:48:16.500 c.TestSynchronousQueue [t1] - putting...2
11:48:17.502 c.TestSynchronousQueue [t3] - taking 2
11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted...

img

评价 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况

6.Executors 单线程线程池newSingleThreadExecutor

特性说明:

  • 核心线程数 = 最大线程数 = 1
  • 无救急线程,线程死后自动补一个
  • 使用 LinkedBlockingQueue,任务串行排队
1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

使用场景:

希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程 也不会被释放。

区别:

  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作

img

  • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
    • FinalizableDelegatedExecutorService 应用的是装饰器模式,在调用构造方法时将ThreadPoolExecutor对象传给了内部的ExecutorService接口。只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法,也不能重新设置线程池的大小。
  • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
    • 对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

为什么使用 FinalizableDelegatedExecutorService

  • 它包装了 ThreadPoolExecutor,只暴露出 ExecutorService 接口,屏蔽修改线程数的能力,防止用户破坏单线程语义。

img

总结Executors 工厂类提供的三种线程池工具方法
方法 核心线程 最大线程 队列类型 特点 适用场景
newFixedThreadPool(n) n n 无界队列 固定线程数,不会销毁线程 任务量已知、执行时间较长
newCachedThreadPool() 0 SynchronousQueue 救急线程多、空闲销毁 短任务量大、并发压力突发
newSingleThreadExecutor() 1 1 无界队列 单线程串行执行,线程异常会被重建 顺序执行任务、串行日志写入等
7.ThreadPoolExecutor 中任务的提交方法(submit/invokeAll/invokeAny)
  1. execute(Runnable) — 最简单提交任务(无结果)
  • 没有返回值
  • 适用于只关心任务是否运行,而不关心其结果的情况
1
pool.execute(() -> System.out.println("do work"));

2.submit(Callable) — 提交任务并获得结果(Future)

可使用 future.get() 阻塞等待结果

返回 Future<T> 可异步获取执行结果(Callable与Runnable的差异在于,Callable会返回结果。)

img

1
2
3
4
5
Future<String> future = pool.submit(() -> {
Thread.sleep(1000);
return "ok";
});
System.out.println(future.get())

img

特点:

  • Callable 支持返回值,优于 Runnable
  • get() 方法会阻塞直到结果返回,或异常抛出

3.invokeAll(...) — 提交一组任务,等待全部完成

img

1
2
3
4
List<Future<String>> results = pool.invokeAll(tasks);
for (Future<String> f : results) {
System.out.println(f.get());
}

img

特点:

  • 所有任务并发执行
  • 会阻塞直到所有任务完成
  • 可设置超时时间,超时任务会被取消

4.invokeAny(...) — 提交一组任务,谁先成功就返回谁的结果

1
String result = pool.invokeAny(tasks);

特点:

  • 返回第一个完成的任务结果
  • 其它任务取消
  • 遇到异常或全部失败会抛异常
  • 可设置超时限制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Slf4j(topic = "c.Test1")
public class Test1 {
public static void main(String[] args) throws ExecutionException,InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(3);
String result = pool.invokeAny(Arrays.asList(
()->{
log.debug("begin");
Thread.sleep(1000);
log.debug("end");
return "1";
},
()->{
log.debug("begin");
Thread.sleep(500);
log.debug("end");
return "2";
},
()->{
log.debug("begin");
Thread.sleep(2000);
log.debug("end");
return "3";
}
));
log.debug("{}",result);
}
}

img

img

8.线程池关闭机制

线程池一旦关闭不能再提交任务,否则抛出 RejectedExecutionException

1.shutdown() — 平缓关闭

  • 拒绝新任务
  • 执行完队列中的任务
  • 不会中断正在执行的线程
1
pool.shutdown();

img

下面的例子可以说明:调用shutdown之后,已经提交的任务会执行完,但不能加入新的任务:

img

2.shutdownNow() — 立即强制关闭

  • 拒绝新任务
  • 中断正在执行的线程
  • 返回队列中尚未执行的任务
1
2
List<Runnable> pending = pool.shutdownNow();
System.out.println(pending);

img

如下图可见调用shutdownNow之后已有的任务便不会再执行,也不会再加入新的任务:

img

3.awaitTermination(timeout) — 等待线程池完全终止

  • 阻塞调用线程直到线程池关闭
  • 返回是否在规定时间内成功终止
1
2
pool.shutdown();
pool.awaitTermination(5, TimeUnit.SECONDS);

img

img

总结:

方法 特点 是否阻塞主线程 是否返回结果
execute() 无返回值,提交 Runnable
submit() 返回 Future,可调用 get() ✅(通过 get)
invokeAll() 一组任务,等待全部完成
invokeAny() 一组任务,返回第一个成功结果

关闭方法:

关闭方法 拒绝新任务 取消运行中任务 等待已有任务
shutdown()
shutdownNow() ❌(立即返回)

img

9.任务调度线程池

主要是 ScheduledExecutorService 的使用方式和替代传统 Timer 的优势。

定时任务的前世今生

1.java.util.Timer(早期做法)

  • 最早用于 JDK 1.3 之前的定时任务调度工具
  • 所有任务由同一个线程顺序执行:一个任务阻塞会导致下一个任务延迟
  • 无法应对任务执行时间不确定、异常等复杂情况

所有任务都是由同一个线程来调度,因此所有的任务都是串行执行的。

只要前面有任务存在延迟或者异常,都会影响到后面的任务。

img

Timer的问题

img

2.现代替代:ScheduledExecutorService

JDK 1.5 引入,解决 Timer 缺陷,支持线程池执行定时任务。

1
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
  • 多线程调度,互不阻塞
  • 支持延时执行、周期执行
  • 可设置固定速率或固定延迟
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j(topic = "c.Test1")
public class Test1 {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
pool.schedule(()->{
log.debug("task1");
sleep(2);
// int i = 1/0;
},1,TimeUnit.SECONDS);
pool.schedule(()->{
log.debug("task2");
},1,TimeUnit.SECONDS);
}
}

可以发现尽管任务1存在延时,但两个线程都是并行执行的。

3.三种使用方式讲解

img

img

img

方法 启动延时 间隔计算方式 特点
schedule() 仅执行一次 延时执行一次
scheduleAtFixedRate() 固定周期调度(不考虑任务执行时间) 任务慢会被“挤压”
scheduleWithFixedDelay() 每次任务执行完后延迟一段时间再执行 任务执行完再计时,稳定不堆积

img

10.线程池中如何正确处理任务执行过程中发生的异常

默认情况下,Java 线程池并不会直接抛出任务执行的异常,而是悄悄“吞掉”了,如果不做额外处理就会导致异常“沉没”,不利于排查 bug 或做出补偿机制。

1.问题现象回顾

线程池中任务出错,不会抛异常?

当你用线程池执行任务,任务内部抛出异常时:

  • 控制台不会显示错误
  • 主线程不会收到通知
  • 没有 try/catchget() 的话,异常就“消失了”

2.两种正确处理方式

方法一:主动在任务中 try-catch 捕获异常

这是最直接、推荐的方式。

1
2
3
4
5
6
7
8
9
10
ExecutorService pool = Executors.newFixedThreadPool(1);

pool.submit(() -> {
try {
log.debug("task1");
int i = 1 / 0; // 抛出异常
} catch (Exception e) {
log.error("error:", e);
}
});

输出

1
2
3
[pool-1-thread-1] - task1
[pool-1-thread-1] - error:
java.lang.ArithmeticException: / by zero

优点:

  • 能看到错误堆栈
  • 可进行日志记录、报警、重试等处理
  • 最通用、最灵活的方式

方法二:借助 Future.get() 捕获异常

适用于你通过 submit() 提交任务,并希望在主线程中统一处理异常的场景。

1
2
3
4
5
6
7
Future<Boolean> f = pool.submit(() -> {
log.debug("task1");
int i = 1 / 0;
return true;
});

log.debug("result:{}", f.get()); // 会抛出异常

输出:

1
2
3
[pool-1-thread-1] - task1
Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.ArithmeticException: / by zero

特点:

  • 如果任务抛异常,Future.get() 会包装成 ExecutionException 抛出
  • 可以在调用 get() 的地方统一处理异常

注意:

若 lambda 中没有返回值,就不会自动识别为 Callable,那返回的 Future 实际上没法获取到异常。

3.总结

方式 是否能看到异常 异常处理位置 推荐场景
try-catch 包裹任务体 任务线程内部 日志记录、补偿逻辑等需要
Future.get() 捕获 提交任务的主线程 多任务批量处理统一异常汇总
不做任何处理 无法获取 非推荐方式,会导致“异常沉没”

4.建议

  1. 所有通过线程池执行的任务,都应保证异常被显式处理
  2. 如果是并发场景,推荐结合 Future.get() 做统一异常控制和聚合
  3. 复杂系统中,可封装统一的线程池提交工具类,内部统一捕获/记录异常
11.应用之定时任务

如何让每周四 18:00:00 定时执行任务?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 获得当前时间
LocalDateTime now = LocalDateTime.now();
// 获取本周四 18:00:00.000
LocalDateTime thursday =
now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0);
// 如果当前时间已经超过 本周四 18:00:00.000, 那么找下周四 18:00:00.000
if(now.compareTo(thursday) >= 0) {
thursday = thursday.plusWeeks(1);
}
// 计算时间差,即延时执行时间
long initialDelay = Duration.between(now, thursday).toMillis();
// 计算间隔时间,即 1 周的毫秒值
long oneWeek = 7 * 24 * 3600 * 1000;
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
System.out.println("开始时间:" + new Date());
executor.scheduleAtFixedRate(() -> {
System.out.println("执行时间:" + new Date());
}, initialDelay, oneWeek, TimeUnit.MILLISECONDS);

img

img

img

img

12.Tomcat 线程池

1.Tomcat 中线程池的整体架构

Tomcat 在哪里用到了线程池呢

img

模块角色说明:

模块 职责说明
Acceptor 只负责接收 socket 连接(监听端口)
Poller 监听连接的 I/O 事件(是否可读),基于 NIO
Executor 线程池,负责真正处理请求逻辑(servlet 调用等)

图中流程说明:

  1. 用户连接发起请求
  2. Acceptor 接收连接
  3. Poller 发现可读,封装为 SocketProcessor
  4. 提交 SocketProcessorExecutor 线程池
  5. 线程池中工作线程处理业务逻辑
  • LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
  • Acceptor 只负责【接收新的 socket 连接】
  • Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
  • 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
  • Executor 线程池中的工作线程最终负责【处理请求】

2.Tomcat 的线程池行为特点

Tomcat 的 Executor 本质上是对 ThreadPoolExecutor 的扩展

行为稍有不同:

img

相关源码分析:

源码 tomcat-7.0.42

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
// 尝试再次强行放入队列
if (!queue.force(command, timeout, unit)) {
throw new RejectedExecutionException("Queue capacity is full.");
}
} else {
throw rx;
}
}
}

重点理解:

  • force() 是 Tomcat 特有逻辑,不是 Java 原生线程池的默认行为
  • 它可以避免请求高峰时任务被直接拒绝,提高系统的鲁棒性和服务承载能力

3.连接器 Connector 配置解析

Tomcat 通过 <Connector><Executor> 进行线程池配置。

Connector 中可调参数(简版)

配置项 默认值 说明
acceptorThreadCount 1 负责 accept socket 的线程数量
pollerThreadCount 1 负责监听 socket channel 的线程数
minSpareThreads 10 核心线程数(corePoolSize)
maxThreads 200 最大线程数(maximumPoolSize)
executor - 对应的 Executor 名称

Executor 中可调参数(高级配置)

配置项 默认值 说明
threadPriority 5 线程优先级
deamon true 是否守护线程
minSpareThreads 25 核心线程数,即corePoolSize
maxThreads 200 最大线程数,即 maximumPoolSize
maxIdleTime 60000 线程生存时间( 线程闲置多久可回收),单位是毫秒,默认值即 1 分钟
maxQueueSize Integer.MAX_VALUE 任务队列容量(默认无限)
prestartminSpareThreads false 是否在启动时预热核心线程

img

图中展示的是 Tomcat 请求到达后的处理流程:

  1. 客户端发起连接
  2. Acceptor 接收后传给 Poller
  3. Poller 监听读事件后将任务包装为 SocketProcessor
  4. 提交到线程池(Executor)
  5. 线程池中的工作线程开始执行实际的 Servlet 处理逻辑

整个模型体现了解耦 + 高并发分工的思想。

4.总结

模块 作用 特点
Acceptor 只接连接 非阻塞,高性能
Poller NIO监听,触发事件 I/O 线程
Executor 真正业务处理 可调线程池,支持拒绝策略优化

Tomcat 的线程池不仅支持核心线程池扩容机制,还支持通过 force() 增强容错能力,非常适合大并发场景。

三、Fork/Join

1.Fork/Join概念

Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算

Fork/Join 是 Java 提供的一种分治并发编程模型,通过将一个大任务递归拆分成多个小任务并行执行,最后合并结果,提高效率。

特性 说明
分治思想 将大任务分解成子任务,递归执行
多线程并行 每个子任务可以并行执行
工作窃取算法 空闲线程会从其他线程偷任务做,提高资源利用率
自动任务调度 线程池自动调度任务执行,开发者无需干预
2.应用之求和

交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下 面定义了一个对 1~n 之间的整数求和的任务

img

结果

img

用图来表示

img

  • 每次拆一个子任务(深度递归)
  • 形成一条长链,不利于并发

问题:

这个版本虽然用了 fork/join,但每次只 fork 一个子任务,线程利用率不高,多核 CPU 难以发挥。

优化版本:区间划分递归(AddTask3)

img

用图来表示

img

  • 每次拆成两半(平衡递归)
  • 构成二叉树状,线程可并发执行左右子任务,效率更高

补充:异步模式之工作线程模式

重点在于:

  • 工作线程的基本思想
  • 饥饿问题的出现及示例
  • 线程池分工的优化方案
  • 如何根据任务类型(CPU密集型 vs I/O密集型)合理配置线程池大小

1.什么是 Worker Thread 模式?

定义:

让有限数量的工作线程轮流处理无限的任务

这是线程池(ThreadPoolExecutor)的核心思想,也是一种资源复用、任务分发的经典设计模式

  • 每个请求不再配备一个线程(那样太浪费)
  • 而是通过线程池的几个线程重复复用,轮流处理任务
  • 类似“服务员”轮流为不同的顾客点餐

举例类比:

  • 海底捞只有几个服务员(线程)
  • 客人很多(任务)
  • 服务员轮流接待,完成“点餐 → 上菜”的整个流程

2.饥饿问题(Deadlock-like starvation)

问题引入:

你定义了一个固定大小的线程池,假设线程数是 2。

1
ExecutorService executorService = Executors.newFixedThreadPool(2);

你设计的工作流是:

  • 线程 A:处理“点餐”任务,并提交“做菜”任务(再由其他线程做)
  • 点餐线程等待“做菜”任务的结果(调用 .get()
  • 如果此时两个线程都在做“点餐”任务,那么就没人能做菜了 ——> 饥饿!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j(topic = "c.Test1")
public class Test1 {
static final List<String> MENU=Arrays.asList("地三鲜","宫保鸡丁","辣子鸡丁","烤鸡翅");
static Random RANDOM = new Random();
static String cooking(){
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) throws ExecutionException,InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.execute(()->{
log.debug("处理点餐...");
Future<String> f=pool.submit(()->{
log.debug("做菜");
return cooking();
});
try{
log.debug("上菜:{}",f.get());
}catch (InterruptedException|ExecutionException e){
e.printStackTrace();
}
});
pool.execute(()->{
log.debug("处理点餐...");
Future<String> f=pool.submit(()->{
log.debug("做菜");
return cooking();
});
try{
log.debug("上菜:{}",f.get());
}catch (InterruptedException|ExecutionException e){
e.printStackTrace();
}
});

}
}

一个客人可以完美处理,2个客人就处理不动了:

img

img

img

3.解决方案:任务分类,线程池分工

可以设置2种类型的线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j(topic = "c.Test1")
public class Test1 {
static final List<String> MENU=Arrays.asList("地三鲜","宫保鸡丁","辣子鸡丁","烤鸡翅");
static Random RANDOM = new Random();
static String cooking(){
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) throws ExecutionException,InterruptedException {
ExecutorService waitpool = Executors.newFixedThreadPool(1);
ExecutorService cookpool = Executors.newFixedThreadPool(1);
waitpool.execute(()->{
log.debug("处理点餐...");
Future<String> f=cookpool.submit(()->{
log.debug("做菜");
return cooking();
});
try{
log.debug("上菜:{}",f.get());
}catch (InterruptedException|ExecutionException e){
e.printStackTrace();
}
});
}
}

img

4.线程池大小如何设置最合适?

过小的问题:

  • 线程不够 → 任务堆积 → 饥饿、延迟、吞吐低

过大的问题:

  • 线程上下文切换成本大
  • 内存占用高,可能 OOM

一般建议:

1)CPU 密集型任务(比如加解密、计算):

  • 线程数 = CPU 核数 + 1
  • 额外线程是为了防止调度/IO 阻塞带来的 CPU 空转

2)I/O 密集型任务(比如数据库、文件、网络):

经验公式:

1
线程数 = CPU核数 * 期望CPU利用率 * (计算时间 + 等待时间) / 计算时间

举例:

  • 4 核 CPU,计算时间 10%,等待时间 90%
1
线程数 = 4 * 1 * (10% + 90%) / 10% = 40
场景 建议线程数配置 原因
CPU 密集型任务 核心数 + 1 保证 CPU 不空闲
I/O 密集型任务 看公式(一般远大于核心数) 利用 IO 等待时 CPU 空闲时间
任务依赖嵌套时 分多个线程池 避免线程池内部递归调用导致饥饿

8.2JUC

img

1.AQS原理

概述

AQS 全称是 AbstractQueuedSynchronizer,它是 JDK 提供的一个构建锁和同步器的基础框架,很多并发工具类(如 ReentrantLockSemaphoreCountDownLatch 等)都基于它构建。

特点:

  • 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取 锁和释放锁
    • getState - 获取 state 状态
    • setState - 设置 state 状态
    • compareAndSetState - cas 机制设置 state 状态
    • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源

img

  • 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
  • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

核心思想:状态 + 队列 + 阻塞

模块 描述
state 用于表示资源状态(如 0 表示未占用,1 表示被占用)
FIFO 队列 等待线程都会被构造成节点挂入队列(CLH 队列)
park/unpark 用来阻塞/唤醒线程,替代废弃的 suspend/resume

子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

获取锁的姿势(独占模式)

1
2
3
4
5
if (!tryAcquire(arg)) {
// 获取失败,加入队列并阻塞
enqueue();
park(); // 直到被唤醒
}

释放锁的姿势(独占模式)

1
2
3
4
if (tryRelease(arg)) {
// 成功释放,唤醒队列中的下一个等待线程
unparkSuccessor();
}

自定义不可重入锁实现

通过自定义同步器 MySync 继承 AQS:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
final class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int acquires) {
if (acquires == 1){
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}
return false;
}
@Override
protected boolean tryRelease(int acquires) {
if(acquires == 1) {
if(getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
return false;
}
protected Condition newCondition() {
return new ConditionObject();
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
自定义锁

有了自定义同步器,很容易复用 AQS ,实现一个功能完备的自定义锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class MyLock implements Lock {
static MySync sync = new MySync();
@Override
// 尝试,不成功,进入等待队列
public void lock() {
sync.acquire(1);
}
@Override
// 尝试,不成功,进入等待队列,可打断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
// 尝试一次,不成功返回,不进入队列
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
// 尝试,不成功,进入等待队列,有时限
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
// 释放锁
public void unlock() {
sync.release(1);
}
@Override
// 生成条件变量
public Condition newCondition() {
return sync.newCondition();
}
}

测试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
sleep(1);
} finally {
log.debug("unlocking...");
lock.unlock();
}
},"t1").start();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
} finally {
log.debug("unlocking...");
lock.unlock();
}
},"t2").start();

输出

1
2
3
4
22:29:28.727 c.TestAqs [t1] - locking... 
22:29:29.732 c.TestAqs [t1] - unlocking...
22:29:29.732 c.TestAqs [t2] - locking...
22:29:29.732 c.TestAqs [t2] - unlocking...

输出说明:

  • t1 先获取锁并持有 1 秒
  • t2 阻塞等待,直到 t1 释放后再执行

不可重入测试

如果改为下面代码,会发现自己也会被挡住(只会打印一次 locking)

1
2
3
4
lock.lock();
log.debug("locking...");
lock.lock();
log.debug("locking...");

AQS 队列模型(基于 CLH)

img

目标

AQS 要实现的功能目标

  • 阻塞版本获取锁 acquire 和非阻塞的版本尝试获取锁 tryAcquire
  • 获取锁超时机制
  • 通过打断取消机制
  • 独占机制及共享机制
  • 条件不满足时的等待机制

要实现的性能目标

Instead, the primary performance goal here is scalability: to predictably maintain efficiency even, or especially, when synchronizers are contended.

设计

AQS 的基本思想其实很简单

获取锁的逻辑

1
2
3
4
5
6
while(state 状态不允许获取) {
if(队列中还没有此线程) {
入队并阻塞
}
}
当前线程出队

释放锁的逻辑

1
2
3
if(state 状态允许了) {
恢复阻塞的线程(s)
}

要点

  • 原子维护 state 状态
  • 阻塞及恢复线程
  • 维护队列
  1. state 设计

    • state 使用 volatile 配合 cas 保证其修改时的原子性
    • state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想
  2. 阻塞恢复设计

    • 早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume 那么 suspend 将感知不到
    • 解决方法是使用 park & unpark 来实现线程的暂停和恢复,具体原理在之前讲过了,先 unpark 再 park 也没 问题
    • park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细
    • park 线程还可以通过 interrupt 打断
  3. 队列设计

    • 使用了 FIFO 先入先出队列,并不支持优先级队列
    • 设计时借鉴了 CLH 队列,它是一种单向无锁队列

img

  • 类似一个链表结构,每个线程通过节点 Node 加入队列
  • 使用 park/unpark 方式挂起/唤醒线程,效率更高、可中断

队列中有 head 和 tail 两个指针节点,都用 volatile 修饰配合 cas 使用,每个节点有 state 维护节点状态 入队伪代码,只需要考虑 tail 赋值的原子性

1
2
3
4
5
do {
// 原来的 tail
Node prev = tail;
// 用 cas 在原来 tail 的基础上改为 node
} while(tail.compareAndSet(prev, node))

出队伪代码

1
2
3
4
5
// prev 是上一个节点
while((Node prev=node.prev).state != 唤醒状态) {
}
// 设置头节点
head = node;

CLH 好处:

  • 无锁,使用自旋
  • 快速,无阻塞

AQS 在一些方面改进了 CLH

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 队列中还没有元素 tail 为 null
if (t == null) {
// 将 head 从 null -> dummy
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 将 node 的 prev 设置为原来的 tail
node.prev = t;
// 将 tail 从原来的 tail 设置为 node
if (compareAndSetTail(t, node)) {
// 原来 tail 的 next 设置为 node
t.next = node;
return t;
}
}
}
}
主要用到 AQS 的并发工具类

img

2.ReentrantLock 原理

先回顾一下上篇学过的ReentrantLock的特性

img

img

非公平锁实现原理

什么是非公平锁?

非公平锁指的是线程在获取锁时不关心队列中是否有等待线程,只要锁是空闲的,谁先 CAS 成功谁就获得锁

ReentrantLock 默认是 非公平锁

1
2
3
public ReentrantLock() {
sync = new NonfairSync(); // 默认使用非公平策略
}
加锁解锁流程

NonfairSync 继承自 AQS 没有竞争时

img

第一个竞争出现时

img

Thread-1 执行了

  1. CAS 尝试将 state 由 0 改为 1,结果失败
  2. 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败

img

3.接下来进入 addWaiter 逻辑,构造 Node 队列

img

图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态

Node 的创建是懒惰的

其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

img

4.当前线程进入 acquireQueued 逻辑

img

img

1.acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞

2.如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败

3.进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false

img

4.shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败

5.当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true

6.进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)

img

再次有多个线程经历上述过程竞争失败,变成这个样子

img

Thread-0 释放锁,进入 tryRelease 流程,如果成功

  • 设置 exclusiveOwnerThread 为 null
  • state = 0

img

当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程

找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1

回到 Thread-1 的 acquireQueued 流程

img

如果加锁成功(没有竞争),会设置

  • exclusiveOwnerThread 为 Thread-1,state = 1
  • head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
  • 原本的 head 因为从链表断开,而可被垃圾回收

如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了

img

如果不巧又被 Thread-4 占了先

  • Thread-4 被设置为 exclusiveOwnerThread,state = 1
  • Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
加锁源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

// 加锁实现
final void lock() {
// 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果尝试失败,进入 ㈠
acquire(1);
}

// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquire(int arg) {
// ㈡ tryAcquire
if (
!tryAcquire(arg) &&
// 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}

// ㈡ 进入 ㈢
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

// ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果还没有获得锁
if (c == 0) {
// 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 获取失败, 回到调用处
return false;
}

// ㈣ AQS 继承过来的方法, 方便阅读, 放在此处
//将当前node加入等待队列末尾等待,并返回当前node
private Node addWaiter(Node mode) {
// 将当前线程关联到一个 Node 对象上, 模式为独占模式
Node node = new Node(Thread.currentThread(), mode);
//非公平同步器中有head和tail两个引用分别指向了等待队列的第一个和最后一个节点
//pred指的是node的前驱,从队尾插入,所以pred为tail
Node pred = tail;
// 如果 tail 不为 null, 说明已经有了等待队列了,cas 尝试将 Node 对象加入 AQS 队列尾部
if (pred != null) {
//将node的前驱节点设置为pred
node.prev = pred;
//尝试将队列的tial从当前的pred修改为node
if (compareAndSetTail(pred, node)) {
// 双向链表
pred.next = node;
return node;
}
}
//如果pred为null,说明等待队列还未创建,调用enq方法创建队列
// 尝试将 Node 加入 AQS, 进入 ㈥
enq(node);
return node;
}

// ㈥ AQS 继承过来的方法, 方便阅读, 放在此处
//该方法就是创建等待队列,并将node插入队列的尾部。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
if (compareAndSetHead(new Node())) {
//将head赋值给tail,head和tail同时指向哨兵节点
tail = head;
}
} else {
// cas 尝试将 Node 对象加入 AQS 队列尾部
//设置node的前驱节点为队列的最后一个节点
node.prev = t;
//尝试将队列的尾部从当前的tail设置为node
if (compareAndSetTail(t, node)) {
//将node设为上一个tail的后继节点
t.next = node;
return t;
}
}
}
}

// ㈤ AQS 继承过来的方法, 方便阅读, 放在此处
//在队列中循环等待,只有当排队排到第一名并且获得了锁才能出队并从方法中退出。
//返回打断状态
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//找到当前node的前驱节点
final Node p = node.predecessor();
// 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
if (p == head && tryAcquire(arg)) {
// 获取成功, 设置自己(当前线程对应的 node)为 head
setHead(node);
// 上一个节点 help GC
p.next = null;
failed = false;
// 返回中断标记 false
return interrupted;
}
if (
// 判断是否应当 park, 进入 ㈦
shouldParkAfterFailedAcquire(p, node) &&
// park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
parkAndCheckInterrupt()
) {
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}

// ㈦ AQS 继承过来的方法, 方便阅读, 放在此处
//判断acquire失败以后是否应该阻塞等待。从规则上来讲:
//1.如果前驱节点都阻塞了,那么当前节点也应该阻塞
//2.如果前驱节点取消,那么应该将前驱节点前移,直到其状态不为取消为止。
//3.如果前两种情况都不是,尝试将前驱节点状态设为SIGNAL,返回false(不用阻塞,等到下次在阻塞)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取上一个节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) {
// 上一个节点都在阻塞, 那么自己也阻塞好了
return true;
}
// > 0 表示取消状态
if (ws > 0) {
// 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 这次还没有阻塞
// 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

// ㈧ 阻塞当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
}

注意

是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL 来决定,而不是本节点的 waitStatus 决定

总结:

  • 调用

    1
    lock

    ,尝试将state从0修改为1

    • 成功:将owner设为当前线程

    • 失败:调用

      1
      acquire

      ->

      1
      tryAcquire

      ->

      1
      nonfairTryAcquire

      ,判断state=0则获得锁,或者state不为0但当前线程持有锁则重入锁,以上两种情况

      1
      tryAcquire

      返回true,剩余情况返回false。

      • true:获得锁

      • false:调用

        1
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

        ,其中

        1
        addwiter

        将关联线程的节点插入AQS队列尾部,进入

        1
        acquireQueued

        中的for循环:

        • 如果当前节点是头节点,并尝试获得锁成功,将当前节点设为头节点,清除此节点信息,返回打断标记。
        • 调用shoudParkAfterFailure,第一次调用返回false,并将前驱节点改为-1,第二次循环如果再进入此方法,会进入阻塞并检查打断的方法。
解锁源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
// 解锁实现
public void unlock() {
sync.release(1);
}

// AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean release(int arg) {
// 尝试释放锁, 进入 ㈠
if (tryRelease(arg)) {
// 队列头节点 unpark
Node h = head;
if (
// 队列不为 null
h != null &&
// waitStatus == Node.SIGNAL 才需要 unpark
h.waitStatus != 0
) {
// unpark AQS 中等待的线程, 进入 ㈡
unparkSuccessor(h);
}
return true;
}
return false;
}

// ㈠ Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

// ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
private void unparkSuccessor(Node node) {
// 如果状态为 Node.SIGNAL 尝试重置状态为 0
// 不成功也可以
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
Node s = node.next;
// 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
}

总结:

  • unlock->syn.release(1)->tryRelease(1),如果当前线程并不持有锁,抛异常。state减去1,如果之后state为0,解锁成功,返回true;如果仍大于0,表示解锁不完全,当前线程依旧持有锁,返回false。

  • 返回true:检查AQS队列第一个节点状态图是否为

    1
    SIGNAL

    (意味着有责任唤醒其后记节点),如果有,调用

    1
    unparkSuccessor

    • unparkSuccessor中,不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点,如果有,将其唤醒。
  • 返回false:

img

img

非公平性体现在哪?

非公平锁不会保证队列中“排在最前面的线程”优先获取锁。

例如,当 Thread-1 刚被唤醒,还未再次抢到锁时,若这时 Thread-4 新来并立即执行 compareAndSetState(0, 1) 成功,Thread-1 就要继续排队,Thread-4 获得锁

img

img

state为锁的重入量

img

阶段 方法 说明
加锁入口 lock() 非公平锁尝试获取锁
CAS失败 acquire(1) 进入 AQS 获取流程
加入队列 addWaiter(Node.EXCLUSIVE) 将当前线程包装成 Node 入队
自旋尝试 acquireQueued(...) 死循环中不断尝试获取锁
是否阻塞 shouldParkAfterFailedAcquire 判断是否该 park 当前线程
阻塞当前线程 parkAndCheckInterrupt 线程进入等待
解锁入口 unlock() 调用 release(1)
成功释放 tryRelease() 修改 state,并唤醒
唤醒队列 unparkSuccessor() 找到下一个有效节点并唤醒
可重入原理

可重入锁表示:一个线程获取了锁后,如果再次请求该锁,不会被阻塞,而是会直接获得锁并将计数加一。释放锁时则需多次 unlock(),直到重入次数清零。

当持有锁的线程再次尝试获取锁时,会将state的值加1,state表示锁的重入量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static final class NonfairSync extends Sync {
// ...

// Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}

img

img

可打断原理

1.不可打断模式(默认 lock)

在此模式下,即使它被打断,仍会驻留在 AQS 队列中,并将打断信号存储在一个interrupt变量中。一直要等到获得锁后方能得知自己被打断了,并且调用selfInterrupt方法打断自己。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
// ...

private final boolean parkAndCheckInterrupt() {
// 如果打断标记已经是 true, 则 park 会失效
LockSupport.park(this);
// interrupted 会清除打断标记
return Thread.interrupted();
}

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
// 还是需要获得锁后, 才能返回打断状态
return interrupted;
}
if (
shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()
) {
// 如果是因为 interrupt 被唤醒, 返回打断状态为 true
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}

public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
// 如果打断状态为 true
selfInterrupt();
}
}

//响应打断标记,打断自己
static void selfInterrupt() {
// 重新产生一次中断
Thread.currentThread().interrupt();
}
}

img

可打断模式

此模式下即使线程在等待队列中等待,一旦被打断,就会立刻抛出打断异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static final class NonfairSync extends Sync {
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 如果没有获得到锁, 进入 ㈠
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

// ㈠ 可打断的获取锁流程
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
// 在 park 过程中如果被 interrupt 会进入此
// 这时候抛出异常, 而不会再次进入 for (;;)
throw new InterruptedException();
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}

img

img

img

img

公平锁实现原理

与之前默认的非公平锁(NonfairSync)相比,公平锁(FairSync)的核心变化在于它避免“插队”,遵循“先来先得”原则

简而言之,公平与非公平的区别在于,公平锁中的tryAcquire方法被重写了,新来的线程即便得知了锁的state为0,也要先判断等待队列中是否还有线程等待,只有当队列没有线程等待式,才获得锁。

公平锁 VS 非公平锁的区别

特性 非公平锁(默认) 公平锁
锁竞争策略 可以直接抢锁 必须排队
插队行为 允许插队 禁止插队
tryAcquire 是否判断队列 ✅是(核心)

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}

// AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// 与非公平锁主要区别在于 tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
//存疑
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h != t 时表示队列中有 Node
return h != t &&
(
// (s = h.next) == null 表示队列中还有没有老二
(s = h.next) == null ||
// 或者队列中老二线程不是此线程
s.thread != Thread.currentThread()
);
}
}

代码解读:

img

img

img

img

img

条件变量Condition实现原理

什么是 Condition?

Condition 是配合 ReentrantLock 使用的一个等待/通知机制。每个 Condition 维护了一个条件等待队列(单向链表结构),称为 Condition队列

img

每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject

await 流程

img

开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程

创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部

img

接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁

img

unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功

img

park 阻塞 Thread-0

img

img

总结:

  • 创建一个节点,关联当前线程,并插入到当前Condition队列的尾部
  • 调用fullRelease,完全释放同步器中的锁,并记录当前线程的锁重入数
  • 唤醒(park)AQS队列中的第一个线程
  • 调用park方法,阻塞当前线程。

signal 流程

img

假设 Thread-1 要来唤醒 Thread-0

img

进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node

img

执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1

img

Thread-1 释放锁,进入 unlock 流程,略

img

总结:

  • 当前持有锁的线程唤醒等待队列中的线程,调用doSignal或doSignalAll方法,将等待队列中的第一个(或全部)节点插入到AQS队列中的尾部。
  • 将插入的节点的状态从Condition设置为0,将插入节点的前一个节点的状态设置为-1(意味着要承担唤醒后一个节点的责任)
  • 当前线程释放锁,parkAQS队列中的第一个节点线程。

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;

// 第一个等待节点
private transient Node firstWaiter;

// 最后一个等待节点
private transient Node lastWaiter;
public ConditionObject() { }
// ㈠ 添加一个 Node 至等待队列
private Node addConditionWaiter() {
Node t = lastWaiter;
// 所有已取消的 Node 从队列链表删除, 见 ㈡
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个关联当前线程的新 Node, 添加至队列尾部
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 唤醒 - 将没取消的第一个节点转移至 AQS 队列
private void doSignal(Node first) {
do {
// 已经是尾节点了
if ( (firstWaiter = first.nextWaiter) == null) {
lastWaiter = null;
}
first.nextWaiter = null;
} while (
// 将等待队列中的 Node 转移至 AQS 队列, 不成功且还有节点则继续循环 ㈢
!transferForSignal(first) &&
// 队列还有节点
(first = firstWaiter) != null
);
}

// 外部类方法, 方便阅读, 放在此处
// ㈢ 如果节点状态是取消, 返回 false 表示转移失败, 否则转移成功
final boolean transferForSignal(Node node) {
// 如果状态已经不是 Node.CONDITION, 说明被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 加入 AQS 队列尾部
Node p = enq(node);
int ws = p.waitStatus;
if (
// 上一个节点被取消
ws > 0 ||
// 上一个节点不能设置状态为 Node.SIGNAL
!compareAndSetWaitStatus(p, ws, Node.SIGNAL)
) {
// unpark 取消阻塞, 让线程重新同步状态
LockSupport.unpark(node.thread);
}
return true;
}
// 全部唤醒 - 等待队列的所有节点转移至 AQS 队列
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

// ㈡
private void unlinkCancelledWaiters() {
// ...
}
// 唤醒 - 必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 全部唤醒 - 必须持有锁才能唤醒, 因此 doSignalAll 内无需考虑加锁
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 不可打断等待 - 直到被唤醒
public final void awaitUninterruptibly() {
// 添加一个 Node 至等待队列, 见 ㈠
Node node = addConditionWaiter();
// 释放节点持有的锁, 见 ㈣
int savedState = fullyRelease(node);
boolean interrupted = false;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// park 阻塞
LockSupport.park(this);
// 如果被打断, 仅设置打断状态
if (Thread.interrupted())
interrupted = true;
}
// 唤醒后, 尝试竞争锁, 如果失败进入 AQS 队列
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

// ㈡
private void unlinkCancelledWaiters() {
// ...
}
// 唤醒 - 必须持有锁才能唤醒, 因此 doSignal 内无需考虑加锁
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 全部唤醒 - 必须持有锁才能唤醒, 因此 doSignalAll 内无需考虑加锁
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 不可打断等待 - 直到被唤醒
public final void awaitUninterruptibly() {
// 添加一个 Node 至等待队列, 见 ㈠
Node node = addConditionWaiter();
// 释放节点持有的锁, 见 ㈣
int savedState = fullyRelease(node);
boolean interrupted = false;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// park 阻塞
LockSupport.park(this);
// 如果被打断, 仅设置打断状态
if (Thread.interrupted())
interrupted = true;
}
// 唤醒后, 尝试竞争锁, 如果失败进入 AQS 队列
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}

// 外部类方法, 方便阅读, 放在此处
// ㈣ 因为某线程可能重入,需要将 state 全部释放
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// 打断模式 - 在退出等待时重新设置打断状态
private static final int REINTERRUPT = 1;
// 打断模式 - 在退出等待时抛出异常
private static final int THROW_IE = -1;
// 判断打断模式
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// ㈤ 应用打断模式
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
// 等待 - 直到被唤醒或打断
public final void await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 添加一个 Node 至等待队列, 见 ㈠
Node node = addConditionWaiter();
// 释放节点持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// park 阻塞
LockSupport.park(this);
// 如果被打断, 退出等待队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 退出等待队列后, 还需要获得 AQS 队列的锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 所有已取消的 Node 从队列链表删除, 见 ㈡
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 应用打断模式, 见 ㈤
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//向Condition中的等待队列中新增节点,并将此节点返回
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

//判断当前节点是否在同步器中的队列中等待锁
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
// 等待 - 直到被唤醒或打断或超时
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 添加一个 Node 至等待队列, 见 ㈠
Node node = addConditionWaiter();
// 释放节点持有的锁
int savedState = fullyRelease(node);
// 获得最后期限
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
// 如果该节点还没有转移至 AQS 队列, 阻塞
while (!isOnSyncQueue(node)) {
// 已超时, 退出等待队列
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// park 阻塞一定时间, spinForTimeoutThreshold 为 1000 ns
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 如果被打断, 退出等待队列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
// 退出等待队列后, 还需要获得 AQS 队列的锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 所有已取消的 Node 从队列链表删除, 见 ㈡
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 应用打断模式, 见 ㈤
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
// 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
public final boolean awaitUntil(Date deadline) throws InterruptedException {
// ...
}
// 等待 - 直到被唤醒或打断或超时, 逻辑类似于 awaitNanos
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
// ...
}
// 工具方法 省略 ...
}

两类等待队列对比

类型 队列名称 作用
Condition 队列 firstWaiter 保存 await 的线程
AQS 队列 CLH 队列 正在等待锁的线程

重要:await 线程必须先从 Condition 队列被 signal 唤醒 → 再转移到 AQS 队列参与锁竞争。

img

img

img

3.读写锁之ReentrantReadWriteLock

ReentrantReadWriteLock介绍

什么是 ReentrantReadWriteLock

1.定义

它将锁分为两种类型:

  • 读锁 ReadLock:多个线程可以同时获得,适用于只读操作。
  • 写锁 WriteLock:同一时刻只能被一个线程获得,适用于写操作。

读-读不互斥,读-写/写-写互斥

当读操作远远高于写操作时,这时候使用读写锁读-读可以并发,提高性能。 类似于数据库中的select ... from ... lock in share mode

2.读写锁的应用:数据容器示例

提供一个数据容器类内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class DataContainer {
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
public Object read() {
log.debug("获取读锁...");
r.lock();
try {
log.debug("读取");
sleep(1);
return data;
} finally {
log.debug("释放读锁...");
r.unlock();
}
}
public void write() {
log.debug("获取写锁...");
w.lock();
try {
log.debug("写入");
sleep(1);
} finally {
log.debug("释放写锁...");
w.unlock();
}
}
}

img

(1)测试读锁-读锁可以并发

1
2
3
4
5
6
7
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.read();
}, "t2").start();

输出结果,从这里可以看到 Thread-0 锁定期间,Thread-1 的读操作不受影响(说明 读-读不互斥

1
2
3
4
5
6
14:05:14.341 c.DataContainer [t2] - 获取读锁... 
14:05:14.341 c.DataContainer [t1] - 获取读锁...
14:05:14.345 c.DataContainer [t1] - 读取
14:05:14.345 c.DataContainer [t2] - 读取
14:05:15.365 c.DataContainer [t2] - 释放读锁...
14:05:15.386 c.DataContainer [t1] - 释放读锁...

(2)测试读锁-写锁相互阻塞

1
2
3
4
5
6
7
8
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
Thread.sleep(100);
new Thread(() -> {
dataContainer.write();
}, "t2").start();

输出结果(写线程必须等待读线程释放锁后才能进行)

1
2
3
4
5
6
14:04:21.838 c.DataContainer [t1] - 获取读锁... 
14:04:21.838 c.DataContainer [t2] - 获取写锁...
14:04:21.841 c.DataContainer [t2] - 写入
14:04:22.843 c.DataContainer [t2] - 释放写锁...
14:04:22.843 c.DataContainer [t1] - 读取
14:04:23.843 c.DataContainer [t1] - 释放读锁...

(3)写锁-写锁也是相互阻塞的,这里就不测试了

3.注意事项

  • 读锁不支持条件变量

你不能对 readLock() 使用 await() / signal(),仅写锁支持条件等待机制。

  • 重入时升级不支持(读 → 写):即持有读锁的情况下去获取写锁,会导致获取写锁永久等待

img

  • 重入时降级支持(写 → 读):即持有写锁的情况下去获取读锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class CachedData {
Object data;
// 是否有效,如果失效,需要重新计算 data
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// 获取写锁前必须释放读锁
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock();
}
}
// 自己用完数据, 释放读锁
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}

img

img

总结:

情况 是否互斥 特性
读 vs 读 并发执行
读 vs 写 写操作等待
写 vs 写 排他操作
读 → 写升级 ❌ 死锁 不允许
写 → 读降级 支持缓存优化等场景
读锁支持条件变量 不支持 await/signal
写锁支持条件变量 可用于线程协调

ReentrantReadWriteLock应用到缓存

这部分内容讲的是使用 ReentrantReadWriteLock 实现一致性缓存 的应用示例,同时也探讨了“缓存更新策略”的两个选择:先删缓存 vs 先改数据库

缓存更新策略

1.先删缓存,再改数据库(推荐)

  • 流程:先删除缓存 → 更新数据库
  • 可能的问题:
    • 删除缓存后,还没来得及更新数据库,此时有查询请求进入,查不到缓存就去查数据库,查到旧值后又写入缓存,造成脏数据回写缓存

即“缓存被删 + 查询刚好到达”的小概率问题

先删缓存,写库慢

img

→脏数据写回缓存

查询请求刚好穿透缓存失效(假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询)

img

→缓存脏回写(极小概率)

2.先改数据库,再删缓存

  • 流程:先更新数据库 → 删除缓存
  • 可能的问题:
    • 如果删除缓存失败(例如 Redis 宕机),新旧数据同时存在于缓存和数据库,造成数据不一致

先更新数据库,删缓存失败

img

→ 数据不一致

使用 ReentrantReadWriteLock 实现缓存一致性

基本设计思路:

  • 使用 HashMap 作为缓存,非线程安全 → 用 读写锁保护
  • 写操作加写锁: 只能一个线程写入缓存(或清空缓存)
  • 读操作加读锁: 允许多个线程同时读缓存
  • 如果缓存未命中:
    • 升级为写锁,再次检查缓存
    • 如果依旧未命中 → 查询数据库 → 写入缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class GenericCachedDao<T> {
// HashMap 作为缓存非线程安全, 需要保护
HashMap<SqlPair, T> map = new HashMap<>();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
GenericDao genericDao = new GenericDao();
public int update(String sql, Object... params) {
SqlPair key = new SqlPair(sql, params);
// 加写锁, 防止其它线程对缓存读取和更改
lock.writeLock().lock();
try {
int rows = genericDao.update(sql, params);
map.clear();
return rows;
} finally {
lock.writeLock().unlock();
}
}
public T queryOne(Class<T> beanClass, String sql, Object... params) {
SqlPair key = new SqlPair(sql, params);
// 加读锁, 防止其它线程对缓存更改
lock.readLock().lock();
try {
T value = map.get(key);
if (value != null) {
return value;
}
} finally {
lock.readLock().unlock();
}
// 加写锁, 防止其它线程对缓存读取和更改
lock.writeLock().lock();
try {
// get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据
// 为防止重复查询数据库, 再次验证
T value = map.get(key);
if (value == null) {
// 如果没有, 查询数据库
value = genericDao.queryOne(beanClass, sql, params);
map.put(key, value);
}
return value;
} finally {
lock.writeLock().unlock();
}
}
// 作为 key 保证其是不可变的
class SqlPair {
private String sql;
private Object[] params;
public SqlPair(String sql, Object[] params) {
this.sql = sql;
this.params = params;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SqlPair sqlPair = (SqlPair) o;
return sql.equals(sqlPair.sql) &&
Arrays.equals(params, sqlPair.params);
}
@Override
public int hashCode() {
int result = Objects.hash(sql);
result = 31 * result + Arrays.hashCode(params);
return result;
}
}
}

核心代码解读

img

img

存在的问题

问题点 描述
写频繁性能低 写锁独占,阻塞所有读操作
粗暴清缓存 每次写操作都清空整个 map,效率差
缓存无容量控制 可能 OOM
无过期机制 数据一直驻留内存
单机可用 不适合分布式部署
并发不高 所有数据共用一把锁,容易成为瓶颈
无缓存更新粒度 可以设计更细的 key 维度

ReentrantReadWriteLock原理

核心原理回顾

  • ReentrantReadWriteLock 读写锁使用同一个 Sync 同步器(继承自 AQS)。
  • 状态 state 的高 16 位用于读锁计数,低 16 位用于写锁计数
  • 所有锁请求(读或写)都排队进入 AQS 的等待队列(双向链表结构)。
图解流程

读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个

阶段 1:t1 获取写锁,t2 尝试获取读锁失败 t1 w.lock,t2 r.lock

1) t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位

img

2)t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写锁占据,那么 tryAcquireShared 返回 -1 表示失败

tryAcquireShared 返回值表示

  • -1 表示失败
  • 0 表示成功,但后继节点不会继续唤醒
  • 正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1

img

3)这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态

img

4)t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁

5)如果没有成功,在 doAcquireShared 内 for (;;) 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;;) 循环一 次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park

img

img

阶段 2:t3 再次尝试获取读锁,t4 尝试获取写锁 t3 r.lock,t4 w.lock

img

这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子

img

阶段 3:t1 释放写锁,唤醒后继节点(t2) t1 w.unlock

img

这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子

img

接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行

这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一

img

阶段 4:t2 调用 setHeadAndPropagate() 传播唤醒

img

这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

img

事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用 doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行

img

这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一

img

这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点

img

下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点

阶段 5:t2 和 t3 分别释放读锁 t2 r.unlock,t3 r.unlock

img

t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零

img

t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入 doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即

img

阶段 6:t4 是写锁请求,被唤醒后获取锁成功

img

之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;;) 这次自己是老二,并且没有其他 竞争,tryAcquire(1) 成功,修改头结点,流程结束

img

关键词理解

概念 含义
state 高 16 位是读锁计数,低 16 位是写锁计数
tryAcquireShared() 读锁尝试获取逻辑,-1 表示失败
doAcquireShared() 读锁失败后入队,并阻塞等待
unparkSuccessor() 唤醒队列中下一个节点
setHeadAndPropagate() 设置新 head 并传播唤醒
Node.SHARED AQS 共享节点标记

【阶段 1】t1 获取写锁成功
├── 状态:state = 0x0001(低 16 位为写锁计数 = 1)
├── 锁持有者:t1(独占写锁)
└── t2 尝试获取读锁失败 → 进入 AQS 队列(共享模式)

当前队列:Head → t2[共享]


【阶段 2】t3 和 t4 相继加入队列
├── t3 获取读锁失败 → 加入 AQS 队列(共享模式)
├── t4 获取写锁失败 → 加入 AQS 队列(独占模式)
└── 所有尝试都会 CAS 修改前驱节点的 waitStatus = SIGNAL 并阻塞

当前队列:Head → t2[共享] → t3[共享] → t4[独占]


【阶段 3】t1 释放写锁(unlock)
├── tryRelease 成功 → state = 0x0000(写锁计数归 0)
├── 进入 unparkSuccessor → 唤醒 t2(第一个等待节点)
└── t2 醒来后调用 tryAcquireShared 成功 → 获取读锁

更新状态:

  • state = 0x00010000(高 16 位读锁计数 = 1)
  • 锁持有者:t2(共享读锁)

当前队列:Head → t3[共享] → t4[独占]


【阶段 4】t2 执行 setHeadAndPropagate
├── 把自己设为新的 head 节点
├── 判断下一个节点 t3 是共享模式
└── 直接唤醒 t3(共享传播)

t3 醒来后:

  • 再次尝试获取读锁,成功
  • state = 0x00020000(读锁计数 +1)
  • 锁持有者:t2、t3(共享)

当前队列:Head → t4[独占]


【阶段 5】t2、t3 分别释放读锁
├── t2 调用 unlock → state = 0x00010000(仍有 1 个读锁)
│ └── 不传播唤醒
├── t3 调用 unlock → state = 0x00000000(全部读锁释放)
└── doReleaseShared() → 唤醒后继节点 t4

当前队列:Head → t4[独占]


【阶段 6】t4 被唤醒后获取写锁
├── t4 醒来后调用 tryAcquire(独占)成功
├── 更新 state = 0x0001(写锁计数 = 1)
└── 锁持有者:t4(独占写锁)

AQS 队列为空,流程结束

img

源码分析

写锁上锁流程(非公平)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
static final class NonfairSync extends Sync {
// ... 省略无关代码

// 外部类 WriteLock 方法, 方便阅读, 放在此处
public void lock() {
sync.acquire(1);
}

// AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquire(int arg) {
if (
// 尝试获得写锁失败
!tryAcquire(arg) &&
// 将当前线程关联到一个 Node 对象上, 模式为独占模式
// 进入 AQS 队列阻塞
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}

// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryAcquire(int acquires) {

Thread current = Thread.currentThread();
int c = getState();
// 获得低 16 位, 代表写锁的 state 计数
int w = exclusiveCount(c);
//表示有写锁或者有读锁
if (c != 0) {
if (
// c != 0 and w == 0 表示有读锁, 或者
w == 0 ||
// 如果 exclusiveOwnerThread 不是自己
current != getExclusiveOwnerThread()
) {
// 获得锁失败
return false;
}
// 写锁计数超过低 16 位, 报异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 写锁重入, 获得锁成功
setState(c + acquires);
return true;
}
if (
// 判断写锁是否该阻塞, 或者
//非公平锁下,总是返回false
writerShouldBlock() ||
// 尝试更改计数失败
!compareAndSetState(c, c + acquires)
) {
// 获得锁失败
return false;
}
// 获得锁成功
setExclusiveOwnerThread(current);
return true;
}

// 非公平锁 writerShouldBlock 总是返回 false, 无需阻塞
final boolean writerShouldBlock() {
return false;
}
}

总结:

  • lock
    
    1
    2
    3

    ->

    syn.acquire
    1
    2
    3

    ->

    tryAquire
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17

    - 如果有锁:
    - 如果是写锁或者锁持有者不为自己,返回false
    - 如果时写锁且为自己持有,则重入
    - 如果无锁:
    - 判断无序阻塞并设置state成功后,将owner设为自己,返回true

    - 成功,则获得了锁

    - 失败:

    - 调用`acquireQueued(addWaiter(Node.EXCLUSIVE), arg)`进入阻塞队列,将节点状态设置为EXCLUSIVE,之后的逻辑与之前的aquireQueued类似。

    ![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1671.png)

    **写锁释放流程**

static final class NonfairSync extends Sync {
// … 省略无关代码

// WriteLock 方法, 方便阅读, 放在此处
public void unlock() {
    sync.release(1);
}

// AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean release(int arg) {
    // 尝试释放写锁成功
    if (tryRelease(arg)) {
        // unpark AQS 中等待的线程
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    // 因为可重入的原因, 写锁计数为 0, 才算释放成功
    boolean free = exclusiveCount(nextc) == 0;
    if (free) {
        setExclusiveOwnerThread(null);
    }
    setState(nextc);
    return free;
}

}

1
2
3
4
5

总结:

- ```
unlock

->

1
syn.release

->

1
tryRelease
  • state状态减少

    • 如果减为零,表示解锁成功,返回true
    • 没有减为0,当前线程依旧持有锁
  • 成功:解锁成功

    • 如果ASQ队列不为空,则唤醒第一个节点。
  • 失败:解锁失败。

img

读锁上锁流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
static final class NonfairSync extends Sync {

// ReadLock 方法, 方便阅读, 放在此处
public void lock() {
sync.acquireShared(1);
}

// AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquireShared(int arg) {
// tryAcquireShared 返回负数, 表示获取读锁失败
//大于0的情况在读写锁这里无区别,后面信号量会做进一步处理。
if (tryAcquireShared(arg) < 0) {
doAcquireShared(arg);
}
}

// Sync 继承过来的方法, 方便阅读, 放在此处
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 如果是其它线程持有写锁, 获取读锁失败
if (
exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current
) {
return -1;
}
int r = sharedCount(c);
if (
// 读锁不该阻塞(如果老二是写锁,读锁该阻塞), 并且
!readerShouldBlock() &&
// 小于读锁计数, 并且
r < MAX_COUNT &&
// 尝试增加计数成功
compareAndSetState(c, c + SHARED_UNIT)
) {
// ... 省略不重要的代码
return 1;
}
return fullTryAcquireShared(current);
}

// 非公平锁 readerShouldBlock 看 AQS 队列中第一个节点是否是写锁
// true 则该阻塞, false 则不阻塞
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}

// AQS 继承过来的方法, 方便阅读, 放在此处
// 与 tryAcquireShared 功能类似, 但会不断尝试 for (;;) 获取读锁, 执行过程中无阻塞
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
// ... 省略不重要的代码
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
// ... 省略不重要的代码
return 1;
}
}
}

// AQS 继承过来的方法, 方便阅读, 放在此处
private void doAcquireShared(int arg) {
// 将当前线程关联到一个 Node 对象上, 模式为共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 再一次尝试获取读锁
int r = tryAcquireShared(arg);
// 成功
if (r >= 0) {
// ㈠
// r 表示可用资源数, 在这里总是 1 允许传播
//(唤醒 AQS 中下一个 Share 节点)
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (
// 是否在获取读锁失败时阻塞(前一个阶段 waitStatus == Node.SIGNAL)
shouldParkAfterFailedAcquire(p, node) &&
// park 当前线程
parkAndCheckInterrupt()
) {
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}

// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 设置自己为 head
setHead(node);

// propagate 表示有共享资源(例如共享读锁或信号量)
// 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
// 现在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果是最后一个节点或者是等待共享读锁的节点
if (s == null || s.isShared()) {
// 进入 ㈡
doReleaseShared();
}
}
}

// ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
private void doReleaseShared() {
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE, 为了解决 bug, 见后面分析
for (;;) {
Node h = head;
// 队列还有节点
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 下一个节点 unpark 如果成功获取读锁
// 并且下下个节点还是 shared, 继续 doReleaseShared
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
}

总结:

  • lock
    
    1
    2
    3

    ->

    syn.acquireShare
    1
    2
    3

    ->

    tryAcquireShare
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    - 如果其他线程持有写锁:则失败,返回-1
    - 否则:判断无需等待后,将state加上一个写锁的单位,返回1

    - 返回值大于等于0:成功

    - 返回值小于0:

    - 调用doAcquireShare,类似之前的aquireQueued,将当前线程关联节点,状态设置为SHARE,插入AQS队列尾部。在for循环中判断当前节点的前驱节点是否为头节点

    - 是:调用

    tryAcquireShare
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    - 如果返回值大于等于0,则获取锁成功,并调用`setHeadAndPropagate`,出队,并不断唤醒AQS队列中的状态为SHARE的节点,直到下一个节点为EXCLUSIVE。记录打断标记,之后退出方法(不返回打断标记)

    - 判断是否在失败后阻塞

    - 是:阻塞住,并监测打断信号。
    - 否则:将前驱节点状态设为-1。(下一次循环就又要阻塞了)

    ![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1673.png)

    **读锁释放流程**

static final class NonfairSync extends Sync {

// ReadLock 方法, 方便阅读, 放在此处
public void unlock() {
    sync.releaseShared(1);
}

// AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryReleaseShared(int unused) {
    // ... 省略不重要的代码
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc)) {
            // 读锁的计数不会影响其它获取读锁线程, 但会影响其它获取写锁线程
            // 计数为 0 才是真正释放
            return nextc == 0;
        }
    }
}

// AQS 继承过来的方法, 方便阅读, 放在此处
private void doReleaseShared() {
    // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
    // 如果 head.waitStatus == 0 ==> Node.PROPAGATE 
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果有其它线程也在释放读锁,那么需要将 waitStatus 先改为 0
            // 防止 unparkSuccessor 被多次执行
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; // loop to recheck cases
                unparkSuccessor(h);
            }
            // 如果已经是 0 了,改为 -3,用来解决传播性,见后文信号量 bug 分析
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue; // loop on failed CAS
        }
        if (h == head) // loop if head changed
            break;
    }
} 

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

总结:

- `unlock`->`releaseShared`->`tryReleaseShared`,将state减去一个share单元,最后state0则返回true,不然返回false。
- 返回tue:调用`doReleaseShare`,唤醒队列中的节点。
- 返回false:解锁不完全。

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1674.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1675.png)

- [ 写锁加锁流程 ]
WriteLock.lock()

├──> AQS.acquire(1)
│ │
│ └──> tryAcquire(1)
│ ├── 若 state != 0
│ │ ├── 当前线程为 ownerThread → 支持重入,加 state,成功
│ │ └── 否则 → 加锁失败,进入 AQS 队列等待
│ └── 若 state == 0
│ ├── CAS 修改 state 成功 → 获得写锁,设置 ownerThread
│ └── 失败 → 入队等待

└── 获取写锁成功
- [ 写锁释放流程 ]
WriteLock.unlock()

├──> AQS.release(1)
│ │
│ └──> tryRelease(1)
│ ├── state1(可重入释放)
│ └── 若 state == 0
│ ├── 清空 ownerThread
│ └── unparkSuccessor() 唤醒后继线程

└── 写锁释放完成,唤醒下一个(读或写)
- [ 读锁加锁流程 ]
ReadLock.lock()

├──> AQS.acquireShared(1)
│ │
│ └──> tryAcquireShared(1)
│ ├── 若存在其它线程持有写锁 → 返回 -1,失败,入队等待
│ └── 否则:
│ ├── CAS 增加高 16 位读锁计数
│ └── 返回 1,获取成功

├── 获取读锁成功
└── 调用 setHeadAndPropagate()
└── 若下一个节点是共享模式 → 传播唤醒(读锁共享)
- [ 读锁释放流程 ]
ReadLock.unlock()

├──> AQS.releaseShared(1)
│ │
│ └──> tryReleaseShared(1)
│ ├── CAS 减少读锁计数(高 16 位)
│ └── 若 state == 0(全部释放):
│ └── 调用 doReleaseShared()
│ └── 唤醒下一个节点(共享或独占)

└── 读锁释放完成,若为最后一个读线程则唤醒后继线程

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1676.png)

### 4.读写锁之StampedLock

`StampedLock`它是 JDK 8 中引入的一种**高性能读写锁机制**,支持乐观读,并通过“戳(stamp)”机制实现锁状态的校验和控制。

#### 什么是 `StampedLock`

`StampedLock` 是 `java.util.concurrent.locks` 包下的类,提供了:

- 独占写锁 `writeLock()`
- 共享读锁 `readLock()`
- **乐观读** `tryOptimisticRead()`

区别于 `ReentrantReadWriteLock` 的是,它使用一个 `long` 类型的戳(stamp)来代表锁状态,而不是直接依赖 `Lock` 接口。

#### 核心思想:“戳 + 校验”

乐观读

long stamp = lock.tryOptimisticRead();
// 读取数据…
if (!lock.validate(stamp)) {
// 戳校验失败,需要升级为读锁
}

1
2
3
4
5
6
7
8

- 乐观读不会阻塞写线程,性能高
- 但需要在读取后**校验戳**是否仍然有效(即这段时间有没有写线程插入)

锁升级

如果乐观读失败(验戳失败),就会升级成读锁来保证数据一致性:

stamp = lock.readLock();
// …读取数据…
lock.unlockRead(stamp);

1
2
3
4
5

#### 完整示例说明

数据容器类:`DataContainerStamped`

class DataContainerStamped {
private int data;
private final StampedLock lock = new StampedLock();

public int read(int readTime) {
    long stamp = lock.tryOptimisticRead();     // 尝试乐观读
    sleep(readTime);
    if (lock.validate(stamp)) {                // 戳校验
        return data;                           // 校验通过,直接返回
    }
    // 校验失败,升级为读锁
    stamp = lock.readLock();
    try {
        sleep(readTime);
        return data;
    } finally {
        lock.unlockRead(stamp);
    }
}

public void write(int newData) {
    long stamp = lock.writeLock();             // 加写锁
    try {
        sleep(2);
        this.data = newData;
    } finally {
        lock.unlockWrite(stamp);               // 解写锁
    }
}

}

1
2
3
4
5

#### 两种测试场景分析

场景 1:读 - 读(乐观读成功)

// t1 执行 read(1),t2 稍后执行 read(0)

1
2
3
4
5

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1677.png)

场景 2:读 - 写(乐观读失败 → 升级)

// t1 执行 read(1),中途 t2 执行 write(100)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1678.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1679.png)

总结

| 模式 | 特点 | 用途 |
| ------------------- | ---------------- | -------------------------- |
| writeLock() | 独占、阻塞 | 严格写入控制 |
| readLock() | 共享、阻塞 | 多线程读 |
| tryOptimisticRead() | 非阻塞、乐观策略 | 提高读性能(读多写少场景) |

### 5.Semaphore(信号量)

这部分内容介绍了 Java 并发工具类 **`Semaphore`(信号量)** 的基本用法,它是一种用于限制同时访问某资源的线程数量的机制。

#### Semaphore 概念

- **信号量(Semaphore)**:用于控制同时访问某一资源的线程数量。
- 它维护一个**许可计数器 permits**,线程获取许可才能执行,否则就阻塞。
- 可用于实现资源池、限流器、连接池等场景。

案例代码解析

Semaphore semaphore = new Semaphore(3);

1
2
3

初始化信号量,最多允许 **3 个线程**同时执行关键业务。

for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug(“running…”);
sleep(1);
log.debug(“end…”);
} finally {
semaphore.release(); // 释放许可
}
}).start();
}

1
2
3
4
5
6
7
8
9
10
11
12
13

流程解释:

1. 同时启动 10 个线程;
2. 最多只能有 3 个线程成功 `acquire()` → 开始执行;
3. 其余线程被阻塞;
4. 每个线程 `sleep(1秒)` 后释放许可 → 其他线程继续获取许可并运行;
5. 最终 10 个线程都执行完毕。

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1680.png)

构造器补充说明

Semaphore(int permits)
Semaphore(int permits, boolean fair)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

| 参数 | 说明 |
| ------- | ----------------------------- |
| permits | 同时允许多少个线程访问 |
| fair | 是否公平(FIFO 排队获取许可) |

非公平模式:性能高,可能插队;

公平模式:线程先来先得,调度公平。

应用场景举例

| 场景 | 使用 Semaphore 的原因 |
| ---------------------- | ----------------------------- |
| 数据库连接池 | 限制同时能连接数据库的连接数 |
| 限流控制(如上传并发) | 控制接口同时执行的最大线程数 |
| 资源访问限流(如IO) | 限制同时访问硬盘/网络的线程数 |
| 限定停车位(经典例子) | 每次只能停固定数量的车 |

#### Semaphore 应用-在**连接池实现中的应用**



semaphore 限制对共享资源的使用(`Semaphore` 控制资源访问并防止并发冲突)

- 使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机 线程数量,**并且仅是限制线程数,而不是限制资源数**(例如连接数,请对比 Tomcat LimitLatch 的实现)
- 用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好, 注意下面的实现中线程数和数据库连接数是相等的

我们需要构建一个**线程安全的连接池类 `Pool`**

- 同一时刻最多只有 `N` 个线程可以访问池中的资源;
- 多于 `N` 的线程必须阻塞等待;
- 当资源归还后,其他线程再继续获取。

这就是 `Semaphore` 最适合的用武之地。

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1681.png)

@Slf4j(topic = “c.Pool”)
class Pool {
// 1. 连接池大小
private final int poolSize;
// 2. 连接对象数组
private Connection[] connections;
// 3. 连接状态数组 0 表示空闲, 1 表示繁忙
private AtomicIntegerArray states;
private Semaphore semaphore;
// 4. 构造方法初始化
public Pool(int poolSize) {
this.poolSize = poolSize;
// 让许可数与资源数一致
this.semaphore = new Semaphore(poolSize);
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection(“连接” + (i+1));
}
}
// 5. 借连接
public Connection borrow() {// t1, t2, t3
// 获取许可
try {
semaphore.acquire(); // 没有许可的线程,在此等待
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
if(states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug(“borrow {}”, connections[i]);
return connections[i];
}
}
}
// 不会执行到这里
return null;
}
// 6. 归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
states.set(i, 0);
log.debug(“free {}”, conn);
semaphore.release();
break;
}
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

使用流程详解

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1682.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1683.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1684.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1685.png)

#### Semaphore 原理

Semaphore 是什么?

- 它是一种**共享锁(Shared Lock)**,用于控制同时访问某个资源的线程数量;
- 类比“停车场”:`permits` 就是车位,线程获取资源就像抢车位,获取不到就得等。

#### 加锁解锁流程

Semaphore有点像一个停车场,permits就好像停车位数量,当线程获得了permits就像是获得了停车位,然后停车场显示空余车位减一。

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1690.png)

刚开始,permits(state)为 3,这时 5 个线程来获取资源

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1686-1024x555.png)

假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列 park 阻塞

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1687-1024x276.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1691.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1692.png)

这时 Thread-4 释放了 permits,状态如下

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1693.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1688-1024x302.png)

接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1689-1024x371.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1694.png)

方法回顾

| 方法 | 说明 |
| -------------------------------- | ----------------------------------------- |
| `acquire()` | 获取一个许可,阻塞直到成功 |
| `release()` | 释放一个许可 |
| `tryAcquireShared()` | 非公平抢占许可(CAS) |
| `doAcquireSharedInterruptibly()` | 加入队列等待 |
| `doReleaseShared()` | 唤醒队列中第一个节点(如 Thread-0) |
| `setHeadAndPropagate()` | 将当前成功线程设置为新 head,继续传播唤醒 |
| `Node.SIGNAL` | 前驱节点告诉后继节点要阻塞 |
| `Node.PROPAGATE` | 用于共享传播唤醒 |

总结:Semaphore 原理

| 流程步骤 | 行为 |
| ------------ | ----------------------------------------- |
| 获取许可成功 | `CAS(state)` 成功后直接返回 |
| 获取许可失败 | 入队列,park 阻塞 |
| 许可释放 | `state++`,唤醒下一个等待线程 |
| 唤醒流程 | `doReleaseShared()` + `unparkSuccessor()` |

### 6.CountDownLatch(倒计时器)

#### `CountDownLatch` 是什么?

`CountDownLatch` 是一个倒计时器,用于**等待多个线程完成任务后,再继续主线程或其他任务的执行**

核心方法说明

| 方法 | 作用说明 |
| ----------------------- | ------------------------------ |
| `new CountDownLatch(n)` | 初始化倒计时器,内部计数器为 n |
| `countDown()` | 每次调用将计数器减 1 |
| `await()` | 阻塞当前线程,直到计数器为 0 |

代码演示

public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
new Thread(() -> {
log.debug(“begin…”);
sleep(1);
latch.countDown();
log.debug(“end…{}”, latch.getCount());
}).start();
new Thread(() -> {
log.debug(“begin…”);
sleep(2);
latch.countDown();
log.debug(“end…{}”, latch.getCount());
}).start();
new Thread(() -> {
log.debug(“begin…”);
sleep(1.5);
latch.countDown();
log.debug(“end…{}”, latch.getCount());
}).start();
log.debug(“waiting…”);
latch.await();
log.debug(“wait end…”);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1695.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1696.png)

#### 对比:CountDownLatch vs join

| 特性 | `CountDownLatch` | `join()` |
| ---------------- | ---------------- | -------------------- |
| 可结合线程池使用 | ✅ 是 | ❌ 不支持 |
| 控制多个线程同步 | ✅ 更灵活 | ⛔ 必须对每个线程调用 |
| 编程风格 | 面向任务 | 面向线程 |

线程池场景下使用

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1697.png)

#### CountDownLatch应用-同步等待多个线程准备完毕

模拟场景:游戏加载进度条(王者荣耀)

AtomicInteger num = new AtomicInteger(0);
ExecutorService service = Executors.newFixedThreadPool(10, ® -> {
return new Thread(r, “t” + num.getAndIncrement());
});
CountDownLatch latch = new CountDownLatch(10);
String[] all = new String[10];
Random r = new Random();
for (int j = 0; j < 10; j++) {
int x = j;
service.submit(() -> {
for (int i = 0; i <= 100; i++) {
try {
//随机休眠,模拟网络延迟
Thread.sleep(r.nextInt(100));
} catch (InterruptedException e) {
}
all[x] = Thread.currentThread().getName() + “(” + (i + “%”) + “)”;
//\r可以让当前输出覆盖上一次的输出。
System.out.print(“\r” + Arrays.toString(all));
}
latch.countDown();
});
}
latch.await();
System.out.println(“\n游戏开始…”);
service.shutdown();

1
2
3

中间输出

[t0(52%), t1(47%), t2(51%), t3(40%), t4(49%), t5(44%), t6(49%), t7(52%), t8(46%), t9(46%)]

1
2
3

最后输出

[t0(100%), t1(100%), t2(100%), t3(100%), t4(100%), t5(100%), t6(100%), t7(100%), t8(100%),
t9(100%)]
游戏开始…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1698.png)

效果说明:

- 每个线程独立更新自己的进度;
- 主线程通过 `await()` 等待所有加载完成;
- 等到 `count == 0`,主线程打印“游戏开始”。

这是一个典型的“**等待多个任务准备完毕再开始**”的场景。

#### CountDownLatch应用-同步等待多个远程调用结束

模拟场景:并发调用多个 REST 接口

@RestController
public class TestCountDownlatchController {
@GetMapping(“/order/{id}”)
public Map<String, Object> order(@PathVariable int id) {
HashMap<String, Object> map = new HashMap<>();
map.put(“id”, id);
map.put(“total”, “2300.00”);
sleep(2000);
return map;
}
@GetMapping(“/product/{id}”)
public Map<String, Object> product(@PathVariable int id) {
HashMap<String, Object> map = new HashMap<>();
if (id == 1) {
map.put(“name”, “小爱音箱”);
map.put(“price”, 300);
} else if (id == 2) {
map.put(“name”, “小米手机”);
map.put(“price”, 2000);
}
map.put(“id”, id);
sleep(1000);
return map;
}
@GetMapping(“/logistics/{id}”)
public Map<String, Object> logistics(@PathVariable int id) {
HashMap<String, Object> map = new HashMap<>();
map.put(“id”, id);
map.put(“name”, “中通快递”);
sleep(2500);
return map;
}
private void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

1
2
3

rest远程调用

RestTemplate restTemplate = new RestTemplate();
log.debug(“begin”);
ExecutorService service = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(4);
Future<Map<String,Object>> f1 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject(“http://localhost:8080/order/{1}”, Map.class, 1);
return r;
});
Future<Map<String, Object>> f2 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject(“http://localhost:8080/product/{1}”, Map.class, 1);
return r;
});
Future<Map<String, Object>> f3 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject(“http://localhost:8080/product/{1}”, Map.class, 2);
return r;
});
Future<Map<String, Object>> f4 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject(“http://localhost:8080/logistics/{1}”, Map.class, 1);
return r;
});
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
System.out.println(f4.get());
log.debug(“执行完毕”);
service.shutdown();

1
2
3

执行结果

19:51:39.711 c.TestCountDownLatch [main] - begin
{total=2300.00, id=1}
{price=300, name=小爱音箱, id=1}
{price=2000, name=小米手机, id=2}
{name=中通快递, id=1}
19:51:42.407 c.TestCountDownLatch [main] - 执行完毕

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

说明:

- 这种等待多个带有返回值的任务的场景,还是用future比较合适,CountdownLatch适合任务没有返回值的场景。

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1699.png)

- 每个 `Future` 是一个异步请求;
- `get()` 会阻塞直到该请求结束;
- 所以虽然没有用 `CountDownLatch.await()`,但效果是类似的;
- 不过此场景**更推荐使用 `Future` 或 `CompletableFuture`**

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1700.png)

总结

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1701.png)

### 7.`CyclicBarrier`(循环栅栏)

#### 为什么需要 `CyclicBarrier`?

CountdownLatch的缺点在于不能重用,见下:

private static void test1() {
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 3; i++) {
CountDownLatch latch = new CountDownLatch(2);
service.submit(() -> {
log.debug(“task1 start…”);
sleep(1);
latch.countDown();
});
service.submit(() -> {
log.debug(“task2 start…”);
sleep(2);
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug(“task1 task2 finish…”);
}
service.shutdown();
}

1
2
3
4
5
6
7
8
9

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1702.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1703.png)

想要重复使用CountdownLatch进行同步,必须创建多个CountDownLatch对象。

引入 CyclicBarrier(循环屏障)

CyclicBarrier barrier = new CyclicBarrier(2);

1
2
3
4
5
6
7

- 意思是:**等到两个线程都执行到 `barrier.await()` 处,才会继续向下执行**
- 达不到数量,所有线程都阻塞;
- 达到后一起继续,就像“人满发车”。

#### 代码示例

CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行
new Thread(()->{
System.out.println(“线程1开始…”+new Date());
try {
cb.await(); // 当个数不足时,等待
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(“线程1继续向下运行…”+new Date());
}).start();
new Thread(()->{
System.out.println(“线程2开始…”+new Date());
try { Thread.sleep(2000); } catch (InterruptedException e) { }
try {
cb.await(); // 2 秒后,线程个数够2,继续运行
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(“线程2继续向下运行…”+new Date());
}).start();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99

行为说明:

- 线程1先运行,遇到 `await()` 进入阻塞;
- 线程2延迟2秒后运行,调用 `await()`;
- 此时到达2个线程,**屏障打开**,两线程继续运行;
- 实现了“**阶段性等待同步**”。

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1704.png)

| 工具 | 类比说明 |
| -------------- | ---------------------------- |
| CountDownLatch | 一次性使用的门闩(一次倒计) |
| CyclicBarrier | 自动复位的栅栏(人满再发车) |

使用场景:

例如进行 3 轮比赛,每轮都等选手准备就绪再出发 → 用 `CyclicBarrier` 最合适!

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1705.png)

#### CyclicBarrier与`CountDownLatch`对比

| 特性 | `CountDownLatch` | `CyclicBarrier` |
| -------------- | ------------------------- | ------------------------- |
| 能否复用 | ❌ 一次性使用 | ✅ 可重复使用 |
| 使用方式 | `countDown()` + `await()` | `await()` 统一阻塞点 |
| 是否有回调功能 | ❌ 没有 | ✅ 可设置 barrierAction |
| 应用场景 | 主线程等待子线程完成 | 多线程互相等待 → 同步继续 |

- `CyclicBarrier` 非常适合**阶段性线程同步**;
- 它比 `CountDownLatch` 更加灵活,**可复用 + 可回调**;
- 在多轮任务协作、多人游戏准备就绪、批量处理同步等场景中非常实用。

### 8.线程安全集合类概述

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1706-1024x220.png)

线程安全集合类可以分为三大类:

1.遗留类(Legacy)

- 如 `Hashtable`、`Vector`
- 在方法层面直接使用 `synchronized` 修饰,线程安全
- 安全,但并发性能差(竞争激烈时性能急剧下降)

2.Collections 工具类包装

通过 `Collections.synchronizedXXX()` 对非线程安全的集合进行包装:

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1708.png)

- `Collections.synchronizedCollection`
- `Collections.synchronizedList`
- `Collections.synchronizedMap`
- `Collections.synchronizedSet`
- `Collections.synchronizedNavigableMap`
- `Collections.synchronizedNavigableSet`
- `Collections.synchronizedSortedMap`
- `Collections.synchronizedSortedSet`

3.java.util.concurrent.*(JUC包)

这是主流推荐用法,支持更高性能的并发集合类。分为三类: Blocking、CopyOnWrite、Concurrent

-

> #### Blocking 类型
>
> - 如 `BlockingQueue`、`BlockingDeque`
> - 基于锁实现,支持 `put/take` 等阻塞方法
> - 应用于:**线程间通信/生产消费模型**

#### CopyOnWrite 类型

- 如 `CopyOnWriteArrayList`、`CopyOnWriteArraySet`
- 写操作复制数据,读操作无锁
- 应用于:**读多写少**场景(如监听器列表)

#### Concurrent 类型

- 如 `ConcurrentHashMap`、`ConcurrentLinkedQueue`
- 内部大量使用 **CAS + 分段锁**
- 提供弱一致性(下文详解)

##### 弱一致性说明(Concurrent 系列的特性)

| 操作 | 含义说明 |
| ------------- | ------------------------------------------------------------ |
| 遍历 | 弱一致性迭代器:遍历中容器被修改,仍然可以继续访问旧值,不报错 |
| size/contains | 可能返回非精确值(因为查询时未加锁,或数据正在变化) |
| 读取 | 读到的可能是稍早之前的值(非强一致性) |

这种行为可以换来极大的吞吐性能(比加锁方式轻量很多),适合多数场景。

#### 非线程安全容器的 fail-fast 机制

当使用非线程安全集合(如 `ArrayList`)进行遍历时,若容器被其他线程修改,就会抛出异常:

Exception in thread “main” java.util.ConcurrentModificationException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

- 原理:容器中维护一个 `modCount` 修改次数字段;
- 迭代器记录初始值,遍历中检测是否变化;
- 如果发现有改动 → 报错。

#### 总结对比

| 类型 | 是否线程安全 | 是否推荐 | 并发性能 | 特点 |
| ------------------------------- | ------------ | -------- | -------------- | -------------------------- |
| `Hashtable` / `Vector` | ✅ 是 | ❌ 不推荐 || 粗粒度锁,性能低 |
| `Collections.synchronizedXXX()` | ✅ 是 | ❌ 不推荐 || 包装实现,锁住整个容器方法 |
| JUC 并发集合类 | ✅ 是 | ✅ 推荐 || 精细锁、CAS、弱一致性 |
| `CopyOnWriteXXX` | ✅ 是 | ✅ 特殊用 | 读高效,写低效 | 适合读多写少 |
| `BlockingQueue` 等 | ✅ 是 | ✅ 推荐 || 支持阻塞、线程间通信 |

### 9.ConcurrentHashMap

ConcurrentHashMap 是Java中用于并发场景的哈希表,提供线程安全的键值对存储功能。其核心设计兼顾效率与安全性,主要应用于多线程环境以提升性能。

#### 应用之单词计数

应用背景:并发统计单词出现次数

- 共 26 个线程,每个线程从不同的文件中读取单词;
- 把所有单词统计进一个共享的 `Map` 中;
- 使用了 `CountDownLatch` 来确保主线程等待所有线程完成再输出结果。

**搭建练习环境:**

public class Test {
public static void main(String[] args){
//在main方法中实现两个接口
}

//开启26个线程,每个线程调用get方法获取map,从对应的文件读取单词并存储到list中,最后调用accept方法进行统计。
public static <V> void  calculate(Supplier<Map<String,V>> supplier, BiConsumer<Map<String,V>, List<String>> consumer) {
    Map<String, V> map = supplier.get();
    CountDownLatch count = new CountDownLatch(26);
    for (int i = 1; i < 27; i++) {
        int k = i;
        new Thread(()->{
            ArrayList<String> list = new ArrayList<>();
            read(list,k);
            consumer.accept(map,list);
            count.countDown();
        }).start();
    }
    try {
        count.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(map.toString());
}
//读单词方法的实现
public static void read(List<String> list,int i){
    try{
        String element;
        BufferedReader reader = new BufferedReader(new FileReader(i + ".txt"));
        while((element = reader.readLine()) != null){
            list.add(element);
        }
    }catch (IOException e){

    }
}
//生成测试数据
public void construct(){
    String str = "abcdefghijklmnopqrstuvwxyz";
    ArrayList<String> list = new ArrayList<>();
    for (int i = 0; i < str.length(); i++) {
        for (int j = 0; j < 200; j++) {
            list.add(String.valueOf(str.charAt(i)));
        }
    }
    Collections.shuffle(list);
    for (int i = 0; i < 26; i++) {
        try (PrintWriter out = new PrintWriter(new FileWriter(i + 1 + ".txt"))) {
            String collect = list.subList(i * 200, (i + 1) * 200).stream().collect(Collectors.joining("\n"));
            out.println(collect);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

}

1
2
3

##### 实现一:

demo(
// 创建 map 集合
// 创建 ConcurrentHashMap 对不对?
() -> new ConcurrentHashMap<String, Integer>(),
// 进行计数
(map, words) -> {
for (String word : words) {
Integer counter = map.get(word);
int newValue = counter == null ? 1 : counter + 1;
map.put(word, newValue);
}
}
);

1
2
3

输出:

{a=186, b=192, c=187, d=184, e=185, f=185, g=176, h=185, i=193, j=189, k=187, l=157, m=189, n=181, o=180, p=178, q=185, r=188, s=181, t=183, u=177, v=186, w=188, x=178, y=189, z=186}
47

1
2
3
4
5
6
7
8
9

错误原因:

- ConcurrentHashMap虽然每个方法都是线程安全的,但是多个方法的组合并不是线程安全的。(你get到一个值这个时候肯定是原子的 但是另外一个线程来的时候把他改了 你又在get的这个值上操作这个时候就是有问题了)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1709.png)

##### 正确答案一:

demo(
() -> new ConcurrentHashMap<String, LongAdder>(),
(map, words) -> {
for (String word : words) {
// 注意不能使用 putIfAbsent,此方法返回的是上一次的 value,首次调用返回 null
map.computeIfAbsent(word, (key) -> new LongAdder()).increment();
}
}
);

1
2
3
4
5
6
7
8
9
10

说明:

- computIfAbsent方法的作用是:当map中不存在以参数1key对应的value时,会将参数2函数式接口的返回值作为value,putmap中,然后返回该value。如果存在key,则直接返回value
- 以上两部均是线程安全的。

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1710.png)

##### 正确答案二:

demo(
() -> new ConcurrentHashMap<String, Integer>(),
(map, words) -> {
for (String word : words) {
// 函数式编程,无需原子变量
map.merge(word, 1, Integer::sum);
}
}
);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1711.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1712.png)

#### JDK 7 HashMap 并发死链

在讲ConcurrentHashMap 原理前,首先先讲讲HashMap,因为面试的时候经常会问到hashmap这种不完全的实现,它都有那些并发问题

问题背景:HashMap 并发死链

场景:

- 多线程同时操作同一个 `HashMap`
- 在触发 **扩容(resize)** 的过程中;
- 由于链表搬迁顺序不同,可能导致链表形成 **环形结构(死链)**
- 最终:`put()` 死循环、CPU 飙升,程序卡死。

##### 死链复现步骤(基于 JDK 7)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1713.png)

public static void main(String[] args) {
// 测试 java 7 中哪些数字的 hash 结果相等
System.out.println(“长度为16时,桶下标为1的key”);
for (int i = 0; i < 64; i++) {
if (hash(i) % 16 == 1) {
System.out.println(i);
}
}
System.out.println(“长度为32时,桶下标为1的key”);
for (int i = 0; i < 64; i++) {
if (hash(i) % 32 == 1) {
System.out.println(i);
}
}
// 1, 35, 16, 50 当大小为16时,它们在一个桶内
final HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
// 放 12 个元素
map.put(2, null);
map.put(3, null);
map.put(4, null);
map.put(5, null);
map.put(6, null);
map.put(7, null);
map.put(8, null);
map.put(9, null);
map.put(10, null);
map.put(16, null);
map.put(35, null);
map.put(1, null);
System.out.println(“扩容前大小[main]:”+map.size());
new Thread() {
@Override
public void run() {
// 放第 13 个元素, 发生扩容
map.put(50, null);
System.out.println(“扩容后大小[Thread-0]:”+map.size());
}
}.start();
new Thread() {
@Override
public void run() {
// 放第 13 个元素, 发生扩容
map.put(50, null);
System.out.println(“扩容后大小[Thread-1]:”+map.size());
}
}.start();
}
final static int hash(Object k) {
int h = 0;
if (0 != h && k instanceof String) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);
}

1
2
3
4
5
6
7
8
9

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1716.png)

##### 死链复现

调试工具使用 idea

在 HashMap 源码 590 行加断点

int newCapacity = newTable.length;

1
2
3

断点的条件如下,目的是让 HashMap 在扩容为 32 时,并且线程为 Thread-0Thread-1 时停下来

newTable.length==32 &&
(
Thread.currentThread().getName().equals(“Thread-0”)||
Thread.currentThread().getName().equals(“Thread-1”)
)

1
2
3
4
5

断点暂停方式选择 Thread,否则在调试 Thread-0 时,Thread-1 无法恢复运行

运行代码,程序在预料的断点位置停了下来,输出

长度为16时,桶下标为1的key
1
16
35
50
长度为32时,桶下标为1的key
1
35
扩容前大小[main]:12

1
2
3
4
5
6
7

接下来进入扩容流程调试

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1717.png)

在 HashMap 源码 594 行加断点

Entry<K,V> next = e.next; // 593
if (rehash) // 594
// …

1
2
3
4
5

这是为了观察 e 节点和 next 节点的状态,Thread-0 单步执行到 594 行,再 594 处再添加一个断点(条件 Thread.currentThread().getName().equals("Thread-0"))

这时可以在 Variables 面板观察到 e 和 next 变量,使用`view as -> Object`查看节点状态

e (1)->(35)->(16)->null
next (35)->(16)->null

1
2
3

在 Threads 面板选中 Thread-1 恢复运行,可以看到控制台输出新的内容如下,Thread-1 扩容已完成

newTable[1] (35)->(1)->null
扩容后大小:13

1
2
3

这时 Thread-0 还停在 594 处, Variables 面板变量的状态已经变化为

e (1)->null
next (35)->(1)->null

1
2
3
4
5
6
7

为什么呢,因为 Thread-1 扩容时链表也是后加入的元素放入链表头,因此链表就倒过来了,但 Thread-1 虽然结 果正确,但它结束后 Thread-0 还要继续运行

接下来就可以单步调试(F8)观察死链的产生了

下一轮循环到 594,将 e 搬迁到 newTable 链表头

newTable[1] (1)->null
e (35)->(1)->null
next (1)->null

1
2
3

下一轮循环到 594,将 e 搬迁到 newTable 链表头

newTable[1] (35)->(1)->null
e (1)->null
next null

1
2
3
4
5

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1718.png)

再看看源码

e.next = newTable[1];
// 这时 e (1,35)
// 而 newTable[1] (35,1)->(1,35) 因为是同一个对象
newTable[1] = e;
// 再尝试将 e 作为链表头, 死链已成
e = next;
// 虽然 next 是 null, 会进入下一个链表的复制, 但死链已经形成了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

##### 死链形成模拟过程

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1722.png)

模拟两个线程 Thread-0 和 Thread-1 的交替过程

我们以“断点”方式描述:

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1723.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1724.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1725.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1726.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1727.png)

##### **源码分析**

HashMap 的并发死链发生在扩容时

// 将 table 迁移至 newTable
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
Entry<K,V> next = e.next;
// 1 处
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
// 2 处
// 将新元素加入 newTable[i], 原 newTable[i] 作为新元素的 next
e.next = newTable[i];
newTable[i] = e;
e = next;
}
}
}

1
2
3

假设 map 中初始元素是

原始链表,格式:[下标] (key,next)
[1] (1,35)->(35,16)->(16,null)
线程 a 执行到 1 处 ,此时局部变量 e 为 (1,35),而局部变量 next 为 (35,16) 线程 a 挂起
线程 b 开始执行
第一次循环
[1] (1,null)
第二次循环
[1] (35,1)->(1,null)
第三次循环
[1] (35,1)->(1,null)
[17] (16,null)
切换回线程 a,此时局部变量 e 和 next 被恢复,引用没变但内容变了:e 的内容被改为 (1,null),而 next 的内
容被改为 (35,1) 并链向 (1,null)
第一次循环
[1] (1,null)
第二次循环,注意这时 e 是 (35,1) 并链向 (1,null) 所以 next 又是 (1,null)
[1] (35,1)->(1,null)
第三次循环,e 是 (1,null),而 next 是 null,但 e 被放入链表头,这样 e.next 变成了 35 (2 处)
[1] (1,35)->(35,1)->(1,35)
已经是死链了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

**小结**

- 究其原因,是因为在多线程环境下使用了非线程安全的 map 集合
- JDK 8 虽然将扩容算法做了调整,不再将元素加入链表头(而是保持与扩容前一样的顺序),但仍不意味着能 够在多线程环境下能够安全扩容,还会出现其它问题(如扩容丢数据)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1719.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1720.png)

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1721.png)

#### JDK 8 ConcurrentHashMap原理

##### 重要属性和内部类

// 默认为 0
// 当初始化时, 为 -1
// 当扩容时, 为 -(1 + 扩容线程数)
// 当初始化或扩容完成后,为 下一次的扩容的阈值大小
private transient volatile int sizeCtl;
// 整个 ConcurrentHashMap 就是一个 Node[]
static class Node<K,V> implements Map.Entry<K,V> {}
// hash 表
transient volatile Node<K,V>[] table;
// 扩容时的 新 hash 表
private transient volatile Node<K,V>[] nextTable;
// 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
static final class ForwardingNode<K,V> extends Node<K,V> {}
// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
static final class ReservationNode<K,V> extends Node<K,V> {}
// 作为 treebin 的头节点, 存储 root 和 first
static final class TreeBin<K,V> extends Node<K,V> {}
// 作为 treebin 的节点, 存储 parent, left, right
static final class TreeNode<K,V> extends Node<K,V> {}

1
2
3
4
5

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1729.png)

##### 重要方法

// 获取 Node[] 中第 i 个 Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)

// cas 修改 Node[] 中第 i 个 Node 的值, c 为旧值, v 为新值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)

// 直接修改 Node[] 中第 i 个 Node 的值, v 为新值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)

1
2
3
4
5

##### 构造器分析

initialCapacity初始容量,float loadFactor负载因子,concurrencyLevel并发度

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// tableSizeFor 仍然是保证计算的大小是 2^n, 即 16,32,64 …
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

1
2
3
4
5
6
7
8
9

初始容量小于并发度的时候,就会把初始容量改成并发度,最少要保证并发度这么大

计算大小的时候,jdk8ConcurrentHashMap实现了懒惰初始化,table不是一上来就把数字创建出来,在构造方法中仅仅计算了下一次table容量的大小,以后在第一次使用时才会真正创建(区别于jdk7)

tableSizeFor保证计算出来的大小是2的n次方

##### get流程(无锁)

public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// spread 方法能确保返回结果是正数
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果头结点已经是要查找的 key
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash 为负数表示该 bin 在扩容中或是 treebin, 这时调用 find 方法来查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 正常遍历链表, 用 equals 比较
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

总结:

- 如果table不为空且长度大于0且索引位置有元素
- if 头节点key的hash值相等
- 头节点的key指向同一个地址或者equals
- 返回value
- else if 头节点的hash为负数(bin在扩容或者是treebin)
- 调用find方法查找
- 进入循环(e不为空):
- 节点key的hash值相等,且key指向同一个地址或equals
- 返回value
- 返回null

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1730.png)

##### put 流程

以下数组简称(table),链表简称(bin)

public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 其中 spread 方法会综合高位低位, 具有更好的 hash 性
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
// f 是链表头节点
// fh 是链表头结点的 hash
// i 是链表在 table 中的下标
Node<K,V> f; int n, i, fh;
// 要创建 table
if (tab == null || (n = tab.length) == 0)
// 初始化 table 使用了 cas, 无需 synchronized 创建成功, 进入下一轮循环
tab = initTable();
// 要创建链表头节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 添加链表头使用了 cas, 无需 synchronized
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
// 帮忙扩容
else if ((fh = f.hash) == MOVED)
// 帮忙之后, 进入下一轮循环
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 锁住链表头节点
synchronized (f) {
// 再次确认链表头节点没有被移动
if (tabAt(tab, i) == f) {
// 链表
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到相同的 key
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 更新
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 已经是最后的节点了, 新增 Node, 追加至链表尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// putTreeVal 会看 key 是否已经在树中, 是, 则返回对应的 TreeNode
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
// 释放链表头节点的锁
}

        if (binCount != 0) { 
            if (binCount >= TREEIFY_THRESHOLD)
                // 如果链表长度 >= 树化阈值(8), 进行链表转为红黑树
                treeifyBin(tab, i);
            if (oldVal != null)
                return oldVal;
            break;
        }
    }
}
// 增加 size 计数
addCount(1L, binCount);
return null;

}
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield();
// 尝试将 sizeCtl 设置为 -1(表示初始化 table)
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 获得锁, 创建 table, 这时其它线程会在 while() 循环中 yield 直至 table 创建
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
// check 是之前 binCount 的个数
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if (
// 已经有了 counterCells, 向 cell 累加
(as = counterCells) != null ||
// 还没有, 向 baseCount 累加
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)
) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (
// 还没有 counterCells
as == null || (m = as.length - 1) < 0 ||
// 还没有 cell
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
// cell cas 增加计数失败
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
) {
// 创建累加单元数组和cell, 累加重试
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 获取元素个数
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// newtable 已经创建了,帮忙扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 需要扩容,这时 newtable 未创建
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1731.png)

总结:

- 进入for循环:

- if table为null或者长度 为0

- 初始化表

- else if 索引处无节点

- 创建节点,填入key和value,放入table,退出循环

- else if 索引处节点的hash值为MOVE(ForwardingNode),表示正在扩容和迁移

- 帮忙

- else

- 锁住头节点

- if 再次确认头节点没有被移动

- if 头节点hash值大于0(表示这是一个链表)

- 遍历链表找到对应key,如果没有,创建。

- else if 节点为红黑树节点

- 调用

  putTreeVal
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

查看是否有对应key的数节点

- 如果有且为覆盖模式,将值覆盖,返回旧值
- 如果没有,创建并插入,返回null

- 解锁

- if binCount不为0

- 如果binCount大于树化阈值8

- 树化

- 如果旧值不为null

- 返回旧值

- break

- 增加size计数

- return null

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1733.png)

##### size 计算流程

size 计算实际发生在 put,remove 改变集合元素的操作之中

- 没有竞争发生,向 baseCount 累加计数
- 有竞争发生,新建 counterCells,向其中的一个 cell 累加计
- counterCells 初始有两个 cell
- 如果计数竞争比较激烈,会创建新的 cell 来累加计数

public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
// 将 baseCount 计数与所有 cell 计数累加
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1732.png)

##### 总结

**Java 8** 数组(Node) +( 链表 Node | 红黑树 TreeNode ) 以下数组简称(table),链表简称(bin)

- 初始化,使用 cas 来保证并发安全,懒惰初始化 table
- 树化,当 table.length < 64 时,先尝试扩容,超过 64 时,并且 bin.length > 8 时,会将链表树化,树化过程 会用 synchronized 锁住链表头
- put,如果该 bin 尚未创建,只需要使用 cas 创建 bin;如果已经有了,锁住链表头进行后续 put 操作,元素 添加至 bin 的尾部
- get,无锁操作仅需要保证可见性,扩容过程中 get 操作拿到的是 ForwardingNode 它会让 get 操作在新 table 进行搜索
- 扩容,扩容时以 bin 为单位进行,需要对 bin 进行 synchronized,但这时妙的是其它竞争线程也不是无事可做,它们会帮助把其它 bin 进行扩容,扩容时平均只有 1/6 的节点会把复制到新 table 中
- size,元素个数保存在 baseCount 中,并发时的个数变动保存在 CounterCell[] 当中。最后统计数量时累加 即可

JDK8 `ConcurrentHashMap` 优势

| 特性 | 说明 |
| ---------------- | -------------------------------------- |
| 高并发 | CAS + synchronized 局部加锁(锁 bin) |
| 分离锁 | 链表/红黑树加 synchronized,避免全表锁 |
| 支持协作扩容 | 多线程一起搬迁桶位,效率高 |
| 无锁读取 | `get()` 过程无锁,保证可见性即可 |
| 分段 size 累加器 | 高效统计元素个数,避免热点竞争 |
| 红黑树优化 | 链表过长自动转为红黑树,提高查找效率 |

#### JDK 7 ConcurrentHashMap原理

它维护了一个 segment 数组,每个 segment 对应一把锁

- 优点:如果多个线程访问不同的 segment,实际是没有冲突的,这与 jdk8 中是类似的
- 缺点:Segments 数组默认大小为16,这个容量初始化指定后就不能改变了,并且不是懒惰初始化

##### 构造器分析

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// ssize 必须是 2^n, 即 2, 4, 8, 16 … 表示了 segments 数组的大小
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// segmentShift 默认是 32 - 4 = 28
this.segmentShift = 32 - sshift;
// segmentMask 默认是 15 即 0000 0000 0000 1111
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 创建 segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1734.png)

构造完成,如下图所示

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1735.png)

可以看到 ConcurrentHashMap 没有实现懒惰初始化,空间占用不友好

其中 this.segmentShift 和 this.segmentMask 的作用是决定将 key 的 hash 结果匹配到哪个 segment

例如,根据某一 hash 值求 segment 位置,先将高位向低位移动 this.segmentShift 位

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1736.png)

结果再与 this.segmentMask 做位于运算,最终得到 1010 即下标为 10 的 segment

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1737.png)

##### put 流程

public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// 计算出 segment 下标
int j = (hash >>> segmentShift) & segmentMask;

// 获得 segment 对象, 判断是否为 null, 是则创建该 segment
if ((s = (Segment<K,V>)UNSAFE.getObject 
     (segments, (j << SSHIFT) + SBASE)) == null) {
    // 这时不能确定是否真的为 null, 因为其它线程也发现该 segment 为 null,
    // 因此在 ensureSegment 里用 cas 方式保证该 segment 安全性
    s = ensureSegment(j);
}
// 进入 segment 的put 流程
return s.put(key, hash, value, false);

}

1
2
3

segment 继承了可重入锁(ReentrantLock),它的 put 方法为

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 尝试加锁
HashEntry<K,V> node = tryLock() ? null :
// 如果不成功, 进入 scanAndLockForPut 流程
// 如果是多核 cpu 最多 tryLock 64 次, 进入 lock 流程
// 在尝试期间, 还可以顺便看该节点在链表中有没有, 如果没有顺便创建出来
scanAndLockForPut(key, hash, value);

// 执行到这里 segment 已经被成功加锁, 可以安全执行
V oldValue;
try {
    HashEntry<K,V>[] tab = table;
    int index = (tab.length - 1) & hash;
    HashEntry<K,V> first = entryAt(tab, index);
    for (HashEntry<K,V> e = first;;) {
        if (e != null) {
            // 更新
            K k;
            if ((k = e.key) == key ||
                (e.hash == hash && key.equals(k))) { 
                oldValue = e.value;
                if (!onlyIfAbsent) {
                    e.value = value;
                    ++modCount;
                } break;
            }
            e = e.next;
        }
        else {
            // 新增
            // 1) 之前等待锁时, node 已经被创建, next 指向链表头
            if (node != null)
                node.setNext(first);
            else
                // 2) 创建新 node
                node = new HashEntry<K,V>(hash, key, value, first);
            int c = count + 1; 
            // 3) 扩容
            if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                rehash(node);
            else
                // 将 node 作为链表头
                setEntryAt(tab, index, node);
            ++modCount;
            count = c;
            oldValue = null;
            break;
        }
    }
} finally {
    unlock();
}
return oldValue;

}

1
2
3
4
5
6
7

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1738.png)

##### rehash 流程

发生在 put 中,因为此时已经获得了锁,因此 rehash 时不需要考虑线程安全

private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
// 过一遍链表, 尽可能把 rehash 后 idx 不变的节点重用
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// 剩余节点需要新建
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 扩容完成, 才加入新的节点
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;

// 替换为新的 HashEntry table
table = newTable;

}

1
2
3
4
5

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1739.png)

附,调试代码

public static void main(String[] args) {
ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
for (int i = 0; i < 1000; i++) {
int hash = hash(i);
int segmentIndex = (hash >>> 28) & 15;
if (segmentIndex == 4 && hash % 8 == 2) {
System.out.println(i + “\t” + segmentIndex + “\t” + hash % 2 + “\t” + hash % 4 +
“\t” + hash % 8);
}
}
map.put(1, “value”);
map.put(15, “value”); // 2 扩容为 4 15 的 hash%8 与其他不同
map.put(169, “value”);
map.put(197, “value”); // 4 扩容为 8
map.put(341, “value”);
map.put(484, “value”);
map.put(545, “value”); // 8 扩容为 16
map.put(912, “value”);
map.put(941, “value”);
System.out.println(“ok”);
}
private static int hash(Object k) {
int h = 0;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
int v = h ^ (h >>> 16);
return v;
}

1
2
3
4
5

##### get 流程(无锁)

get 时并未加锁,用了 UNSAFE 方法保证了可见性,扩容过程中,get 先发生就从旧表取内容,get 后发生就从新 表取内容

public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
// u 为 segment 对象在数组中的偏移量
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// s 即为 segment
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

1
2
3
4
5
6
7
8

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1740.png)

##### size 计算流程(重试+加锁)

- 计算元素个数前,先不加锁计算两次,如果前后两次结果如一样,认为个数正确返回
- 如果不一样,进行重试,重试次数超过 3,将所有 segment 锁住,重新计算个数返回

public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn’t retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
// 超过重试次数, 需要创建所有 segment 并加锁
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1741.png)

##### 与 JDK8 的差异对比

| 特性 | JDK7 | JDK8 |
| -------- | ------------------------ | ----------------------------- |
| 锁粒度 | Segment 级(锁 segment) | Node 级(锁 bin) |
| 并发级别 | segment 数量(默认 16) | 无限制(理论上所有 bin 并发) |
| 扩容 | segment 自扩容 | 整体 table 协作扩容 |
| 懒加载 | ❌ segments 全部初始化 | ✅ table 延迟初始化 |
| 数据结构 | 哈希链表 | 链表 + 红黑树 |

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1742.png)

### 10.LinkedBlockingQueue 原理

这是一个基于链表的阻塞队列,广泛用于生产者-消费者模型中,支持**高并发入队和出队操作**

#### 整体结构

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1743.png)

| 组件 | 描述 |
| ---------- | ----------------------------------------- |
| `head` | 哨兵节点,不存实际数据 |
| `last` | 指向队尾的最后一个元素节点 |
| `count` | 当前元素个数,`AtomicInteger` 实现 |
| `putLock` | 入队操作使用的锁 |
| `takeLock` | 出队操作使用的锁 |
| `notFull` | 条件变量:队列未满,用于阻塞/唤醒入队线程 |
| `notEmpty` | 条件变量:队列非空,用于阻塞/唤醒出队线程 |

#### 基本的入队出队

public class LinkedBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {
static class Node {
E item;
/**
* 下列三种情况之一
* - 真正的后继节点
* - 自己, 发生在出队时
* - null, 表示是没有后继节点, 是最后了
*/
Node next;
Node(E x) { item = x; }
}
}

1
2
3
4
5
6
7
8
9

##### 初始化链表

`last = head = new Node(null);`Dummy 节点用来占位,item 为 null

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1744.png)

##### 当一个节点入队

last = last.next = node;

1
2
3
4
5
6
7
8
9

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1745-1024x267.png)

再来一个节点入队`last = last.next = node;`

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1746.png)

##### 出队

//临时变量h用来指向哨兵
Node h = head;
//first用来指向第一个元素
Node first = h.next;
h.next = h; // help GC
//head赋值为first,表示first节点就是下一个哨兵。
head = first;
E x = first.item;
//删除first节点中的数据,表示真正成为了哨兵,第一个元素出队。
first.item = null;
return x;
h = head

1
2
3

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1747.png)

first = h.next

1
2
3

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1748.png)

h.next = h

1
2
3

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1749.png)

head = first

1
2
3

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1750.png)

E x = first.item;
first.item = null;
return x;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1751.png)

#### 加锁分析

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1755.png)

**高明之处**在于用了两把锁和 dummy 节点

- 用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行
- 用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
- 消费者与消费者线程仍然串行
- 生产者与生产者线程仍然串行

线程安全分析

- 当节点总数大于 2 时(包括 dummy 节点),putLock 保证的是 last 节点的线程安全,takeLock 保证的是 head 节点的线程安全。两把锁保证了入队和出队没有竞争
- 当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争
- 当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞

// 用于 put(阻塞) offer(非阻塞)
private final ReentrantLock putLock = new ReentrantLock();
// 用户 take(阻塞) poll(非阻塞)
private final ReentrantLock takeLock = new ReentrantLock();

1
2
3

put 操作

public void put(E e) throws InterruptedException {
//LinkedBlockingQueue不支持空元素
if (e == null) throw new NullPointerException();
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
// count 用来维护元素计数
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 满了等待
while (count.get() == capacity) {
// 倒过来读就好: 等待 notFull
notFull.await();
}
// 有空位, 入队且计数加一
enqueue(node);
c = count.getAndIncrement();
// 除了自己 put 以外, 队列还有空位, 由自己叫醒其他 put 线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果队列中有一个元素, 叫醒 take 线程
if (c == 0)
// 这里调用的是 notEmpty.signal() 而不是 notEmpty.signalAll() 是为了减少竞争
signalNotEmpty();
}

1
2
3
4
5

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1756.png)

take 操作

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果队列中只有一个空位时, 叫醒 put 线程
// 如果有多个线程进行出队, 第一个线程满足 c == capacity, 但后续线程 c < capacity
if (c == capacity)
// 这里调用的是 notFull.signal() 而不是 notFull.signalAll() 是为了减少竞争
signalNotFull()
return x;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115

> 由 put 唤醒 put 是为了避免信号不足
>
> ![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1757.png)

#### 性能比较

主要列举 LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较

- Linked 支持有界,Array 强制有界
- Linked 实现是链表,Array 实现是数组
- Linked 是懒惰的,而 Array 需要提前初始化 Node 数组
- Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的
- Linked 两把锁,Array 一把锁

#### 总结:

| 特点 | LinkedBlockingQueue 描述 |
| -------- | ------------------------------------------------------------ |
| 内部结构 | 基于链表实现,dummy 节点哨兵结构 |
| 锁机制 | 两把独立锁,入队锁 `putLock`,出队锁 `takeLock` |
| 线程安全 | 入队出队互不干扰,分别控制 `last` 和 `head` |
| 等待机制 | 使用 Condition 的 `await` 和 `signal` 进行精确线程阻塞/唤醒 |
| 性能优势 | 支持高并发下一个线程入队 + 一个线程出队同时进行(比单锁结构高效) |
| 典型应用 | 生产者-消费者模型中的数据缓冲区、异步任务队列等 |

| 特性 | LinkedBlockingQueue | ArrayBlockingQueue |
| ------------------ | ----------------------------- | ------------------ |
| 数据结构 | 链表 | 数组 |
| 容量控制 | 可无界 / 有界(构造指定容量) | 强制有界 |
| 锁机制 | 两把锁(入/出) | 一把锁 |
| 初始化开销 | 懒加载 | 预分配数组 |
| 入队开销 | 每次新建 Node | 复用数组槽位 |
| 性能(高并发吞吐) | 较高(入出分离) | 略低(串行) |

### 11.ConcurrentLinkedQueue

#### 基本特点

它是 Java 提供的一种**高性能无锁并发队列**,适用于高并发场景下的数据共享,如 Netty、Tomcat 中就有广泛应用。

ConcurrentLinkedQueue 的设计与 LinkedBlockingQueue 非常像,也是

- 两把【锁】,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
- dummy 节点的引入让两把【锁】将来锁住的是不同对象,避免竞争
- 只是这【锁】使用了 cas 来实现

| 特性 | 描述 |
| ---------- | -------------------------------------------------- |
| 数据结构 | 单向链表 |
| 线程安全性 | 无锁,使用 **CAS 原子操作** 实现并发安全 |
| 类型 | **非阻塞队列**(Non-blocking),适用于自旋等待场景 |
| 内部机制 | 引入 **dummy 哨兵节点**,区分头尾,减少并发冲突 |
| JDK 包 | `java.util.concurrent.ConcurrentLinkedQueue` |

#### 内部结构设计

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1759.png)

#### 入队和出队流程

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1760.png)

#### CAS 替代锁带来的优势

| 对比点 | `LinkedBlockingQueue` | `ConcurrentLinkedQueue` |
| -------- | ------------------------- | ----------------------- |
| 同步方式 | ReentrantLock + Condition | **CAS 原子操作** |
| 阻塞性 | 阻塞 | 非阻塞 |
| 并发性能 |||
| 场景 | 线程通信(生产者消费者) | 高并发共享数据传递 |

#### dummy 节点的作用

- 初始时,`head = tail = dummy` 节点
- 实际第一个元素为 `head.next`
- 避免边界判断,提高操作一致性
- 便于 `CAS` 操作进行比较和替换

实际应用:Tomcat 中的使用

在 Tomcat 的 NIO 模型中:

- Acceptor 线程不断接收新的 SocketChannel
- 并通过 `ConcurrentLinkedQueue` 传递给 Poller
- Poller 再注册读写事件,进行业务处理

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1758.png)

#### 总结

| 特性 | 描述 |
| -------- | ----------------------------------- |
| 无锁 | 全程使用 `CAS`,避免线程挂起与唤醒 |
| 高性能 | 允许多个线程并发读写 |
| 非阻塞 | 线程不会因为队列满或空而阻塞 |
| 应用场景 | 线程间传递事件/任务,高并发消息队列 |

### 12.CopyOnWriteArrayList

#### 什么是 CopyOnWriteArrayList?

`CopyOnWriteArrayList` 是 Java 并发包中的一种线程安全集合,核心特点是:

- **读写分离**:读操作无锁,写操作时复制原数组,操作新数组。
- **写入时复制(Copy-On-Write)**:每次写操作如 `add/remove/set` 都会创建数组副本。

它非常适用于「读多写少」的场景,例如:配置列表、观察者模式等。

`CopyOnWriteArraySet`是它的马甲 底层实现采用了 写入时拷贝 的思想,增删改操作会将底层数组拷贝一份,更改操作在新数组上执行,这时不影响其它线程的**并发读****读写分离**

#### 写时复制机制详解

以新增为例:

public boolean add(E e) {
synchronized (lock) {
// 获取旧的数组
Object[] es = getArray();
int len = es.length;
// 拷贝新的数组(这里是比较耗时的操作,但不影响其它读线程)
es = Arrays.copyOf(es, len + 1);
// 在新数组末尾添加元素
es[len] = e;
// 用新数组替换旧数组
setArray(es);
return true;
}
}

1
2
3
4
5
6
7
8
9
10
11

注意点:

- 写操作使用 `synchronized`(Java 11 版本),保证写操作串行。
- 复制数组开销较大,但**不会影响读线程**,因为读线程拿到的是旧数组的引用。
- `setArray()` 是一个 `volatile` 写操作,能保证写后对其它线程可见。

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1762.png)

其它读操作并未加锁,例如:

public void forEach(Consumer<? super E> action) {
Objects.requireNonNull(action);
for (Object x : getArray()) {
@SuppressWarnings(“unchecked”) E e = (E) x;
action.accept(e);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

适合『读多写少』的应用场景

#### 弱一致性问题(Weak Consistency)

##### 1.get 弱一致性

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1761.png)

假设以下时间点:

| 时间点 | 操作 |
| ------------------------------------------------------- | --------------------------------------------- |
| T1 0拿数据(此时还没拿到) | Thread-0 调用 `getArray()` 拿到旧数组 |
| T2 此时间点1要调用remove,先要拷贝 | Thread-1 调用 `getArray()` 拿到同一个数组副本 |
| T3 删除完后新引用替换掉原来的引用 | Thread-1 执行 `setArray()` 替换成新数组 |
| T4 0此时拿到了数据却还是旧的还是能拿到1这个被删除的数字 | Thread-0 继续从旧数组中读元素 |

> 此时 Thread-0 读到的数据可能是旧值,**这就是弱一致性**
>
> 📌 **解释**
>
> - 弱一致性不是 bug,是设计:读操作总是拿当下引用的数组。
> - 不保证读到的是最新数据,但读是快速、安全的。

##### 2.迭代器弱一致性

CopyOnWriteArrayList list = new CopyOnWriteArrayList<>();
list.add(1);
list.add(2);
list.add(3);
Iterator iter = list.iterator();
new Thread(() -> {
list.remove(0);// 触发复制并替换数组
System.out.println(list);
}).start();
sleep1s();
//此时主线程的iterator依旧指向旧的数组。
while (iter.hasNext()) {
System.out.println(iter.next());// 依旧遍历旧数组
}


结论:

- 读的是旧数据,但**读的行为是安全的**。
- `iter` 拿到的是旧数组的快照。
- 主线程不会报 `ConcurrentModificationException`。

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1763.png)

#### 优缺点总结

| 优点                 | 说明                                   |
| -------------------- | -------------------------------------- |
| 读操作无锁,性能高   | 所有读取操作直接读数组                 |
| 不会出现并发修改异常 | 不抛 `ConcurrentModificationException` |
| 非阻塞读             | 写不影响读                             |
| 简单易用             | 线程安全,不依赖复杂机制               |

| 缺点         | 说明                     |
| ------------ | ------------------------ |
| 写操作开销大 | 每次写操作都复制整个数组 |
| 占用内存     | 写频繁时旧数组不容易回收 |
| 一致性弱     | 读取操作读到的不是最新值 |

![img](https://www.legendkiller.xyz/wp-content/uploads/2025/07/image-1764.png)

# 下篇完结