JUC
自旋锁->
自旋N次(N自适应, 取决于先前历史,当前负载等)->
升级为重量级锁
重量级, mutex 本质上的syscall, 轻量级, CAS去尝试拿到对象头中 的锁标识字节MarkWord
更新成功说明没人抢
偏向锁: 当某个线程第一次获取锁时, 接下来都没有其他线程拿, 那这个线程后续拿锁就连CAS也不需要
无锁->
偏向锁->
自旋锁->
重量级锁
JMM
所有可能出现竞争的变量(成员, 静态等)均在主内存
局部变量线程私有, 工作内存相互隔离, 只能通过主内存同步
volatile
需要立即看到修改的值, 每一次读取都从主线程读, 每一次写都把工作内存的值刷新到主线程
加入内存屏障禁止指令重排
只保证可见性, 不保证原子性
轻到重, volatie->
原子类->
锁
可重入锁, 多次加锁需要多次解锁
(信号量PV, bushi)
getHoldCount()
获取当前线程加锁次数
锁公平性?
可重入锁默认是非公平的(不提供公平性保证)
否则按照拿锁顺序排队 ReentrantLock(boolean fair=false)
读写锁
一种读写双信号量的高级抽象(逃)
https://stackoverflow.com/questions/17683575/binary-semaphore-vs-a-reentrantlock
从应用层上值得注意的就是可重入锁进行进一步封装之后, 保证了
- 强制要求拿锁的线程放锁,牺牲了一些自由度换取编程安全性
Binary semaphores provide a non-ownership release mechanism. Therefore, any thread can release the permit for a deadlock recovery of a binary semaphore. 二进制信号量提供了非所有权释放机制。因此,任何线程都可以释放二进制信号量死锁恢复的许可。
On the contrary, deadlock recovery is difficult to achieve in the case of a reentrant lock. For instance, if the owner thread of a reentrant lock goes into sleep or infinite wait, it won’t be possible to release the resource, and a deadlock situation will result. 相反,在可重入锁的情况下,死锁恢复是很难实现的。例如,如果可重入锁的所有者线程进入睡眠或无限等待,则无法释放资源,从而导致死锁情况。
读锁: 写锁没占, 多个线程可加读锁
写锁: 没有读锁的情况下, 才可以加写锁(看起来是读者优先的实现)
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
并且读写锁保留了可重入 实际上CLHLock, 将一个工作线程抽象成一个节点, 队列本身使用无锁CAS维护, 通过CAS更新节点state来标识状态实现线程内的可重入
公平性和非公平性的实现
公平锁
// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
非公平锁
// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
// 这里对排队不做判断, 直接拿走
if (c == 0) {
if (compareAndSetState(0, acquires)){
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
一个链表, 通过每次拿锁原子性加(AtomicRefence.getAndSet, 实际就是CAS)节点在尾部, 解锁需要等待前一个节点解锁来得到fairness
核心CAS, 用currentThread()创建一个新节点node, 之后设置node.prev为tail, CAS尝试设置tail为node
如果成功, acquire成功; 如果失败, 自旋重复
官方示例
class RWDictionary {
private final Map<String, Data> m = new TreeMap<>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public Data get(String key) {
r.lock();
try {
return m.get(key);
} finally {
r.unlock();
}
}
public List<String> allKeys() {
r.lock();
try {
return new ArrayList<>(m.keySet());
} finally {
r.unlock();
}
}
public Data put(String key, Data value) {
w.lock();
try {
return m.put(key, value);
} finally {
w.unlock();
}
}
public void clear() {
w.lock();
try {
m.clear();
} finally {
w.unlock();
}
}
}
}
支持锁降级: 先拿写锁再拿读锁
不支持锁升级: 废话, 不能先拿读锁再拿写锁, 哎名词发明家
java原子类
AtomicXXX
除了基本类以外还有AtomicIntergerArray, AtomicLongArray, AtomicReferenceArray
就是个槽, 实际上不是什么新鲜的东西, 对给定内存地址CAS而已, 由于java不能直接非Unsafe操作指针导致的
还有一个有趣的是LongAdder类
像是ostep里面的lazy时钟, 抢不到原子加的时候先加本地, 最后同步
注意引用交换的实用性: 例如Object的CAS
甚至可以原子更新字段, AtomicIntegerUpdater
避免ABA问题, 给了个工具类AtomicStampedReference
核心思 路是在CAS成功换到值的时候也换一个stamp过去, 就可以在读取的时候根据stamp来判断是不是想要的值(有没有被其他线程修改过)了
常用并发容器
ArrayList ->
CopyOnWriteArrayList
public boolean add(E e) {
synchronized (lock) {
Object[] es = getArray();
int len = es.length;
es = Arrays.copyOf(es, len + 1);
es[len] = e;
setArray(es); // array = es
return true;
}
}
This is ordinarily too costly, but may be more efficient than alternatives when traversal operations vastly outnumber mutations, and is useful when you cannot or don't want to synchronize traversals, yet need to preclude interference among concurrent threads
开销很大, 但确保了不会出现迭代器失效, 适用于读多写少的情况
(迭代器读是不堵塞的)
如果使用加大锁需要自己同步迭代器
List list = Collections.synchronizedList(new ArrayList());\
// 这玩意就是所有操作都加synchronized(mutex), 除了iterator()
...
synchronized (list) {
Iterator i = list.iterator(); // Must be in synchronized block
while (i.hasNext())
foo(i.next());
}
ConcurrentLinkedQueue 经典并发队列
ConcurrentSkipListMap 并发跳表
ConCurrentHashMap
锁链表头(红黑树根, 长度大于8升级)
阻塞队列 BlockingQueue
- ArrayBlockingQueue
<->
chan(int) - SynchronousQueue
<->
chan(0) - LinkedBlockingQueue
<->
chan(inf)
经典生产消费者()
chan方便理解, 但和chan还是有点不 一样, 可以多消费者
PriorityBlockingQueue 优先级阻塞队列
DelayQueue 延时队列, 在达到延时之前出不了队
用于限流?
线程池
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
)
FixedThreadPool
:固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
SingleThreadExecutor
: 只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
CachedThreadPool
: 可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
ScheduledThreadPool
:给定的延迟后运行任务或者定期执行任务的线程池
execute() 执行
submit() 需要返回值, 得到Future
CompletableFuture
CompletableFuture<Void> task1 =
CompletableFuture.supplyAsync(()->{
//自定义业务操作
});
......
CompletableFuture<Void> task6 =
CompletableFuture.supplyAsync(()->{
//自定义业务操作
});
......
CompletableFuture<Void> headerFuture=CompletableFuture.allOf(task1,.....,task6);
try {
headerFuture.join();
} catch (Exception ex) {
......
}
System.out.println("all done. ");
CompletableFuture
默认使用全局共享的ForkJoinPool.commonPool()
作为执行器,所有未指定执行器的异步任务都会使用该线程池。这意味着应用程序、多个库或框架(如 Spring、第三方库)若都依赖CompletableFuture
,默认情况下它们都会共享同一个线程池
自定义线程池就是RunAsync加一个参数ThreadPoolExecutor
正确使用
thenCompose()
、thenCombine()
、acceptEither()
、allOf()
、anyOf()
等方法来组合多个异步任务,以满足实际业务的需求,提高程序执行效率
还有
thenApply()
thenAccept()
thenRun()
whenComplete()
并发工具类
CountDownLatch
CyclicBarrier
Semaphore
Fork/Join框架