享学笔记:并发编程之AQS
大数据学习路线java(Java se,javaweb)Linux(shell,高并发架构,lucene,solr)Hadoop(Hadoop,HDFS,Mapreduce,yarn,hive,hbase,sqoop,zookeeper,flume)机器学习(R,mahout)Storm(Storm,kafka,redis)Spark(scala,spark,spark core,spar...
一、简介
AQS是AbstractQueuedSynchronizer (抽象队列同步器)的简称,java中近一半的显示锁是基于AQS实现的。
例如:ReentrantLock(独占锁)、Semaphore(信号量)、ReentrantReadWriteLock(读写锁)、CountDownLatch(并发工具类)、ThreadPoolExecutor(线程池)
源码:
二、锁
ReentrantLock独占锁中就实现了AbstractQueuedSynchronizer。
方法:
不同的锁需要实现和调用不同的方法:
模板方法:
独占式获取:acquire()、acquireInterrutpibly()、 tryAcquireNanos()
独占式释放:release()
共享式获取:acquireShared()、acquireSharedInterruptibly()、 tryAcquireSharedNanos()
共享式释放:releaseShared()
需要覆盖的方法:
独占式获取:tryAcquire()
独占式释放:tryRelease()
共享式获取:tryAcquireShared()
共享式释放:tryReleaseShared()
isHeldExclusively() :该方法返回同步状态,需要自己覆盖实现
getState():获取锁状态 1有锁占用 0无锁
setState()设置锁状态
三、手写LOCK
public class MyReentrantLock implements Lock {
//使用state做锁的状态标志,state=1表示获取到锁,state=0表示释放锁,其他线程可以竞争锁
private static class Sync extends AbstractQueuedSynchronizer{
/**
* 尝试获取锁的方法
* 需要自己实现的流程方法
* @param arg
* @return
*/
@Override
protected boolean tryAcquire(int arg) {
//CAS比较内存中的原始值为0,则修改为传入的状态值1,当前线程获取到锁
if(compareAndSetState(0 , arg)){
setExclusiveOwnerThread(Thread.currentThread());//当前线程得到了锁,则将当前得到锁的线程设置为独占线程
return true;
}
return false;
}
/**
* 释放锁的方法,需要实现
* @param arg
* @return
*/
@Override
protected boolean tryRelease(int arg) {
if(getState() == 0){//判断状态是否为0,为0则直接抛出不支持操作的异常,增强健壮性的代码
throw new UnsupportedOperationException();
}
setExclusiveOwnerThread(null);//将当前独占线程设置为null
setState(0);//将当前标志锁状态的值设置为0,表示锁已经释放
return true;
}
/**
* 是否同步独占,true--已被独占,false--未被独占
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1 ;
}
Condition newCondition(){
return new ConditionObject();//AQS已经实现Condition,此处只需要直接实例化并使用AQS中的实现即可
}
}
private Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1); //获取锁
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);//获取锁,允许获取过程中有中断
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));//获取锁,有超时机制
}
@Override
public void unlock() {
sync.release(1) ;//释放锁
}
@Override
public Condition newCondition() {
return sync.newCondition();//获取AQS中的Condition实例,用于等待、唤醒操作
}
}
在AQS中有一个静态内部类Node,node包含了获取同步状态失败的线程引用、等待状态、前驱节点和后继节点,节点的属性类型和名称以及描述。
int waitStatus:
1、CANCELLED,值为1 。场景:当该线程等待超时或者被中断,需要从同步队列中取消等待,则该线程被置1,即被取消(这里该线程在取消之前是等待状态)。节点进入了取消状态则不再变化;
2、SIGNAL,值为-1。场景:后继的节点处于等待状态,当前节点的线程如果释放了同步状态或者被取消(当前节点状态置为-1),将会通知后继节点,使后继节点的线程得以运行;
3、CONDITION,值为-2。场景:节点处于等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点从等待队列中转移到同步队列中,加入到对同步状态的获取中;
4、PROPAGATE,值为-3。场景:表示下一次的共享状态会被无条件的传播下去;
5、INITIAL,值为0,初始状态。
Node prev:前驱节点,当节点加入同步队列的时候被设置(尾部添加)
Node next:后继节点
Node nextWaiter:等待节点的后继节点。如果当前节点是共享的,那么这个字段是一个SHARED常量,也就是说节点类型(独占和共享)和等待队列中的后继节点共用一个字段。(注:比如说当前节点A是共享的,那么它的这个字段是shared,也就是说在这个等待队列中,A节点的后继节点也是shared。如果A节点不是共享的,那么它的nextWaiter就不是一个SHARED常量,即是独占的。)
Thread thread:获取同步状态的线程
更多推荐
所有评论(0)