JUC
本博客根据黑马juc教程学习而做的笔记,链接如下:黑马程序员全面深入学习Java并发编程,JUC并发编程全套教程
# 一、基本概念
# 进程
进程是程序分配的最小单位 一个进程里包含多个线程
# 线程
线程是程序执行的最小单位,是进程的子集
并发:是一个CPU在不同的时间段,切换多个线程执行命令 并行:是多个CPU同时执行命令。
同步:需要等待结果,才能继续执行后续的操作 异步:不需要等待结果,就能执行后续的操作
# 二、线程的创建
private static void create3() throws ExecutionException, InterruptedException {
FutureTask<Object> futureTask = new FutureTask<>(new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
});
Thread thread = new Thread(futureTask);
thread.start();
futureTask.get();// 阻塞,等待结果返回
}
private static void create2() {
Thread thread = new Thread(new Runnable(){
@Override
public void run() {
System.out.println("执行任务");
}
});
thread.start();
}
private static void create1() {
Thread thread = new Thread();
thread.start();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 线程方法
run():运行方法
start():真正的启动方法
join():阻塞,等待当前线程执行完
interrupt():打断,有打断标记
isInterrupted():判断是否有打断标记
sleep():不会释放锁
# 线程的状态
操作系统:5种
java:6种
# 三、synchronized
# monitor
普通对象
数组对象
32 位虚拟机 Mark Word
64 位虚拟机 Mark Word
monitor
每个 Java 对象都可以关联一个 Monitor 对象,如果使用 synchronized 给对象上锁(重量级)之后,该对象头的 Mark Word 中就被设置指向 Monitor 对象的指针
- 刚开始 Monitor 中 Owner 为 null
- 当 Thread-2 执行 synchronized(obj) 就会将 Monitor 的所有者 Owner 置为 Thread-2,Monitor中只能有一 个 Owner
- 在 Thread-2 上锁的过程中,如果 Thread-3,Thread-4,Thread-5 也来执行 synchronized(obj),就会进入 EntryList BLOCKED
- Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争的时是非公平的
- 图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程,后面讲 wait-notify 时会分析
注意:
- synchronized 必须是进入同一个对象的 monitor 才有上述的效果
- 不加 synchronized 的对象不会关联监视器,不遵从以上规则
static final Object lock = new Object();
static int counter = 0;
public static void main(String[] args) {
synchronized (lock) {
counter++;
}
}
2
3
4
5
6
7
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=3, args_size=1
0: getstatic #2 // <- lock引用 (synchronized开始)
3: dup
4: astore_1 // lock引用 -> slot 1
5: monitorenter // 将 lock对象 MarkWord 置为 Monitor 指针
6: getstatic #3 // <- i
9: iconst_1 // 准备常数 1
10: iadd // +1
11: putstatic #3 // -> i
14: aload_1 // <- lock引用
15: monitorexit // 将 lock对象 MarkWord 重置, 唤醒 EntryList
16: goto 24
19: astore_2 // e -> slot 2
20: aload_1 // <- lock引用
21: monitorexit // 将 lock对象 MarkWord 重置, 唤醒 EntryList
22: aload_2 // <- slot 2 (e)
23: athrow // throw e
24: return
Exception table:
from to target type
6 16 19 any
19 22 19 any
LineNumberTable:
line 8: 0
line 9: 6
line 10: 14
line 11: 24
LocalVariableTable:
Start Length Slot Name Signature
0 25 0 args [Ljava/lang/String;
StackMapTable: number_of_entries = 2
frame_type = 255 /* full_frame */
offset_delta = 19
locals = [ class "[Ljava/lang/String;", class java/lang/Object ]
stack = [ class java/lang/Throwable ]
frame_type = 250 /* chop */
offset_delta = 4
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
流程
- 创建锁记录(Lock Record)对象,每个线程都的栈帧都会包含一个锁记录的结构,内部可以存储锁定对象的 Mark Word
- 让锁记录中 Object reference 指向锁对象,并尝试用 cas 替换 Object 的 Mark Word,将 Mark Word 的值存 入锁记录
- 如果 cas 替换成功,对象头中存储了 锁记录地址和状态 00 ,表示由该线程给对象加锁,这时图示如下
- 如果 cas 失败,有两种情况
- 如果是其它线程已经持有了该 Object 的轻量级锁,这时表明有竞争,进入锁膨胀过程
- 如果是自己执行了 synchronized 锁重入,那么再添加一条 Lock Record 作为重入的计数
- 当退出 synchronized 代码块(解锁时)如果有取值为 null 的锁记录,表示有重入,这时重置锁记录,表示重 入计数减一
- 当退出 synchronized 代码块(解锁时)锁记录的值不为 null,这时使用 cas 将 Mark Word 的值恢复给对象
头
- 成功,则解锁成功
- 失败,说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程
# 偏向锁
解决多次锁重入的问题
一个对象创建的时候,默认开启偏向锁,mark word 对象头存储线程id
注意:程序启动延迟4s才能看到,或者设置偏向锁延迟时间
-XX:BiasedLockingStartupDelay=0 0延迟
-XX:-UseBiasedLocking 禁用偏向锁
已经是偏向锁的对象,调用 hashCode()方法会改成无锁,因为对象头存了线程id,存不下hashCode码
偏向锁撤销:
1.调用 hashCode()方法
2.有其他线程竞争,会撤销偏向锁,升级为轻量级锁
撤销偏向锁超过20次,jvm会做批量重偏向,把原先的线程id改成新的线程id
撤销偏向锁超过40次,jvm会把整个类改成不可偏向,新建的对象就不是偏向锁
# 轻量级锁
锁记录存在线程栈中
# 重量级锁
monitor
自旋优化:多尝试几次获取锁
# 锁膨胀
加轻量级锁失败时, 会进行锁膨胀(锁升级),将轻量级锁变为重量级锁。
- 当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁
- 这时 Thread-1 加轻量级锁失败,进入锁膨胀流程
- 即为 Object 对象申请 Monitor 锁,让 Object 指向重量级锁地址
- 然后自己进入 Monitor 的 EntryList BLOCKED
- 当 Thread-0 退出同步块解锁时,使用 cas 将 Mark Word 的值恢复给对象头,失败。这时会进入重量级解锁 流程,即按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中 BLOCKED 线程
# 锁消除
jit 会对字节码进行优化,无效的加锁会被优化掉
# 锁重入
进入同一个对象锁
park/unpark 与wait/notify的区别
unpark 可以指定恢复的线程 unpark 可以在线程阻塞之前调用 park不会释放锁,而wait会释放锁
原理:每个线程都有自己的一个 Parker 对象,由三部分组成 _counter , _cond 和 _mutex
park:会检查 counter是否为0, 是,去mutex的cond队列等待 不是,继续运行,把 counter 设置为0
unpark:给 counter设置1 ,去唤醒mutex对象的cond中的线程
死锁:两个线程都在阻塞,等待对方释放锁, 活锁:两个线程都在运行,互相改变对方的结束条件,后谁也无法结束。 线程饥饿:长时间得不到运行
# 四、ReentrantLock
- 可中断,可以防止死等
- 可以设置超时时间
- 可以设置为公平锁 (先到先得)
- 支持多个条件变量( 具有多个waitset)
# 五、volatile
java 内存模型(jmm)
有序性:防止指令重排,只是本线程内的
可见性:每次查询都从主内查
内存屏障
// 其他代码1
volatile int a = 0;
// =====================写屏障============================
// 保证在该屏障之前,对共享变量a的改动,都同步到主存中
// 其他代码2
// =====================读屏障============================
// 保证在该屏障之后,对共享变量a的查询都是去主存查的
int b = a;
// 其他代码3
2
3
4
5
6
7
8
9
10
11
12
13
java代码
public class Singleton {
private static volatile Singleton INSTANCE = null;
private Singleton() {}
public static Singleton getInstance() {
// 首次访问会同步,之后不会进入synchronized中
if (INSTANCE != null) {
return INSTANCE;
}
synchronized (Singleton.class) {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
public static void main(String[] args) {
Singleton.getInstance();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class 文件 反编译命令:javap -v Singleton.class
0: getstatic #2 // Field INSTANCE:Lcom/syyo/juc/c4/Singleton;
3: ifnull 10
6: getstatic #2 // Field INSTANCE:Lcom/syyo/juc/c4/Singleton;
9: areturn
10: ldc #3 // class com/syyo/juc/c4/Singleton
12: dup
13: astore_0
14: monitorenter
15: getstatic #2 // Field INSTANCE:Lcom/syyo/juc/c4/Singleton;
18: ifnull 27
21: getstatic #2 // Field INSTANCE:Lcom/syyo/juc/c4/Singleton;
24: aload_0
25: monitorexit
26: areturn
27: new #3 // class com/syyo/juc/c4/Singleton // 创建对象
30: dup // 复制引用地址
31: invokespecial #4 // Method "<init>":()V // 调用构造方法
34: putstatic #2 // Field INSTANCE:Lcom/syyo/juc/c4/Singleton; // 赋值给变量
37: getstatic #2 // Field INSTANCE:Lcom/syyo/juc/c4/Singleton;
40: aload_0
41: monitorexit
42: areturn
43: astore_1
44: aload_0
45: monitorexit
46: aload_1
47: athrow
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
没加 volatile, 17 20 24 21 这4步在多线程下,会有线程乱序的问题,如下图
// ---------------------------------------> 读屏障
0: getstatic #2 // Field INSTANCE:Lcom/syyo/juc/c4/Singleton;
3: ifnull 10
6: getstatic #2 // Field INSTANCE:Lcom/syyo/juc/c4/Singleton;
9: areturn
10: ldc #3 // class com/syyo/juc/c4/Singleton
12: dup
13: astore_0
14: monitorenter
15: getstatic #2 // Field INSTANCE:Lcom/syyo/juc/c4/Singleton;
18: ifnull 27
21: getstatic #2 // Field INSTANCE:Lcom/syyo/juc/c4/Singleton;
24: aload_0
25: monitorexit
26: areturn
27: new #3 // class com/syyo/juc/c4/Singleton // 创建对象
30: dup // 复制引用地址
31: invokespecial #4 // Method "<init>":()V // 调用构造方法
34: putstatic #2 // Field INSTANCE:Lcom/syyo/juc/c4/Singleton; // 赋值给变量
// ---------------------------------------> 写屏障
37: getstatic #2 // Field INSTANCE:Lcom/syyo/juc/c4/Singleton;
40: aload_0
41: monitorexit
42: areturn
43: astore_1
44: aload_0
45: monitorexit
46: aload_1
47: athrow
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
加了 volatile, 17 20 24 21 这4步保证不会出现多线程乱序
happens-before
定义了对共享变量可见性的规则
# 六、CAS
compareAndSwap 的意思是比较并交换,使用自旋方式不断尝试去修改。底层是lock cmpxchg指令(x86架构) 在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。
CAS 是基于乐观锁的思想:乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。 synchronized 是基于悲观锁的思想:悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会
实现cas的类:
AtomicInteger
AtomicBoolean
AtomicLong
AtomicReference 引用类型
AtomicStampedReference 引用类型(用整数记录版本号,可以统计版本次数)
AtomicMarkableReference 引用类型(用布尔值记录版本号,不统计版本次数)
AtomicIntegerArray 数组
AtomicLongArray
AtomicReferenceArray
AtomicReferenceFieldUpdater 字段更新器
AtomicIntegerFieldUpdater
AtomicLongFieldUpdate
public class MyUnsafe {
public static void main(String[] args) throws NoSuchFieldException {
// 通过反射拿到 Unsafe 的实例
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Unsafe unsafe = (Unsafe) theUnsafe.get(null);
// 使用 Unsafe拿到 Teacher类的属性偏移量
long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));
long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));
Teacher t = new Teacher();
// 通过cas 命令修改内容,原子性
unsafe.compareAndSwapInt(t,idOffset,0,1);
unsafe.compareAndSwapObject(t,nameOffset,null,"张三");
System.out.println(t);
System.out.println(idOffset);
}
@Data
static class Teacher{
volatile int id;
volatile String name;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
LongAdder:拆分,使用多个累加单元来累加,Thread-0 用Cell[0],Thread-1 用Cell[1],最后结果进行汇总。
@Contended:防止缓存行伪共享,在修饰类的属性a前加了多个空白的占位属性,防止因为多线程的多个a属性会存在同一个内存行
cpu的缓存级别
cpu读取速度
cpu的缓存行
# 七、线程池
@Slf4j
public class T04MyThreadPool {
public static void main(String[] args) {
ThreadPoll pool = new ThreadPoll(1,
1000,
TimeUnit.MILLISECONDS,
1, (queue, task) -> {
// 1.死等
// queue.put(task);
// 2.超时
// queue.putTimeout(task,1500,TimeUnit.MILLISECONDS);
// 3.放弃
// log.debug("放弃任务{}",task);
// 4.抛出异常
// throw new RuntimeException("任务执行失败 " + task);
// 5.让调用者自己执行任务
task.run();
});
for (int i = 0; i < 4; i++) {
int index = i;
pool.execute(() -> {
log.debug("=====" + index);
sleep(1000);
});
}
}
}
// 线程池
@Slf4j
class ThreadPoll{
private BlockingQueue<Runnable> taskQueue; // 任务队列
private HashSet<Runnable> workers = new HashSet<>();// 线程集合
private int coreSize;// 核心线程数
private long timeout;// 获取任务的超时时间
private TimeUnit timeUnit;// 超时的时间单位
private RejectPolicy<Runnable> rejectPolicy;// 拒绝策略
public ThreadPoll(int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.rejectPolicy = rejectPolicy;
}
// 执行任务
public void execute(Runnable task){
synchronized(workers){
// 当任务数小于核心数,就创建新的线程,加入到加入任务队列
if (workers.size() < coreSize){
Worker worker = new Worker(task);
log.debug("execute===新增worker: {},任务对象: {}",worker,task);
workers.add(worker);
worker.start();
}else {
// 当任务数大于核心数,使用拒绝策略
taskQueue.tryPut(rejectPolicy,task);
}
}
}
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行任务
// 当task不为空,执行任务
// 当task执行完毕,再去执行任务队列的任务
while (task != null || (task = taskQueue.poll(timeout,timeUnit)) != null) {// 超时
// while (task != null || (task = taskQueue.take()) != null) {// 死等
try {
log.debug("run===正在执行任务 {}",task);
task.run();
} catch (Exception e) {
e.printStackTrace();
}finally {
task = null;
}
}
// 移除执行完的任务
synchronized(workers){
log.debug("run===移除任务 {}",this);
workers.remove(this);
}
}
}
}
// 阻塞队列
@Slf4j
class BlockingQueue<T>{
// 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 锁
private ReentrantLock lock = new ReentrantLock();
// 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 容量上限
private int capcity;
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
// 阻塞获取
public T take(){
lock.lock();
try{
// 没有任务时,消费者线程就等待
while (queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 获取队列头部元素
T t = queue.removeFirst();
// 唤醒生产者
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
// 阻塞获取,带超时
public T poll(long timeout, TimeUnit unit){
lock.lock();
try{
long nanos = unit.toNanos(timeout);
// 没有任务时,消费者线程就等待
while (queue.isEmpty()){
try {
if (nanos <= 0){
return null;
}
// 返回的是剩余时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 获取队列头部元素
T t = queue.removeFirst();
// 唤醒生产者
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
// 阻塞添加
public void put(T task){
lock.lock();
try{
// 队列满了,生产者线程阻塞
while (queue.size() == capcity){
try {
log.debug("put====等待加入任务队列 {}",task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 添加任务到队列尾部
log.debug("put===加入任务队列 {}",task);
queue.addLast(task);
// 唤醒消费者
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
// 阻塞添加,带超时
public boolean putTimeout(T task,long timeout,TimeUnit timeUnit){
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capcity){
try {
log.debug("putTimeout===等待加入任务队列 {}",task);
if (nanos <= 0){
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("putTimeout===加入任务队列 {}",task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
// 获取大小
public int size(){
lock.lock();
try{
return queue.size();
}finally {
lock.unlock();
}
}
// 拒绝策略
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try{
if (queue.size() == capcity){
// 任务队列已满
log.debug("tryPut====队列已满,拒绝 :{}",task);
rejectPolicy.reject(this ,task);
}else {
// 没满,加入任务队列,唤醒消费者线程
log.debug("tryPut====加入任务队列 {}",task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
// 拒绝策略
@FunctionalInterface
interface RejectPolicy<T>{
void reject(BlockingQueue<T> queue,T task);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
ThreadPoolExecutor 参数
/**
*
* corePoolSize: 核心线程数
* 核心线程:corePoolSize,永不会销毁
* 救济线程:maximumPoolSize - corePoolSize,会销毁,(配置有界队列使用的)
* maximumPoolSize: 最大线程数
* keepAliveTime: 生存时间(救济线程)
* unit: 时间单位 (救济线程)
* workQueue: 阻塞队列(存放任务)
* threadFactory: 线程工厂(给线程取名字)
* handler: 拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作
进行赋值
提交任务方法
// 执行任务
void execute(Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
2
3
4
5
6
7
8
9
10
11
12
io运算:磁盘、网络 cpu运算:计算
线程数 = 核数 * 期望cpu利用率 * 总时间(cpu计算时间+等待时间)/cpu计算时间
4核cpu计算时间是 50%,其他等待时间是 50%,期望cpu被100%利用
4 * 100% * 100%/50% = 8;
4核cpu计算时间是 10%,其他等待时间是 90%,期望cpu被100%利用
4 * 100% * 100%/10% = 40;
# 八、AQS
AbstractQueuedSynchronizer: 是阻塞式锁和相关的同步器工具的框架
用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取
锁和释放锁
getState - 获取 state 状态
setState - 设置 state 状态
compareAndSetState - cas 机制设置 state 状态
独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
提供了基于 FIFO(first input first output) 的等待队列,类似于 Monitor 的 EntryList
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
子类需要实现的方法
tryAcquire:尝试一次获取锁,返回ture表示成功
tryRelease:释放锁,返回ture表示成功
tryAcquireShared
tryReleaseShared
isHeldExclusively
ReentrantLock:原理
加锁解锁流程图
第一个竞争出现时
Thread-1 执行了
- CAS 尝试将 state 由 0 改为 1,结果失败
- 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
- 接下来进入 addWaiter 逻辑,构造 Node 队列
- 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
- Node 的创建是懒惰的
- 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
当前线程进入 acquireQueued 逻辑
- acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
- 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
- shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
- 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
再次有多个线程经历上述过程竞争失败,变成这个样子
Thread-0 释放锁,进入 tryRelease 流程,如果成功
- 设置 exclusiveOwnerThread 为 null
- state = 0
当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程 找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1 回到 Thread-1 的 acquireQueued 流程
如果加锁成功(没有竞争),会设置
- exclusiveOwnerThread 为 Thread-1,state = 1
- head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
- 原本的 head 因为从链表断开,而可被垃圾回收 如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了
如果不巧又被 Thread-4 占了先
- Thread-4 被设置为 exclusiveOwnerThread,state = 1
- Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
加锁解锁源码
// 加锁流程
// 第一步
final void lock() {
if (compareAndSetState(0, 1))
// cas 修改 State 成功,加锁成功 设置 Owner线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 加锁失败
// 第二步
acquire(1);
}
// 第二步
public final void acquire(int arg) {
// !tryAcquire(arg) 加锁失败 走这个流程 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
// 第三步
if (!tryAcquire(arg) &&
// 第四步
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
// 第五步
// 获得到锁,执行打断线程
selfInterrupt();
}
}
// 第三步
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 没有上锁,先去阻塞队列查询,(这里是公平锁)
if (!hasQueuedPredecessors() &&
// 执行cas操作
compareAndSetState(0, acquires)) {
// 设置当前线程为owner线程
setExclusiveOwnerThread(current);
return true;
}
}
// 判断是不是锁重入,如果是锁计数+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 第四步
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 判断当前线程的节点是否是第二位,如果是,再次竞争锁
if (p == head && tryAcquire(arg)) {
// 设置当前线程节点为头节点
setHead(node);
p.next = null; // help GC
failed = false;
// 执行返回打断状态,(这里是不可打断)
return interrupted;
}
// shouldParkAfterFailedAcquire()把前一个节点的状态改成-1,-1的节点有职责唤醒后一个节点
if (shouldParkAfterFailedAcquire(p, node) &&
// parkAndCheckInterrupt() 使用park阻塞线程,返回打断标记
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 第六步,解锁
public final boolean release(int arg) {
// 解锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 判断头节点的状态是不是-1,是的话,唤醒下一个线程
unparkSuccessor(h);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
可重入原理
/**
* 可重入原理总结:
* 当一个线程来获取锁,会判断这个线程和已上锁的线程是不是同一个,
* 如果是同一个就用一个锁计数+1,后面释放锁的时候会把锁计数-1,
* 当锁计数为0的时候再会释放锁,把owner线程设置为空
*/
// 加锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 0:表示没上锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前线程和已经上锁的线程是同一个,锁计数+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 解锁
protected final boolean tryRelease(int releases) {
// 锁计数-1
int c = getState() - releases;
// 如果加锁线程和当前线程不是同一个,抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 锁计数是0,进行解锁
free = true;
setExclusiveOwnerThread(null);
}
// 不是0,锁计数-1
setState(c);
return free;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
公平锁原理
/**
* 公平锁原理总结:
* 当一个线程来获取锁,会判断这个线程是否上锁
* 如果没有上锁,会先去阻塞队列查询有没有线程,没有就进行上锁操作
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 没有上锁,先去阻塞队列查询有没有线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/**
* 非公平锁原理总结:
* 当一个线程来获取锁,会判断这个线程是否上锁
* 如果没有上锁,直接竞争锁
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 没有上锁,直接
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 查询阻塞队列有没有线程
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
// 表示队列中没有老二
((s = h.next) == null ||
// 队列中的老二不是当前线程
s.thread != Thread.currentThread());
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
不可打断原理
/**
* 不可打断原理总结:
* 当一个线程a获取不到锁,会进入队列等待(park),
* 线程b使用interrupt打断线程a,会唤醒线程a,并保存线程的打断标记true,
* 直到线程a获取到锁才会返回true,最后才会执行线程a的打断
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
// 获得到锁才会返回打断标记
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 使用park阻塞
parkAndCheckInterrupt())
// 重置打断标记,保证在获取到锁之前,不会被打断
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 可打断原理总结:
* 当一个线程a获取不到锁,会进入队列等待(park),
* 线程b使用interrupt打断线程a,会唤醒线程a,
* 并保存线程的打断标记true,抛出打断异常
*
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 抛出打断异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
await
// 线程阻塞
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 把当前线程加入到条件变量的双向链表中,把节点状态设置为-2(等待状态)
Node node = addConditionWaiter();
// 把节点对应的所有线程的锁释放掉,唤醒下一个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 当前线程park
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 线程唤醒
public final void signal() {
// 检查当前线程是不是owner线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 条件表里第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// 清理之前最后一节的节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// transferForSignal() 将条件变量里的节点转移到等待队列中,唤醒等待队列
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
ReentrantReadWriteLock 原理
// 加写锁
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// 已加的锁是读锁,和需要加的写锁是互斥的
if (w == 0 ||
// 已加的锁是写锁,判断是当前线程和加锁线是否是同一个
current != getExclusiveOwnerThread())
return false;
// 锁重入超过范围
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 可重入,锁计数+1
setState(c + acquires);
return true;
}
// 没加锁,
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 加锁成功,设置当前线程为owner线程
setExclusiveOwnerThread(current);
return true;
}
// 解写锁
public final boolean release(int arg) {
// 尝试解锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后一个节点
unparkSuccessor(h);
return true;
}
return false;
}
// 第一步
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
// 不是当前线程抛异常
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
// 锁计数为0时,设置owner线程为空,锁状态为0
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 加读锁
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
// 没有获取到锁,执行
doAcquireShared(arg);
}
// 第一步
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 判断是否加过写锁,exclusiveCount(c) 获取低16位的值:1 << 16
if (exclusiveCount(c) != 0 &&
// 已加写锁的线程和owner线程是同一个,就进行锁降级
// 注意:同一个线程先加写锁,再加读锁,可以锁降级
getExclusiveOwnerThread() != current)
// 加锁失败返回-1
return -1;
// sharedCount(c) 获取高16位的值: 1 >>> 16 (三个>表示无符号右移)
int r = sharedCount(c);
// 读锁不阻塞
if (!readerShouldBlock() &&
r < MAX_COUNT &&
// 高位+1 = 1 + 1 << 16
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// 读锁计数
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
// 加锁成功返回 1
return 1;
}
return fullTryAcquireShared(current);
}
// 第二步
private void doAcquireShared(int arg) {
// 加入2个节点,头节点占位用的,第二个节点是包装当前线程的节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 当前节点是不是第二个节点
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取到锁,将当前节点改成头节点,并判断下一个节点是不是共享节点(读锁的节点)
// 如果是 就执行 doReleaseShared(); 唤醒下一个节点的操作
// 这个方法是读读并发的核心
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 没有获取到锁,会park,修改节点的状态为-1
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 解读锁
public final boolean releaseShared(int arg) {
// 尝试释放读锁,
if (tryReleaseShared(arg)) {
// 当锁的计数 = 0时才执行unpark唤醒下个节点
doReleaseShared();
return true;
}
return false;
}
// 第一步
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
// 高位-1 = c - 1 << 16
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
StampedLock :速度快,但是不支持条件变量和可重入 ReentrantReadWriteLock :支持条件变量和可重入
Semaphore:信号量,用来限制能同时访问共享资源的线程上限
public class T03Semaphore {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
// 获取许可
semaphore.acquire();
log.debug("running");
sleep(1000);
log.debug("end");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 释放许可
semaphore.release();
}
}).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
CountdownLatch:倒计时锁,用来进行线程同步协作,等待所有线程完成倒计时。
































