typora-copy-images-to: JUC.assets
线程
创建线程
继承Thread类
Thread01 thread = new Thread01();
thread.start();//启动线程
// 继承Thread
public static class T1 extends Thread{
@Override
public void run(){
System.out.println("当前线程:"+Thread.currentThread().getId());
System.out.println("T1");
}
}
实现Runnable接口
Runable01 runable01 = new Runable01();
new Thread(runable01).start();
// 实现Runnable接口
public static class R1 implements Runnable{
@Override
public void run(){
System.out.println("当前线程:"+Thread.currentThread().getId());
System.out.println("R1");
}
}
实现Callable接口
//实现Callable接口 + FutureTask (可以拿到返回结果,可以处理异常)
FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
new Thread(futureTask).start();
//阻塞等待整个线程执行完成,获取返回结果
Integer integer = futureTask.get();
// 实现Callable接口 (可以拿到返回结果,可以处理异常)
public static class C1 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("当前线程:"+Thread.currentThread().getId());
System.out.println("C1");
return 66;
}
}
线程状态
-
新建状态(New):
- 新创建了一个线程对象,但还没有调用start()方法。实现Runnable接口和继承Thread可以得到一个线程类,new一个实例出来,线程就进入了新建状态
-
就绪状态(Runnable):
- 新建状态的线程,调用线程的start()方法,此线程进入就绪状态
-
运行状态(Running):
- 当线程获得CPU时间后,它才进入运行状态,真正开始执行run()方法
-
阻塞状态(Blocked):
- 线程调用一个在I/O上被阻塞的操作,即该操作在输入输出操作完成之前不会返回到它的调用者;阻塞状态是正在运行的线程没有运行结束,暂时让出CPU,这时其他处于就绪状态的线程就可以获得CPU时间,进入运行状态
-
等待状态/超时等待(Waiting/Timed_Waiting) ,线程进入等待状态有三种方式:
-
cpu调度给优先级更高的线程
-
线程要等待获得资源或者信号
-
时间片的轮转,时间片到了,进入等待状态
-
在可执行状态下,如果调用 sleep()、 wait()等方法,线程都将进入等待状态
-
-
终止状态(Terminated):
- 表示该线程已经执行完毕。当线程的run()方法完成时,或者主线程的main()方法完成时,我们就认为它终止了。这个线程对象也许是活的,但是,它已经不是一个单独执行的线程。线程一旦终止了,就不能复生。
-
sleep() 和 wait() 的区别:
- sleep方法是Thread类里面的,主要的意义就是让当前线程停止执行,让出CPU给其他的线程,但是不会释放对象锁资源以及监控的状态,当指定的时间到了之后又会自动恢复运行状态
- wait方法是Object类里面的,主要的意义就是让线程放弃当前的对象的锁,进入等待此对象的等待锁定池,只有针对此对象调动notify方法后本线程才能够进入对象锁定池准备获取对象锁进入运行状态
线程的生命周期
-
join:将调用方法的线程加入当前线程,说人话,让别的线程插队执行,完成后自己再执行
-
demo:
public static void main(String[] args) {
Thread t = new Thread(() -> System.out.println("test"));
t.start();
System.out.println("main1");
try {
t.join();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("main2");
}
运行结果:
main2一定是最后才执行的
线程分类
- 用户线程:main线程,新创建的线程一般都是用户线程(除非通过构造方法指定)
- 守护线程:等用户线程执行完结束后,它会自动结束
线程池
- 线程池能干什么?
- 线程池自己内部维护的线程可以复用,减少了每次创建和销毁线程的消耗
- 通过控制线程的数量,可以控制并发数
- 对维护的线程统一管理和监控
创建线程池
- Executors工具类中封装了ThreadPoolExecutor的构造方法
- API:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
-
一般使用这三个方法来创建线程池,其它的方法也是对参数的设置
-
new FixedThreadPool:创建固定线程数的线程池
- 若新任务提交时,有空闲线程就立刻执行,没有则把任务放在阻塞队列中,等待线程执行
-
new CachedThreadPool:创建线程数量可调整线程池
- 若线程都在执行,此时又提交了新任务,就会创建新的线程来执行
-
new SingleThreadExecutor:创建一个线程的线程池,参照固定数量的线程池工作流程
-
我们在工作中使用那种方式创建线程池?
这三种哪个都不用!!!
FixedThreadPool和SingleThreadPool,创建任务队列时,队列的长度为Integer.MAX_VALUE,可能堆积大量请求导致 OOM
CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数为 Integer.MAX_VALUE ,可能会创建大量线程导致 OOM
七大参数
- ThreadPoolExecutor构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor 3 个最重要的参数:
● corePoolSize : 相当于“值班”的线程数,核心线程
● maximumPoolSize : 同时工作的最大线程数,包括核心和非核心线程
● workQueue: 暂存新任务的队列
ThreadPoolExecutor其他4个参数:
● keepAliveTime:非核心线程,在这个时间后,如果没有任务提交,就会自动销毁
● unit : keepAliveTime 参数的时间单位
● threadFactory :executor 创建新线程的时候会用到
● handler :拒绝策略,当任务队列满时,对于新任务的处理方式,后面会展开讲
拒绝策略
- AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
- CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不
会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。一般回退到main线程让它来完成 - DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中
尝试再次提交当前任务 - DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常,如果允许任务丢失,这是最好的一种策略
任务提交
- void execute(Runnable command):只是提交任务
- future submit(Runnable task):提交任务后返回future对象,你可以通过get方法获取任务的返回值
Future
 :是否取消
-
cancle : 取消任务
-
get()有参 : 再给定的时间内获得结果,如果没有获取到就报异常
-
get() : 一直阻塞,等待结果
-
isDone () :是否完成任务
-
注意事项:get方法会一直阻塞进程,等待结果
Demo
获得结果案例
FutureTask<String> futureTask = new FutureTask<String>(()->{
System.out.println("线程开始执行");
Thread.sleep(5000);
return "执行结束";
});
Thread t1 = new Thread(futureTask, "t1");
t1.start();
Thread.sleep(550);
System.out.println("主线程执行其他任务");
while (true){
if (futureTask.isDone()){
System.out.println(futureTask.get());
break;
}else {
Thread.sleep(1000);
System.out.println("正在处理中");
}
}
- get方法会一直阻塞进程,所以可以配合isDone来获得结果,但是这样还是有CPU的浪费
异步执行案例
@Test
public void TestAsyn() throws InterruptedException {
long begin = System.currentTimeMillis();
FutureTask<String> futureTask1 = new FutureTask<String>(()->{
System.out.println("线程1开始执行");
Thread.sleep(300);
return "1执行结束";
});
threadPool.submit(futureTask1);
FutureTask<String> futureTask2 = new FutureTask<String>(()->{
System.out.println("线程2开始执行");
Thread.sleep(500);
return "2执行结束";
});
threadPool.submit(futureTask2);
Thread.sleep(400);
long end = System.currentTimeMillis();
System.out.println(end-begin);
threadPool.shutdown();
}
@Test
public void TestSync() throws InterruptedException {
long begin = System.currentTimeMillis();
Thread.sleep(200);
Thread.sleep(200);
Thread.sleep(400);
long end = System.currentTimeMillis();
System.out.println(end-begin);
}
- 配合线程池可以完成异步任务处理,明显比同步执行更节省时间
锁
两种思想
悲观锁
认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改,synchronized关键字和Lock的实现类都是悲观锁,适应写操作频繁的场景
乐观锁
认为自己在使用数据时不会有别的线程修改数据或资源,所以不会添加锁。 在Java中是通过使用无锁编程来实现,只是在更新数据的时候去判断,之前有没有别的线程更新了这个数据
如果这个数据没有被更新,当前线程将自己修改的数据成功写入。 如果这个数据已经被其它线程更新,则根据不同的实现执行不同的操作,比如放弃修改、重试抢锁等等
synchronized
对象锁
作用于普通方法,只和当前实例有关,同一个实例 访问方法会阻塞等待,不同实例则不会阻塞
类锁
作用于静态方法,我们知道静态方法只和类有关,所以不管该类有多个实例来访问这个方法,都会被阻塞
代码块锁
作用于代码块,任意一个对象都可以作为锁放入
底层实现原理
对两个方法来说,通过反编译字节码文件可知,方法上会有这两个标记ACC_STATIC,ACC_SYNCHRONIZED这两个会告诉JVM该方法是否为静态同步方法
对代码块来说,使用的是monitorenter和monitorexit指令,来保证持有锁,释放锁,会而外多加一个monitorexit,为了出现异常时也能释放锁
ReentrantLock
ReentrantReadWriteLock
为了优化使用Synchronized或ReentrantLock独占锁时的并发性能
独占锁时只有一个线程来读或写,现在可以多个线程来读了
-
适用在读多写少的情况下,提高并发性能
-
多个线程可以并发读读,但是不能并发读写或写写
-
demo:
class MyResource //资源类,模拟一个简单的缓存
{
Map<String,String> map = new HashMap<>();
//=====ReentrantLock 等价于 =====synchronized,之前讲解过
Lock lock = new ReentrantLock();
//=====ReentrantReadWriteLock 一体两面,读写互斥,读读共享
ReadWriteLock rwLock = new ReentrantReadWriteLock();
public void write(String key ,String value)
{
rwLock.writeLock().lock();
try
{
System.out.println(Thread.currentThread().getName()+"\t"+"正在写入");
map.put(key,value);
//暂停毫秒
try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t"+"完成写入");
}finally {
rwLock.writeLock().unlock();
}
}
public void read(String key)
{
rwLock.readLock().lock();
try
{
System.out.println(Thread.currentThread().getName()+"\t"+"正在读取");
String result = map.get(key);
//暂停200毫秒
try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t"+"完成读取"+"\t"+result);
}finally {
rwLock.readLock().unlock();
}
}
}
/**
* @auther zzyy
* @create 2022-04-08 18:18
*/
public class ReentrantReadWriteLockDemo
{
public static void main(String[] args)
{
MyResource myResource = new MyResource();
for (int i = 1; i <=10; i++) {
int finalI = i;
new Thread(() -> {
myResource.write(finalI +"", finalI +"");
},String.valueOf(i)).start();
}
for (int i = 1; i <=10; i++) {
int finalI = i;
new Thread(() -> {
myResource.read(finalI +"");
},String.valueOf(i)).start();
}
}
}
- 运行结果:可以看出写的时候只有一个线程在写,而读的时候可以多个线程并发读
- 读写锁还存在锁饥饿问题,多个线程读,少数线程写,可能会造成写线程长时间获取不到锁而饥饿等待
- 锁降级机制:获取写锁后,再获取读锁,然后释放写锁,再释放读锁,前三部完成后,该线程持有的写锁就变成了读锁,利用这种机制可以在高并发情况下写缓存后,立刻返回这次的数据,而不是再和其他线程争夺读锁再返回缓存数据了
StampedLock
邮戳锁新增了一种乐观读锁,解决了上述的饥饿问题
- 获取任何类型的锁时都会返回一个long类型的Stamp(返回0表示获取失败),释放锁时要传入获取锁时的Stamp
- StampedLock不可重入,不支持条件变量Condition,锁代码中不要调用interrupt方法
- StampedLock的读写锁和之前的ReentrantLock一样,这里就不再演示,只演示乐观读
- 获取乐观读后,允许其他的写线程并发,而不是阻塞等待
- 乐观读其实不是真正的读锁,在获取乐观锁后,需要对代码中调用validate(long stamp),true表示读过程中没有写线程来修改数据,false表示有写线程修改数据,此时就需手动将乐观读升级为悲观读(正常的读锁)
- demo:乐观读时有写线程修改
public class StampedLockDemo
{
static int number = 37;
static StampedLock stampedLock = new StampedLock();
public void write()
{
long stamp = stampedLock.writeLock();
System.out.println(Thread.currentThread().getName()+"\t"+"写线程准备修改");
try
{
number = number + 13;
}finally {
stampedLock.unlockWrite(stamp);
}
System.out.println(Thread.currentThread().getName()+"\t"+"写线程结束修改");
}
//乐观读,读的过程中也允许获取写锁介入
public void tryOptimisticRead()
{
long stamp = stampedLock.tryOptimisticRead();
int result = number;
//故意间隔4秒钟,很乐观认为读取中没有其它线程修改过number值,具体靠判断
System.out.println("4秒前stampedLock.validate方法值(true无修改,false有修改)"+"\t"+stampedLock.validate(stamp));
for (int i = 0; i < 4; i++) {
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t"+"正在读取... "+i+" 秒" +
"后stampedLock.validate方法值(true无修改,false有修改)"+"\t"+stampedLock.validate(stamp));
}
if(!stampedLock.validate(stamp))
{
System.out.println("有人修改过------有写操作");
stamp = stampedLock.readLock();//从乐观读 升级为 悲观读
try
{
System.out.println("从乐观读 升级为 悲观读");
result = number;
System.out.println("重新悲观读后result:"+result);
}finally {
stampedLock.unlockRead(stamp);
}
}
System.out.println(Thread.currentThread().getName()+"\t"+" finally value: "+result);
}
public static void main(String[] args)
{
StampedLockDemo resource = new StampedLockDemo();
new Thread(() -> {
resource.tryOptimisticRead();
},"readThread").start();
//暂停2秒钟线程,读过程可以写介入,演示
try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"\t"+"----come in");
resource.write();
},"writeThread").start();
}
}
- 运行结果:
4秒前stampedLock.validate方法值(true无修改,false有修改) true
readThread 正在读取... 0 秒后stampedLock.validate方法值(true无修改,false有修改) true
writeThread ----come in
writeThread 写线程准备修改
writeThread 写线程结束修改
readThread 正在读取... 1 秒后stampedLock.validate方法值(true无修改,false有修改) false
readThread 正在读取... 2 秒后stampedLock.validate方法值(true无修改,false有修改) false
readThread 正在读取... 3 秒后stampedLock.validate方法值(true无修改,false有修改) false
有人修改过------有写操作
从乐观读 升级为 悲观读
重新悲观读后result:50
readThread finally value: 50
辅助类
CountDownLatch
保证任务线程都执行完才执行主线程的任务
- 指定计数器
new CountDownLatch(int count)
- 当一个或多个线程调用await方法时,这些线程会阻塞,一般主线程调用,等待任务线程完成
- 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞)
- 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行
- demo:等所有同学离开教室,班长才能关门
public class CountDownLatchDemo
{
public static void main(String[] args) throws InterruptedException
{
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <=6; i++) //6个上自习的同学,各自离开教室的时间不一致
{
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"\t 号同学离开教室");
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"\t****** 班长关门走人,main线程是班长");
}
}
运行结果:
某某某离开教室
某某某离开教室
某某某离开教室
某某某离开教室
某某某离开教室
某某某离开教室
班长关门
- 源码
1.CountDownLatch构造方法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
//初始化AQS的state
this.sync = new Sync(count);
}
2.await方法调用AQS的acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted() ||
//成立,就把阻塞的线程封装node节点
(tryAcquireShared(arg) < 0 &&
//获取不到state的线程封装并入队,挂起···,详细参见ReentrantLock获取锁流程
acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
}
3.CountDownLatch的tryAcquireShared
protected int tryAcquireShared(int acquires) {
//一般情况下都返回-1
return (getState() == 0) ? 1 : -1;
}
4.countDown方法调用AQS的releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
signalNext(head);
return true;
}
return false;
}
5.CountDownLatch的tryReleaseShared
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
//每次state减一,大多数返回false,只有当state为一,nextc为0.唤醒入队等待的线程
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
CyclicBarrier
使不同线程都执行到一个同步点
- 指定屏障数,也可以指定在到达同步点后优先执行的任务
new CyclicBarrier(int parties)
new CyclicBarrier(int parties, Runnable barrierAction)
- 线程进入屏障通过CyclicBarrier的await()方法,会使屏障数+1,并且使自身阻塞
- 当指定个数线程都到达屏障时,被阻塞的线程才会执行之后的代码
- demo:集齐七颗龙珠才能召唤神龙
public class CyclicBarrierDemo
{
private static final int NUMBER = 7;
public static void main(String[] args)
{
//CyclicBarrier(int parties, Runnable barrierAction)
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, ()->{System.out.println("*****集齐7颗龙珠就可以召唤神龙");}) ;
for (int i = 1; i <= 7; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+"\t 星龙珠被收集 ");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
运行结果:
龙珠被收集
龙珠被收集
龙珠被收集
龙珠被收集
龙珠被收集
龙珠被收集
龙珠被收集
召唤神龙
- CountDownLatch和CyclicBarrier都是线程同步的工具类,可以发现这两者的等待主体是不一样的
- CountDownLatch调用await()通常是主线程/调用线程
- CyclicBarrier调用await()是在任务线程调用的,所以,CyclicBarrier中的阻塞的是任务的线程,而主线程是不受影响的
Semaphore
- acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1)
-
要么一直等下去,直到有线程释放信号量,或超时
- release(释放)实际上会将信号量的值加1,然后唤醒等待的线程
- 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制
- 信号量为1时,相当于synchronized
锁机制
公平锁和非公平锁
在获取锁时,可以通过构造方法指定
公平锁:是指多个线程按照申请锁的顺序来获取锁
非公平锁:是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁,在高并 发环境下,有可能造成优先级翻转或者饥饿的状态
默认情况下,获取的锁是非公平锁,因为回避CPU时间的浪费,多个线程来回执行会有上下文切换,开销大
隐式锁和显式锁
隐式锁:不用显式的lock和unlock
显式锁:代码中写明lock和unlock
可重入锁
是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞
可以在锁的方法内部再次获取锁
ReentrantLock 和 synchronized都是可重入锁
死锁
死锁是指两个或两个以上的线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉那它们都将无法推进下去,如果系统资源充足,进程的资源请求都能够得到满足,死锁出现的可能性就很低,否则就会因争夺有限的资源而陷入死锁
//死锁代码
public class JucTest {
public static void main(String[] args) {
final Object objectA = new Object();
final Object objectB = new Object();
new Thread(() -> {
synchronized (objectA) {
System.out.println(Thread.currentThread().getName() + "\t自己持有A锁,希望获得B锁");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (objectB) {
System.out.println(Thread.currentThread().getName() + "\t成功获得B锁");
}
}
}, "A").start();
new Thread(() -> {
synchronized (objectB) {
System.out.println(Thread.currentThread().getName() + "\t自己持有B锁,希望获得A锁");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (objectA) {
System.out.println(Thread.currentThread().getName() + "\t成功获得A锁");
}
}
}, "B").start();
}
}
排查死锁:
jps -l , jstack 进程编号
jconsole图形化界面
并发容器
JDK原生
- 原生的Hashtable,Vector
- 使用Collections修饰的线程安全集合
JUC提供
- JUC并发包下的,Blocking类型,CopyOnWrite类型,Concurrent类型
- 并发容器
- 并发容器中使用volatile修饰属性和CAS操作修改属性
- 具体源码就不分析了
中断协商机制
在java中如何停止一条线程的执行?
java中提供了一种中断机制,注意这只是一种协商机制,好比两个人之间商量
如何协商?
每个线程对象中都有一个属性,来表示别人当前线程是否和别的线程协商过
希望你中断,则设置为true,默认式false
API
-
interrupt:将线程的(中断状态)interrupted属性修改为true
- 如果线程2调用时,线程1处于被阻塞状态,如sleep,wait,join时,线程1会立即退出阻塞状态,并且抛出interruptedException,还会把interrupted属性重置为false
- 如果线程1此时已经结束了,此时中断状态就不会设置为ture,变为false
-
interrupted:返回中断状态true还是false
-
static interrupted:返回中断状态true还是false,并且重置为false
Demo
volatile中断线程
AtomicBoolean中断线程
interrupt和isInterrupted
public class InterruptDemo {
//案例一:volatile
static volatile boolean isStop = false;
//案例二:atomicBoolean
static AtomicBoolean atomicBoolean = new AtomicBoolean(false);
//案例三:isInterrupted
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()) { //还可以有isStop或者atomicBoolean.get()作为条件
System.out.println(Thread.currentThread().getName() + "\t isInterrupted()被修改为true,程序停止");
break;
}
System.out.println("t1 -----hello interrupt api");
}
}, "t1");
t1.start();
System.out.println("-----t1的默认中断标志位:" + t1.isInterrupted());
//暂停毫秒
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
//t2向t1发出协商,将t1的中断标志位设为true希望t1停下来
new Thread(() -> {
t1.interrupt();
//isStop=true
//atomicBoolean.set(true);
}, "t2").start();
}
}
这里为什么要用volatile来修饰变量和new一个AtomicBoolean呢?
往后看······
LockSupport
synchronized唤醒和等待
object.wait() : 阻塞线程,释放锁
object.notify() : 唤醒一个阻塞中的线程,并不是立刻唤醒,而是通知它可以去竞争锁了
object.notifyAll() : 唤醒所有阻塞中的线程
注意这些方法必须在锁中使用,而且必须先wait后notify,前者会报异常,后者会报异常
Lock唤醒和等待
Condition condition = lock.newCondition();
condition.await() : 阻塞线程
condition.signal() : 唤醒线程
LockSupport唤醒和等待
- static void park : 阻塞方法,检查当前线程自己是否有许可证,没有就阻塞
- static void unpark : 唤醒方法,给当前指定线程一张许可证,最多有一个,也即是说,无论调多少次方法,只会有一个许可证
- permit :默认是0,调用unpark会+1,但最多是1
解决了上述的两大痛点:无须在锁中的代码执行,也无须按照顺序(你也可以先给一张通行证)
public class LockSupportDemo
{
public static void main(String[] args)
{
Object objectLock = new Object();
/*Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();**/
new Thread(() -> {
synchronized (objectLock){
//lock.lock();
// LockSupport.park();
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t ----come in");
try {
objectLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
//lock.unlock();
}
System.out.println(Thread.currentThread().getName()+"\t ----被唤醒");
}
},"t1").start();
//暂停几秒钟线程
//try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
new Thread(() -> {
synchronized (objectLock){
objectLock.notify();
System.out.println(Thread.currentThread().getName()+"\t ----发出通知");
}
},"t2").start();
/*
new Thread(() -> {
lock.lock();
try
{
condition.signal();
System.out.println(Thread.currentThread().getName()+"\t ----发出通知");
}finally {
lock.unlock();
}
},"t2").start();
**/
/*
new Thread(() -> {
LockSupport.unpark(t1);
System.out.println(Thread.currentThread().getName() + "\t ----发出通知");
}, "t2").start();
**/
}
}
JMM
三大特性
可见性:是指当一个线程修改了某一个共享变量的值,其他线程是否能够立即知道该变更,JMM规定了所有的变量都存储在主内存中
原子性:指一个操作是不可打断的,即多线程环境下,操作不能被其他线程干扰
有序性:JVM线程内部要维持顺序化语义,即程序最终的执行结果 和顺序化执行的结果相等,那么指令的执行顺序可能和代码的顺序不一致,此时就发生了指令的重排序
多线程读写
多线程读写操作都是讲主存中的变量拷贝一份放到自己的(工作内存)私有域中,线程与线程之间无法互相访问私有域,所以线程之间通信(写回变量)要经过主存
happens-before
-
总原则:如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前; 两个操作之间存在happens-before关系,并不意味着一定要按照happens-before原则制定的顺序来执行。 如果重排序之后的执行结果与按照happens-before关系来执行的结果一致,那么这种重排序并不非法
-
8种细分原则
-
次序规则
前一个操作的结果可以被后续的操作获取。讲白点就是前面一个操作把变量X赋值为1,那后面一个操作肯定能知道X已经变成了1。
-
锁定规则
一个unLock操作先行发生于后面 (这里的“后面”是指时间上的先后) 对同一个锁的Iock操作。
-
volatile变量规则
对一个volatile变量的写操作先行发生于后面对这个变量的读操作,前面的写对后面的读是可见的,这里的”后面"同样是指时间上的先后。
-
传递规则
如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C;
-
线程启动规则(Thread Start Rule)
Thread对象的start()方法先行发生于此线程的每一个动作
-
线程中断规则(Thread Interruption Rule)
对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生 可以通过Thread.interrupted()检测到是否发生中断 也就是说你要先调用interrupt()方法设置过中断标志位,我才能检测到中断发送。
-
线程终止规则(Thread Termination Rule)
线程中的所有操作都先行发生于对此线程的终止检测,我们可以通过isAlive()等手段检测线程是否已经终止执行。
-
对象终结规则(Finalizer Rule)
一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始
-
volatile
可见性有序性
volatile满足了JMM的可见性,有序性,没有满足原子性
-
可见性:当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值立即刷新回主内存中,并及时发出通知,大家可以去主内存拿最新版,前面的修改对后面所有线程可见。 当读一个volatile变量时,JMM会把该线程对应的本地内存设置为无效,重新回到主内存中读取最新共享变量所以volatile的写内存语义是直接刷新到主内存中,读的内存语义是直接从主内存中读取
-
有序性:禁止将有些指令(需要按顺序执行的指令)重排
如何保证可见性和有序性?
通过内存屏障
- 内存屏障:内存屏障其实是一种JVM指令,Java内存模型的重排规则会要求Java编译器在生成JVM指令时插入特定的内存屏障指令,通过这些内存屏障指令,volatile实现了Java内存模型中的可见性和有序性
内存屏障
读屏障(Load Memory Barrier):在读指令之前插入读屏障,让工作内存或CPU高速缓存当中的缓存数据失效,重新回到主内存中获取最新数据
写屏障( Store Memory Barrier) :在写指令之后插入写屏障,强制把写缓冲区的数据刷回到主内存中
不管加的什么屏障,屏障之前的所有写操作都要回写到主内存;屏障之后的所有读操作都能获得所有写操作的最新结果(实现了可见性)
粗分的两种屏障就保证了可见性
四大屏障
屏障类型 | 指令示例 | 说明 |
---|---|---|
LoadLoad | Load1; LoadLoad; Load2 | 保证load1的读取操作在load2及后续读取操作之前执行 |
StoreStore | Store1; StoreStore; Store2 | 在store2及其后的写操作执行前, 保证store1的写操作已刷新到主内存 |
LoadStore | Load1; LoadStore; Store2 | 在stroe2及其后的写操作执行前, 保证load1的读操作已读取结束 |
StoreLoad | Store1; StoreLoad; Load2 | 保证store1的写操作已刷新到主内存之 后,load2及 其后的读操作才能执行 |
细分的四大屏障就实现了有序性
无原子性
对于volatile变量具备可见性,JVM只是保证从主内存加载到线程工作内存的值是最新的,也仅是数据加载时是最新的,但是加载之后的操作如果不是原子操作,那么在多线程的环境下,有线程比你快提交了数据到主存中,根据可见性,你需要重新加载最新值所以你的这次操作就不生效了,这样就发生了写丢失
所以在多线程场景下,所以我们需要对共享变量加锁来保证
适用场景
单一赋值,I++操作底层执行了三个指令
布尔类型作为标志,来执行后续逻辑
读多写少的场景,读用volatile修饰变量,写用同步锁实现原子性
CAS
思想理念
- Compare And Swap:比较并交换,包含三个操作数,内存位置的值(现在的值),预期值(相当于旧的值),新值,如果内存位置的值和预期值相等,就将新值赋给现值,不相等就会自旋
CAS这种理念规范,最终落实到了CPU指令这里,,也就是说会调用操作系统的cmpxchg指令(原子性指令)来完成,涉及到汇编指令就不再继续深入了、比起使用锁,更节省了CPU的资源
- 自旋:重新这次操作
CAS底层是怎么实现的?
UnSafe类
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
public final native boolean compareAndSetInt(Object o, long offset,int expected,int x);
do-while循环是自旋的体现
compareAndSetInt方法是Native方法,这里就会去掉操作系统的函数或者是第三方C语言函数库的函数,也就是前面讲的最后会执行一个CPU的原语指令cmpxchg
数据库乐观锁
如何更好的理解?结合数据库的乐观锁
数据库的乐观锁就是CAS的具体实现
试想这样一种场景:有一个秒杀业务(高并发的环境下),多个请求同时对数据库中的商品库存更新,一般情况下业务代码是这样的:
商品库存 = queryByID
flag = update
这两个操作不具备原子性,线程1有可能执行完第一步以后,线程2已经执行完这两步,恰巧库存为0了,此时该线程1执行了,它直接对库存字段修改,此时就会发生超卖
乐观锁其实是一种代码上的实现,不像悲观锁一样是真正的锁
sql:update tb_shops set stock=stock-1 where stock = #old
带上where条件后,相当于CAS中做了一次比较,如果相等,是不是这条语句就执行成功返回ture,也就完成了交换(更新)
自旋锁
//实现自旋锁
public class JucTest {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void lock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t" + "come in");
while (!atomicReference.compareAndSet(null, thread)) {
}
}
public void unLock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName() + "\t" + "task over, unLock");
}
public static void main(String[] args) {
JucTest jucTest = new JucTest();
new Thread(() -> {
jucTest.lock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
jucTest.unLock();
}, "A").start();
//保证A先于B
try {
TimeUnit.MICROSECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
jucTest.lock();
jucTest.unLock();
}, "B").start();
}
}
利用CAS自旋的特性,我们可以自定义一个自旋锁
简单说一下,加锁代码中更新值就相当于获取锁,获取不成功就会再次尝试,但是这样对CPU的消耗过大
原子类
原子类也是CAS思想的体现
被volatile修饰的变量保证可见性和有序性,但是没有实现原子性,所以出现了原子类,用原子类类型声明一个变量
基本类型
AtomicInteger
AtomicLong
AtomicBoolean
AtomicReference:可以原子更新对象的引用
数组类型
数组类型的使用只不过加了索引,大多数API还是一样的
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
引用类型
AtomicReference:申明时指定泛型就可以对自己写的资源类进行前面一样操作
AtomicStampedReference:相比于前面,增加了一个Int类型的版本号属性,可以结合数据库悲观锁理解
AtomicMarkableReference:相比于前面,增加了一个布尔类型的标志属性,
对象的属性类型
AtomicIntegerFieldUpdater:基于反射,对指定的类的volatile int 字段更新
AtomicLongFieldUpdater:基于反射,对指定的类的volatile long字段更新
AtomicReferenceFieldUpdater:基于反射,对指定的类的volatile 引用字段字段更新
增强类型
LongAdder:比AtomicLong占用更多的内存空间,但具有更好的性能,但不保证数据的精度
DoubleAdder
LongAccumulator:LongAdder只能算加法,这个更强大,可以去实现函数式接口
DoubleAccumulator
- LongAdder为什么比AtomicLong更快?
LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。 sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong个value的更新压力分散到多个value中去,从而降级更新热点(sum不能保证精度,只能保证最终一致性)
源码有点复杂知道大概该过程就行了······
ThreadLocal
每一个Thread对象都维护一个ThreadLocal.ThreadLocalMap类型的字段,它是ThreadLocal类中的一个静态内部类
API
-
get:获取线程局部变量的值
-
set:将变量存入
-
withIitial:初始化变量值为null
-
remove:清除
-
getMap:获取当前线程的ThreadLocalMap
ThreadLocalMap
使用ThreadLocal的API最后会调用ThreadLocalMap的API
ThreadLocal
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
//
ThreadLocalMap
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
Entry
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
每个Entry里都维护了ThreadLocal<?> key, Object value,ThreadLocal对象和线程局部变量
但这个Map不是真正的Map,只是通过代码维护的线程Map
对象内存布局
Object obj=new Object();
//new Object()实例 放在堆空间
//Object 模板放在方法区
//obj 对象引用放在栈中
对象头
-
对象头由两部分组成:共占16个字节,各占8个字节
- MarkWord :Mark Word 用于存储对象自身的运行时数据,如 HashCode、GC分代年龄、锁状态标志、线程持有的锁、偏向线程ID等等
- 类型指针:虚拟机通过这个指针确定该对象是哪个类的实例,指向方法区的类模板
-
MarkWord存储结构
- 存储 (哈希值(HashCode )、GC分代年龄、锁状态标志、线程持有的锁、偏向线程ID即JavaThread、偏向时间戳epoch)等信息
实例数据
- 对象真正存储的有效信息,包括程序代码中定义的各种类型的字段(包括从父类继承下来的和本身拥有的字段)
对齐填充
- 虚拟机要求对象起始地址必须是8字节的整数倍,假如一个对象应该是15字节,但是虚拟机会自动把1字节补上,变成16字节;填充数据不是必须存在的,仅仅是为了字节对齐
Demo
从代码级别验证对象内存布局到底是什么样
- 引入JOL依赖
<!--
JAVA object layout
官网:http://openjdk.java.net/projects/code-tools/jol/
定位:分析对象在JVM的大小和分布
-->
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.9</version>
</dependency>
- demo
public class JOLDemo{
public static void main(String[] args){
Object o = new Object();
System.out.println( ClassLayout.parseInstance(o).toPrintable());
}
}
- 运行结果:
- MarkWord占了8个字节,指针占了4个字节?
JVM默认开启了类型指针压缩!,所以被压缩到4个字节,自己可以手动关闭,这里就不做演示
这里Object没有定义字段,所以实例数据就为0字节
因为保证对象的大小为8字节的整数倍,所以对齐填充就补了4个字节,Object实例就是16个字节了
Synchronized锁升级
无锁
当前对象没有被作为锁对象放到Synchronized同步代码块中
- demo
@Test
public void testNoLock() {
Object o = new Object();
System.out.println("10进制hash码:"+o.hashCode());
System.out.println("16进制hash码:"+Integer.toHexString(o.hashCode()));
System.out.println("2进制hash码:"+Integer.toBinaryString(o.hashCode()));
System.out.println( ClassLayout.parseInstance(o).toPrintable());
}
- 运行结果:
- hashcode值和对象头中的对应,而且锁标志位为01也能对应
- 无锁情况下对象头如上图所示
偏向锁
-
程序运行时,第一个进入同步代码块的线程,会把锁对象的对象头中的前54位修改为自己的线程ID,并且修改偏向锁标志位和锁标志位,此时这个锁对象就变成了偏向了次线程
-
如果没有其他线程来抢锁,这个线程就不会释放锁,下次进入同步代码块方法是也不需要获取锁,而是去锁对象中的对象头找是否有自己的线程ID,这样就提高了性能
-
假如,此时有第二个线程运行,发生了竞争,持有偏向锁的线程才会释放锁,竞争的线程会使用CAS来修改锁对象中的线程ID
-
如果竞争成功,就把锁对象中的ID修改成自己的线程ID
-
竞争失败,锁对象会升级为轻量级锁
-
demo:模拟多个窗口售票
class Ticket //资源类,模拟3个售票员卖完50张票
{
private int number = 50;
Object lockObject = new Object();
public void sale()
{
synchronized (lockObject) {
if(number > 0)
{
System.out.println(Thread.currentThread().getName()+"卖出第:\t"+(number--)+"\t 还剩下:"+number);
}
}
}
}
@Test
public void testBiasLock() {
Ticket ticket = new Ticket();
new Thread(() -> { for (int i = 0; i <55; i++) ticket.sale(); },"a").start();
new Thread(() -> { for (int i = 0; i <55; i++) ticket.sale(); },"b").start();
new Thread(() -> { for (int i = 0; i <55; i++) ticket.sale(); },"c").start();
}
-
运行结果:多数行为都是某一个线程完成的
-
注意JDK15后,偏向锁默认是关闭的,此处测试须手动开启偏向锁(直接百度)
-
偏向锁撤销有两种情况:撤销需要等待全局安全点,没有字节码在执行
- 撤销时,如果持有偏向锁的线程还在执行代码块的方法,锁就会升级为轻量级锁,锁还是由之前持有偏向锁的线程持有,竞争的线程会自旋的方式获取轻量锁
- 如果碰巧方法执行完,该线程就会把对象锁的对象头设置为无锁,可以重新偏向了,轮到竞争的线程上位了
轻量锁
当偏向锁关闭或者上述线程竞争锁偏向锁失败的情况下,锁升级
轻量级锁是针对于两个线程来说的
-
加锁操作:
- 线程获得轻量锁成功时,会把锁的MarkWord复制到自己的Displaced Mark Word(每个线程的栈帧中存储锁记录的空间),并将锁对象的对象头的前62位设置一个指针,指向自己Displaced Mark Word
- 失败时,就自旋获取锁
- 注意,和偏向锁不同的是,轻量锁每次执行完同步代码块会释放锁
-
释放锁操作:当前线程会使用CAS操作将Displaced Mark Word的内容复制回锁的Mark Word里面
- 如果没有发生竞争,那么这个复制的操作会成功
- 如果有其他线程因为自旋多次导致轻量级锁升级成了重量级锁那么CAS操作会失败,此时会释放锁并唤醒被阻塞的线程
-
自旋次数:一般情况下都是两个线程交替执行,但是有可能一个线程自旋次数太多,java6之前是10次,java6之后变成自适应自旋次数,也就是说次数是不固定的(如果线程上次自旋成功,下次自旋的次数上限就会增加,相反······),达到上限就会升级为重量级锁
重量级锁
Demo
-
当一个对象已经计算过一致性哈希码后,它就再也无法进入偏向锁状态了,直接升级为轻量级锁
-
而当一个对象当前正处于偏向锁状态,又收到需要 计算其一致性哈希码请求时,它的偏向状态会被立即撤销,并且锁会膨胀为重量级锁
-
demo:
@Test
public void testLockHash() {
//先睡眠5秒,保证开启偏向锁
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
Object o = new Object();
System.out.println("本应是偏向锁");
System.out.println(ClassLayout.parseInstance(o).toPrintable());
o.hashCode();
synchronized (o){
System.out.println("本应是偏向锁,但是由于计算过一致性哈希,会直接升级为轻量级锁");
System.out.println(ClassLayout.parseInstance(o).toPrintable());
}
}
@Test
public void testLockHash() {
//先睡眠5秒,保证开启偏向锁
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
Object o = new Object();
synchronized (o){
o.hashCode();//没有重写,一致性哈希,重写后无效
System.out.println("偏向锁过程中遇到一致性哈希计算请求,立马撤销偏向模式,膨胀为重量级锁");
System.out.println(ClassLayout.parseInstance(o).toPrintable());
}
}
- 运行结果:通过锁标志位可得出直接由无锁变为轻量锁
- 运行结果:此时偏向锁直接变为了重量锁
HashCode
-
锁对象变为轻量锁时,原本对象头的MarkWord被保存在当前线程的栈帧中的LockRecord
-
锁对象变为重量锁时,原本对象头的MarkWord被保存在当前锁对象的Monitor类中的字段
AQS
- 已ReentrantLock为例的架构
AbstractQueuedSynchronizer
- 维护一个内部类
- Node中维护前后节点,当前线程,线程的状态
- 维护三个属性
- 前后节点和锁的状态位
- state表示当前这把锁的重入次数,0表示锁还没被获取
- Node类就可以实现双向队列,而每一个Node节点中维护的线程,就是排队等待获取锁的线程
ReentrantLock获取锁
- 源码
ReentrantLock类的lock方法
@ReservedStackAccess
final void lock() {
if (!initialTryLock())
acquire(1);
}
1.NonfairSync的initialTryLock:
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) { // first attempt is unguarded
//直接抢锁,设置锁的线程拥有者为当前线程
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) {
//锁是当前线程拥有的,重入次数+1
int c = getState() + 1;
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false;
}
2.FairSync的initialTryLock:
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//当前队列是否有线程排队,没有线程排队就尝试一次
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
3.AQS的acquire,FairSync或...的tryAcquire
public final void acquire(int arg) {
if (!tryAcquire(arg))
//第三次尝试
acquire(null, arg, false, false, false, 0L);
}
//第二次尝试
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
4.两者不同的区别在于非公平锁会直接抢锁,而不是排队,但最终都调用AQS的acquire
AQS的accquire方法
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if (first || pred == null) {
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
1.第一次for,再一次尝试获取锁,获取失败就创建一个空的Node节点,此时还没赋值
2.第二次for,再一次尝试获取锁,获取失败,对当前创建好的节点的属性赋值,并初始化双向队列,创建一个哨兵节点,head和tail指向它
3.第三次for,再一次尝试获取锁,获取失败,每个线程对应的node的prev指向tail(也就是指向了哨兵节点),接着自旋将tail指向一个node节点,并将哨兵节点的next指向它,多线程下只有一个成功,执行完也就是入队成功了,失败的线程对应的节点的prev又被置为null
4.第四次for,对于已经入队的节点:再一次尝试获取锁,成功则使自己变为哨兵节点,head指向它,失败则将状态设置为WAITING;对于未入队的node:执行像第三次for的操作(入队操作),成功就入队,失败再尝试入队
5.第五次for,对于已经入队的第一个节点,再一次尝试获取锁,成功则是自己变为哨兵节点,失败调用LockSupport.park()阻塞自己,第二个节点,不尝试获取锁,而是将自己的状态设置为WAITING,,下一次也就执行阻塞,为入队的节点继续入队
6.线程中断情况,
7.cancelAcquire,某种情况下会取消获取
ReentrantLock释放锁
- 源码
AQS的release
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
1.Sync的tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
//重置锁的持有线程weinull,修改AQS的状态位
if (free)
setExclusiveOwnerThread(null);
setState(c);
return free;
}
2.Sync的signalNext
private static void signalNext(Node h) {
Node s;
//拿到头结点,找到第一个排队的节点,设置状态位,并唤醒
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING);
LockSupport.unpark(s.waiter);
}
}