一、简介

AQS是AbstractQueuedSynchronizer (抽象队列同步器)的简称,java中近一半的显示锁是基于AQS实现的。

例如:ReentrantLock(独占锁)、Semaphore(信号量)、ReentrantReadWriteLock(读写锁)、CountDownLatch(并发工具类)、ThreadPoolExecutor(线程池)

源码:

二、锁

ReentrantLock独占锁中就实现了AbstractQueuedSynchronizer。

方法:

不同的锁需要实现和调用不同的方法:

模板方法

独占式获取:acquire()、acquireInterrutpibly()、 tryAcquireNanos()

独占式释放:release()

共享式获取:acquireShared()、acquireSharedInterruptibly()、 tryAcquireSharedNanos()

共享式释放:releaseShared()

需要覆盖的方法

独占式获取:tryAcquire()

独占式释放:tryRelease()

共享式获取:tryAcquireShared()

共享式释放:tryReleaseShared()

isHeldExclusively() :该方法返回同步状态,需要自己覆盖实现

getState():获取锁状态 1有锁占用 0无锁

setState()设置锁状态

三、手写LOCK

public class MyReentrantLock implements Lock {
        //使用state做锁的状态标志,state=1表示获取到锁,state=0表示释放锁,其他线程可以竞争锁
        private static class Sync extends AbstractQueuedSynchronizer{
        /**
        * 尝试获取锁的方法
        * 需要自己实现的流程方法
        * @param arg
        * @return
        */
        @Override
        protected boolean tryAcquire(int arg) {
                //CAS比较内存中的原始值为0,则修改为传入的状态值1,当前线程获取到锁
                if(compareAndSetState(0 , arg)){
                        setExclusiveOwnerThread(Thread.currentThread());//当前线程得到了锁,则将当前得到锁的线程设置为独占线程
                        return true;
                }
                return false;
        }
        /**
        * 释放锁的方法,需要实现
        * @param arg
        * @return
        */
        @Override
        protected boolean tryRelease(int arg) {
            if(getState() == 0){//判断状态是否为0,为0则直接抛出不支持操作的异常,增强健壮性的代码
                    throw new UnsupportedOperationException();
            }
            setExclusiveOwnerThread(null);//将当前独占线程设置为null
            setState(0);//将当前标志锁状态的值设置为0,表示锁已经释放
            return true;
        }
            /**
            * 是否同步独占,true--已被独占,false--未被独占
            * @return
            */
            @Override
            protected boolean isHeldExclusively() {
                return getState() == 1 ;
            }
            Condition newCondition(){
                    return new ConditionObject();//AQS已经实现Condition,此处只需要直接实例化并使用AQS中的实现即可
            }
         }
        private Sync sync = new Sync();
        @Override
        public void lock() {
                sync.acquire(1); //获取锁
        }
        @Override
        public void lockInterruptibly() throws InterruptedException {
                sync.acquireInterruptibly(1);//获取锁,允许获取过程中有中断
        }
        @Override
        public boolean tryLock() {
                return sync.tryAcquire(1);
        }
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
                return sync.tryAcquireNanos(1,unit.toNanos(time));//获取锁,有超时机制
        }
        @Override
        public void unlock() {
                sync.release(1) ;//释放锁
        }
        @Override
        public Condition newCondition() {
                return sync.newCondition();//获取AQS中的Condition实例,用于等待、唤醒操作
        }
}

 

 

在AQS中有一个静态内部类Node,node包含了获取同步状态失败的线程引用、等待状态、前驱节点和后继节点,节点的属性类型和名称以及描述。

int waitStatus:

1、CANCELLED,值为1 。场景:当该线程等待超时或者被中断,需要从同步队列中取消等待,则该线程被置1,即被取消(这里该线程在取消之前是等待状态)。节点进入了取消状态则不再变化;

2、SIGNAL,值为-1。场景:后继的节点处于等待状态,当前节点的线程如果释放了同步状态或者被取消(当前节点状态置为-1),将会通知后继节点,使后继节点的线程得以运行;

3、CONDITION,值为-2。场景:节点处于等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点从等待队列中转移到同步队列中,加入到对同步状态的获取中;

4、PROPAGATE,值为-3。场景:表示下一次的共享状态会被无条件的传播下去;

5、INITIAL,值为0,初始状态。

Node prev:前驱节点,当节点加入同步队列的时候被设置(尾部添加)

Node next:后继节点

Node nextWaiter:等待节点的后继节点。如果当前节点是共享的,那么这个字段是一个SHARED常量,也就是说节点类型(独占和共享)和等待队列中的后继节点共用一个字段。(注:比如说当前节点A是共享的,那么它的这个字段是shared,也就是说在这个等待队列中,A节点的后继节点也是shared。如果A节点不是共享的,那么它的nextWaiter就不是一个SHARED常量,即是独占的。)

Thread thread:获取同步状态的线程

 

 

 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐