Administrator
Published on 2024-06-08 / 38 Visits
0
0

JUC


typora-copy-images-to: JUC.assets

线程

创建线程

继承Thread类

    Thread01 thread = new Thread01();
    thread.start();//启动线程
    // 继承Thread
    public static class T1 extends Thread{
        @Override
        public void run(){
            System.out.println("当前线程:"+Thread.currentThread().getId());
            System.out.println("T1");
        }

    }

实现Runnable接口

Runable01 runable01 = new Runable01();
    new Thread(runable01).start();
    // 实现Runnable接口
    public static class R1 implements Runnable{
        @Override
        public void run(){
            System.out.println("当前线程:"+Thread.currentThread().getId());
            System.out.println("R1");
        }

    }

实现Callable接口

    //实现Callable接口 + FutureTask (可以拿到返回结果,可以处理异常)
    FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
    new Thread(futureTask).start();
    //阻塞等待整个线程执行完成,获取返回结果
    Integer integer = futureTask.get();    


    // 实现Callable接口 (可以拿到返回结果,可以处理异常)
    public static class C1 implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            System.out.println("当前线程:"+Thread.currentThread().getId());
            System.out.println("C1");
            return 66;
        }
    }

线程状态

  • 新建状态(New):

    • 新创建了一个线程对象,但还没有调用start()方法。实现Runnable接口和继承Thread可以得到一个线程类,new一个实例出来,线程就进入了新建状态
  • 就绪状态(Runnable):

    • 新建状态的线程,调用线程的start()方法,此线程进入就绪状态
  • 运行状态(Running):

    • 当线程获得CPU时间后,它才进入运行状态,真正开始执行run()方法
  • 阻塞状态(Blocked):

    • 线程调用一个在I/O上被阻塞的操作,即该操作在输入输出操作完成之前不会返回到它的调用者;阻塞状态是正在运行的线程没有运行结束,暂时让出CPU,这时其他处于就绪状态的线程就可以获得CPU时间,进入运行状态
  • 等待状态/超时等待(Waiting/Timed_Waiting) ,线程进入等待状态有三种方式:

    • cpu调度给优先级更高的线程

    • 线程要等待获得资源或者信号

    • 时间片的轮转,时间片到了,进入等待状态

    • 在可执行状态下,如果调用 sleep()、 wait()等方法,线程都将进入等待状态

  • 终止状态(Terminated):

    • 表示该线程已经执行完毕。当线程的run()方法完成时,或者主线程的main()方法完成时,我们就认为它终止了。这个线程对象也许是活的,但是,它已经不是一个单独执行的线程。线程一旦终止了,就不能复生。
  • sleep() 和 wait() 的区别:

    • sleep方法是Thread类里面的,主要的意义就是让当前线程停止执行,让出CPU给其他的线程,但是不会释放对象锁资源以及监控的状态,当指定的时间到了之后又会自动恢复运行状态
    • wait方法是Object类里面的,主要的意义就是让线程放弃当前的对象的锁,进入等待此对象的等待锁定池,只有针对此对象调动notify方法后本线程才能够进入对象锁定池准备获取对象锁进入运行状态

线程的生命周期

image-20221201124230995

  • join:将调用方法的线程加入当前线程,说人话,让别的线程插队执行,完成后自己再执行

  • demo:

	public static void main(String[] args) {
        Thread t = new Thread(() -> System.out.println("test"));
        t.start();
        System.out.println("main1");
        try {
            t.join();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("main2");
    }
运行结果:
    main2一定是最后才执行的

线程分类

  • 用户线程:main线程,新创建的线程一般都是用户线程(除非通过构造方法指定)
  • 守护线程:等用户线程执行完结束后,它会自动结束

线程池

image-20221201101743624

  • 线程池能干什么?
    • 线程池自己内部维护的线程可以复用,减少了每次创建和销毁线程的消耗
    • 通过控制线程的数量,可以控制并发数
    • 对维护的线程统一管理和监控

创建线程池

  • Executors工具类中封装了ThreadPoolExecutor的构造方法
  • API:

image-20221201102906056

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • 一般使用这三个方法来创建线程池,其它的方法也是对参数的设置

  • new FixedThreadPool:创建固定线程数的线程池

    • 若新任务提交时,有空闲线程就立刻执行,没有则把任务放在阻塞队列中,等待线程执行
  • new CachedThreadPool:创建线程数量可调整线程池

    • 若线程都在执行,此时又提交了新任务,就会创建新的线程来执行
  • new SingleThreadExecutor:创建一个线程的线程池,参照固定数量的线程池工作流程

  • 我们在工作中使用那种方式创建线程池?

这三种哪个都不用!!!

FixedThreadPool和SingleThreadPool,创建任务队列时,队列的长度为Integer.MAX_VALUE,可能堆积大量请求导致 OOM

CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数为 Integer.MAX_VALUE ,可能会创建大量线程导致 OOM

七大参数

  • ThreadPoolExecutor构造方法
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
ThreadPoolExecutor 3 个最重要的参数:
● corePoolSize : 相当于“值班”的线程数,核心线程
● maximumPoolSize : 同时工作的最大线程数,包括核心和非核心线程
● workQueue: 暂存新任务的队列
ThreadPoolExecutor其他4个参数:
● keepAliveTime:非核心线程,在这个时间后,如果没有任务提交,就会自动销毁
● unit : keepAliveTime 参数的时间单位
● threadFactory :executor 创建新线程的时候会用到
● handler :拒绝策略,当任务队列满时,对于新任务的处理方式,后面会展开讲

拒绝策略

  • AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
  • CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不
    会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。一般回退到main线程让它来完成
  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中
    尝试再次提交当前任务
  • DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常,如果允许任务丢失,这是最好的一种策略

任务提交

  • void execute(Runnable command):只是提交任务
  • future submit(Runnable task):提交任务后返回future对象,你可以通过get方法获取任务的返回值

Future

![image-20221127100610921](C:\Users\顾森湘\Desktop\md\JUC.assets\image-20221127100610921.png

image-20221127102241037

  • 该接口定义了异步任务的的一些方法

  • 通过类图可知,FutureTask实现了RunnableFuture接口,继承了Future和Runnable接口

  • 构造方法可以传入Callable或者Runnable接口的实现类

接口规范

image-20221127100945235

  • isCancelled() :是否取消

  • cancle : 取消任务

  • get()有参 : 再给定的时间内获得结果,如果没有获取到就报异常

  • get() : 一直阻塞,等待结果

  • isDone () :是否完成任务

  • 注意事项:get方法会一直阻塞进程,等待结果

Demo

获得结果案例

FutureTask<String> futureTask = new FutureTask<String>(()->{
            System.out.println("线程开始执行");
            Thread.sleep(5000);
            return "执行结束";
        });

        Thread t1 = new Thread(futureTask, "t1");
        t1.start();
        Thread.sleep(550);
        System.out.println("主线程执行其他任务");
        while (true){
            if (futureTask.isDone()){
                System.out.println(futureTask.get());
                break;
            }else {
                Thread.sleep(1000);
                System.out.println("正在处理中");
            }
        }
  • get方法会一直阻塞进程,所以可以配合isDone来获得结果,但是这样还是有CPU的浪费

异步执行案例

@Test
    public  void TestAsyn() throws InterruptedException {
        long begin = System.currentTimeMillis();
        FutureTask<String> futureTask1 = new FutureTask<String>(()->{
            System.out.println("线程1开始执行");
            Thread.sleep(300);
            return "1执行结束";
        });
        threadPool.submit(futureTask1);
        FutureTask<String> futureTask2 = new FutureTask<String>(()->{
            System.out.println("线程2开始执行");
            Thread.sleep(500);
            return "2执行结束";
        });
        threadPool.submit(futureTask2);
        Thread.sleep(400);
        long end = System.currentTimeMillis();
        System.out.println(end-begin);
        threadPool.shutdown();
    }
    @Test
    public  void TestSync() throws InterruptedException {
        long begin = System.currentTimeMillis();
        Thread.sleep(200);
        Thread.sleep(200);
        Thread.sleep(400);
        long end = System.currentTimeMillis();
        System.out.println(end-begin);

    }
  • 配合线程池可以完成异步任务处理,明显比同步执行更节省时间

两种思想

悲观锁

认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改,synchronized关键字和Lock的实现类都是悲观锁,适应写操作频繁的场景

乐观锁

认为自己在使用数据时不会有别的线程修改数据或资源,所以不会添加锁。 在Java中是通过使用无锁编程来实现,只是在更新数据的时候去判断,之前有没有别的线程更新了这个数据

如果这个数据没有被更新,当前线程将自己修改的数据成功写入。 如果这个数据已经被其它线程更新,则根据不同的实现执行不同的操作,比如放弃修改、重试抢锁等等

synchronized

对象锁

作用于普通方法,只和当前实例有关,同一个实例 访问方法会阻塞等待,不同实例则不会阻塞

类锁

作用于静态方法,我们知道静态方法只和类有关,所以不管该类有多个实例来访问这个方法,都会被阻塞

代码块锁

作用于代码块,任意一个对象都可以作为锁放入

底层实现原理

对两个方法来说,通过反编译字节码文件可知,方法上会有这两个标记ACC_STATIC,ACC_SYNCHRONIZED这两个会告诉JVM该方法是否为静态同步方法

对代码块来说,使用的是monitorenter和monitorexit指令,来保证持有锁,释放锁,会而外多加一个monitorexit,为了出现异常时也能释放锁

ReentrantLock

ReentrantReadWriteLock

为了优化使用Synchronized或ReentrantLock独占锁时的并发性能

独占锁时只有一个线程来读或写,现在可以多个线程来读了

  • 适用在读多写少的情况下,提高并发性能

  • 多个线程可以并发读读,但是不能并发读写或写写

  • demo:

class MyResource //资源类,模拟一个简单的缓存
{
    Map<String,String> map = new HashMap<>();
    //=====ReentrantLock 等价于 =====synchronized,之前讲解过
    Lock lock = new ReentrantLock();
    //=====ReentrantReadWriteLock 一体两面,读写互斥,读读共享
    ReadWriteLock rwLock = new ReentrantReadWriteLock();

    public void write(String key ,String value)
    {
        rwLock.writeLock().lock();
        try
        {
            System.out.println(Thread.currentThread().getName()+"\t"+"正在写入");
            map.put(key,value);
            //暂停毫秒
            try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(Thread.currentThread().getName()+"\t"+"完成写入");
        }finally {
            rwLock.writeLock().unlock();
        }
    }

    public void read(String key)
    {
        rwLock.readLock().lock();
        try
        {
            System.out.println(Thread.currentThread().getName()+"\t"+"正在读取");
            String result = map.get(key);
            //暂停200毫秒
            try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(Thread.currentThread().getName()+"\t"+"完成读取"+"\t"+result);
        }finally {
            rwLock.readLock().unlock();
        }
    }


}


/**
 * @auther zzyy
 * @create 2022-04-08 18:18
 */
public class ReentrantReadWriteLockDemo
{
    public static void main(String[] args)
    {
        MyResource myResource = new MyResource();

        for (int i = 1; i <=10; i++) {
            int finalI = i;
            new Thread(() -> {
                myResource.write(finalI +"", finalI +"");
            },String.valueOf(i)).start();
        }

        for (int i = 1; i <=10; i++) {
            int finalI = i;
            new Thread(() -> {
                myResource.read(finalI +"");
            },String.valueOf(i)).start();
        }

    }
}
  • 运行结果:可以看出写的时候只有一个线程在写,而读的时候可以多个线程并发读
  • 读写锁还存在锁饥饿问题,多个线程读,少数线程写,可能会造成写线程长时间获取不到锁而饥饿等待
  • 锁降级机制:获取写锁后,再获取读锁,然后释放写锁,再释放读锁,前三部完成后,该线程持有的写锁就变成了读锁,利用这种机制可以在高并发情况下写缓存后,立刻返回这次的数据,而不是再和其他线程争夺读锁再返回缓存数据了

StampedLock

邮戳锁新增了一种乐观读锁,解决了上述的饥饿问题

  • 获取任何类型的锁时都会返回一个long类型的Stamp(返回0表示获取失败),释放锁时要传入获取锁时的Stamp
  • StampedLock不可重入,不支持条件变量Condition,锁代码中不要调用interrupt方法
  • StampedLock的读写锁和之前的ReentrantLock一样,这里就不再演示,只演示乐观读
  • 获取乐观读后,允许其他的写线程并发,而不是阻塞等待
  • 乐观读其实不是真正的读锁,在获取乐观锁后,需要对代码中调用validate(long stamp),true表示读过程中没有写线程来修改数据,false表示有写线程修改数据,此时就需手动将乐观读升级为悲观读(正常的读锁)
  • demo:乐观读时有写线程修改
public class StampedLockDemo
{
    static int number = 37;
    static StampedLock stampedLock = new StampedLock();

    public void write()
    {
        long stamp = stampedLock.writeLock();
        System.out.println(Thread.currentThread().getName()+"\t"+"写线程准备修改");
        try
        {
            number = number + 13;
        }finally {
            stampedLock.unlockWrite(stamp);
        }
        System.out.println(Thread.currentThread().getName()+"\t"+"写线程结束修改");
    }



    //乐观读,读的过程中也允许获取写锁介入
    public void tryOptimisticRead()
    {
        long stamp = stampedLock.tryOptimisticRead();
        int result = number;
        //故意间隔4秒钟,很乐观认为读取中没有其它线程修改过number值,具体靠判断
        System.out.println("4秒前stampedLock.validate方法值(true无修改,false有修改)"+"\t"+stampedLock.validate(stamp));
        for (int i = 0; i < 4; i++) {
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(Thread.currentThread().getName()+"\t"+"正在读取... "+i+" 秒" +
                    "后stampedLock.validate方法值(true无修改,false有修改)"+"\t"+stampedLock.validate(stamp));
        }
        if(!stampedLock.validate(stamp))
        {
            System.out.println("有人修改过------有写操作");
            stamp = stampedLock.readLock();//从乐观读 升级为 悲观读
            try
            {
                System.out.println("从乐观读 升级为 悲观读");
                result = number;
                System.out.println("重新悲观读后result:"+result);
            }finally {
                stampedLock.unlockRead(stamp);
            }
        }
        System.out.println(Thread.currentThread().getName()+"\t"+" finally value: "+result);
    }


    public static void main(String[] args)
    {
        StampedLockDemo resource = new StampedLockDemo();


        new Thread(() -> {
            resource.tryOptimisticRead();
        },"readThread").start();

        //暂停2秒钟线程,读过程可以写介入,演示
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }


        new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"\t"+"----come in");
            resource.write();
        },"writeThread").start();


    }
}
  • 运行结果:
4秒前stampedLock.validate方法值(true无修改,false有修改)	true
readThread	正在读取... 0 秒后stampedLock.validate方法值(true无修改,false有修改)	true
writeThread	----come in
writeThread	写线程准备修改
writeThread	写线程结束修改
readThread	正在读取... 1 秒后stampedLock.validate方法值(true无修改,false有修改)	false
readThread	正在读取... 2 秒后stampedLock.validate方法值(true无修改,false有修改)	false
readThread	正在读取... 3 秒后stampedLock.validate方法值(true无修改,false有修改)	false
有人修改过------有写操作
从乐观读 升级为 悲观读
重新悲观读后result:50
readThread	 finally value: 50

辅助类

CountDownLatch

保证任务线程都执行完才执行主线程的任务

  • 指定计数器
new CountDownLatch(int count)
  • 当一个或多个线程调用await方法时,这些线程会阻塞,一般主线程调用,等待任务线程完成
  • 其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞)
  • 当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行
  • demo:等所有同学离开教室,班长才能关门
public class CountDownLatchDemo
{
   public static void main(String[] args) throws InterruptedException
   {
         CountDownLatch countDownLatch = new CountDownLatch(6);
       
       for (int i = 1; i <=6; i++) //6个上自习的同学,各自离开教室的时间不一致
       {
          new Thread(() -> {
              System.out.println(Thread.currentThread().getName()+"\t 号同学离开教室");
              countDownLatch.countDown();
          }, String.valueOf(i)).start();
       }
       countDownLatch.await();
       System.out.println(Thread.currentThread().getName()+"\t****** 班长关门走人,main线程是班长");
          
   }
 
 
}
运行结果:
某某某离开教室
某某某离开教室
某某某离开教室
某某某离开教室
某某某离开教室
某某某离开教室
班长关门
  • 源码
1.CountDownLatch构造方法
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
    //初始化AQS的state
        this.sync = new Sync(count);
    }
2.await方法调用AQS的acquireSharedInterruptibly方法
    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted() ||
            //成立,就把阻塞的线程封装node节点
            (tryAcquireShared(arg) < 0 &&
             //获取不到state的线程封装并入队,挂起···,详细参见ReentrantLock获取锁流程
             acquire(null, arg, true, true, false, 0L) < 0))
            throw new InterruptedException();
    }
3.CountDownLatch的tryAcquireShared
    protected int tryAcquireShared(int acquires) {
    //一般情况下都返回-1
            return (getState() == 0) ? 1 : -1;
        }
4.countDown方法调用AQS的releaseShared
    public final boolean releaseShared(int arg) {
    
        if (tryReleaseShared(arg)) {
            signalNext(head);
            return true;
        }
        return false;
    }
5.CountDownLatch的tryReleaseShared
    protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                //每次state减一,大多数返回false,只有当state为一,nextc为0.唤醒入队等待的线程
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

CyclicBarrier

使不同线程都执行到一个同步点

  • 指定屏障数,也可以指定在到达同步点后优先执行的任务
new CyclicBarrier(int parties) 
new CyclicBarrier(int parties, Runnable barrierAction) 
  • 线程进入屏障通过CyclicBarrier的await()方法,会使屏障数+1,并且使自身阻塞
  • 当指定个数线程都到达屏障时,被阻塞的线程才会执行之后的代码
  • demo:集齐七颗龙珠才能召唤神龙
public class CyclicBarrierDemo
{
  private static final int NUMBER = 7;
  
  public static void main(String[] args)
  {
     //CyclicBarrier(int parties, Runnable barrierAction) 
     CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER, ()->{System.out.println("*****集齐7颗龙珠就可以召唤神龙");}) ;
     
     for (int i = 1; i <= 7; i++) {
       new Thread(() -> {
          try {
            System.out.println(Thread.currentThread().getName()+"\t 星龙珠被收集 ");
            cyclicBarrier.await();
          } catch (InterruptedException | BrokenBarrierException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
          }
       
       }, String.valueOf(i)).start();
     }
     

运行结果:
龙珠被收集
龙珠被收集
龙珠被收集
龙珠被收集
龙珠被收集
龙珠被收集
龙珠被收集
召唤神龙
  • CountDownLatch和CyclicBarrier都是线程同步的工具类,可以发现这两者的等待主体是不一样的
    • CountDownLatch调用await()通常是主线程/调用线程
    • CyclicBarrier调用await()是在任务线程调用的,所以,CyclicBarrier中的阻塞的是任务的线程,而主线程是不受影响的

Semaphore

  • acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1)
  •   要么一直等下去,直到有线程释放信号量,或超时
    
  • release(释放)实际上会将信号量的值加1,然后唤醒等待的线程
  • 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制
  • 信号量为1时,相当于synchronized

锁机制

公平锁和非公平锁

在获取锁时,可以通过构造方法指定

公平锁:是指多个线程按照申请锁的顺序来获取锁

非公平锁:是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁,在高并 发环境下,有可能造成优先级翻转或者饥饿的状态

默认情况下,获取的锁是非公平锁,因为回避CPU时间的浪费,多个线程来回执行会有上下文切换,开销大

隐式锁和显式锁

隐式锁:不用显式的lock和unlock

显式锁:代码中写明lock和unlock

可重入锁

是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞

可以在锁的方法内部再次获取锁

ReentrantLock 和 synchronized都是可重入锁

死锁

死锁是指两个或两个以上的线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉那它们都将无法推进下去,如果系统资源充足,进程的资源请求都能够得到满足,死锁出现的可能性就很低,否则就会因争夺有限的资源而陷入死锁

//死锁代码
public class JucTest {
    public static void main(String[] args) {
        final Object objectA = new Object();
        final Object objectB = new Object();
        new Thread(() -> {
            synchronized (objectA) {
                System.out.println(Thread.currentThread().getName() + "\t自己持有A锁,希望获得B锁");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (objectB) {
                    System.out.println(Thread.currentThread().getName() + "\t成功获得B锁");
                }
            }
        }, "A").start();

        new Thread(() -> {
            synchronized (objectB) {
                System.out.println(Thread.currentThread().getName() + "\t自己持有B锁,希望获得A锁");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (objectA) {
                    System.out.println(Thread.currentThread().getName() + "\t成功获得A锁");
                }
            }
        }, "B").start();
    }
}

排查死锁:

jps -l , jstack 进程编号

jconsole图形化界面

并发容器

JDK原生

  • 原生的Hashtable,Vector
  • 使用Collections修饰的线程安全集合

JUC提供

  • JUC并发包下的,Blocking类型,CopyOnWrite类型,Concurrent类型
  • 并发容器

image-20230102191655908

  • 并发容器中使用volatile修饰属性和CAS操作修改属性
  • 具体源码就不分析了

中断协商机制

在java中如何停止一条线程的执行?

java中提供了一种中断机制,注意这只是一种协商机制,好比两个人之间商量

如何协商?

每个线程对象中都有一个属性,来表示别人当前线程是否和别的线程协商过

希望你中断,则设置为true,默认式false

API

image-20221127191140748

  • interrupt:将线程的(中断状态)interrupted属性修改为true

    • 如果线程2调用时,线程1处于被阻塞状态,如sleep,wait,join时,线程1会立即退出阻塞状态,并且抛出interruptedException,还会把interrupted属性重置为false
    • 如果线程1此时已经结束了,此时中断状态就不会设置为ture,变为false
  • interrupted:返回中断状态true还是false

  • static interrupted:返回中断状态true还是false,并且重置为false

Demo

volatile中断线程

AtomicBoolean中断线程

interrupt和isInterrupted

public class InterruptDemo {
    //案例一:volatile
    static volatile boolean isStop = false;
    //案例二:atomicBoolean
    static AtomicBoolean atomicBoolean = new AtomicBoolean(false);
    //案例三:isInterrupted
    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            while (true) {
                if (Thread.currentThread().isInterrupted()) { //还可以有isStop或者atomicBoolean.get()作为条件
                    System.out.println(Thread.currentThread().getName() + "\t isInterrupted()被修改为true,程序停止");
                    break;
                }
                System.out.println("t1 -----hello interrupt api");
            }
        }, "t1");
        t1.start();

        System.out.println("-----t1的默认中断标志位:" + t1.isInterrupted());

        //暂停毫秒
        try {
            TimeUnit.MILLISECONDS.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //t2向t1发出协商,将t1的中断标志位设为true希望t1停下来
        new Thread(() -> {
            t1.interrupt();
            //isStop=true
            //atomicBoolean.set(true);
        }, "t2").start();

    }
}

这里为什么要用volatile来修饰变量和new一个AtomicBoolean呢?

往后看······

LockSupport

synchronized唤醒和等待

object.wait() : 阻塞线程,释放锁

object.notify() : 唤醒一个阻塞中的线程,并不是立刻唤醒,而是通知它可以去竞争锁了

object.notifyAll() : 唤醒所有阻塞中的线程

注意这些方法必须在锁中使用,而且必须先wait后notify,前者会报异常,后者会报异常

Lock唤醒和等待

Condition condition = lock.newCondition();

condition.await() : 阻塞线程

condition.signal() : 唤醒线程

LockSupport唤醒和等待

  • static void park : 阻塞方法,检查当前线程自己是否有许可证,没有就阻塞
  • static void unpark : 唤醒方法,给当前指定线程一张许可证,最多有一个,也即是说,无论调多少次方法,只会有一个许可证
  • permit :默认是0,调用unpark会+1,但最多是1

解决了上述的两大痛点:无须在锁中的代码执行,也无须按照顺序(你也可以先给一张通行证)

public class LockSupportDemo
{

    public static void main(String[] args)
    {
        Object objectLock = new Object();
		/*Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();**/
        new Thread(() -> {
           
            synchronized (objectLock){
                //lock.lock();
                // LockSupport.park();
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                System.out.println(Thread.currentThread().getName()+"\t ----come in");
                try {
                    objectLock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    //lock.unlock();
                }
                System.out.println(Thread.currentThread().getName()+"\t ----被唤醒");
            }
        },"t1").start();

        //暂停几秒钟线程
        //try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }

        new Thread(() -> {
            synchronized (objectLock){
                objectLock.notify();
                System.out.println(Thread.currentThread().getName()+"\t ----发出通知");
            }
        },"t2").start();
        /*
        new Thread(() -> {
            lock.lock();
            try
            {
                condition.signal();
                System.out.println(Thread.currentThread().getName()+"\t ----发出通知");
            }finally {
                lock.unlock();
            }
        },"t2").start();
        **/
        
        /*
         new Thread(() -> {
            LockSupport.unpark(t1);
            System.out.println(Thread.currentThread().getName() + "\t ----发出通知");
        }, "t2").start();
        **/

    }
}


JMM

三大特性

可见性:是指当一个线程修改了某一个共享变量的值,其他线程是否能够立即知道该变更,JMM规定了所有的变量都存储在主内存中

原子性:指一个操作是不可打断的,即多线程环境下,操作不能被其他线程干扰

有序性:JVM线程内部要维持顺序化语义,即程序最终的执行结果 和顺序化执行的结果相等,那么指令的执行顺序可能和代码的顺序不一致,此时就发生了指令的重排序

多线程读写

image-20221128003635180

多线程读写操作都是讲主存中的变量拷贝一份放到自己的(工作内存)私有域中,线程与线程之间无法互相访问私有域,所以线程之间通信(写回变量)要经过主存

happens-before

  • 总原则:如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前; 两个操作之间存在happens-before关系,并不意味着一定要按照happens-before原则制定的顺序来执行。 如果重排序之后的执行结果与按照happens-before关系来执行的结果一致,那么这种重排序并不非法

  • 8种细分原则

    • 次序规则

      前一个操作的结果可以被后续的操作获取。讲白点就是前面一个操作把变量X赋值为1,那后面一个操作肯定能知道X已经变成了1。

    • 锁定规则

      一个unLock操作先行发生于后面 (这里的“后面”是指时间上的先后) 对同一个锁的Iock操作。

    • volatile变量规则

      对一个volatile变量的写操作先行发生于后面对这个变量的读操作,前面的写对后面的读是可见的,这里的”后面"同样是指时间上的先后。

    • 传递规则

      如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C;

    • 线程启动规则(Thread Start Rule)

      Thread对象的start()方法先行发生于此线程的每一个动作

    • 线程中断规则(Thread Interruption Rule)

      对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生 可以通过Thread.interrupted()检测到是否发生中断 也就是说你要先调用interrupt()方法设置过中断标志位,我才能检测到中断发送。

    • 线程终止规则(Thread Termination Rule)

      线程中的所有操作都先行发生于对此线程的终止检测,我们可以通过isAlive()等手段检测线程是否已经终止执行。

    • 对象终结规则(Finalizer Rule)

      一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始

volatile

可见性有序性

volatile满足了JMM的可见性,有序性,没有满足原子性

  • 可见性:当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值立即刷新回主内存中,并及时发出通知,大家可以去主内存拿最新版,前面的修改对后面所有线程可见。 当读一个volatile变量时,JMM会把该线程对应的本地内存设置为无效,重新回到主内存中读取最新共享变量所以volatile的写内存语义是直接刷新到主内存中,读的内存语义是直接从主内存中读取

  • 有序性:禁止将有些指令(需要按顺序执行的指令)重排

如何保证可见性和有序性?

通过内存屏障

  • 内存屏障:内存屏障其实是一种JVM指令,Java内存模型的重排规则会要求Java编译器在生成JVM指令时插入特定的内存屏障指令,通过这些内存屏障指令,volatile实现了Java内存模型中的可见性和有序性

内存屏障

读屏障(Load Memory Barrier):在读指令之前插入读屏障,让工作内存或CPU高速缓存当中的缓存数据失效,重新回到主内存中获取最新数据

写屏障( Store Memory Barrier) :在写指令之后插入写屏障,强制把写缓冲区的数据刷回到主内存中

不管加的什么屏障,屏障之前的所有写操作都要回写到主内存;屏障之后的所有读操作都能获得所有写操作的最新结果(实现了可见性)

粗分的两种屏障就保证了可见性

四大屏障

屏障类型指令示例说明
LoadLoadLoad1; LoadLoad; Load2保证load1的读取操作在load2及后续读取操作之前执行
StoreStoreStore1; StoreStore; Store2在store2及其后的写操作执行前, 保证store1的写操作已刷新到主内存
LoadStoreLoad1; LoadStore; Store2在stroe2及其后的写操作执行前, 保证load1的读操作已读取结束
StoreLoadStore1; StoreLoad; Load2保证store1的写操作已刷新到主内存之 后,load2及 其后的读操作才能执行

细分的四大屏障就实现了有序性

无原子性

对于volatile变量具备可见性,JVM只是保证从主内存加载到线程工作内存的值是最新的,也仅是数据加载时是最新的,但是加载之后的操作如果不是原子操作,那么在多线程的环境下,有线程比你快提交了数据到主存中,根据可见性,你需要重新加载最新值所以你的这次操作就不生效了,这样就发生了写丢失

所以在多线程场景下,所以我们需要对共享变量加锁来保证

适用场景

单一赋值,I++操作底层执行了三个指令

布尔类型作为标志,来执行后续逻辑

读多写少的场景,读用volatile修饰变量,写用同步锁实现原子性

CAS

思想理念

  • Compare And Swap:比较并交换,包含三个操作数,内存位置的值(现在的值),预期值(相当于旧的值),新值,如果内存位置的值和预期值相等,就将新值赋给现值,不相等就会自旋

CAS这种理念规范,最终落实到了CPU指令这里,,也就是说会调用操作系统的cmpxchg指令(原子性指令)来完成,涉及到汇编指令就不再继续深入了、比起使用锁,更节省了CPU的资源

  • 自旋:重新这次操作

CAS底层是怎么实现的?

UnSafe类
public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            v = getIntVolatile(o, offset);
        } while (!weakCompareAndSetInt(o, offset, v, v + delta));
        return v;
    }
public final native boolean compareAndSetInt(Object o, long offset,int expected,int x);

do-while循环是自旋的体现

compareAndSetInt方法是Native方法,这里就会去掉操作系统的函数或者是第三方C语言函数库的函数,也就是前面讲的最后会执行一个CPU的原语指令cmpxchg

数据库乐观锁

如何更好的理解?结合数据库的乐观锁

数据库的乐观锁就是CAS的具体实现

试想这样一种场景:有一个秒杀业务(高并发的环境下),多个请求同时对数据库中的商品库存更新,一般情况下业务代码是这样的:

商品库存 = queryByID
flag   =  update

这两个操作不具备原子性,线程1有可能执行完第一步以后,线程2已经执行完这两步,恰巧库存为0了,此时该线程1执行了,它直接对库存字段修改,此时就会发生超卖

乐观锁其实是一种代码上的实现,不像悲观锁一样是真正的锁

sql:update tb_shops set stock=stock-1 where stock = #old

带上where条件后,相当于CAS中做了一次比较,如果相等,是不是这条语句就执行成功返回ture,也就完成了交换(更新)

自旋锁

//实现自旋锁
public class JucTest {
    AtomicReference<Thread> atomicReference = new AtomicReference<>();
    public void lock() {
        Thread thread = Thread.currentThread();
        System.out.println(Thread.currentThread().getName() + "\t" + "come in");
        while (!atomicReference.compareAndSet(null, thread)) {

        }
    }
    public void unLock() {
        Thread thread = Thread.currentThread();
        atomicReference.compareAndSet(thread, null);
        System.out.println(Thread.currentThread().getName() + "\t" + "task over, unLock");
    }
    public static void main(String[] args) {
        JucTest jucTest = new JucTest();
        new Thread(() -> {
            jucTest.lock();
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            jucTest.unLock();
        }, "A").start();
        //保证A先于B
        try {
            TimeUnit.MICROSECONDS.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(() -> {
            jucTest.lock();
            jucTest.unLock();
        }, "B").start();
    }
}

利用CAS自旋的特性,我们可以自定义一个自旋锁

简单说一下,加锁代码中更新值就相当于获取锁,获取不成功就会再次尝试,但是这样对CPU的消耗过大

原子类

原子类也是CAS思想的体现

被volatile修饰的变量保证可见性和有序性,但是没有实现原子性,所以出现了原子类,用原子类类型声明一个变量

基本类型

AtomicInteger

AtomicLong

AtomicBoolean

AtomicReference:可以原子更新对象的引用

数组类型

数组类型的使用只不过加了索引,大多数API还是一样的

AtomicIntegerArray

AtomicLongArray

AtomicReferenceArray

引用类型

AtomicReference:申明时指定泛型就可以对自己写的资源类进行前面一样操作

AtomicStampedReference:相比于前面,增加了一个Int类型的版本号属性,可以结合数据库悲观锁理解

AtomicMarkableReference:相比于前面,增加了一个布尔类型的标志属性,

对象的属性类型

AtomicIntegerFieldUpdater:基于反射,对指定的类的volatile int 字段更新

AtomicLongFieldUpdater:基于反射,对指定的类的volatile long字段更新

AtomicReferenceFieldUpdater:基于反射,对指定的类的volatile 引用字段字段更新

增强类型

LongAdder:比AtomicLong占用更多的内存空间,但具有更好的性能,但不保证数据的精度

DoubleAdder

LongAccumulator:LongAdder只能算加法,这个更强大,可以去实现函数式接口

DoubleAccumulator

  • LongAdder为什么比AtomicLong更快?

LongAdder的基本思路就是分散热点,将value值分散到一个Cell数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。 sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong个value的更新压力分散到多个value中去,从而降级更新热点(sum不能保证精度,只能保证最终一致性)

源码有点复杂知道大概该过程就行了······

ThreadLocal

image-20221128210243134

每一个Thread对象都维护一个ThreadLocal.ThreadLocalMap类型的字段,它是ThreadLocal类中的一个静态内部类

image-20221128210520704

API

  • get:获取线程局部变量的值

  • set:将变量存入

  • withIitial:初始化变量值为null

  • remove:清除

  • getMap:获取当前线程的ThreadLocalMap

ThreadLocalMap

使用ThreadLocal的API最后会调用ThreadLocalMap的API

ThreadLocal
public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }

//    
ThreadLocalMap
private Entry getEntry(ThreadLocal<?> key) {
            int i = key.threadLocalHashCode & (table.length - 1);
            Entry e = table[i];
            if (e != null && e.get() == key)
                return e;
            else
                return getEntryAfterMiss(key, i, e);
        }
Entry
static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

每个Entry里都维护了ThreadLocal<?> key, Object value,ThreadLocal对象和线程局部变量

但这个Map不是真正的Map,只是通过代码维护的线程Map

对象内存布局

image-20221129135922670

Object obj=new Object();
//new Object()实例 放在堆空间
//Object 模板放在方法区
//obj 对象引用放在栈中

image-20221129140255213

对象头

  • 对象头由两部分组成:共占16个字节,各占8个字节

    • MarkWord :Mark Word 用于存储对象自身的运行时数据,如 HashCode、GC分代年龄、锁状态标志、线程持有的锁、偏向线程ID等等
    • 类型指针:虚拟机通过这个指针确定该对象是哪个类的实例,指向方法区的类模板
  • MarkWord存储结构

image-20221129140649906

  • 存储 (哈希值(HashCode )、GC分代年龄、锁状态标志、线程持有的锁、偏向线程ID即JavaThread、偏向时间戳epoch)等信息

实例数据

  • 对象真正存储的有效信息,包括程序代码中定义的各种类型的字段(包括从父类继承下来的和本身拥有的字段)

对齐填充

  • 虚拟机要求对象起始地址必须是8字节的整数倍,假如一个对象应该是15字节,但是虚拟机会自动把1字节补上,变成16字节;填充数据不是必须存在的,仅仅是为了字节对齐

Demo

从代码级别验证对象内存布局到底是什么样

  • 引入JOL依赖
	<!--
	JAVA object layout
	官网:http://openjdk.java.net/projects/code-tools/jol/
	定位:分析对象在JVM的大小和分布
	-->
	<dependency>
	    <groupId>org.openjdk.jol</groupId>
	    <artifactId>jol-core</artifactId>
	    <version>0.9</version>
	</dependency>
  • demo
public class JOLDemo{
    public static void main(String[] args){
        Object o = new Object();
        System.out.println( ClassLayout.parseInstance(o).toPrintable());
    }
}

  • 运行结果:

image-20221129142705504

  • MarkWord占了8个字节,指针占了4个字节?

JVM默认开启了类型指针压缩!,所以被压缩到4个字节,自己可以手动关闭,这里就不做演示

这里Object没有定义字段,所以实例数据就为0字节

因为保证对象的大小为8字节的整数倍,所以对齐填充就补了4个字节,Object实例就是16个字节了

Synchronized锁升级

image-20221129140649906

image-20221129174312877

2f788106-49dd-43b5-8e8d-ae2a7887551b_

无锁

当前对象没有被作为锁对象放到Synchronized同步代码块中

  • demo
@Test
    public void testNoLock() {
        Object o = new Object();
        System.out.println("10进制hash码:"+o.hashCode());
        System.out.println("16进制hash码:"+Integer.toHexString(o.hashCode()));
        System.out.println("2进制hash码:"+Integer.toBinaryString(o.hashCode()));
        System.out.println( ClassLayout.parseInstance(o).toPrintable());
    }
  • 运行结果:

image-20221129175137022

  • hashcode值和对象头中的对应,而且锁标志位为01也能对应
  • 无锁情况下对象头如上图所示

偏向锁

  • 程序运行时,第一个进入同步代码块的线程,会把锁对象的对象头中的前54位修改为自己的线程ID,并且修改偏向锁标志位和锁标志位,此时这个锁对象就变成了偏向了次线程

  • 如果没有其他线程来抢锁,这个线程就不会释放锁,下次进入同步代码块方法是也不需要获取锁,而是去锁对象中的对象头找是否有自己的线程ID,这样就提高了性能

  • 假如,此时有第二个线程运行,发生了竞争,持有偏向锁的线程才会释放锁,竞争的线程会使用CAS来修改锁对象中的线程ID

  • 如果竞争成功,就把锁对象中的ID修改成自己的线程ID

  • 竞争失败,锁对象会升级为轻量级锁

  • demo:模拟多个窗口售票

class Ticket //资源类,模拟3个售票员卖完50张票
    {
        private int number = 50;

        Object lockObject = new Object();

        public void sale()
        {
            synchronized (lockObject) {
                if(number > 0)
                {
                    System.out.println(Thread.currentThread().getName()+"卖出第:\t"+(number--)+"\t 还剩下:"+number);
                }
            }
        }
    }

    @Test
    public void testBiasLock() {
        Ticket ticket = new Ticket();
        new Thread(() -> { for (int i = 0; i <55; i++)  ticket.sale(); },"a").start();
        new Thread(() -> { for (int i = 0; i <55; i++)  ticket.sale(); },"b").start();
        new Thread(() -> { for (int i = 0; i <55; i++)  ticket.sale(); },"c").start();
    }
  • 运行结果:多数行为都是某一个线程完成的

  • 注意JDK15后,偏向锁默认是关闭的,此处测试须手动开启偏向锁(直接百度)

  • 偏向锁撤销有两种情况:撤销需要等待全局安全点,没有字节码在执行

    • 撤销时,如果持有偏向锁的线程还在执行代码块的方法,锁就会升级为轻量级锁,锁还是由之前持有偏向锁的线程持有,竞争的线程会自旋的方式获取轻量锁
    • 如果碰巧方法执行完,该线程就会把对象锁的对象头设置为无锁,可以重新偏向了,轮到竞争的线程上位了

轻量锁

当偏向锁关闭或者上述线程竞争锁偏向锁失败的情况下,锁升级

轻量级锁是针对于两个线程来说的

  • 加锁操作:

    • 线程获得轻量锁成功时,会把锁的MarkWord复制到自己的Displaced Mark Word(每个线程的栈帧中存储锁记录的空间),并将锁对象的对象头的前62位设置一个指针,指向自己Displaced Mark Word
    • 失败时,就自旋获取锁
    • 注意,和偏向锁不同的是,轻量锁每次执行完同步代码块会释放锁
  • 释放锁操作:当前线程会使用CAS操作将Displaced Mark Word的内容复制回锁的Mark Word里面

    • 如果没有发生竞争,那么这个复制的操作会成功
    • 如果有其他线程因为自旋多次导致轻量级锁升级成了重量级锁那么CAS操作会失败,此时会释放锁并唤醒被阻塞的线程
  • 自旋次数:一般情况下都是两个线程交替执行,但是有可能一个线程自旋次数太多,java6之前是10次,java6之后变成自适应自旋次数,也就是说次数是不固定的(如果线程上次自旋成功,下次自旋的次数上限就会增加,相反······),达到上限就会升级为重量级锁

重量级锁

Demo

  • 当一个对象已经计算过一致性哈希码后,它就再也无法进入偏向锁状态了,直接升级为轻量级锁

  • 而当一个对象当前正处于偏向锁状态,又收到需要 计算其一致性哈希码请求时,它的偏向状态会被立即撤销,并且锁会膨胀为重量级锁

  • demo:

 @Test
    public void testLockHash() {
        //先睡眠5秒,保证开启偏向锁
        try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
        Object o = new Object();
        System.out.println("本应是偏向锁");
        System.out.println(ClassLayout.parseInstance(o).toPrintable());
        o.hashCode();
        synchronized (o){
            System.out.println("本应是偏向锁,但是由于计算过一致性哈希,会直接升级为轻量级锁");
            System.out.println(ClassLayout.parseInstance(o).toPrintable());
        }
    }
@Test
    public void testLockHash() {
        //先睡眠5秒,保证开启偏向锁
        try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
        Object o = new Object();

        synchronized (o){
            o.hashCode();//没有重写,一致性哈希,重写后无效
            System.out.println("偏向锁过程中遇到一致性哈希计算请求,立马撤销偏向模式,膨胀为重量级锁");
            System.out.println(ClassLayout.parseInstance(o).toPrintable());
        }
    }
  • 运行结果:通过锁标志位可得出直接由无锁变为轻量锁

image-20221129221101676

  • 运行结果:此时偏向锁直接变为了重量锁

image-20221129221448251

HashCode

  • 锁对象变为轻量锁时,原本对象头的MarkWord被保存在当前线程的栈帧中的LockRecord

  • 锁对象变为重量锁时,原本对象头的MarkWord被保存在当前锁对象的Monitor类中的字段

AQS

image-20230101174844799

  • 已ReentrantLock为例的架构

AbstractQueuedSynchronizer

image-20230101173907645

image-20230101174027488

  • 维护一个内部类
    • Node中维护前后节点,当前线程,线程的状态
    • image-20230101174538278
  • 维护三个属性
    • 前后节点和锁的状态位
    • state表示当前这把锁的重入次数,0表示锁还没被获取

image-20230101175316505

  • Node类就可以实现双向队列,而每一个Node节点中维护的线程,就是排队等待获取锁的线程

ReentrantLock获取锁

  • 源码
ReentrantLock类的lock方法
    @ReservedStackAccess
        final void lock() {
            if (!initialTryLock())
                acquire(1);
        }
	1.NonfairSync的initialTryLock:
        final boolean initialTryLock() {
            Thread current = Thread.currentThread();
            if (compareAndSetState(0, 1)) { // first attempt is unguarded
                //直接抢锁,设置锁的线程拥有者为当前线程
                setExclusiveOwnerThread(current);
                return true;
            } else if (getExclusiveOwnerThread() == current) {
                //锁是当前线程拥有的,重入次数+1
                int c = getState() + 1;
                if (c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            } else
                return false;
        }
    2.FairSync的initialTryLock:
        final boolean initialTryLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //当前队列是否有线程排队,没有线程排队就尝试一次
                if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (getExclusiveOwnerThread() == current) {
                if (++c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            }
            return false;
        }
	3.AQS的acquire,FairSync或...的tryAcquire
        public final void acquire(int arg) {
        if (!tryAcquire(arg))
            //第三次尝试
            acquire(null, arg, false, false, false, 0L);
    	}
		//第二次尝试
		protected final boolean tryAcquire(int acquires) {
            if (getState() == 0 && compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
	4.两者不同的区别在于非公平锁会直接抢锁,而不是排队,但最终都调用AQS的acquire
        
        
    
AQS的accquire方法
final int acquire(Node node, int arg, boolean shared,
                      boolean interruptible, boolean timed, long time) {
        Thread current = Thread.currentThread();
        byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
        boolean interrupted = false, first = false;
        Node pred = null;                // predecessor of node when enqueued

        /*
         * Repeatedly:
         *  Check if node now first
         *    if so, ensure head stable, else ensure valid predecessor
         *  if node is first or not yet enqueued, try acquiring
         *  else if node not yet created, create it
         *  else if not yet enqueued, try once to enqueue
         *  else if woken from park, retry (up to postSpins times)
         *  else if WAITING status not set, set and retry
         *  else park and clear WAITING status, and check cancellation
         */

        for (;;) {
            if (!first && (pred = (node == null) ? null : node.prev) != null &&
                !(first = (head == pred))) {
                if (pred.status < 0) {
                    cleanQueue();           // predecessor cancelled
                    continue;
                } else if (pred.prev == null) {
                    Thread.onSpinWait();    // ensure serialization
                    continue;
                }
            }
            if (first || pred == null) {
                boolean acquired;
                try {
                    if (shared)
                        acquired = (tryAcquireShared(arg) >= 0);
                    else
                        acquired = tryAcquire(arg);
                } catch (Throwable ex) {
                    cancelAcquire(node, interrupted, false);
                    throw ex;
                }
                if (acquired) {
                    if (first) {
                        node.prev = null;
                        head = node;
                        pred.next = null;
                        node.waiter = null;
                        if (shared)
                            signalNextIfShared(node);
                        if (interrupted)
                            current.interrupt();
                    }
                    return 1;
                }
            }
            if (node == null) {                 // allocate; retry before enqueue
                if (shared)
                    node = new SharedNode();
                else
                    node = new ExclusiveNode();
            } else if (pred == null) {          // try to enqueue
                node.waiter = current;
                Node t = tail;
                node.setPrevRelaxed(t);         // avoid unnecessary fence
                if (t == null)
                    tryInitializeHead();
                else if (!casTail(t, node))
                    node.setPrevRelaxed(null);  // back out
                else
                    t.next = node;
            } else if (first && spins != 0) {
                --spins;                        // reduce unfairness on rewaits
                Thread.onSpinWait();
            } else if (node.status == 0) {
                node.status = WAITING;          // enable signal and recheck
            } else {
                long nanos;
                spins = postSpins = (byte)((postSpins << 1) | 1);
                if (!timed)
                    LockSupport.park(this);
                else if ((nanos = time - System.nanoTime()) > 0L)
                    LockSupport.parkNanos(this, nanos);
                else
                    break;
                node.clearStatus();
                if ((interrupted |= Thread.interrupted()) && interruptible)
                    break;
            }
        }
        return cancelAcquire(node, interrupted, interruptible);
    }
1.第一次for,再一次尝试获取锁,获取失败就创建一个空的Node节点,此时还没赋值
2.第二次for,再一次尝试获取锁,获取失败,对当前创建好的节点的属性赋值,并初始化双向队列,创建一个哨兵节点,head和tail指向它
3.第三次for,再一次尝试获取锁,获取失败,每个线程对应的node的prev指向tail(也就是指向了哨兵节点),接着自旋将tail指向一个node节点,并将哨兵节点的next指向它,多线程下只有一个成功,执行完也就是入队成功了,失败的线程对应的节点的prev又被置为null
4.第四次for,对于已经入队的节点:再一次尝试获取锁,成功则使自己变为哨兵节点,head指向它,失败则将状态设置为WAITING;对于未入队的node:执行像第三次for的操作(入队操作),成功就入队,失败再尝试入队
5.第五次for,对于已经入队的第一个节点,再一次尝试获取锁,成功则是自己变为哨兵节点,失败调用LockSupport.park()阻塞自己,第二个节点,不尝试获取锁,而是将自己的状态设置为WAITING,,下一次也就执行阻塞,为入队的节点继续入队
6.线程中断情况,
7.cancelAcquire,某种情况下会取消获取

ReentrantLock释放锁

  • 源码
AQS的release
	public final boolean release(int arg) {
        if (tryRelease(arg)) {
            signalNext(head);
            return true;
        }
        return false;
    }
1.Sync的tryRelease
     protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (getExclusiveOwnerThread() != Thread.currentThread())
                throw new IllegalMonitorStateException();
            boolean free = (c == 0);
    //重置锁的持有线程weinull,修改AQS的状态位
            if (free)
                setExclusiveOwnerThread(null);
            setState(c);
            return free;
        }
2.Sync的signalNext
     private static void signalNext(Node h) {
        Node s;
    //拿到头结点,找到第一个排队的节点,设置状态位,并唤醒
        if (h != null && (s = h.next) != null && s.status != 0) {
            s.getAndUnsetStatus(WAITING);
            LockSupport.unpark(s.waiter);
        }
    }

Comment