[TOC]
JUC04 课程任务概览 本部分包含:
CAS CAS原理简介 CAS发展: java.util.concurrent.atomic包下的类是使用CAS+自旋实现的
没有CAS之前:多线程环境不使用原子类保证线程安全++(基本数据类型)使用synchronized或lock(都是悲观锁) 使用CAS之后:多线程环境 使用原子类保证线程安全++(基本数据类型) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public volatile int value = 0 ;public int getValue () { return value; } public synchronized void setValue () { value++; } AtomicInteger atomicInteger = new AtomicInteger ();public int getValue () { return atomicInteger.get(); } public void setValue () { atomicInteger.getAndIncrement(); }
CAS是什么? compare and swap的缩写,中文翻译成比较并交换 ,实现并发算法时常用到的一种技术
它包含三个操作数一一位置内存值、预期原值及更新值。
执行CAS操作的时候,将内存位置的值与预期原值比较:
如果相匹配 ,那么处理器会自动将该位置值更新为新值, 如果不匹配 ,处理器不做任何操作,多个线程同时执行CAS操作只有一个会成功 。 CAS原理: CAS有3个操作数,位置内存值V,旧的预期值A,要修改的更新值B。
当且仅当旧的预期值A和内存值V相同时 ,将内存值V修改为B,否则什么都不做或重来
当它重来重试的这种行为称为-自旋!!
代码举例:
1 2 3 4 AtomicInteger atomicInteger = new AtomicInteger (5 );System.out.println(atomicInteger.compareAndSet(5 ,2023 ) + "\t" + atomicInteger.get()); System.out.println(atomicInteger.compareAndSet(5 ,2023 ) + "\t" + atomicInteger.get());
CAS优点: CAS是JDK提供的非阻塞 原子性操作,它通过硬件保证 了比较-更新的原子性。
它是非阻塞的且自身具有原子性,也就是说这玩意效率更高且通过硬件保证,说明这玩意更可靠 。
CAS是一条CPU的原子指令(cmpxchg指令 ),不会造成所谓的数据不一致问题,Unsafe提供的CAS方法(如compareAndSwapXXX)底层实现即为CPU指令cmpxchg。
执行cmpxchg指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功,加锁成功之后会执行cas操作,也就是说CAS的原子性实际上是CPU实现独占的 ,比起用synchronized重量级锁,这里的排他时间要短很多 ,所以在多线程情况下性能会比较好 。
CAS底层实现-引出Unsafe类 compareAndSet()方法底层源码:
我们知道i++线程不安全的,那atomiclnteger.getAndIncrement()是如何实现的?
AtomicInteger类主要利用CAS (compare and swap)+volatile 和native 方法来保证原子操作,从而避免synchronized的高开销,执行效率大为提升。
CAS并发原语体现在JAVA语言中就是sun.misc.Unsafe类中的各个方法。调用UnSafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令 。这是一种完全依赖于硬件 的功能,通过它实现了原子操作。再次强调,由于CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的 ,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。
==CAS自旋实现个人总结== 即上面所有内容的简洁版(面试版)
CAS是从硬件 提升效率,最底层是交给硬件和volatile来保证原子性(CPU独占)和可见性(volatile) 底层是基于Unsafe 类,实现方式是基于硬件平台的汇编指令,在intel的CPU中(X86机器上),使用的汇编指令是cmpxchg 指令 核心思想就是:比较要更新变量的值V和预期值E(compare),相等才会将V的值设为新值N(swap),如果不相等自旋再来。 原子性是一个CPU独占的实现的 ,但多个CPU的多个线程依旧会发生线程安全问题AtomicReference原子引用 java.util.concurrent.atomic包下抓门用来处理自写类 的一个原子类
案例演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Data @AllArgsConstructor class User { String name; Integer age; } public class AtomicReferenceDemo { public static void main (String[] args) { AtomicReference<User> atomicReference = new AtomicReference <>(); User z3 = new User ("z3" ,22 ); User li4 = new User ("li4" ,28 ); atomicReference.set(z3); System.out.println(atomicReference.compareAndSet(z3,li4) + "\t" + atomicReference.get().toString()); } }
==CAS自旋锁(手写)== 自旋锁(spinlock)
CAS是实现自旋锁的基础,CAS利用CPU指令保证了操作的原子性,以达到锁的效果,至于自旋呢,看字面意思也很明白,自己旋转。是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式 去尝试获取锁 ,当线程发现锁被占用时,会不断循环判断锁的状态,直到获取。这样的好处 是减少线程上下文切换的消耗,缺点 是循环会消耗CPU
所以CAS类似于乐观锁,适用于读取多的场景,悲观锁适用于写多的场景
OpenJDK源码里面查看下Unsafe.java
CAS是实现自旋锁的基础,自旋翻译成人话就是循环,一般是用一个无限循环实现。这样一来,一个无限循环中,执行一个CAS操作,
当操作成功返回true时,循环结束; 当返回false时,接着执行循环,继续尝试CAS操作,直到返回true。 底层getAndAddInt方法自旋实现源码:
1 2 3 4 5 6 7 8 public final int getAndAddInt (Object var1, long var2, int var4) { int var5; do { var5 = this .getIntVolatile(var1, var2); } while (!this .compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
==自写自旋锁 ==
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class SpinLockDemo { AtomicReference<Thread> atomicReference = new AtomicReference <>(); public void lock () { Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName() + "------come in" ); while (!atomicReference.compareAndSet(null ,thread)){ } System.out.println(Thread.currentThread().getName() + "-------lock" ); } public void unlock () { Thread thread = Thread.currentThread(); while (!atomicReference.compareAndSet(thread,null )){ } System.out.println(Thread.currentThread().getName() + "---------task over ,unlock" ); } public static void main (String[] args) { SpinLockDemo lockDemo = new SpinLockDemo (); new Thread (() -> { lockDemo.lock(); try {TimeUnit.SECONDS.sleep(5 );} catch (InterruptedException e) {e.printStackTrace();} lockDemo.unlock(); },"A" ).start(); try {TimeUnit.MILLISECONDS.sleep(500 );} catch (InterruptedException e) {e.printStackTrace();} new Thread (() -> { lockDemo.lock(); lockDemo.unlock(); },"B" ).start(); } }
1 2 3 4 5 6 A------come in A-------lock B------come in A---------task over ,unlock B-------lock B---------task over ,unlock
CAS两大缺点: 循环时间开销很大
CAS会导致”ABA问题”
CAS算法实现一个重要前提需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差 类会导致数据的变化。
比如说一个线程1从内存位置V中取出A,这时候另一个线程2也从内存中取出A,并且线程2进行了一些操作将值变成了B,然后线程2又将V位置的数据变成A,这时候线程1进行CAS操作发现内存中仍然是A,预期OK,然后线程1操作成功。
尽管线程1的CAS操作成功,但是不代表这个过程就是没有问题的。
ABA 问题的案例以及解决方法:看AtomicStampedReference
AtomicStampedReference 在普通自旋的基础上再加上一个版本号的判断
若版本号也一致,修改成功 若版本号不一致,修改失败,继续自旋 单线程普通使用案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Data @AllArgsConstructor @NoArgsConstructor class Book { Integer id; String bookName; } public class AtomicStampedReferenceDemo { public static void main (String[] args) { Book javaBook = new Book (1 ,"javaBook" ); AtomicStampedReference<Book> stampedReference = new AtomicStampedReference <>(javaBook,1 ); System.out.println(stampedReference.getReference() + "\t" + stampedReference.getStamp()); Book mysqlBook = new Book (2 ,"mysqlBook" ); boolean b; b = stampedReference.compareAndSet(javaBook,mysqlBook,stampedReference.getStamp(),stampedReference.getStamp()+1 ); System.out.println(b + "\t" + stampedReference.getReference() + "\t" + stampedReference.getStamp()); } }
==多线程ABA问题演示及解决案例== ABA问题案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static AtomicInteger atomicInteger = new AtomicInteger (100 );private static void abaHappen () { new Thread (() -> { atomicInteger.compareAndSet(100 ,101 ); try {TimeUnit.MILLISECONDS.sleep(10 );} catch (InterruptedException e) {e.printStackTrace();} atomicInteger.compareAndSet(101 ,100 ); },"t1" ).start(); new Thread (() -> { try {TimeUnit.MILLISECONDS.sleep(200 );} catch (InterruptedException e) {e.printStackTrace();} System.out.println(atomicInteger.compareAndSet(100 , 2022 )+"\t" +atomicInteger.get()); },"t2" ).start(); }
解决ABA问题演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference <>(100 ,1 );public static void main (String[] args) { new Thread (() -> { int stamp = stampedReference.getStamp(); System.out.println(Thread.currentThread().getName()+"\t" +"首次版本号:" +stamp); try { TimeUnit.MILLISECONDS.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } stampedReference.compareAndSet(100 ,101 ,stampedReference.getStamp(),stampedReference.getStamp() + 1 ); System.out.println(Thread.currentThread().getName()+"\t" +"2次流水号:" +stampedReference.getStamp()); stampedReference.compareAndSet(101 ,100 ,stampedReference.getStamp(),stampedReference.getStamp() + 1 ); System.out.println(Thread.currentThread().getName()+"\t" +"3次流水号:" +stampedReference.getStamp()); },"t1" ).start(); new Thread (() -> { int stamp = stampedReference.getStamp(); System.out.println(Thread.currentThread().getName() + "\t" + stamp); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } boolean b = stampedReference.compareAndSet(100 ,2023 ,stamp,stamp + 1 ); System.out.println(b+"\t" +stampedReference.getReference()+"\t" +stampedReference.getStamp()); },"t2" ).start(); }
18大原子类 介绍位于java.util.concurrent.atomic包下的所有类
原子类使用的原理是CAS自旋
基本类型原子类 包含
AtomicInteger AtomicBoolean AtomicLong 常用API
public final int get()//获取当前的值 public final int getAndSet(int newValue)//获取当前的值,并设置新的值 public final int getAndIncrement()//获取当前的值,并自增 public final int getAndDecrement()//获取当前的值,并自减 public final int getAndAdd(int delta)//获取当前的值,并加上预期的值 boolean compareAndSet(int expect,int update)//如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update) 案例:
注意:不能直接在主线程中获取计算结果,否则可能前面线程还没计算完成就打印出结果了
需要使用辅助类CountDownLatch来阻塞线程
开发中不能使用等待几秒的方式,因为等待时间不准确
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 class MyNumber { AtomicInteger atomicInteger = new AtomicInteger (); public void setPlusPlus () { atomicInteger.getAndIncrement(); } } public class AtomicIntegerDemo { public static final int SIZE = 50 ; public static void main (String[] args) throws InterruptedException { MyNumber myNumber = new MyNumber (); CountDownLatch countDownLatch = new CountDownLatch (SIZE); for (int i = 0 ; i < SIZE; i++) { new Thread (() -> { try { for (int j = 0 ; j < 1000 ; j++) { myNumber.setPlusPlus(); } } finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName() + "\t result : " + myNumber.atomicInteger.get()); } }
数组类型原子类 包含
AtomicIntegetArray AtomicBooleanArray AtomicReferenceArray 常用API
public final int get(int offset)//获取当前的值 public final int getAndSet(int offset, int newValue)//获取当前的值,并设置新的值 public final int getAndIncrement(int offset)//获取当前的值,并自增 public final int getAndDecrement(int offset)//获取当前的值,并自减 public final int getAndAdd(int offset, int delta)//获取当前的值,并加上预期的值 boolean compareAndSet(int offset, int expect,int update)//如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update) 引用类型原子类 AtomicReference AtomicStampedReference前面CAS解决ABA问题使用过 加上的是版本号version 解决修改过几次 AtomicMarkableReference加上的是标记为mark 一次性使用,若被修改过,则版本号不对,修改失败 解决是否被修改过 AtomicMarkableReference使用案例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class AtomicMarkableReferenceDemo { public static void main (String[] args) { AtomicMarkableReference<Integer> markableReference = new AtomicMarkableReference <>(100 ,false ); new Thread (() -> { System.out.println(Thread.currentThread().getName() + "\t" + markableReference.isMarked()); try {TimeUnit.SECONDS.sleep(1 );} catch (InterruptedException e) {e.printStackTrace();} markableReference.compareAndSet(100 ,1000 ,markableReference.isMarked(),!markableReference.isMarked()); },"t1" ).start(); new Thread (() -> { System.out.println(Thread.currentThread().getName() + "\t" + markableReference.isMarked()); try {TimeUnit.SECONDS.sleep(2 );} catch (InterruptedException e) {e.printStackTrace();} boolean b = markableReference.compareAndSet(100 ,2000 ,markableReference.isMarked(),!markableReference.isMarked()); System.out.println(Thread.currentThread().getName() + "修改结果:" + b); System.out.println(Thread.currentThread().getName() + "\t" + markableReference.isMarked()); System.out.println(Thread.currentThread().getName() + "\t" + markableReference.getReference()); },"t2" ).start(); } }
属性修改原子类 AtomicIntegerFieldUpdater AtomicLongFieldUpdater AtomicReferenceFieldUpdater 使用目的:以一种线程安全的方式操作非线程安全对象内的某些字段
并不是整个对象都需要原子操作,可能只是其中的某一个字段需要保证线程安全 所以采用更细粒度的方式,只对某个对象的需要线程安全的某些字段进行操作 使用要求:
更新的对象属性必须使用public volatile 修饰符。 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类的字段 volatile使用场景:AtomicReferenceFieldUpdater、双端锁
AtomicIntegerFieldUpdater使用案例 i++案例到目前一共有四种实现方式:(后面还有两种)
按照学习先后顺序:
synchronized或lock重量级锁保证符合操作的原子性 volatile写锁策略,只有写操作才加锁(只能是一写),多写时需要用上面或下面两种 AtomicInteger原子类 AtomicIntegerFieldUpdater属性修改器 这里演示第一种和第四种,第四种和第三种差不多
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 class BankAccount { String bankName = "CCB" ; public volatile int money = 0 ; public synchronized void add () { money++; } AtomicIntegerFieldUpdater<BankAccount> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class,"money" ); public void transMoney () { fieldUpdater.getAndIncrement(this ); } } public class AtomicIntegerFieldUpdaterDemo { public static void main (String[] args) throws InterruptedException { BankAccount bankAccount = new BankAccount (); CountDownLatch countDownLatch = new CountDownLatch (10 ); for (int i = 0 ; i < 10 ; i++) { new Thread (() -> { try { for (int j = 0 ; j < 1000 ; j++) { bankAccount.transMoney(); } } finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName() + "\tresult: " + bankAccount.money); } }
AtomicReferenceFieldUpdater使用案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class MyVar { public volatile Boolean isInit = Boolean.FALSE; AtomicReferenceFieldUpdater<MyVar,Boolean> fieldUpdater = AtomicReferenceFieldUpdater.newUpdater(MyVar.class,Boolean.class,"isInit" ); public void init () { if (fieldUpdater.compareAndSet(this ,Boolean.FALSE,Boolean.TRUE)){ System.out.println(Thread.currentThread().getName() + "\t------------start init,needs 2 seconds" ); try {TimeUnit.SECONDS.sleep(2 );} catch (InterruptedException e) {e.printStackTrace();} System.out.println(Thread.currentThread().getName() + "\t------------init over" ); }else { System.out.println(Thread.currentThread().getName() + "\t----------- already have other thread is initing" ); } } } public class AtomicReferenceFieldUpdaterDemo { public sa i = 0 ; i < 5 ; i++) { new Thread (myVar::init,String.valueOf(i)).start(); } } }
==原子操作增强类== 包含两种四个
DoubleAccumulator DoubleAdder LongAccumulator LongAdder LongAdder常用API
void add(long x) void increment( void decrement() long sum()返回当前值。待别注意,在没有并发更新value的情况下,sum会返回一个精确值,在存在并发的情况下,sum不保证返回精确值 void reset()将value重置为0,可用于替代重新new一个LongAdder,但此方法只可以在没有并发更新的情况下使用。 long sumThenReset( LongAccumulator常用API
void accumulate(long x) long get()
当多个线程更新用于收集统计信息但不用于细粒度同步控制的目的的公共和时,使用LongAdder通常优于AtomicLong 。在低更新 争用下,这两个类具有相似的特征。但在高争用 的情况下,这一类的预期吞吐量明显更高,但代价是空间消耗更高
LongAdder只能用来计算加法,且从零开始计算 LongAccumulator提供了自定义的函数操作 17.【参考】volatile解决多线程内存不可见问题对于一写多读,是可以解决变量同步问题,但是如果多写,同样无法解决线程安全问题。
说明:如果是count++操作,使用如下类实现: Atomiclnteger count new Atomiclnteger(); count.addAndGet(1);
如果是JDK8,推荐使用LongAdder对象,比AtomicLong性能更好(减少乐观锁的重试次数)
——《阿里Java开发手册》
阿里题目:(即后面讲到的i++点赞案例)
1 热点商品点赞计算器,点赞数加加统计,不要求实时精确 2 一个很大的list,里面都是int类型,如何实现加加,说说思路
使用案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class LongAdderDemo { public static void main (String[] args) { LongAdder longAdder = new LongAdder (); longAdder.increment(); longAdder.increment(); longAdder.increment(); System.out.println(longAdder.sum()); System.out.println("-------------------------" ); LongAccumulator longAccumulator = new LongAccumulator ((x,y) -> x + y,0 ); longAccumulator.accumulate(1 ); longAccumulator.accumulate(3 ); System.out.println(longAccumulator.get()); } }
i++点赞累加问题解决办法汇总对比 阿里题目:
1 热点商品点赞计算器,点赞数加加统计,不要求实时精确 2 一个很大的list,里面都是int类型,如何实现加加,说说思路
解决思路:
1 synchronized或lock读写写锁策略:volatile读,synchronized写(只适合一写多读场景) 2 AtomicInteger / AtomicLong 3 AtomicIntegerFieldUpdater / AtomicLongFieldUpdate 4 LongAdder 5 LongAccumulator 高争用 的情况下,LongAdder和LongAccumulator预期吞吐量明显更高,但代价是空间消耗更高
代码演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 class ClickNumber { long number = 0 ; public synchronized void clickBySynchronized () {number++;} AtomicLong atomicLong = new AtomicLong (0 ); public void clickByAtomicLong () {atomicLong.getAndIncrement();} public volatile long number2 = 0 ; AtomicLongFieldUpdater<ClickNumber> fieldUpdater = AtomicLongFieldUpdater.newUpdater(ClickNumber.class,"number2" ); public void clickByAtomicLongFieldUpdater () { fieldUpdater.getAndIncrement(this ); } LongAdder longAdder = new LongAdder (); public void clickByLongAdder () {longAdder.increment(); } LongAccumulator longAccumulator = new LongAccumulator ((x,y) -> x + y,0 ); public void clickByLongAccumulator () { longAccumulator.accumulate(1 );} } public class AccumulatorCompareDemo { public static final int _1W = 10000 ; public static final int threadNumber = 50 ; public static void main (String[] args) throws InterruptedException { ClickNumber clickNumber = new ClickNumber (); long startTime; long endTime; CountDownLatch countDownLatch1 = new CountDownLatch (threadNumber); CountDownLatch countDownLatch2 = new CountDownLatch (threadNumber); CountDownLatch countDownLatch3 = new CountDownLatch (threadNumber); CountDownLatch countDownLatch4 = new CountDownLatch (threadNumber); CountDownLatch countDownLatch5 = new CountDownLatch (threadNumber); startTime = System.currentTimeMillis(); for (int i = 1 ; i <=threadNumber; i++) { new Thread (() -> { try { for (int j = 1 ; j <=100 * _1W; j++) { clickNumber.clickBySynchronized(); } } finally { countDownLatch1.countDown(); } },String.valueOf(i)).start(); } countDownLatch1.await(); endTime = System.currentTimeMillis(); System.out.println("----costTime: " +(endTime - startTime) +" 毫秒" +"\t clickBySynchronized: " +clickNumber.number); startTime = System.currentTimeMillis(); for (int i = 1 ; i <= threadNumber; i++) { new Thread (() -> { try { for (int j = 1 ; j <= 100 * _1W; j++){ clickNumber.clickByAtomicLong(); } } finally { countDownLatch2.countDown(); } },String.valueOf(i)).start();; } countDownLatch2.await(); endTime = System.currentTimeMillis(); System.out.println("----costTime: " +(endTime - startTime) +" 毫秒" +"\t clickByAtomicLong: " +clickNumber.atomicLong.get()); startTime = System.currentTimeMillis(); for (int i = 1 ; i <= threadNumber; i++) { new Thread (() -> { try { for (int j = 1 ; j <= 100 * _1W; j++){ clickNumber.clickByAtomicLongFieldUpdater(); } } finally { countDownLatch3.countDown(); } },String.valueOf(i)).start();; } countDownLatch3.await(); endTime = System.currentTimeMillis(); System.out.println("----costTime: " +(endTime - startTime) +" 毫秒" +"\t clickByAtomicLongFieldUpdater: " +clickNumber.number2); startTime = System.currentTimeMillis(); for (int i = 1 ; i <= threadNumber; i++) { new Thread (() -> { try { for (int j = 1 ; j <= 100 * _1W; j++){ clickNumber.clickByLongAdder(); } } finally { countDownLatch4.countDown(); } },String.valueOf(i)).start();; } countDownLatch4.await(); endTime = System.currentTimeMillis(); System.out.println("----costTime: " +(endTime - startTime) +" 毫秒" +"\t clickByLongAdder: " +clickNumber.longAdder.sum()); startTime = System.currentTimeMillis(); for (int i = 1 ; i <= threadNumber; i++) { new Thread (() -> { try { for (int j = 1 ; j <= 100 * _1W; j++){ clickNumber.clickByLongAccumulator(); } } finally { countDownLatch5.countDown(); } },String.valueOf(i)).start();; } countDownLatch5.await(); endTime = System.currentTimeMillis(); System.out.println("----costTime: " +(endTime - startTime) +" 毫秒" +"\t clickByLongAccumulator: " +clickNumber.longAccumulator.get()); } }
1 2 3 4 5 ----costTime: 2956 毫秒 clickBySynchronized: 50000000 ----costTime: 781 毫秒 clickByAtomicLong: 50000000 ----costTime: 1253 毫秒 clickByAtomicLongFieldUpdater: 50000000 ----costTime: 183 毫秒 clickByLongAdder: 50000000 ----costTime: 88 毫秒 clickByLongAccumulator: 50000000
LongAdder为什么快?源码分析
18个原子类,上面一共介绍了16个,还有两个隐藏的就是,Number类和Striped64
LongAdder是继承于Striped64的
思路 LongAdder的基本思路就是分散热点 ,将value值分散到一个Cell数组 中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
sum()会将所有Cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点。
如图:
add()源码详解 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 LongAdder.java public void add (long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null , uncontended); }
add()方法总结:
1 2 3 4 5 6 7 8 9 10 public void add (long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null , uncontended); } }
1如果Cells表为空,尝试用CAS更新base字段,成功则退出 2如果Cells表为空,CAS更新base字段失败,出现竞争,uncontended为true,调用longAccumulate() (尝试执行CASE2,其他线程正在执行则执行CASE3)3如果Cells表非空,但当前线程映射的槽为空,uncontended为true,调用longAccumulate() (尝试执行CASE1.1)4如果Cells表非空,且前线程映射的槽非空,CAS更新Cell的值,成功则返回,否则,uncontended设为false,调用longAccumulate() (尝试循环执行CASE1.2-1.6)longAccumulate()源码详解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 final void longAccumulate (long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0 ) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true ; } ... }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false ; try { if (cells == as) { Cell[] rs = new Cell [2 ]; rs[h & 1 ] = new Cell (x); cells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; }
CASE3:兜底操作,CASE1判断cells为空,CASE2判断时被其它线程抢先初始化cells时会尝试修改base值(多个线程尝试CAS修改失败的线程会走这个分支)(其他线程正在进行初始化cells数组时,走这个CASE3 ) 1 2 3 4 5 6 7 else if (casBase(v = base, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 if ((as = cells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { Cell r = new Cell (x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false ; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 &&rs[j = (m - 1 ) & h] == null ) { rs[j] = r; created = true ; } } finally { cellsBusy = 0 ; } if (created) break ; continue ; } } collide = false ; }
1 2 3 4 5 6 7 8 9 else if (!wasUncontended) wasUncontended = true ; ... h = advanceProbe(h);
1 2 3 4 5 6 7 else if (a.cas(v = a.value, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; ... h = advanceProbe(h);
1 2 3 4 5 6 7 else if (n >= NCPU || cells != as) collide = false ; ... h = advanceProbe(h);
1 2 3 4 5 6 else if (!collide) collide = true ; ... h = advanceProbe(h);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell [n << 1 ]; for (int i = 0 ; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; } h = advanceProbe(h);
我的理解:
当cells==null时,执行CASE2 CASE1判断cells为空,到CASE2判断时被其它线程抢先初始化cells时,会执行兜底操作CASE3,对base进行CAS cells!=null时,CASE1会从上到下进行判断,线程对应的hash值只能映射到仅有的槽位 CASE1.1:若是刚刚初始化的槽位还没有线程占用放过值,那么会进行CASE1.1,new一个cell存放值,之后的线程hash到该槽位时,CASE1.1将会判断失败,wasUncontended值为false,往下走 CASE1.2:如果是有竞争的,即wasUncontended值为false,将其修改为true,修改后即可往下走,否则若是没竞争,将无法继续往下走 CASE1.3:前面两步将会连续判断失败,此时会尝试在有竞争的槽位进行CAS自旋,若自旋成功,则跳出,若失败,则继续往下走 CASE1.4:第1、2步连续判断失败,第三步本次循环失败的话,会到第4步,判断槽位是否达到最大值(CPU个数),若达到最大个数后,设置collide为false,表示不可扩容,将会止步CASE1.4,导致永远不可能到达第6步进行扩容,只能一直无限循环在for循环内,无限进行第3步尝试CAS自旋直到成功 CASE1.5:扩容前的最后一步,修改扩容意向collide为true,下次循环将会不再进入CASE1.5,将会到达下一步进行扩容 CASE1.6:前面两步将会连续判断失败后,并且该次循环CAS失败,并且没达到最大槽位,执行扩容操作 sum()源码详解 1 2 3 4 5 6 7 8 9 10 11 12 13 public long sum () { Cell[] as = cells; Cell a; long sum = base; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }
为啥高并发下sum的值不精确?
sum执行时,并没有限制对base和cells的更新(一句要命的话)。所以LongAdder不是强一致性,它是最终一致性的 首先,最终返回的sum局部变量,初始被赋值为base,而最终返回时,很可能base已经被更新了,而此时局部变量sum不会更新,造成不一致 其次,这里对cell的读取也无法保证是最后一次写入的值 。所以,sum方法在没有并发的情况下,可以获得正确的结果 一句话:调用sum方法时,累加操作还在进行,所以返回时累加值已经变了,不是强一致性,而是最终一致性的
LongAdder源码解析总结 总结:LongAdder的基本思路就是分散热点 ,内部有一个base变量,一个Cell数组。
base变量:低并发,直接累加到该变量上 Cell[]数组:高并发,累加进各个线程自己的槽cell[i]中 最后:Value = Base + ∑(i=1~n) Cell[i] LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base 进行操作,当出现竞争关系时则是采用化整为零分散热点 的做法,用空间换时间 ,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标 ,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和base都加起来作为最终结果
AtomicLong和LongAdder对比总结 AtomicLong
原理:CAS+自旋 AtomicLong能保证并发情况下计数的准确性,其内部通过CAS来解决并发安全问题 AtomicLong是多个线程针对单个热点值value进行原子操作 场景:低并发下的全局计算 保证精度,性能代价 线程安全,可允许一些性能损耗,要求高精度时可使用 缺陷:高并发后性能急剧下降 why?AtomicLong的自旋会成为瓶颈N个线程CAS操作修改线程的值,每次只有一个成功过,其它N-1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子cpu就打高了。 LongAdder
原理:CAS+Base+CeIl数组分散 空间换时间并分散了热点数据 LongAdder是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行CAS操作 但最多只能存在CPU个数那么多个槽,因为需要CPU独占实现原子性 场景:高并发下的全局计算 保证性能,精度代价 当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用 缺陷:sum求和后还有计算线程修改结果的话,最后结果不够准确 附件:Striped64.java和LongAdder.java源码 Striped64.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 Striped64.java abstract class Striped64 extends Number { static final int NCPU = Runtime.getRuntime().availableProcessors(); transient volatile Cell[] cells; transient volatile long base; transient volatile int cellsBusy; final boolean casBase (long cmp, long val) { return UNSAFE.compareAndSwapLong(this , BASE, cmp, val); } final void longAccumulate (long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0 ) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true ; } boolean collide = false ; for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { Cell r = new Cell (x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false ; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; created = true ; } } finally { cellsBusy = 0 ; } if (created) break ; continue ; } } collide = false ; } else if (!wasUncontended) wasUncontended = true ; else if (a.cas(v = a.value, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; else if (n >= NCPU || cells != as) collide = false ; else if (!collide) collide = true ; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell [n << 1 ]; for (int i = 0 ; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false ; try { if (cells == as) { Cell[] rs = new Cell [2 ]; rs[h & 1 ] = new Cell (x); cells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } else if (casBase(v = base, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; } } static final int getProbe () { return UNSAFE.getInt(Thread.currentThread(), PROBE); } }
LongAdder.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 LongAdder.java (1 ).baseOK,直接通过casBase进行处理 (2 ).base不够用了,开始新建一个cell数组,初始值为2 (3 ).当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[ ]扩容 public void add (long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null , uncontended); } } Striped64.java abstract class Striped64 extends Number { static final int NCPU = Runtime.getRuntime().availableProcessors(); transient volatile Cell[] cells; transient volatile long base; transient volatile int cellsBusy; final boolean casBase (long cmp, long val) { return UNSAFE.compareAndSwapLong(this , BASE, cmp, val); } }