Java中的读写锁ReentrantReadWriteLock介绍与使用 C/C++

信息发布员 5月前 75

导读

这篇文章我们来Java中的读写锁。阅读完本篇文章,你将了解到:

1、读写锁的使用场景和优缺点

2、读写锁的实现原理

3、如何使用读写锁

1、Java中的读写锁

有这样一种场景:

如果对一个共享资源的写操作没有读操作那么频繁,这个时候可以允许多个线程同时读取共享资源;

但是如果有一个线程想去写这些共享资源,那么其他线程此刻就不应该对这些资源进行读和写操作了。

Java中的ReentrantReadWriteLock正是为这种场景提供的锁。该类里面包括了读锁和写锁。

1.1、可获取读锁的情况

没有其他线程正在持有写锁;

尝试获取读锁的线程同时持有写锁。

1.2、可获取写锁的情况

没有其他线程正在持有读锁;

没有其他线程正在持有写锁。

1.3、读写锁特点

允许并发读:只要没有线程正在更新数据,那么多个线程就可以同时读取数据;

只能独占写:只要有一个线程正在写数据,那么就会导致其他线程的读或者写均被阻塞;但写的线程可以获取读锁,并通过释放写锁,让锁降级为读锁;(不能由读锁升级为写锁)

只要有一个线程正在读数据,那么其他线程的写入就会阻塞,直到读锁被释放;

公平性:支持非公平锁和公平锁,非公平锁吞吐量较高;

可重入:无论是读锁还是写锁都是支持可重入的。

读写锁可以增加更新不频繁而读取频繁的共享数据结构的吞吐量。

2、实现原理

ReentrantReadWriteLock是可重入读写锁的实现。我们先来看看涉及到的类:

我们可以看到,ReentrantReadWriteLock中也具有非公平锁NonfairSync和公平锁FairSync的实现。同时ReentrantReadWriteLock组合了两把锁:写锁WriteLock和读锁ReadLock。

我们来看看具体的构造函数:

public ReentrantReadWriteLock(boolean fair) {
  sync = fair ? new FairSync() : new NonfairSync();
  readerLock = new ReadLock(this);
  writerLock = new WriteLock(this);
}

可以发现,通过参数fair控制是创建非公平锁还是公平锁。同时ReentrantReadWriteLock持有了写锁和读锁。

而本质上,读锁和写锁都是通过持有ReentrantReadWriteLock.sync来进行加锁和释放锁的,用的是同一个AQS,Sync类提供类对ReentrantReadWriteLock的支持:

protected ReadLock(ReentrantReadWriteLock lock) {
  sync = lock.sync; // 引用的是ReentrantReadWriteLock的sync实例
}
protected WriteLock(ReentrantReadWriteLock lock) {
  sync = lock.sync; // 引用的是ReentrantReadWriteLock的sync实例
}

基于对AQS原理的理解,我们知道sync是读写锁实现的关键,而aqs中核心是state字段和双端等待队列。下面我们来看看具体的实现。

2.1、看代码之前您必须了解的内容

在查看ReentrantReadWriteLock之前,您需要了解以下内容:

2.1.1、SYNC.HOLDCOUNTER类

读锁计数器类,为每个获取读锁的线程进行计数。Sync类中有一个cachedHoldCounter字段,该字段主要是缓存上一个线程的读锁计数器,节省ThreadLocal查找次数。

static final class HoldCounter {
  // 某个读线程的重入次数
  int count = 0;
  // 某个线程的tid字段
  // Use id, not reference, to avoid garbage retention
  final long tid = getThreadId(Thread.currentThread());
}

2.1.2、SYNC.THREADLOCALHOLDCOUNTER类

当前线程持有的可重入读锁的数量,当数量下降到0的时候进行删除。


static final class ThreadLocalHoldCounter
  extends ThreadLocal<HoldCounter> {
  public HoldCounter initialValue() {
    return new HoldCounter();
  }
}

2.1.3、读写锁中AQS的STATE状态设计

AQS中的state为了能够同时记录读锁和写锁的状态,把32位变量分为了两部分:

如上图,高16位存储读状态,读锁是共享锁,这里记录持有读锁的线程数;低16位是写状态,写锁是排他锁,这里0表示没有线程持有,大于0表示持有线程对锁的重入次数。

2.1.4、关于读写锁的数据结构

虽然读写锁看起来有两把锁,但是底层用的都是同一个state,同一个等待队列。只不过是通过ReadLock和WriteLock分别提供了读锁和写锁的API,底层还是用同一个AQS。如下图:

由于读写锁是互斥的,所以线程1获取写锁,线程2获取读锁,并发执行的时候,一定有一个会失败;

如果是已经获取了读锁的线程尝试获取写锁,则会获取成功;

公平模式下,先进入等待队列的线程先被处理;非公平模式下,如果尝试获取写锁的线程节点在头节点后面,尝试获取读锁的线程要让步,进入等待队列;

线程节点获取到读锁之后,会判断下一个节点是否处于共享模式,如果是则会一直传播并唤醒后续共享模式节点;

如果有其他线程获取了写锁,那么获取写锁就会被阻塞。

公平和非公平是针对等待队列中的线程节点的处理来说的:

公平模式一般都是从队列头开始处理,并且如果等待队列还有待处理节点,新的线程全部都入等待队列;

非公平模式一般不管等待队列里面有没有待处理节点,都会先尝试竞争获取锁;特殊情况:如果等待队列中有写锁线程,那么新来的读锁线程必须排队让写锁线程先进行处理。

其实关于读写锁的原理就差不多是这么多了。

以下是详细的代码分析,可能会比较枯燥,为了避免让大家一头陷入源码中,于是在上面先把源码做的事情都给讲出来了。建议感兴趣的同学打开电脑跟踪源码一起来阅读。

1.1、ReadLock实现原理

1.1.1、LOCK

查看ReadLock的lock相关方法,调用的是AQS的acquireShared方法,该方法会以共享模式获取锁:

public final void acquireShared(int arg) {
  // 尝试获取锁
  if (tryAcquireShared(arg) < 0)
    // 如果获取锁失败了,那么会进入ASQ的等待队列,等待被唤醒后重新尝试获取锁
    doAcquireShared(arg);
}

下面看看关键获取锁的tryAcquireShared方法,该方法主要处理逻辑:

因为读写是互斥的,如果另一个线程持有写锁,则失败;

否则,此线程具备锁定write状态的条件,因此判断是否应该进入阻塞。 如果不是,请尝试CAS获取读锁许可并更新读锁计数。 请注意,该步骤不检查重入,这将推迟到最后fullTryAcquireShared方法;

如果第2步失败,或者由于线程不符合锁定条件或者CAS失败或读锁计数饱和,将会使用fullTryAcquireShared进一步重试。

下面是详细的说明:

protected final int tryAcquireShared(int unused) {
  Thread current = Thread.currentThread();
  int c = getState();
  // 如果存在写锁,并且写锁不是当前线程,则直接失败让线程进入等待队列
  if (exclusiveCount(c) != 0 &&
      getExclusiveOwnerThread() != current)
    return -1;
  int r = sharedCount(c);
  // 判断读锁是否应该被阻塞,公平模式下,先进入等待队列则先被处理;非公平模式下写锁优先级比较高,如果头节点的下一个节点不是共享模式,即是尝试获取写锁的线程,读锁需要让步
  if (!readerShouldBlock() &&
      // 读锁是否已到达获取上线
      r < MAX_COUNT &&
      // CAS修改读锁状态,+1
      compareAndSetState(c, c + SHARED_UNIT)) {
      // 获取读锁成功
      if (r == 0) {
        // 如果是第一个获取读锁的线程,也就是把读锁状态从0变到1的那个线程,那么存入firstReader中
        firstReader = current;
        // firstReader持有锁=1
        firstReaderHoldCount = 1;
      } else if (firstReader == current) {
        // firstReader已经是当前线程,则firstReaderHoldCount++
        firstReaderHoldCount++;
      } else { // 读锁数量不为0,并且第一个读线程不为当前线程
        // 获取缓存读锁计数器
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
          // 缓存读锁计数器为空或者计数器不是当前线程的,则尝试通过ThreadLocal获取当前线程对应的计数器
          cachedHoldCounter = rh = readHolds.get();
        else if (rh.count == 0)
          readHolds.set(rh);
        rh.count++;
      }
      return 1;
    }
    // 以上执行失败,则进入该逻辑
    return fullTryAcquireShared(current);
}

让我们接着看fullTryAcquireShared方法,这个方法可知,只有其他线程持有写锁,或者使用的是公平锁并且头节点后面还有其他等待的线程,或者头节点后面的节点不是共享模式,或者读锁计数器达到了上限,则阻塞,否则一直会循环尝试获取锁:

final int fullTryAcquireShared(Thread current) {
  HoldCounter rh = null;
  for (;;) {
    int c = getState();
    // 如果存在写锁,并且写锁不是当前线程,则返回false
    if (exclusiveCount(c) != 0) {
      if (getExclusiveOwnerThread() != current)
        return -1;
      // else we hold the exclusive lock; blocking here
      // would cause deadlock.
    // 不存在写锁,继续判断是否应该阻塞:如果是公平锁并且头节点后有其他等待的线程,则阻塞,如果是非公平锁,判断头节点后面的节点是否共享模式,如果不是则阻塞
    } else if (readerShouldBlock()) {
      // Make sure we're not acquiring read lock reentrantly
      // 如果当前线程是firstReader,说明是重入
      if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
      } else {
        // 进入该分支,说明没有读写锁冲突,并且不是重入,当前线程也不是firstReader
        if (rh == null) {
          rh = cachedHoldCounter;
          // 判断上一个获取到锁的线程是否当前线程,不是则进入AQS等待队列
          if (rh == null || rh.tid != getThreadId(current)) {
            rh = readHolds.get();
            if (rh.count == 0)
              readHolds.remove();
          }
        }
        // rh.count == 0 表示rh是刚新获取到的,直接返回,进入等待队列
        if (rh.count == 0)
          return -1;
      }
    }
    // 共享锁达到上限了
    if (sharedCount(c) == MAX_COUNT)
      throw new Error("Maximum lock count exceeded");
    // 读锁自增,以下代码与上一个方法中的类似
    if (compareAndSetState(c, c + SHARED_UNIT)) {
      if (sharedCount(c) == 0) {
        firstReader = current;
        firstReaderHoldCount = 1;
      } else if (firstReader == current) {
        firstReaderHoldCount++;
      } else {
        if (rh == null)
          rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
          rh = readHolds.get();
        else if (rh.count == 0)
          readHolds.set(rh);
        rh.count++;
        cachedHoldCounter = rh; // cache for release
      }
      return 1;
    }
  }
}

最后我们来看看doAcquireShared方法:

private void doAcquireShared(int arg) {
  // 添加一个共享等待节点
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      // 判断新增的节点的前一个节点是否头节点
      final Node p = node.predecessor();
      if (p == head) { // 是头节点,那么在此尝试获取共享锁
        int r = tryAcquireShared(arg);
        if (r >= 0) { 
          // 获取成功,把当前节点变为新的head节点,并且检查后续节点是否可以在共享模式下等待,并且允许继续传播,则调用doReleaseShared继续唤醒下一个节点尝试获取锁
          setHeadAndPropagate(node, r);
          p.next = null; // help GC
          if (interrupted)
            selfInterrupt();
          failed = false;
          return;
        }
      }
      // 阻塞节点
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      // 取消获取锁
      cancelAcquire(node);
  }
}

1.1.2、UNLOCK

接下来我们看看释放锁的代码。

public void unlock() {
  sync.releaseShared(1);
}

AbstractQueuedSynchronizer.releaseShared()

public final boolean releaseShared(int arg) {

  if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
  }
  return false;
}

主要处理方法是tryReleaseShared,该方法主要是清理ThreadLocal中的锁计数器,然后CAS修改读锁个数减1:

protected final boolean tryReleaseShared(int unused) {
  Thread current = Thread.currentThread();
  if (firstReader == current) {
    // assert firstReaderHoldCount > 0;
    if (firstReaderHoldCount == 1)
      firstReader = null;
    else
      firstReaderHoldCount--;
  } else {
    HoldCounter rh = cachedHoldCounter;
    if (rh == null || rh.tid != getThreadId(current))
      rh = readHolds.get();
    int count = rh.count;
    if (count <= 1) {
      readHolds.remove();
      if (count <= 0)
        throw unmatchedUnlockException();
    }
    --rh.count;
  }
  for (;;) {
    int c = getState();
    int nextc = c - SHARED_UNIT;
    if (compareAndSetState(c, nextc))
      // Releasing the read lock has no effect on readers,
      // but it may allow waiting writers to proceed if
      // both read and write locks are now free.
      return nextc == 0;
  }
}

1.1、WriteLock实现原理

1.1.1、LOCK

查看WriteLock的lock锁相关方法,调用的是sync.acquire方法,该方法直接继承了ASQ的acquire()方法的实现:

public void lock() {
    sync.acquire(1);
}

与ReentrantLock的实现区别在具体的tryAcquire()方法的实现,我们来看看ReentrantReadWriteLock.Sync中该方法的实现,主要做了以下事情:

如果读锁数量>0,或者写锁数量>0,并且不是重入的,那么直接失败了;

如果锁数量为0,那么该线程有资格获取到写锁,进而尝试获取。

protected final boolean tryAcquire(int acquires) {
  Thread current = Thread.currentThread();
  int c = getState();
  int w = exclusiveCount(c);
  if (c != 0) { // 存在读锁或者写锁
    // 不存在写锁,或者当前线程不是写锁持有的线程,那么直接失败
    if (w == 0 || current != getExclusiveOwnerThread())
      return false;
    // 写锁超多最大数量限制,也直接失败
    if (w + exclusiveCount(acquires) > MAX_COUNT)
      throw new Error("Maximum lock count exceeded");
    // Reentrant acquire
    // 写锁持有的线程重入,直接修改state即可
    setState(c + acquires);
    return true;
  }
  // 判断是否应该阻塞:非公平模式,无需阻塞,公平模式如果前面有其他节点则需要排队阻塞
  if (writerShouldBlock() ||
      // 尝试获取写锁
      !compareAndSetState(c, c + acquires))
    return false;
  setExclusiveOwnerThread(current);
  return true;
}

1.1.2、UNLOCK

查看WriteLock的unlock相关方法,调用的是sync.release方法,该方法直接继承了AQS的release实现:

public void unlock() {
  sync.release(1);
}

以下是release方法:

public final boolean release(int arg) {
  // 尝试释放锁
  if (tryRelease(arg)) {
    // 释放锁成功,则唤醒队列中头节点后的一个线程
    Node h = head;
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}

释放锁的逻辑主要在tryRelease方法,下面是详细代码:

protected final boolean tryRelease(int releases) {
  // 如果当前线程没有获取写锁,则释放直接抛异常
  if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
  int nextc = getState() - releases;
  boolean free = exclusiveCount(nextc) == 0;
  // 如果当前线程完全释放了写锁,则去除独占标识
  if (free)
    setExclusiveOwnerThread(null);
  // 修改state
  setState(nextc);
  return free;
}

3、读写锁使用例子

下面是读写锁的使用例子,该例子实现了一个支持并发访问的ArrayList。

因为读写锁是互斥的,保证了不会因为写导致读取出现的不一致。

代码如下:

public class ReentrantReadWriteLockTest {
    static final int READER_SIZE = 10;
    static final int WRITER_SIZE = 2;
    public static void main(String[] args) {
        Integer[] initialElements = {33, 28, 86, 99};
        ReadWriteList<Integer> sharedList = new ReadWriteList<>(initialElements);
        for (int i = 0; i < WRITER_SIZE; i++) {
            new Writer(sharedList).start();
        }
        for (int i = 0; i < READER_SIZE; i++) {
            new Reader(sharedList).start();
        }
    }
}
class Reader extends Thread {
    private ReadWriteList<Integer> sharedList;
    public Reader(ReadWriteList<Integer> sharedList) {
        this.sharedList = sharedList;
    }
    public void run() {
        Random random = new Random();
        int index = random.nextInt(sharedList.size());
        Integer number = sharedList.get(index);
        System.out.println(getName() + " -> get: " + number);
        try {
            Thread.sleep(100);
        } catch (InterruptedException ie ) { ie.printStackTrace(); }
    }
}
class Writer extends Thread {
    private ReadWriteList<Integer> sharedList;
    public Writer(ReadWriteList<Integer> sharedList) {
        this.sharedList = sharedList;
    }
    public void run() {
        Random random = new Random();
        int number = random.nextInt(100);
        sharedList.add(number);
        try {
            Thread.sleep(100);
            System.out.println(getName() + " -> put: " + number);
        } catch (InterruptedException ie ) { ie.printStackTrace(); }
    }
}
/**
 * 支持并发读写的ArrayList
 */
class ReadWriteList<E> {
    private List<E> list = new ArrayList<>();
    private ReadWriteLock rwLock = new ReentrantReadWriteLock();
    public ReadWriteList(E... initialElements) {
        list.addAll(Arrays.asList(initialElements));
    }
    public void add(E element) {
        Lock writeLock = rwLock.writeLock();
        writeLock.lock();
        try {
            list.add(element);
        } finally {
            writeLock.unlock();
        }
    }
    public E get(int index) {
        Lock readLock = rwLock.readLock();
        readLock.lock();
        try {
            return list.get(index);
        } finally {
            readLock.unlock();
        }
    }
    public int size() {
        Lock readLock = rwLock.readLock();
        readLock.lock();
        try {
            return list.size();
        } finally {
            readLock.unlock();
        }
    }
}


少客联盟- 版权声明 1、本主题所有言论和图片纯属会员个人意见,与少客联盟立场无关。
2、本站所有主题由该帖子作者发表,该帖子作者信息发布员少客联盟享有帖子相关版权。
3、少客联盟管理员和版主有权不事先通知发贴者而删除本文。
4、其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者信息发布员少客联盟的同意。
5、帖子作者须承担一切因本文发表而直接或间接导致的民事或刑事法律责任。
6、本帖部分内容转载自其它媒体,但并不代表本站赞同其观点和对其真实性负责。
7、如本帖侵犯到任何版权问题,请立即告知本站,本站将及时予与删除并致以最深的歉意。
8、官方反馈邮箱:chinasuc@chinasuc.cn


上一篇:模拟发包winhttp和libcurl+openssl
下一篇:ReentrantLock中的Condition实现原理
人生的价值,并不是用时间,而是用深度去衡量的。
最新回复 (0)
    • 少客联盟
      2
        登录 注册 QQ登录(停用)
返回