目录
一.Java内存模型
1.可见性
1.1设计模式
(1.1.1)两阶段终止
(1.1.2)Balking模式
2.有序性
3.volatile原理
3.1保证可见性与有序性
3.2单例模式DCL
3.3 happens-before规则
4.线程安全单例
4.1饿汉式
二.无锁并发
1.原子整数
2.原子引用
2.1 AtomicReference
2.1.1 ABA问题
2.2 AtomicStampedReference
2.3 AtomicMarkableReference
3.原子数组
4.字段更新器
5.原子累加器
5.1 LongAdder原理
5.1.1 cas锁
5.1.2缓存行伪共享
5.1.3 关键方法方法详解
6.unsafe
三.不可变
1.不可变的设计
2.享元模式
2.1自定义连接池
一.Java内存模型
1.可见性
--例如下图代码中,主线程通过对run进行更改,t线程可以跳出循环,但是结果是t线程依然在一直运行,没有停止:
public class Test32 {// 易变static boolean run = true;public static void main(String[] args) throws InterruptedException {Thread t = new Thread(()->{while(true){if(!run) {break;}}});t.start();sleep(1);run = false; // 线程t不会如预想的停下来}
}
原因就是因为t 线程要频繁从主内存中读取 run 的值,JIT 编译器会将 run 的值缓存至自己工作内存中的高速缓存中,减少对主存中 run 的访问,提高效率,此时main 线程修改了 run 的值,并同步至主存,而 t 是从自己工作内存中的高速缓存中读取这个变量的值,结果永远是旧值。
--解决方法就是在run上增加volatile关键字,它可以用来修饰成员变量和静态成员变量,可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取 它的值,线程操作 volatile 变量都是直接操作主存,如下图:
public class Test32 {// 易变static volatile boolean run = true;public static void main(String[] args) throws InterruptedException {Thread t = new Thread(()->{while(true){if(!run) {break;}}});t.start();sleep(1);run = false; }
}
注:volatile只能解决可见性问题,并不能解决原子性,其主要作用于一个线程写,其他多个线程读的情况。如果多个线程同时执行i++,就会出现并发安全问题,因为其需要原子性操作才可以解决, synchronized可以保证代码块的原子性和代码内变量的可见性,但缺点是synchronized性能比较低。
1.1设计模式
(1.1.1)两阶段终止
--使用volatile对两阶段终止进行改造,不使用打断标记判断是否停止,而是对一个共享变量进行修改来判断:
class TwoPhaseTerminationTest {// 监控线程private Thread monitorThread;// 停止标记private volatile boolean stop = false;// 启动监控线程public void start() {monitorThread = new Thread(() -> {while (true) {Thread current = Thread.currentThread();// 是否被打断if (stop) {log.debug("料理后事");break;}try {Thread.sleep(1000);log.debug("执行监控记录");} catch (InterruptedException e) {}}}, "monitor");monitorThread.start();}// 停止监控线程public void stop() {stop = true;monitorThread.interrupt();}
}
(1.1.2)Balking模式
--Balking(犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回。
--上面的两阶段终止模式还有一个缺陷,就是监控线程一般只需要一个就可以,而上面的可以启动多个,所以我们要在每次启动监控线程之前判断一下是否已经存在监控线程,存在就不再创建,否则就创建一个,而这个判断过程必须要原子性,因为是对共享变量的修改:
class TwoPhaseTerminationTest {// 监控线程private Thread monitorThread;// 停止标记private volatile boolean stop = false;// 判断是否执行过 start 方法private boolean starting = false;// 启动监控线程public void start() {synchronized (this) {if (starting) { // falsereturn;}starting = true;}monitorThread = new Thread(() -> {while (true) {Thread current = Thread.currentThread();// 是否被打断if (stop) {log.debug("料理后事");break;}try {Thread.sleep(1000);log.debug("执行监控记录");} catch (InterruptedException e) {}}}, "monitor");monitorThread.start();}// 停止监控线程public void stop() {stop = true;monitorThread.interrupt();}
}
2.有序性
--Jvm在不影响正确性的前提下,可以调整语句的执行顺序,例如下图中对i和j的赋值语句,其先后顺序就可能会调整,这种特性成为指令重排:
--之所以发生指令重排,主要就是因为计算机中的资源都是有限的,当一个线程中多个指令所需资源不同,或者如果某个指令所需的资源空闲,而其他指令所需资源不空闲,在不影响结果的情况下,就可以将该指令调到前面来运行,提高并行度。
--例如下面代码中,线程2的代码就可能发生指令重排,若ready=true在num=2之前执行,那么线程1中r.r1的结果就可能为0,就发生了并发情况下的安全错误:
class Test {boolean ready = false;// 线程1 执行此方法public void actor1(I_Result r) {if(ready) {r.r1 = num + num;} else {r.r1 = 1;}}// 线程2 执行此方法public void actor2(I_Result r) {num = 2;ready = true;}}
解决方法就是在ready上机上volatile关键字,就可以防止指令重排。
3.volatile原理
--在Jmm中一共有四种内存屏障,内存屏障是一种同步机制,用于确保特定的内存操作顺序,并保证内存可见性。内存屏障通过阻止处理器和编译器对内存操作进行重排序,从而避免多线程编程中可能出现的内存可见性问题。
LoadLoad屏障:在 Load1; LoadLoad; Load2 这样的序列中,确保 Load1 数据的装载先于 Load2 及所有后续装载指令的装载。这意味着在 LoadLoad 屏障之前的读操作必须在 LoadLoad 屏障之后的读操作之前完成。
StoreStore屏障:在 Store1; StoreStore; Store2 这样的序列中,确保 Store1 的数据对其他处理器可见(刷新到主内存),先于 Store2 及所有后续存储指令的存储。即 StoreStore 屏障之前的写操作必须在 StoreStore 屏障之后的写操作之前完成并对其他处理器可见。
LoadStore屏障:在 Load1; LoadStore; Store2 这样的序列中,确保 Load1 数据的装载先于 Store2 及所有后续存储指令的存储。也就是 LoadStore 屏障之前的读操作必须在 LoadStore 屏障之后的写操作之前完成。
StoreLoad屏障:在 Store1; StoreLoad; Load2 这样的序列中,确保 Store1 的数据对其他处理器可见(刷新到主内存),先于 Load2 及所有后续装载指令的装载。这是最强大的内存屏障,因为它同时包含了写操作的完成和读操作的开始顺序保障,且会使处理器的高速缓存失效,强制从主内存读取数据。
--注:LoadLoad会强制清除本地缓存,使得后续是从主存读取。StoreLoad会强制将更改的数据刷新到主存中。
--对 volatile
变量的读操作相当于有一个 LoadLoad
和 LoadStore
屏障,而写操作相当于有一个 StoreStore
和 StoreLoad
屏障。这样就保证了可见性与有序性。
--注:synchronized会在代码块入口加上读屏障,出口加上写屏障。
3.1保证可见性与有序性
--volatile的底层实现原理是内存屏障,例如下列代码中对volatile修饰的变量,内存屏障的添加:
class Test {boolean ready = false;// 线程1 执行此方法public void actor1(I_Result r) {if(ready) {r.r1 = num + num;} else {r.r1 = 1;}}// 线程2 执行此方法public void actor2(I_Result r) {num = 2;ready = true;}}
3.2单例模式DCL
--在普通的单例模式中,可以使用synchronized来实现,第一次来时获取锁并创建实例,其他线程只能等待锁,解锁之后发现已经又实例,就直接使用:
public static synchronized Singleton getInstance() {if (singleton == null) {singleton = new Singleton();}return singleton;
}
// 或者
public static Singleton getInstance() {synchronized(Singleton.class) { if (singleton == null) {singleton = new Singleton();}}return singleton;
}
--上述代码即使对象以及存在,每次判断是否创建时都要获取锁,这种非常消耗性能,所以就推出了DCL,双重检验是否存在,减少获取锁的次数,初步代码如下:
public class Singleton {private static Singleton singleton;private Singleton(){}public static Singleton getInstance() {if (singleton == null) { // 线程A和线程B同时看到singleton = null,如果不为null,则直接返回singletonsynchronized(Singleton.class) { // 线程A或线程B获得该锁进行初始化if (singleton == null) { // 其中一个线程进入该分支,另外一个线程则不会进入该分支singleton = new Singleton();}}}return singleton;}}
--上述代码看似减少了判断对象是否存在时获取锁的次数,但是存在一个问题,就是有序性问题,我么都知道初始化对象分为三个步骤:为对象分配内存空间,初始化对象,将对象引用指向分配好的内存空间。但是如果此时第三步和第二部互换了位置,也就是对象还没有初始化完成,但是引用指向了预分配的地址,并赋值给了singleton,此时有另一个线程进来判断发现singleton不会null,就拿着对象去执行了,此时就会发生错误。
--解决方法就是在singleton上加上volatile关键字,阻止指令重排序即可,也即是让创建对象的散步操作在给singleton赋值操作前面执行:
public class Singleton {private static volatile Singleton singleton;private Singleton(){}public static Singleton getInstance() {if (singleton == null) { // 线程A和线程B同时看到singleton = null,如果不为null,则直接返回singletonsynchronized(Singleton.class) { // 线程A或线程B获得该锁进行初始化if (singleton == null) { // 其中一个线程进入该分支,另外一个线程则不会进入该分支singleton = new Singleton();}}}return singleton;}}
3.3 happens-before规则
--happens-before 规定了对共享变量的写操作对其它线程的读操作可见,它是可见性与有序性的一套规则总结,抛开以下 happens-before 规则,JMM 并不能保证一个线程对共享变量的写,对于其它线程对该共享变量的读可见。
--线程解锁 m 之前对变量的写,对于接下来对 m 加锁的其它线程对该变量的读可见:
static int x;static final Object m = new Object();@Testpublic void contextLoads() {new Thread(()->{synchronized(m) {x = 10;}},"t1").start();new Thread(()->{synchronized(m) {System.out.println(x);}},"t2").start();}
--线程对 volatile 变量的写,对接下来其它线程对该变量的读可见:
volatile static int x;@Testpublic void contextLoads() {new Thread(()->{x = 10;},"t1").start();new Thread(()->{System.out.println(x);},"t2").start();}
--线程 start 前对变量的写,对该线程开始后对该变量的读可见:
static int x;@Testpublic void contextLoads() {x = 10;new Thread(()->{System.out.println(x);},"t2").start();}
--线程结束前对变量的写,对其它线程得知它结束后的读可见(比如其它线程调用 t1.isAlive() 或 t1.join()等待它结束):
static int x;@Testpublic void contextLoads() throws InterruptedException {Thread t1 = new Thread(()->{x = 10;},"t1");t1.start();t1.join();System.out.println(x);}
--线程 t1 打断 t2(interrupt)前对变量的写,对于其他线程得知 t2 被打断后对变量的读可见(通过 t2.interrupted 或 t2.isInterrupted):
static int x;public static void main(String[] args) {Thread t2 = new Thread(()->{while(true) {if(Thread.currentThread().isInterrupted()) {System.out.println(x);break;}}},"t2");t2.start();new Thread(()->{sleep(1);x = 10;t2.interrupt();},"t1").start();while(!t2.isInterrupted()) {Thread.yield();}System.out.println(x);}
--对变量默认值(0,false,null)的写,对其它线程对该变量的读可见。
--具有传递性,如果 x hb-> y 并且 y hb-> z 那么有 x hb-> z ,配合 volatile 的防指令重排:
volatile static int x;static int y;@Testpublic void contextLoads() throws InterruptedException {new Thread(()->{y = 10;x = 20;},"t1").start();new Thread(()->{// x=20 对 t2 可见, 同时 y=10 也对 t2 可见System.out.println(x);},"t2").start();}
4.线程安全单例
4.1饿汉式
--类加载就会导致该单例被实例化。
--第一种实现方式就是将构造器私有化,然后设置一个私有静态变量存储该类的一个对象,然后提高一个方法返回,这样可以在jvm加载类时再初始化静态变量。为了防止子类改写父类方法破坏单例,要给类加上final不可被继承。如果该类继承了Serializable,为了防止反序列化破坏单例,可以实现Serializable中的readResovle方法即可:
public final class Singleton implements Serializable {private Singleton() {}private static final Singleton INSTANCE = new Singleton();public static Singleton getInstance() {return INSTANCE;}public Object readResolve() {return INSTANCE;}
注:这个方法的缺点就是通过反射修改构造器的访问权限来创建对象。
--第二种方法就是枚举类实现,例如下图中的枚举类,经过编译之后如下图,我们可知,Weekday本质就是一个实现了Eunm抽象类的不可变类,而Monday就是其中的一个静态变量,所以该monday在类加载时就会被加载。
enum Weekday {Monday
}
注:枚举类不能通过反射破坏单例。枚举类虽然实现了序列化接口,但是不允许被反序列化。
4.2懒汉式
--类加载不会导致该单实例对象被创建,而是首次使用该对象时才会创建。
--其中一个实现方式就是DCL,前面已经详细讲解,这里就不再细说。
--第二种方法就是静态内部类的方式,因为静态内部类在没有被使用时是不会被加载的,所以只有当被使用时,静态内部类被加载,然后其中的静态变量也就是单例对象才被实例化。
public final class Singleton {private Singleton() { }private static class LazyHolder {static final Singleton INSTANCE = new Singleton();}public static Singleton getInstance() {return LazyHolder.INSTANCE;}}
二.无锁并发
--在之前对账户扣款的操作,为了保证原子性,是通过synchronized来控制的:
public class TestAccount {public static void main(String[] args) {Account account = new AccountSafe(10000);Account.demo(account);}
}class AccountSafe implements Account {private Integer balance;public AccountSafe(Integer balance) {this.balance = balance;}@Overridepublic Integer getBalance() {synchronized (this) {return this.balance;}}@Overridepublic void withdraw(Integer amount) {synchronized (this) {this.balance -= amount;}}
}interface Account {// 获取余额Integer getBalance();// 取款void withdraw(Integer amount);/*** 方法内会启动 1000 个线程,每个线程做 -10 元 的操作* 如果初始余额为 10000 那么正确的结果应当是 0*/static void demo(Account account) {List<Thread> ts = new ArrayList<>();for (int i = 0; i < 1000; i++) {ts.add(new Thread(() -> {account.withdraw(10);}));}long start = System.nanoTime();ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(account.getBalance()+ " cost: " + (end-start)/1000_000 + " ms");}
}
--还有另一种无锁保证原子性的方案,就是cas,cas在底层是通过硬件实现的,他将判断条件和修改两个操作合成一条指令,这样就实现了原子性,同时不用加锁,并且增加了空转的机制,减少线程的切换,减少了性能开销:
package cn.itcast.test;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;public class TestAccount {public static void main(String[] args) {Account account = new AccountCas(10000);Account.demo(account);}
}class AccountCas implements Account {private AtomicInteger balance;public AccountCas(int balance) {this.balance = new AtomicInteger(balance);}@Overridepublic Integer getBalance() {return balance.get();}@Overridepublic void withdraw(Integer amount) {/*while(true) {// 获取余额的最新值int prev = balance.get();// 要修改的余额int next = prev - amount;// 真正修改if(balance.compareAndSet(prev, next)) {break;}}*/balance.getAndAdd(-1 * amount);}
}interface Account {// 获取余额Integer getBalance();// 取款void withdraw(Integer amount);/*** 方法内会启动 1000 个线程,每个线程做 -10 元 的操作* 如果初始余额为 10000 那么正确的结果应当是 0*/static void demo(Account account) {List<Thread> ts = new ArrayList<>();for (int i = 0; i < 1000; i++) {ts.add(new Thread(() -> {account.withdraw(10);}));}long start = System.nanoTime();ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(account.getBalance()+ " cost: " + (end-start)/1000_000 + " ms");}
}
上述代码中,一直循环执行AtomicInteger的compareAndSet方法,这个方法会用cas操作判断值是否改变,没改变就更改AtomicInteger中volatile修饰的vlue的值,否则失败,如下图:
--总结:CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,重试即可。synchronized 是基于悲观锁的思想:最悲观的估计,要防止其它线程来修改共享变量,通过上锁让其他线程都无法修改共享变量,直到解锁。
1.原子整数
--操作原子整数的有AtomicBoolean,AtomicInteger,AtomicLong,分别是对boolean,int,long类型的cas操作,这里以AtomicInteger为例:
public static void main(String[] args) {AtomicInteger i = new AtomicInteger(5);System.out.println(i.incrementAndGet()); // ++i 6System.out.println(i.getAndIncrement()); // i++ 6System.out.println(i.getAndAdd(5)); // 7 , 12System.out.println(i.addAndGet(5)); // 12, 17i.updateAndGet(value -> value * 10); // 17,170}
我们自己实现一下updateAndGet方法:
public class Test34 {public static void main(String[] args) {AtomicInteger i = new AtomicInteger(5);System.out.println(updateAndGet(i, p -> p / 2));System.out.println(i.get()); // 2}public static int updateAndGet(AtomicInteger i, IntUnaryOperator operator) {while (true) {int prev = i.get();int next = operator.applyAsInt(prev);if (i.compareAndSet(prev, next)) {return next;}}}
}
2.原子引用
--如果要用cas操作引用数据类型,原先的原子整性就没办法保证,此时就要使用原子引用:AtomicReference,AtomicMarkableReference,AtomicStampedReference。
2.1 AtomicReference
--使用其对原先的账户扣款进行改造,取消synchronized,使用cas操作扣款保证原子性:
package cn.itcast.test;import lombok.extern.slf4j.Slf4j;import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;public class Test35 {public static void main(String[] args) {DecimalAccount.demo(new DecimalAccountCas(new BigDecimal("10000")));}
}class DecimalAccountCas implements DecimalAccount {private AtomicReference<BigDecimal> balance;public DecimalAccountCas(BigDecimal balance) {
// this.balance = balance;this.balance = new AtomicReference<>(balance);}@Overridepublic BigDecimal getBalance() {return balance.get();}@Overridepublic void withdraw(BigDecimal amount) {while(true) {BigDecimal prev = balance.get();BigDecimal next = prev.subtract(amount);if (balance.compareAndSet(prev, next)) {break;}}}
}interface DecimalAccount {// 获取余额BigDecimal getBalance();// 取款void withdraw(BigDecimal amount);/*** 方法内会启动 1000 个线程,每个线程做 -10 元 的操作* 如果初始余额为 10000 那么正确的结果应当是 0*/static void demo(DecimalAccount account) {List<Thread> ts = new ArrayList<>();for (int i = 0; i < 1000; i++) {ts.add(new Thread(() -> {account.withdraw(BigDecimal.TEN);}));}ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(account.getBalance());}
}
2.1.1 ABA问题
--AtomicReference的compareAndSet方法中cas操作只会比较引用地址,即使引用对象的属性值改变,但是地址没改变,就依然会更新成功,例如下图代码:主线程想将A更新为C,但此时有其他线程将A更新为B,然后将B变为A,对于主线程来说是无法感知的,依然会觉得和初始值一样,所以会更新成功:
import java.util.concurrent.atomic.AtomicReference;static AtomicReference<String> ref = new AtomicReference<>("A");
public static void main(String[] args) throws InterruptedException {log.debug("main start...");// 获取值 AString prev = ref.get();other();sleep(1);// 尝试改为 Clog.debug("change A->C {}", ref.compareAndSet(prev, "C"));
}
private static void other() {new Thread(() -> {log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));}, "t1").start();sleep(0.5);new Thread(() -> {log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));}, "t2").start();
}
2.2 AtomicStampedReference
--如果只要有其他线程更改过共享变量,cas就算失败的话,仅仅比较值是不够的,还需要比较一下版本号:在每次调用compareAndSet方法时,不仅要传入先前的值,还要传入先前的版本号,若版本号发生改变,则cas失败,cas成功就要对版本号加一,以告知其他线程我更改了共享变量:
package cn.itcast.test;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicStampedReference;import static cn.itcast.n2.util.Sleeper.sleep;@Slf4j(topic = "c.Test36")
public class Test36 {static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);public static void main(String[] args) throws InterruptedException {log.debug("main start...");// 获取值 AString prev = ref.getReference();// 获取版本号int stamp = ref.getStamp();log.debug("版本 {}", stamp);// 如果中间有其它线程干扰,发生了 ABA 现象other();sleep(1);// 尝试改为 Clog.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));}private static void other() {new Thread(() -> {log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1));log.debug("更新版本为 {}", ref.getStamp());}, "t1").start();sleep(0.5);new Thread(() -> {log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1));log.debug("更新版本为 {}", ref.getStamp());}, "t2").start();}
}
2.3 AtomicMarkableReference
--如果不关心ABA问题,如果只想标记一下状态,该原子引用就很适合:例如下列代码实现一个,我和清洁工发现垃圾袋满就更换垃圾袋,否则不动的代码:
package cn.itcast.test;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicMarkableReference;import static cn.itcast.n2.util.Sleeper.sleep;@Slf4j(topic = "c.Test38")
public class Test38 {public static void main(String[] args) throws InterruptedException {GarbageBag bag = new GarbageBag("装满了垃圾");// 参数2 mark 可以看作一个标记,表示垃圾袋满了AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);log.debug("start...");GarbageBag prev = ref.getReference();log.debug(prev.toString());new Thread(() -> {log.debug("start...");bag.setDesc("空垃圾袋");ref.compareAndSet(bag, bag, true, false);log.debug(bag.toString());},"保洁阿姨").start();sleep(1);log.debug("想换一只新垃圾袋?");boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);log.debug("换了么?" + success);log.debug(ref.getReference().toString());}
}class GarbageBag {String desc;public GarbageBag(String desc) {this.desc = desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return super.toString() + " " + desc;}
}
3.原子数组
--是为了解决多线程环境下对数组元素的原子操作问题,主要应对以下场景:
package cn.itcast.test;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;public class Test39 {public static void main(String[] args) {demo(()->new int[10],(array)->array.length,(array, index) -> array[index]++,array-> System.out.println(Arrays.toString(array)));demo(()-> new AtomicIntegerArray(10),(array) -> array.length(),(array, index) -> array.getAndIncrement(index),array -> System.out.println(array));}/**参数1,提供数组、可以是线程不安全数组或线程安全数组参数2,获取数组长度的方法参数3,自增方法,回传 array, index参数4,打印数组的方法*/// supplier 提供者 无中生有 ()->结果// function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果// consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->private static <T> void demo(Supplier<T> arraySupplier,Function<T, Integer> lengthFun,BiConsumer<T, Integer> putConsumer,Consumer<T> printConsumer ) {List<Thread> ts = new ArrayList<>();T array = arraySupplier.get();int length = lengthFun.apply(array);for (int i = 0; i < length; i++) {// 每个线程对数组作 10000 次操作ts.add(new Thread(() -> {for (int j = 0; j < 10000; j++) {putConsumer.accept(array, j%length);}}));}ts.forEach(t -> t.start()); // 启动所有线程ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}); // 等所有线程结束printConsumer.accept(array);}
}
4.字段更新器
--利用字段更新器,可以针对对象的某个Field进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常:
package cn.itcast.test;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;public class Test40 {public static void main(String[] args) {Student stu = new Student();AtomicReferenceFieldUpdater updater =AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");System.out.println(updater.compareAndSet(stu, null, "张三"));System.out.println(stu);}
}class Student {volatile String name;@Overridepublic String toString() {return "Student{" +"name='" + name + '\'' +'}';}
}
5.原子累加器
--jdk8及以后版本给我们提供了专门用于类加的原子操作类:LongAdder,前面的所学的原子整数也可以实现类加,但是原子整数的类加时对同一个变量进行类加,这样就会导致高并发下,cas操作失败占比变大,而原子累加器设置多个累加单元,多个cas对不同累加单元累加,最后将多个累加单元的值加起来的得出最终结果,这样就减少了多个cas的重试次数,显著提高了运行速度:
package cn.itcast.test;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.Supplier;public class Test41 {public static void main(String[] args) {for (int i = 0; i < 5; i++) {demo(() -> new AtomicLong(0),(adder) -> adder.getAndIncrement());}for (int i = 0; i < 5; i++) {demo(() -> new LongAdder(),adder -> adder.increment());}}/*() -> 结果 提供累加器对象(参数) -> 执行累加操作*/private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {T adder = adderSupplier.get();List<Thread> ts = new ArrayList<>();// 4 个线程,每人累加 50 万for (int i = 0; i < 4; i++) {ts.add(new Thread(() -> {for (int j = 0; j < 500000; j++) {action.accept(adder);}}));}long start = System.nanoTime();ts.forEach(t -> t.start());ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(adder + " cost:" + (end - start) / 1000_000);}
}
5.1 LongAdder原理
--LongAdder有几个很关键的域:
--这里加锁的实现是类似于cas锁的方式。
5.1.1 cas锁
--用cas实现一个加锁,解锁流程:但是不要自己实现这种方式,因为加锁时要不断循环获取锁,导致线程一直空转,影响性能:
import java.util.concurrent.atomic.AtomicInteger;public class LockCas {private AtomicInteger state = new AtomicInteger(0);public void lock() {while (true) {if (state.compareAndSet(0, 1)) {break;}}}public void unlock() {log.debug("unlock...");state.set(0);}
}
5.1.2缓存行伪共享
--下面是一段Cell静态内部类的代码,在该类上有一个@sun.misc.Contended注解,该注解作用是什么呢。
--CPU的运行速度是很快的,而读取内存的速度是有限的,为了防止因为读取速度影响程序征途运行速度,所以就需要在内存和cpu之间加上多级缓存,加快读取速度,而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long),缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中,CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效。
--因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象,所以无论哪个CPU修改成功,都会导致对方CPU核心的缓存行失效,也就是即使只改变了一个Cell对象,但是因为都在一个缓存行,导致两个Cell对象都失效,导致其他CPU要去内存更新这两个Cell对象,效率降低:
而@sun.misc.Contended注解就是在此注解的对象或字段的前后各增加 128 字节大小的填充,导致CPU的一个缓存行只能存储一个对象,这样即使一个CPU更新了对象,也不会因为和他同处同一个缓存行的对象失效:
5.1.3 关键方法方法详解
--LongAdder的increment方法调用了add方法
--add方法源码解析,流程图如下:
public void add(long x) {// as 为累加单元数组// b 为基础值// x 为累加值Cell[] as; long b, v; int m; Cell a;// 进入 if 的两个条件// 1. as 有值, 表示已经发生过竞争, 进入 if// 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 ifif ((as = cells) != null || !casBase(b = base, b + x)) {// uncontended 表示 cell 没有竞争boolean uncontended = true;if (// as 还没有创建as == null || (m = as.length - 1) < 0 ||// 当前线程对应的 cell 还没有(a = as[getProbe() & m]) == null ||// cas 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )!(uncontended = a.cas(v = a.value, v + x))) {// 进入 cell 数组创建、cell 创建的流程longAccumulate(x, null, uncontended);}}
}
--longAccumulate方法的源码和流程图如下:
import javafx.scene.control.Cell;import java.util.concurrent.ThreadLocalRandom;
import java.util.function.LongBinaryOperator;inal void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {int h;// 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cellif ((h = getProbe()) == 0) {// 初始化 probeThreadLocalRandom.current();// h 对应新的 probe 值, 用来对应 cellh = getProbe();wasUncontended = true;}// collide 为 true 表示需要扩容boolean collide = false;for (;;) {Cell[] as; Cell a; int n; long v;// 已经有了 cellsif ((as = cells) != null && (n = as.length) > 0) {// 还没有 cellif ((a = as[(n - 1) & h]) == null) {// 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x// 成功则 break, 否则继续 continue 循环}// 有竞争, 改变线程对应的 cell 来重试 caselse if (!wasUncontended)wasUncontended = true;// cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 nullelse if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))break;// 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 caselse if (n >= NCPU || cells != as)collide = false;// 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了else if (!collide)collide = true;// 加锁else if (cellsBusy == 0 && casCellsBusy()) {// 加锁成功, 扩容continue;}// 改变线程对应的 cellh = advanceProbe(h);}// 还没有 cells, 尝试给 cellsBusy 加锁else if (cellsBusy == 0 && cells == as && casCellsBusy()) {// 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell// 成功则 break;}// 上两种情况失败, 尝试给 base 累加else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))break;}
}
--最终将cells数组中的值累加,得到最终的值即可:
import javafx.scene.control.Cell;public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;
}
6.unsafe
--因为Unsafe无法直接通过new获取,所以必须要通过反射,怎么获得简要如下:
package cn.itcast.n4;import sun.misc.Unsafe;import java.lang.reflect.Field;public class UnsafeAccessor {private static final Unsafe unsafe;static {try {Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");theUnsafe.setAccessible(true);unsafe = (Unsafe) theUnsafe.get(null);} catch (NoSuchFieldException | IllegalAccessException e) {throw new Error(e);}}public static Unsafe getUnsafe() {return unsafe;}
}
--cas操作相关的方法使用如下:
import cn.itcast.n4.UnsafeAccessor;import sun.misc.Unsafe;import java.lang.reflect.Field;public class Test{public static void main(String[] args) throws NoSuchFieldException {Unsafe unsafe = UnsafeAccessor.getUnsafe();Field id = Student.class.getDeclaredField("id");Field name =Student.class.getDeclaredField("name");// 获得成员变量的偏移量long idOffset = unsafe.objectFieldOffset(id);long nameOffset = unsafe.objectFieldOffset(name);Student student = new Student();// 使用 cas 方法替换成员变量的值unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回 trueunsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 trueSystem.out.println(student);}
}class Student {volatile int id;volatile String name;}
--用Unsafe模拟一下原子整数的实现:
package cn.itcast.n4;import sun.misc.Unsafe;import java.lang.reflect.Field;public class UnsafeAccessor {private static final Unsafe unsafe;static {try {Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");theUnsafe.setAccessible(true);unsafe = (Unsafe) theUnsafe.get(null);} catch (NoSuchFieldException | IllegalAccessException e) {throw new Error(e);}}public static Unsafe getUnsafe() {return unsafe;}
}
package cn.itcast.test;import cn.itcast.n4.UnsafeAccessor;
import lombok.extern.slf4j.Slf4j;
import sun.misc.Unsafe;@Slf4j(topic = "c.Test42")
public class Test42 {public static void main(String[] args) {Account.demo(new MyAtomicInteger(10000));}
}class MyAtomicInteger implements Account {private volatile int value;private static final long valueOffset;private static final Unsafe UNSAFE;static {UNSAFE = UnsafeAccessor.getUnsafe();try {valueOffset = UNSAFE.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));} catch (NoSuchFieldException e) {e.printStackTrace();throw new RuntimeException(e);}}public int getValue() {return value;}public void decrement(int amount) {while(true) {int prev = this.value;int next = prev - amount;if (UNSAFE.compareAndSwapInt(this, valueOffset, prev, next)) {break;}}}public MyAtomicInteger(int value) {this.value = value;}@Overridepublic Integer getBalance() {return getValue();}@Overridepublic void withdraw(Integer amount) {decrement(amount);}
}
三.不可变
--下列代码运行之后会报错,是因为SimpleDateFormat没有做线程安全方面的考虑,高并发下会出现对共享变量的修改:
public static void main(String[] args) throws NoSuchFieldException {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");for (int i = 0; i < 10; i++) {new Thread(() -> {try {log.debug("{}", sdf.parse("1951-04-21"));} catch (Exception e) {log.error("{}", e);}}).start();}}
此时我们可以换用DateTimeFormatter,该类中所有的共享变量都是final修饰,不可以被改变,并且所有的修改操作都是创建一个新的对象,不存在对共享变量的修改,所以就不会存在并发问题:
public static void main(String[] args) {DateTimeFormatter stf = DateTimeFormatter.ofPattern("yyyy-MM-dd");for (int i = 0; i < 10; i++) {new Thread(() -> {TemporalAccessor parse = stf.parse("1951-04-21");log.debug("{}", parse);}).start();}}
1.不可变的设计
--以String为例,其类和共享变量上都加上了final关键字,这就导致子类无法继承,也就无法修改其线程安全的特性,而且共享变量不可被改变,其所有修改字符串的方法都是重新创建一个新的对象更加保证了安全性:
2.享元模式
--上述所讲的不可变的设计,所有的修改操作都是创建一个新的对象,这样也就导致对象创建的频繁,个数会较多,此时就需要享元模式来解决这一问题。享元模式就是重用数量有限的对象。
--在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法,例如 Long 的 valueOf 会缓存 -128~127 之间的Long对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对象:
2.1自定义连接池
--例如:一个线上商城应用,QPS 达到数千,如果每次都重新创建和关闭数据库连接,性能会受到极大影响。 这时 预先创建好一批连接,放入连接池。一次请求到达后,从连接池获取连接,使用完毕后再还回连接池,这样既节约了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库,代码如下:
class Pool {// 1. 连接池大小private final int poolSize;// 2. 连接对象数组private Connection[] connections;// 3. 连接状态数组 0 表示空闲, 1 表示繁忙private AtomicIntegerArray states;// 4. 构造方法初始化public Pool(int poolSize) {this.poolSize = poolSize;this.connections = new Connection[poolSize];this.states = new AtomicIntegerArray(new int[poolSize]);for (int i = 0; i < poolSize; i++) {connections[i] = new MockConnection("连接" + (i+1));}}// 5. 借连接public Connection borrow() {while(true) {for (int i = 0; i < poolSize; i++) {// 获取空闲连接if(states.get(i) == 0) {if (states.compareAndSet(i, 0, 1)) {log.debug("borrow {}", connections[i]);return connections[i];}}}// 如果没有空闲连接,当前线程进入等待synchronized (this) {try {log.debug("wait...");this.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}// 6. 归还连接public void free(Connection conn) {for (int i = 0; i < poolSize; i++) {if (connections[i] == conn) {states.set(i, 0);synchronized (this) {log.debug("free {}", conn);this.notifyAll();}break;}}}
}class MockConnection implements Connection {private String name;public MockConnection(String name) {this.name = name;}
}
测试代码如下: