# ReentrantLock

# ReentrantLock介绍

ReentrantLock是java util concurrent包中提供的一个锁实现,相对于synchronized关键字锁, ReentrantLock提供了具备超时时间的tryLock、可中断的lockInterruptibly,并且能够通过newCondition创建多个等待条件。 ReentrantLock能够保证和synchronized一致的内存模型规则(happen before)。 ReentrantLock中的Reentrant的意思是可重入,也就是一个线程对一个锁加锁后还可以在锁中再加这个锁。

# ReentrantLock使用

# 加锁

ReentrantLock实现了Lock接口。 lock的常见使用方式如下,创建完ReentrantLock后,可以对lock对象调用lock方法加锁, 使用时要注意的是要注意将加锁后的代码加上try finally代码块,并在finally中调用lock.unlock()保证最后锁能unlock释放。

lock加锁代码中的逻辑能独占运行,和其他要加锁的代码间,保证原子性、可见性和防止重排序等问题。默认的构造函数创建的是非公平锁(有可能出现插队问题但是整体的吞吐量更高),如果要创建 公平锁,可以通过构造函数ReentrantLock(boolean fair)传入true。

除了基本的lock方法,Lock接口中还提供了其他的加锁方法。

  • tryLock: 尝试一次加锁,如果加锁成功返回true可以继续执行代码,注意unlock。否则返回false,表示加锁失败。
  • lockInterruptibly: 和lock不同,lockInterruptibly可以响应中断请求,当加锁等待过程中被其他线程中断,lockInterruptibly会抛出InterruptedException
  • boolean tryLock(long time, TimeUnit unit): 带有超时等待时间的加锁,如果超过这个时间还没加到锁,会返回false,否则为true。如果等待过程中被中断,抛出InterruptedException
final ReentrantLock lock = new ReentrantLock();

public void m() {
     assert lock.getHoldCount() == 0;
     lock.lock();
     try {
       // ... method body
     } finally {
     lock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10
11

# 条件等待

Lock能够创建Condition对象,用来实现类似wait/notify/notifyAll的条件等待通知功能。

Condition对象的等待方法是await(),通知方法是signal/signalAll(注意不要和wait/notify用混,因为wait/notify是每个对象都具有的方法。) 调用await和signal方法需要保证当前在和Condition相同的Lock加锁的前提下,否则会抛出IllegalMonitorStateException异常。 并且条件的状态的读取修改都要在相同的锁的保护下,lock加锁后要在一个while循环中判断条件,不满足条件调用await等待。

可以参考ArrayBlockingQueue中对条件等待通知的经典应用。

ReentrantLock lock = new ReentrantLock()
Condition notFull = lock.newCondition();
public void m() throws InterruptedException {
    lock.lock();
    try {
       while(notFull条件不满足) {
            notFull.await();
       }  
    } finally {
       lock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

# ReentrantLock实现原理

ReentrantLock基于AQS实现(AQS的实现原理参考另一篇AQS文章)。

ReentrantLock的基本实现策略是将AQS的state等于0作为未加锁状态,大于0作为加锁状态,因为ReentrantLock要实现可重入 所以state记录当前锁重入的次数,加锁state+1,释放锁state-1。 当state从1减到0时,说明锁已经release,就会触发AQS中的signalNext机制唤醒可能在AQS等待队列中等待的其他尝试加锁线程。

为了实现非公平锁和公平锁,ReentrantLock类定义了两个AQS的子类,区别是在tryAcquire方法时是否允许插队, 公平锁是不能插队的,所以在执行前要判断当前AQS等待队列中没有有效等待的Node节点。

公平锁和非公平锁因为有很多逻辑相同,所以定义了一个共同的父类Sync

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

    /**
     * tryLock是非公平锁的尝试加锁的实现,如果当前锁没有人使用,则尝试加锁,加锁成功返回true,否则返回false,没有加锁不会阻塞线程。
     */
    @ReservedStackAccess
    final boolean tryLock() {
        // 获取当前线程
        Thread current = Thread.currentThread();
        // 获取当前的stat
        int c = getState();
        // 如果state为0,则尝试通过cas修改state来实现加锁
        if (c == 0) {
            // 使用AQS的compareAndSetState修改state状态为1
            if (compareAndSetState(0, 1)) {
                // 修改成功说明当前线程获得了锁,设置exclusiveOwnerThread,在后面判断重入时有用
                setExclusiveOwnerThread(current);
                return true;
            }
        } 
        // 如果state不等于0,则判断下是否是重入,即持有锁的线程和当前线程是否是同一个
        else if (getExclusiveOwnerThread() == current) {
            // 如果是,增加state的值,这里不需要使用cas,是因为在加锁中,其他线程无法对state进行修改,没有原子性问题
            if (++c < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            // 修改state
            setState(c);
            return true;
        }
        // 否则state != 0且已经加锁的不是当前线程,返回false表示尝试加锁失败
        return false;
    }

    /**
     * Checks for reentrancy and acquires if lock immediately
     * available under fair vs nonfair rules. Locking methods
     * perform initialTryLock check before relaying to
     * corresponding AQS acquire methods.
     */
    /**
     * 一次尝试加锁,公平锁和非公平锁提供不同的实现。在lock方法(也包括lockInterruptibly等)中会先调用initialTryLock,如果false才会调用AQS的acquire
     * 这个不同于tryAcquire是因为tryAcquire在很多情况下都会调用,而initialTryLock明确在调用acquire之前调用,对于公平锁和tryAcquire有区别,后面会讲到
     */
    abstract boolean initialTryLock();

    @ReservedStackAccess
    final void lock() {
        // 先第一次initialTryLock
        if (!initialTryLock())
            // initialTryLock不成功再acquire
            acquire(1);
    }

    @ReservedStackAccess
    final void lockInterruptibly() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!initialTryLock())
            acquireInterruptibly(1);
    }

    @ReservedStackAccess
    final boolean tryLockNanos(long nanos) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return initialTryLock() || tryAcquireNanos(1, nanos);
    }

    // 定义公平锁和非公平锁一致的tryRelease释放方法, ReentrantLock的unlock方法会调用AQS的release(1),然后会调用到这里的tryRelease方法,参数是1。
    @ReservedStackAccess
    protected final boolean tryRelease(int releases) {
        // 获取当前的state,计算减去之后的state值
        int c = getState() - releases;
        // 判断下当前线程是否已经加了这个锁,如果没有抛出异常
        if (getExclusiveOwnerThread() != Thread.currentThread())
            throw new IllegalMonitorStateException();
        // state减到0时,会释放整个锁,需要清除exclusiveOwnerThread
        boolean free = (c == 0);
        if (free)
            setExclusiveOwnerThread(null);
        // 修改state值,整个方法不需要考虑原子性问题,因为是还在加锁状态下进行的,执行过程中,其他线程无法修改state值。
        setState(c);
        return free;
    }

    // 判断同步器是否是被当前线程正独占持有
    protected final boolean isHeldExclusively() {
        // While we must in general read state before owner,
        // we don't need to do so to check if current thread is owner
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    // 创建ConditionObject的方法
    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    // Methods relayed from outer class

    final Thread getOwner() {
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    final int getHoldCount() {
        return isHeldExclusively() ? getState() : 0;
    }

    final boolean isLocked() {
        return getState() != 0;
    }

    /**
     * Reconstitutes the instance from a stream (that is, deserializes it).
     */
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121

# 非公平锁实现

先看下非公平锁的实现,非公平锁不需要考虑先到后到的问题,只需要在state为0的情况下cas就可以了,并且在第一次尝试获取锁时判断下锁的线程是否是当前线程,如果是进行重入。

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    // initialTryLock实现,在lock方法中xian调用initialTryLock,不成功调用tryAcquire
    final boolean initialTryLock() {
        // 获取当前线程
        Thread current = Thread.currentThread();
        // 先cas,从0到1
        if (compareAndSetState(0, 1)) { // first attempt is unguarded
            // 如果成功,加锁成功,修改exclusiveOwnerThread
            setExclusiveOwnerThread(current);
            return true;
        }
        // cas失败判断下线程是否同一个
        else if (getExclusiveOwnerThread() == current) {
            int c = getState() + 1;
            if (c < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(c);
            return true;
        } else
            // cas失败并且也不是重入场景,返回false
            return false;
    }

    /**
     * tryAcquire方法,会在调用完initialTryLock(返回false)之后调用acquire方法,在acquire方法中调用
     * 走到这里,肯定能够判断不是重入的场景了,所以不需要在判断exclusiveOwnerThread是否是当前线程,能够提升性能
     */
    protected final boolean tryAcquire(int acquires) {
        // 判断state为0的前提下进行cas从0到1
        if (getState() == 0 && compareAndSetState(0, acquires)) {
            // 如果成功,修改exclusiveOwnerThread为当前线程
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 公平锁实现

公平锁要保证在队列中的Node优先加锁的原则。

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    /**
     * lock方法中第一次调用,此时当前线程没有加入到AQS等待队列中,所以判断优先级只需要判断AQS队列中是否有有效的Node即可
     */
    final boolean initialTryLock() {
        Thread current = Thread.currentThread();
        // 获取当前state
        int c = getState();
        // state为0时尝试加锁
        if (c == 0) {
            // 判断AQS队列是否有等待的线程,如果没有通过cas尝试修改state状态
            if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
                // 获取成功设置exclusiveOwnerThread
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 判断是否是相同线程,重入的情况
        else if (getExclusiveOwnerThread() == current) {
            if (++c < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(c);
            return true;
        }
        return false;
    }

    /**
     * initialTryLock失败后,lock方法会调用acquire,acquire中多个时机都会调用tryAcquire
     */
    protected final boolean tryAcquire(int acquires) {
        // tryAcquire先判断state为0,还需要判断队列中是否有比当前Node更靠前的Node,因为可能有其他线程在当前线程之前把Node加入到AQS队列中了
        // 在判断AQS队列中当前没有更靠前的Node后,尝试cas
        if (getState() == 0 && !hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            // 成功后修改exclusiveOwnerThread
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

AQS中的hasQueuedThreads()方法实现,从tail向前遍历(原因在AQS文章中进行了详细分析),如果找到一个status >=0的Node(非head),说明队列中有有效节点返回true,否则返回false

public final boolean hasQueuedThreads() {
    for (Node p = tail, h = head; p != h && p != null; p = p.prev)
        if (p.status >= 0)
            return true;
    return false;
}
1
2
3
4
5
6

AQS中的hasQueuedPredecessors()方法返回AQS队列中是否有比当前线程更靠前的节点。

实现方案为尝试获取first节点(head的next),判断first节点的线程是否和当前线程相同。

从head开始获取first节点更加快速(不需要像tail那样遍历)但是head节点判断可能会因为头结点更换等因素影响数据一致性。 这些情况下都需要从tail向前遍历查找,因为如果当前线程在队列中,从tail向前一定能找到当前的线程,并且当前线程不会从队列中被移出(没有从head开始查找的问题)

  • 如果head为null,说明现在队列还没有创建,返回false(此时first是null)
  • 如果head不是null,head.next是null,说明队列已经初始化完成,但是可能这个head已经出队不再是最新的head
  • 如果head.next不是null,说明队列中有节点了,但是s.waiter是null,说明有其他线程正在修改head,s从first节点变成head了
  • 如果s.waiter不是null,但是s.prev是null,说明s变成了新的head节点,first节点也发生了变化。

前面这些情况下,都需要再从tail向前遍历一遍(FIXME WHY),找到first节点,通过调用getFirstQueuedThread()获得first节点的线程, 并和当前线程比较。

public final boolean hasQueuedPredecessors() {
    Thread first = null; Node h, s;
    if ((h = head) != null && ((s = h.next) == null ||
                               (first = s.waiter) == null ||
                               s.prev == null))
        first = getFirstQueuedThread(); // retry via getFirstQueuedThread
    return first != null && first != Thread.currentThread();
}
1
2
3
4
5
6
7
8
public final Thread getFirstQueuedThread() {
    Thread first = null, w; Node h, s;
    // if判断还是和hasQueuedPredecessors中的一样
    if ((h = head) != null && ((s = h.next) == null ||
                               (first = s.waiter) == null ||
                               s.prev == null)) {
        // 从tail向前查找,找到最后一个waiter不是null的节点,将这个waiter返回
        // traverse from tail on stale reads
        for (Node p = tail, q; p != null && (q = p.prev) != null; p = q)
            if ((w = p.waiter) != null)
                first = w;
    }
    return first;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# jdk17相对jdk8版本的变化

# 不区分第一次尝试加锁和之后的尝试加锁

在jdk8中,lock方法调用acquire,acquire中的每次调用tryAcquire都需要判断是否重入、公平锁是否有靠前的节点。

jdk17版本的实现中区分了第一次和之后的尝试加锁,可以针对各自进行优化,比如之后的tryAcquire不再判断exclusiveOwnerThread是否是自己等。

# hasQueuedThreads变化

jdk8中的hasQueuedThreads只判断了 head != tail,而jdk17中还额外对节点的status进行了判断过滤掉CANCELLED节点。

jdk8版本

public final boolean hasQueuedThreads() {
    return head != tail;
}
1
2
3

jdk17版本

public final boolean hasQueuedThreads() {
    for (Node p = tail, h = head; p != h && p != null; p = p.prev)
        if (p.status >= 0)
            return true;
    return false;
}
1
2
3
4
5
6

# hasQueuedPredecessors()方法变化

jdk17版本的hasQueuedPredecessors方法相比下面Jdk8版本的更加严谨,对更多的并发情况进行了判断处理。

比如下面jdk8代码中,如果在执行 h != t时,当前线程的Node是first.next,在执行到s = h.next时,旧的head已经出队, 所以head.next返回null,则(s = h.next) == null会返回true,此时其实当前线程已经是first节点了,hasQueuedPredecessor并不准确。

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
1
2
3
4
5
6
7
8
9
10