java concurrent

Table of Contents

java 并发

本文为对源码部分解读
先给出一些链接:
美团ReentrantLock,对于一些难以理解的代码,由于水平有限未解决

  1. AQS的acquire方法(linux的jdk无注释,而且有变动,属实看不懂)

catalogue

  1. ConcurrentHashMap
  2. Thread
  3. AQS
  4. ReentrantLock
  5. CAS

ConcurrentHashMap

spread

    //计算key的扰动函数
    static final int spread(int h) {
        // 异或 低16位和高16位 & Integer>MAX_VALUE的作用是保证 > 0
        return (h ^ h >>> 16) & Integer.MAX_VALUE;
    }

put

    //这个前面存在一个代理方法
      // concurrentHashMap中不存在 null值
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key != null && value != null) {
            int hash = spread(key.hashCode());
            int binCount = 0;
            Node<K, V>[] tab = this.table;

            while(true) {
                int n;
                while(tab == null || (n = tab.length) == 0) {
                    // 初始化 HashTable
                    tab = this.initTable();
                }

                Node f;
                int i;
                // 此位置是否存在节点
                if ((f = tabAt(tab, i = n - 1 & hash)) == null) {
                    // 尝试对此位置 addNode
                    if (casTabAt(tab, i, (Node)null, new Node(hash, key, value))) {
                        break;
                    }
                } else {
                    int fh;
                    if ((fh = f.hash) == -1) {
                        tab = this.helpTransfer(tab, f);
                    } else {
                        Object fk;
                        Object fv;
                        // 存在值的时候直接返回
                        if (onlyIfAbsent && fh == hash && ((fk = f.key) == key || fk != null && key.equals(fk)) && (fv = f.val) != null) {
                            return fv;
                        }
                        // 旧值
                        V oldVal = null;
                        // 锁Node节点,
                        // jdk1.7从分段锁升级到jdk1.8,锁粒度进一步减少
                        synchronized(f) {
                            if (tabAt(tab, i) == f) {
                                if (fh < 0) {
                                    if (f instanceof TreeBin) {
                                        binCount = 2;
                                        TreeNode p;
                                        // 添加树节点
                                        if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) {
                                            oldVal = p.val;
                                            if (!onlyIfAbsent) {
                                                p.val = value;
                                            }
                                        }
                                    } else if (f instanceof ReservationNode) {
                                        throw new IllegalStateException("Recursive update");
                                    }
                                } else {
                                    label124: {
                                        binCount = 1;

                                        Node e;
                                        Object ek;
                                        // 链表加节点,记录链表长度
                                        for(e = f; e.hash != hash || (ek = e.key) != key && (ek == null || !key.equals(ek)); ++binCount) {
                                            Node<K, V> pred = e;
                                            if ((e = e.next) == null) {
                                                pred.next = new Node(hash, key, value);
                                                break label124;
                                            }
                                        }

                                        oldVal = e.val;
                                        if (!onlyIfAbsent) {
                                            e.val = value;
                                        }
                                    }
                                }
                            }
                        }
                        // 如果达到了可以转化为红黑树的条件,关注这个 binCount
                        if (binCount != 0) {
                            if (binCount >= 8) {
                                this.treeifyBin(tab, i);
                            }
                            // concurrentHashMap中不存在 null值
                            if (oldVal != null) {
                                return oldVal;
                            }
                            break;
                        }
                    }
                }
            }

            this.addCount(1L, binCount);
            return null;
        } else {
            throw new NullPointerException();
        }
    }

get

    //get 不加锁
    public V get(Object key) {
        int h = spread(key.hashCode());
        Node[] tab;
        Node e;
        int n;
        if ((tab = this.table) != null && (n = tab.length) > 0 && (e = tabAt(tab, n - 1 & h)) != null) {
            int eh;
            Object ek;
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || ek != null && key.equals(ek)) {
                    return e.val;
                }
             // 红黑树查找   
            } else if (eh < 0) {
                Node p;
                return (p = e.find(h, key)) != null ? p.val : null;
            }
            // 链表查找
            while((e = e.next) != null) {
                if (e.hash == h && ((ek = e.key) == key || ek != null && key.equals(ek))) {
                    return e.val;
                }
            }
        }

        return null;
    }

CompletableFuture

Future

  1. new Thread() 对于接口只支持 Runnable ,但是 Runnable 无法返回结果,不抛出异常
  2. Callable 接口返回结果但是无法构造

RunnableFuture

  1. Java可以接口多继承,所以增强 Runnable 接口只需使用 RunnableFuture 接口继Runnable && Future

FutureTask

  1. 实现 RunnableFuture接口, 相当于实现 Runnable ,Future接口,但是未实现Callable
  2. 如果需要异步返回结果,因此采用构造注入思想 传入Callable
  3. A FutureTask can be used to wrap a Callable or Runnable object 原文是这样的
  FutureTask task = new FutureTask((Callable<Integer>) () -> 13);
        task.run();
        task.get();

上面这段代码是对于主线程是阻塞的,可以理解为new FutureTask就是自动创建线程执行,get()时是 同步的,瞅一眼源码,看不懂,但是内部用到了LockSupport.park(),while(true){}大概能知道应该是这样的

CompletableFuture

  1. 实现Future && CompletionStage
  2. 不推荐 new CompletableFuture(),直接CompletableFuture.XXX就ok了
  3. 静态构造方法: 无指定 Executor的方法,默认使用ForkJoinPool.commonPool() 作为线程池执行异步
  4. 优点 异步任务结束时,自动回调某对象的方法 主线程设置回调后,不用注意异步任务执行,异步任务之间可以顺序执行 异步任务出错后,自动回调某对象方法
  5. get()&&join() 异常处理方面,join() 提供了一种更为宽松的方式,不需要显示地处理检查异常,而 get() 则要求开发者更加明确地处理可能出现的异常情况
  6. 阿里规范
  • 建议开发者不使用 Executors 供的静态工厂方法,而是通过自定义 ThreadPoolExecutor 方式来创建线程,Executors存在潜在问题
  • 固定的大小与拒绝策略- Executors.newFixedThreadPool 创建的线程池大小固若任务队列已满且线程数达到上限时,会采用默认的拒绝策略(通常是 AbortPolicy)接抛出 RejectedExecutionException,生产环境而言不够灵活且不够健壮。
  • 无界队列- Executors.newCachedThreadPoolExecutornewSingleThreadExecutor 使用的是无界的 SynchronousQueue 作为工作队列,味着如果有大量任务提交过来而无法及时处理时, OOM(Out Of Memory)。
  • 资源消耗- Executors.newSingleThreadExecutorExecutornewFixedThreadPool 如果遇到线程池中的线程被阻塞,无法释放资源,可能导致线程资法利用,同时对于IO密集型任务,线程池大小不适合
  • 缺乏细粒度控制- 使用 Executors 创建的线程池往往默认配置较为简单,难以针对场景(如CPU密集型、IO密集型、混合型任务)进行细致的参数调优,如线程池大小、队列容拒绝策略等。 综上所述,通过直接自定义 ThreadPoolExecutor 可以更明确地设定线程池的初始数,例如设置合适的线程池大小、采用有界队列、自定义拒绝策略等,从而降低资源耗尽的风提高系统稳定性和性能表现。

Thread

1. java的线程状态

 public static enum State {
        NEW,
        RUNNABLE,
        BLOCKED,
        WAITING,
        TIMED_WAITING,
        TERMINATED;

        private State() {
        }
    }

2. 中断

java不支持立刻对线程进行stop操作,suspend操作deprecated, interrupt操作在线程处于 sleep,wait,join状态时,会抛InterruptedException,中断标志位会设置为false

 //此方法逻辑很明确,宏观效果就是interrupted被设置为false
 public void interrupt() {
        if (this != currentThread()) {
            this.checkAccess();
            synchronized(this.blockerLock) {
                Interruptible b = this.blocker;
                if (b != null) {
                    this.interrupted = true;
                    this.interrupt0();
                    b.interrupt(this);
                    return;
                }
            }
        }

        this.interrupted = true;
        this.interrupt0();
    }
	//判断是否被中断,也就是 interrupted == true,然后设置interrupted == false
    public static boolean interrupted() {
        Thread t = currentThread();
        boolean interrupted = t.interrupted;
        // 这里无锁,
        if (interrupted) {
            t.interrupted = false;
            clearInterruptEvent();
        }

        return interrupted;
    }
	// 检查是否被中断
    public boolean isInterrupted() {
        return this.interrupted;
    }
  • 测试代码
    /**
    可测试,即使中断后,被中断线程也不会立刻stop
    */
    public static void main(String[] args) {
        Thread t = new Thread(new Runnable() {
            public void run() {
                for (int i = 0; i < 500; i++) {
                    System.out.println(Thread.currentThread().getName() + i);
                    if (Thread.currentThread().isInterrupted()) {
                        System.out.println("Interrupted...........");
                    }
                }
            }
        }, "AqsTest");
        t.start();
        try {
            TimeUnit.MILLISECONDS.sleep(8);
            t.interrupt();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

3. 线程阻塞和唤醒机制

Objectwaitnotify,配合synchroized的对象锁,这两个方法都是native,会从用户态切换至内核态,这里可能是性能瓶颈

   //测试
   public static void main(String[] args) {
        Object lock = new Object();
        // 用这里控制t1和t2的抢占锁顺序,t2先获取锁t1无限等待
        AtomicBoolean isThread1First = new AtomicBoolean(false);
        Thread t1 = new Thread(new Runnable() {
            @SneakyThrows
            public void run() {
                if (isThread1First.get()) {
                    TimeUnit.SECONDS.sleep(1);
                }
                synchronized (lock) {
                    System.out.println(Thread.currentThread().getName() +" get lock");
                    lock.wait();
                    System.out.println(Thread.currentThread().getName() +" finish");
                }
            }
        }, "thread-1");
        Thread t2 = new Thread(new Runnable() {
            @SneakyThrows
            public void run() {
                synchronized (lock) {
                    System.out.println(Thread.currentThread().getName() +" get lock");
                    TimeUnit.SECONDS.sleep(1);
                    lock.notify();
                    System.out.println(Thread.currentThread().getName() +" finish");
                }
            }
        }, "thread-2");
        t1.start();
        t2.start();
    }

为什么synchronizedwait notify协同使用,主要是和synchronized的锁优化相关,对象监控 64bit的jvm的对象头

|-----------------------------------------------------------------------------------------------------------------|
|                                             Object Header(128bits)                                              |
|-----------------------------------------------------------------------------------------------------------------|
|                                   Mark Word(64bits)               |  Klass Word(64bits)    |      State         |
|-----------------------------------------------------------------------------------------------------------------|
|    unused:25|identity_hashcode:31|unused:1|age:4|biase_lock:0| 01 | OOP to metadata object |      Nomal         |
|-----------------------------------------------------------------------------------------------------------------|
|    thread:54|      epoch:2       |unused:1|age:4|biase_lock:1| 01 | OOP to metadata object |      Biased        |
|-----------------------------------------------------------------------------------------------------------------|
|                        ptr_to_lock_record:62                 | 00 | OOP to metadata object | Lightweight Locked |
|-----------------------------------------------------------------------------------------------------------------|
|                       ptr_to_heavyweight_monitor:62          | 10 | OOP to metadata object | Heavyweight Locked |
|-----------------------------------------------------------------------------------------------------------------|
|                                                              | 11 | OOP to metadata object |    Marked for GC   |
|-----------------------------------------------------------------------------------------------------------------|

juc包的Condition,也就是AQS的接口,使用上和前者相同

juc的locks包下的Condition接口,具体实现是在 AQS(AbstractQueuedSynchronizer)里的ConditionObject,其中用到了LockSupport

	//测试
 public static void main(String[] args) {

        ReentrantLock lock = new ReentrantLock();
        Condition con = lock.newCondition();
        AtomicBoolean isThread1First = new AtomicBoolean(false);
        Thread t1 = new Thread(new Runnable() {
            @SneakyThrows
            public void run() {
                if (!isThread1First.get()) {
                    TimeUnit.SECONDS.sleep(1);
                }
                try{
                    lock.lock();
                    System.out.println(Thread.currentThread().getName() +" get lock");
                    con.await();
                    System.out.println(Thread.currentThread().getName() +" finish");
                }finally {
                    lock.unlock();
                }
            }
        }, "thread-1");
        Thread t2 = new Thread(new Runnable() {
            @SneakyThrows
            public void run() {
                try {
                    lock.lock();
                    System.out.println(Thread.currentThread().getName() + " get lock");
                    TimeUnit.SECONDS.sleep(1);
                    con.signal();
                    System.out.println(Thread.currentThread().getName() + " finish");
                }finally {
                    lock.unlock();
                }
            }
        }, "thread-2");
        t1.start();
        t2.start();
        System.out.println(System.getProperty("os.name"));
        System.out.println(System.getProperty("sun.arch.data.model"));
    }

lockSupport locksuppot对线程挂起和唤醒顺序无要求

public class LockSupport {
	// 这里是 jdk.internal.misc.Unsafe;
	// 之前的Unsafe类是这个sun.misc.Unsafe,JDK9以后已经将sun.misc.Unsafe弃用
	//同时改进了lib文件的存储方式,将sun.misc.Unsafe全部存储在了jdk.unsupported里面
    private static final Unsafe U = Unsafe.getUnsafe();
    // ...

	public static void park() {
        U.park(false, 0L);
    }
  	public static void unpark(Thread thread) {
        if (thread != null) {
            U.unpark(thread);
        }
    }
}
    // 测试
    public static void main(String[] args) {
        AtomicBoolean isThread1First = new AtomicBoolean(true);
        Thread t1 = new Thread(new Runnable() {
            @SneakyThrows
            public void run() {
                if (!isThread1First.get()) {
                    TimeUnit.SECONDS.sleep(1);
                }
                    System.out.println(Thread.currentThread().getName() +" get lock");
                    LockSupport.park();
                    System.out.println(Thread.currentThread().getName() +" finish");
            }
        }, "thread-1");
        Thread t2 = new Thread(new Runnable() {
            @SneakyThrows
            public void run() {
                    System.out.println(Thread.currentThread().getName() + " get lock");
                    TimeUnit.SECONDS.sleep(1);
                    LockSupport.unpark(t1);
                    System.out.println(Thread.currentThread().getName() + " finish");
            }
        }, "thread-2");
        t1.start();
        t2.start();
    }

4.本类

public class Thread implements Runnable {
	// ... 
	// 线程名
    private volatile String name;
    //是否守护
    private boolean daemon;
    // 逻辑
    private Runnable target;
    //线程组
    private ThreadGroup group;
    // 状态枚举
	public static enum State {
        NEW,
        RUNNABLE,
        BLOCKED,
        WAITING,
        TIMED_WAITING,
        TERMINATED;

        private State() {
        }
    }    
}

AQS

Node

AQS内部的阻塞队列实际是个双向链表,内部抽象静态类Node,作为基类,其中ConditionNode就是在LockSupport中使用到,Node的实现有 ExclusiveNode (独占) SharedNode(共享) ConditionNode

abstract static class Node {
        volatile Node prev;
        volatile Node next;
        Thread waiter;
        volatile int status;
        private static final long STATUS;
        private static final long NEXT;
        private static final long PREV;
    }

基类

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable {
    /**
     * 
    懒加载的
    如果并发不高,那么一次CAS就加锁成功了
    所以如果初始化一个CLH队列,就会一直使用不到,浪费内存
    */
    private transient volatile Node head;
    private transient volatile Node tail;
    // 同步状态
    private volatile int state;
    // UNSAFE类,直接内存操作
    private static final Unsafe U = Unsafe.getUnsafe();
    private static final long STATE;
    private static final long HEAD;
    private static final long TAIL;
	// ...
	// 核心的 CAS 操作
	 protected final boolean compareAndSetState(int expect, int update) {
        return U.compareAndSetInt(this, STATE, expect, update);
    }

    // 最核心的 aquire操作
    // 这里我看不懂,借助 tongyi的力量,做了一些解释
    // 加个todo
    final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) {
        Thread current = Thread.currentThread();
        byte spins = 0;
        byte postSpins = 0;
        boolean interrupted = false;
        boolean first = false;
        Node pred = null;
        // 跳出多重循环的的位置,基本语法别忘了
        label127:
        do {
            /**
                第一层:确保线程在获取锁失败时能够不断重试
                第二层:处理前驱节点的状态变化
                第三层:确保前驱节点的状态有效
                第四层:处理前驱节点的取消和中断
            */
            while(true) {
                while(true) {
                    while(true) {
                        while(true) {
                            // 先跳过这段逻辑
                            while(!first) {
                                Node var10000 = node == null ? null : ((Node)node).prev;
                                pred = var10000;
                                if (var10000 == null || (first = this.head == pred)) {
                                    break;
                                }

                                if (pred.status < 0) {
                                    this.cleanQueue();
                                } else {
                                    if (pred.prev != null) {
                                        break;
                                    }
                                    //提示 JIT 编译器优化忙等待的调用
                                    Thread.onSpinWait();
                                }
                            }
                            // 
                            if (first || pred == null) {
                                boolean acquired;
                                // 再次执行一次CAS
                                // 这里继续执行一次,
                                try {
                                    if (shared) {
                                        acquired = this.tryAcquireShared(arg) >= 0;
                                    } else {
                                        // 非公平
                                        acquired = this.tryAcquire(arg);
                                    }
                                } catch (Throwable var16) {
                                    Throwable ex = var16;
                                    this.cancelAcquire((Node)node, interrupted, false);
                                    throw ex;
                                }
                                // 如果拿到锁
                                if (acquired) {
                                    // 如果是第一个CLH节点
                                    if (first) {
                                        ((Node)node).prev = null;
                                        //设置头和尾
                                        this.head = (Node)node;
                                        pred.next = null;
                                        ((Node)node).waiter = null;
                                        if (shared) {
                                            signalNextIfShared((Node)node);
                                        }

                                        if (interrupted) {
                                            current.interrupt();
                                        }
                                    }

                                    return 1;
                                }
                            }
                            // 第一次进 Node == null
                            if (node != null) {
                                // 前驱不为null,也就是队列存在之
                                if (pred != null) {
                                    if (!first || spins == 0) {
                                        if (((Node)node).status != 0) {
                                            spins = postSpins = (byte)(postSpins << 1 | 1);
                                            if (!timed) {
                                                LockSupport.park(this);
                                            } else {
                                                long nanos;
                                                if ((nanos = time - System.nanoTime()) <= 0L) {
                                                    return this.cancelAcquire((Node)node, interrupted, interruptible);
                                                }e
                                            ((Node)node).clearStatus();
                                            continue label127;
                                        }

                                        ((Node)node).status = 1;
                                    } else {
                                        --spins;
                                        Thread.onSpinWait();
                                    }
                                } else {
                                    ((Node)node).waiter = current;
                                    Node t = this.tail;
                                    ((Node)node).setPrevRelaxed(t);
                                    if (t == null) {
                                        this.tryInitializeHead();
                                    } else if (!this.casTail(t, (Node)node)) {
                                        ((Node)node).setPrevRelaxed((Node)null);
                                    } else {
                                        t.next = (Node)node;
                                    }
                                }
                            } else if (shared) {
                                node = new SharedNode();
                            } else {
                                // node == null 时,新来一个独占锁
                                node = new ExclusiveNode();
                            }
                        }
                    }
                }
            }
        } while(!(interrupted |= Thread.interrupted()) || !interruptible);

        return this.cancelAcquire((Node)node, interrupted, interruptible);
    }
 }

ReentrantLock

与synchronized对比

jdk15取消biased-lock之后,synchronzied性能提升不少,但是到底那个更快呢?本人做了不严谨测试,代码如下:

// 这里准备3个类
public class SynLock {
    public long a = -100000L;

    public synchronized void add(long args) {
        a += args;
    }
}
public class ReeLock {
    ReentrantLock lock = new ReentrantLock();

    public long a = -100000L;

    public final void add(long args){
        lock.lock();
        a += args;
        lock.unlock();
    }
}
public class LockMain {
    public static void main(String[] args) throws Exception {
        ReeLock reeLock = new ReeLock();
        SynLock synLock = new SynLock();

        List<Long> synList = new ArrayList<>();
        List<Long> lockList = new ArrayList<>();
        for(int j = 0; j < 100; j++) {
            new Thread(()->{
                long start = System.currentTimeMillis();
                List<CompletableFuture> futures = new ArrayList<>();
                for (int i = 0; i < 10000000; i++) {
                    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                        reeLock.add(1L);
                    });
                    futures.add(future);
                }
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
                System.out.println(reeLock.a);
                long end = System.currentTimeMillis();
                System.out.println("test reentrantLock");
                System.out.println(end - start);
                lockList.add(end-start);
            }).run();
            new Thread(()->{
                List<CompletableFuture> futures = new ArrayList<>();
                long start = System.currentTimeMillis();
                for (int i = 0; i < 10000000; i++) {
                    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                        synLock.add(1L);
                    });
                    futures.add(future);
                }
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
                System.out.println(synLock.a);
                long end = System.currentTimeMillis();
                System.out.println("test synchronizedLock");
                System.out.println(end - start);
                synList.add(end- start);
            }).run();
        }
        Double syn = synList.stream().collect(Collectors.averagingDouble(Long::longValue));
        Double lock = lockList.stream().collect(Collectors.averagingLong(Long::longValue));
        System.out.println("syn module cost average time"+syn);
        System.out.println("lock module cost average time"+lock);
        TimeUnit.SECONDS.sleep(1);
        HashMap<Object, Object> map = new HashMap<>();
        map.put(1,1);
    }
}

可以自行测试,这段代码的耗时还是很长的,最后还是synchronzied更快一点,不过这个方法也不太准确,而且二者的差距很小,我的猜测是lockjava层面,虽然jit优化,但是可能jvm层面的修改对象头更快?

Lock

首先实现的是 Lock接口,其实 Lock是对锁操作的抽象,是约束 ReentrantLock的行为接口, 但是后续的AQS不是为了Lock而生,所以可以理解为 ReentrantLock使用了AQS,不是继承自AQS

public interface Lock {
    void lock();
	// 等待锁时被中断,抛出异常,线程获取锁时调用的
    void lockInterruptibly() throws InterruptedException;

    boolean tryLock();
	
    boolean tryLock(long var1, TimeUnit var3) throws InterruptedException;

    void unlock();
		/**
		ReentrantLock + Condition await() && signal() 等同于
		Synchronized + Object wait() && notifyAll()
		*/
    Condition newCondition();
}

Sync

ReentrantLock 的内部实现是依赖与Sync ,Sync又是继承AQS,Sync的实现有FairSyncNonFairSync, 本文基于后者

 // 实现AQS,或者说 ReentrantLock基于AQS
 abstract static class Sync extends AbstractQueuedSynchronizer {

		// 此注解用于通知jvm编译器为此方法预留固定的栈槽,减少栈帧的分配
        //这个接口实现的是 Lock 的行为,其实这里是不依赖于CAS的
        @ReservedStackAccess
        final boolean tryLock() {
            Thread current = Thread.currentThread();
            int c = this.getState();
            if (c == 0) {
            	//CAS尝试获取锁
                if (this.compareAndSetState(0, 1)) {
                	//成功设置当前锁Owner线程
                    this.setExclusiveOwnerThread(current);
                    return true;
                }
                // 可重入
            } else if (this.getExclusiveOwnerThread() == current) {
                ++c;
                // 这里判断是否溢出,感觉有点草率,不可能重入这么多次吧
                if (c < 0) {
                    throw new Error("Maximum lock count exceeded");
                }

                this.setState(c);
                return true;
            }

            return false;
        }
		// 在 NonFairSync FairSync实现,
        abstract boolean initialTryLock();

        @ReservedStackAccess
        // 这个方法是真正的逻辑组织
        final void lock() {
            // 用一次 tryLock的逻辑,尝试获取锁
            // 获取不到,执行较重的acquire操作,就要入队了
            if (!this.initialTryLock()) {
                //这里的 acquire方法还是AQS提供的
                this.acquire(1);
            }

        }

   		/**
   		尝试释放锁
   		*/
        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            int c = this.getState() - releases;
            // 判断加锁和解锁是否是同一线程
            if (this.getExclusiveOwnerThread() != Thread.currentThread()) {
                throw new IllegalMonitorStateException();
            } else {
                boolean free = c == 0;
                if (free) {
                	// 设置占有线程 null
                    this.setExclusiveOwnerThread((Thread)null);
                }
				//state = 0;后续isLock()有用
                this.setState(c);
                return free;
            }
        }
    }

NonfairLock

非公平锁的实现,注意这个 initialTryLock方法.有点和Sync的tryLock方法相同,不要搞混

static final class NonfairSync extends Sync {
        //lock方法的ops,和Sync的tryLock操作相同
        final boolean initialTryLock() {
            Thread current = Thread.currentThread();
            // 只有state == 0的时候,是无锁的
            if (this.compareAndSetState(0, 1)) {
                this.setExclusiveOwnerThread(current);
                return true;
                // 判断是不是当前线程已经加锁
            } else if (this.getExclusiveOwnerThread() == current) {
                int c = this.getState() + 1;
                if (c < 0) {
                    throw new Error("Maximum lock count exceeded");
                } else {
                    this.setState(c);
                    return true;
                }
            } else {
                return false;
            }
        }
        //Lock的行为
        protected final boolean tryAcquire(int acquires) {
            if (this.getState() == 0 && this.compareAndSetState(0, acquires)) {
                this.setExclusiveOwnerThread(Thread.currentThread());
                return true;
            } else {
                return false;
            }
        }
    }

基类

默认是非公平实现

   public ReentrantLock() {
        this.sync = new NonfairSync();
    }

CAS

cas是类似与无锁化,是采用操作系统的原语cmpxchg,cpu执行此指令时,处理器自动锁定总线,禁止其他cpu访问共享变量.执行期间,cpu自动禁止中断,位于java的Unsafe类中 cas的自旋是对性能的损耗,如何量化cas和锁之间的优劣呢?AQS基于CLH队列实现线程的park,所以肯定优于直接加锁,并且AQS中尽可能的利用CAS操作, 最大程度的利用乐观思想避免使用 LockSupport.park()方法

package jdk.internal.misc;
public final class Unsafe {
	// 以此类方法compareAndSetXXX作为介绍
	//最终的本地方法,用于比较并设置对象的一个字段。如果当前值等于预期值,则将其设置为新的值,并返回 true;否则不做任何更改并返回 false
  @IntrinsicCandidate
    public final native boolean compareAndSetReference(Object var1, long var2, Object var4, Object var5);
	//执行比较和交换操作,并返回旧值
    @IntrinsicCandidate
    public final native Object compareAndExchangeReference(Object var1, long var2, Object var4, Object var5);

    @IntrinsicCandidate
    //获取操作上加了一个内存屏障,确保了后续对其他变量的读取不会重排序到这个获取操作之前
    public final Object compareAndExchangeReferenceAcquire(Object o, long offset, Object expected, Object x) {
        return this.compareAndExchangeReference(o, offset, expected, x);
    }
	
    @IntrinsicCandidate
    //release 版本,意味着在释放操作上加了一个内存屏障,确保了之前的写操作不会重排序到这个释放操作之后。
    public final Object compareAndExchangeReferenceRelease(Object o, long offset, Object expected, Object x) {
        return this.compareAndExchangeReference(o, offset, expected, x);
    }

    @IntrinsicCandidate
    //一个弱版本的比较并设置方法,它可能不会总是成功,即便预期值正确。它适合于那些可以容忍偶尔失败的应用场景,比如乐观锁
    public final boolean weakCompareAndSetReferencePlain(Object o, long offset, Object expected, Object x) {
        return this.compareAndSetReference(o, offset, expected, x);
    }
    // ....
}