Volatile
可见性
解决内存可见性问题
public class Demo01VolatileVisible {
public static void main(String[] args) throws InterruptedException {
Visible runnable = new Visible();
Thread thread = new Thread(runnable);
thread.start();
Thread.sleep(1000);
Visible.flag = false;
System.out.println("已经修改flag为false");
}
static class Visible implements Runnable{
static volatile boolean flag = true;
public void run() {
System.out.println("子线程执行");
while (flag){
}
System.out.println("子线程结束");
}
}
}
Volatile是如何保证可见性的?
有volatile修饰的变量,赋值后会多一个lock操作,这个操作相当于一个内存屏障,指令重排序时不能把后面的指令重排序到内存屏障之前的位置,只有一个CPU访问内存时,不需要内存屏障,但如果有多个CPU访问同一块内存,就需要内存屏障保证一致性,关键在于lock指令,它的作用是使得本CPU的缓存写入内存,该写入动作会造成其他CPU的缓存无效。
Java内存模型对volatile变量定义的特殊规则:
- 每次使用变量前都必须从主内存中刷新最新的值,用于保证能看见其他线程对变量所做的修改
- 每次修改变量后必须立刻同步回主内存中,用于保证其他线程可以看到自己对变量所做的修改
- volatile修饰的变量不会被指令重排序优化,保证代码的执行顺序与程序的执行顺序一致
原子性
被volatile修饰的变量不能解决原子性问题
public class Demo02VolatileA {
public static void main(String[] args) {
VolatileThread volatileThread = new VolatileThread();
for (int i = 0; i < 10; i++) {
new Thread(volatileThread).start();
}
}
static class VolatileThread implements Runnable{
public volatile int count;
private void addCount(){
for (int i = 0; i < 1000; i++) {
count++;
}
System.out.println("count = "+ count);
}
public void run() {
addCount();
}
}
}
这里主要是因为count++ 不是一个原子操作,操作步骤如下:
- 从内存中取出count值
- 计算count+1
- 将count写回到内存中
JUC提供原子类来解决i++的原子性问题
public class Demo02VolatileA {
public static void main(String[] args) {
VolatileThread volatileThread = new VolatileThread();
for (int i = 0; i < 10; i++) {
new Thread(volatileThread).start();
}
}
static class VolatileThread implements Runnable{
public volatile int count;
AtomicInteger atomicInteger = new AtomicInteger();
private void addCount(){
for (int i = 0; i < 1000; i++) {
// count++;
atomicInteger.getAndIncrement();
}
// System.out.println("count = "+ count);
System.out.println("count = "+ atomicInteger);
}
public void run() {
addCount();
}
}
}
有序性
Java语言提供了volatile和synchronized两个关键字来保证线程之间操作的有序性。
volatile关键字本身就包含了禁止指令重排序的语义
synchronized是有“一个变量在同一个时刻只允许一条线程对其进行操作”这条规则来保证
CAS
CAS:Compare And Swap ,即比较再交换
CAS原理
CAS算法的过程是这样的:它包含3个参数CAS(V,E,N),V表示要更新的变量,E表示预期值,N表示新值。仅当V的值等于E时,才会将V的值设置为N值,如果V和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做,最后,CAS返回当前V的真实值。当多个线程同时使用CAS操作同一个变量时,只有一个会胜出并更新成功,其他均会更新失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。
public class Demo03CAS {
static AtomicInteger atomicInteger = new AtomicInteger();
public static void main(String[] args) {
System.out.println(atomicInteger);
atomicInteger.compareAndSet(1,200);
System.out.println(atomicInteger);
atomicInteger.compareAndSet(0,100);
System.out.println(atomicInteger);
atomicInteger.getAndSet(300);
// i++ 操作
atomicInteger.getAndIncrement();
System.out.println(atomicInteger);
}
}
CAS和synchronized对比
public class Demo04CASAndSynchronized {
static AtomicInteger atomicInteger = new AtomicInteger();
static int count = 0;
static synchronized void add(){
count++;
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
// 开启10个线程
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
for (int i1 = 0; i1 < 1000000; i1++) {
// add();
atomicInteger.getAndIncrement();
}
}
}).start();
}
// 当前活跃的线程大于2,让主线程让出CPU资源
while (Thread.activeCount()>2){
Thread.yield();
}
long end = System.currentTimeMillis();
System.out.println("总耗时: "+(end-start)+ " 毫秒");
// System.out.println("count = "+ count);
System.out.println("count = "+ atomicInteger);
// synchronized 执行1千万次i++操作平均耗时 560ms
// 使用原子类AtomicInteger 执行1千万次i++操作平均耗时 280ms
}
}
CAS ABA 问题
调用CAS(V,E,N)过程中,此时其他线程将V的值改成了V1,然后又改回了V,CAS操作会误认为变量没有被改变过。这个漏洞就称为CAS 的ABA问题。JUC包为了解决这个问题,提供了一个带有标记的原子引用类AtomicStampedReference
来解决这个问题。大部分情况下ABA问题不会印象程序并发的正确性。
数据库中使用
在表设计的时候,会加上一个版本号字段(version),每次更新都会将版本号+1
update account set money = money + 1000,version = version+1 where account_id = 123 and version = 1;
Atomic包
基本类型
public class Demo05Atomic {
public static void main(String[] args) {
// 整形原子类
AtomicInteger atomicInteger = new AtomicInteger();
atomicInteger.compareAndSet(0,100);
// 长整型原子类
AtomicLong atomicLong = new AtomicLong();
atomicLong.compareAndSet(0,100);
// 布尔型原子类
AtomicBoolean atomicBoolean = new AtomicBoolean();
atomicBoolean.compareAndSet(false,true);
}
}
引用类型
AtomicReference
public class Demo05AtomicReference {
public static void main(String[] args) {
// 基本类型引用
AtomicReference<Integer> atomicReference = new AtomicReference<Integer>();
atomicReference.compareAndSet(0,1);
// 自定义类型引用
User user1 = new User("tom",18);
User user2 = new User("jerry",19);
// 使用user1初始化
AtomicReference<User> userAtomicReference = new AtomicReference<User>(user1);
// 修改user1的属性
user1.setAge(20);
// 将user1的地址改变
user1 = new User("bob",20);
boolean b = userAtomicReference.compareAndSet(user1, user2);
System.out.println(b);
System.out.println(userAtomicReference.get());
}
}
User
public class User {
private String name;
private int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
AtomicStampedReference
public class Demo06AtomicStamperReference {
public static void main(String[] args) {
User user1 = new User("tom",18);
User user2 = new User("jerry",19);
// 使用user1,版本号为1来初始化
AtomicStampedReference<User> atomicStampedReference = new AtomicStampedReference<User>(user1,1);
user1.setAge(20);
// 修改成功将版本号修改为2
boolean b = atomicStampedReference.compareAndSet(user1, user2, 1, 2);
System.out.println(b);
System.out.println(atomicStampedReference.getReference());
// 期望版本号为1
boolean b1 = atomicStampedReference.compareAndSet(user2, user1, 1, 2);
System.out.println(b1);
System.out.println(atomicStampedReference.getReference());
}
}
AtomicMarkableReference
public class Demo07AtomicMarkbaleReference {
public static void main(String[] args) {
User user1 = new User("tom",18);
User user2 = new User("jerry",19);
AtomicMarkableReference<User> atomicMarkableReference = new AtomicMarkableReference<User>(user1,true);
System.out.println(atomicMarkableReference.getReference());
atomicMarkableReference.compareAndSet(user1,user2,true,false);
System.out.println(atomicMarkableReference.getReference());
}
}
数组类型
public class Demo08AtomicArray {
public static void main(String[] args) {
int[] arr = {1,2,3,4,5};
// 整形类数组
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(arr);
atomicIntegerArray.compareAndSet(0,1,200);
System.out.println(atomicIntegerArray);
// 引用型数组
User user1 = new User("tom",18);
User user2 = new User("jerry",19);
User[] arrUser = {user1,user2};
AtomicReferenceArray<User> atomicReferenceArray = new AtomicReferenceArray<User>(arrUser);
atomicReferenceArray.compareAndSet(1,user2,user1);
System.out.println(atomicReferenceArray);
}
}
对象属性修改器
AtomicIntegerFieldUpdater
需要满足条件:
- 被修改的属性不能是private
- 被修改的属性必须使用volatile修饰
- 被修改的属性类型必须是Integer
- 操作的目标不能是static
AtomicLongFieldUpdater
需要满足条件:
- 被修改的属性不能是private
- 被修改的属性必须使用volatile修饰
- 被修改的属性类型必须是Long
- 操作的目标不能是static
AtomicReferenceFieldUpdater
需要满足条件:
- 被修改的属性不能是private
- 被修改的属性必须使用volatile修饰
- 操作的目标不能是static
public class Demo09AtomicInterFieldUpdater {
public static void main(String[] args) {
User user = new User("tom",18);
// 第一个参数是类,第二个参数是需要修改的属性
AtomicIntegerFieldUpdater<User> atomicIntegerFieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(User.class,"age");
atomicIntegerFieldUpdater.incrementAndGet(user);
System.out.println(user);
// 第一个参数是类,第二个参数是需要修改的字段的类,第三个参数是需要修改的字段
AtomicReferenceFieldUpdater<User,String> fieldUpdater =
AtomicReferenceFieldUpdater.newUpdater(User.class,String.class,"name");
boolean b = fieldUpdater.compareAndSet(user, "tom", "jerry");
System.out.println(b);
System.out.println(user);
}
}
LongAdder
public class Demo04CASAndSynchronized {
static AtomicInteger atomicInteger = new AtomicInteger();
static LongAdder longAdder = new LongAdder();
static int count = 0;
static synchronized void add(){
count++;
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
// 开启10个线程
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
for (int i1 = 0; i1 < 1000000; i1++) {
// add();
// atomicInteger.getAndIncrement();
longAdder.increment();
}
}
}).start();
}
// 当前活跃的线程大于2,让主线程让出CPU资源
while (Thread.activeCount()>2){
Thread.yield();
}
long end = System.currentTimeMillis();
System.out.println("总耗时: "+(end-start)+ " 毫秒");
// System.out.println("count = "+ count);
// System.out.println("count = "+ atomicInteger);
System.out.println("count = "+ longAdder);
// synchronized 执行1千万次i++操作平均耗时 560ms
// 使用原子类AtomicInteger 执行1千万次i++操作平均耗时 280ms
// 使用LongAdder 执行1千万次i++操作平均耗时 65ms
}
}
ReentrantLock
"可重入锁"的概念可以理解为,自己可以再次获取自己的内部锁。比如一个线程获取了某个对象的锁,此时这个对象锁还没有释放,当其再次想获取这个对象的锁时还是可以获取到的。
基本用法:
public class Demo10ReentrantLock {
public static void main(String[] args) {
MyService myService = new MyService();
MyThread thread = new MyThread(myService);
for (int i = 0; i < 3; i++) {
new Thread(thread).start();
}
}
static class MyService{
private ReentrantLock lock = new ReentrantLock();
public void test(){
lock.lock();
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+" "+i);
}
lock.unlock();
}
}
static class MyThread implements Runnable{
private MyService myService;
public MyThread(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
myService.test();
}
}
}
synchronize也是可重入的
public class Demo11ReentrantSynchronized {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
Service service = new Service();
service.service1();
}
}).start();
}
static class Service{
private ReentrantLock lock = new ReentrantLock();
public void service1(){
lock.lock();
System.out.println("service1");
service2();
lock.unlock();
}
public void service2(){
lock.lock();
System.out.println("service2");
service3();
lock.unlock();
}
public void service3(){
lock.lock();
System.out.println("service3");
lock.unlock();
}
}
}
public class Demo11ReentrantSynchronized {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
Service service = new Service();
service.service1();
}
}).start();
}
static class Service{
// private ReentrantLock lock = new ReentrantLock();
public synchronized void service1(){
// lock.lock();
System.out.println("service1");
service2();
// lock.unlock();
}
public synchronized void service2(){
// lock.lock();
System.out.println("service2");
service3();
// lock.unlock();
}
public synchronized void service3(){
// lock.lock();
System.out.println("service3");
// lock.unlock();
}
}
}
ReentrantReadWriteLock
读读不互斥
public class Demo12ReentrantReadWriteLock {
public static void main(String[] args) {
MyService myService = new MyService();
ReadThread readThread = new ReadThread(myService);
WriteThread writeThread = new WriteThread(myService);
// 启动读线程
for (int i = 0; i < 3; i++) {
new Thread(readThread).start();
}
}
static class MyService{
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// 读取方法
public void read(){
// 读锁
lock.readLock().lock();
// 业务逻辑
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+" read "+i);
}
// 释放锁
lock.readLock().unlock();
}
public void write(){
// 写锁
lock.writeLock().lock();
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+" write "+i);
}
lock.writeLock().unlock();
}
}
// 读线程
static class ReadThread implements Runnable{
private MyService myService;
public ReadThread(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
myService.read();
}
}
// 写线程
static class WriteThread implements Runnable{
private MyService myService;
public WriteThread(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
myService.write();
}
}
}
读写互斥
public class Demo12ReentrantReadWriteLock {
public static void main(String[] args) {
MyService myService = new MyService();
ReadThread readThread = new ReadThread(myService);
WriteThread writeThread = new WriteThread(myService);
// 读读不互斥
// for (int i = 0; i < 3; i++) {
// new Thread(readThread).start();
// }
// 读写互斥
new Thread(readThread).start();
new Thread(writeThread).start();
}
static class MyService{
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// 读取方法
public void read(){
// 读锁
lock.readLock().lock();
// 业务逻辑
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+" read "+i);
}
// 释放锁
lock.readLock().unlock();
}
public void write(){
// 写锁
lock.writeLock().lock();
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+" write "+i);
}
lock.writeLock().unlock();
}
}
// 读线程
static class ReadThread implements Runnable{
private MyService myService;
public ReadThread(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
myService.read();
}
}
// 写线程
static class WriteThread implements Runnable{
private MyService myService;
public WriteThread(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
myService.write();
}
}
}
写写互斥
public class Demo12ReentrantReadWriteLock {
public static void main(String[] args) {
MyService myService = new MyService();
ReadThread readThread = new ReadThread(myService);
WriteThread writeThread = new WriteThread(myService);
// 读读不互斥
// for (int i = 0; i < 3; i++) {
// new Thread(readThread).start();
// }
// 读写互斥
// new Thread(readThread).start();
// new Thread(writeThread).start();
// 写写互斥
for (int i = 0; i < 3; i++) {
new Thread(writeThread).start();
}
}
static class MyService{
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// 读取方法
public void read(){
// 读锁
lock.readLock().lock();
// 业务逻辑
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+" read "+i);
}
// 释放锁
lock.readLock().unlock();
}
public void write(){
// 写锁
lock.writeLock().lock();
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+" write "+i);
}
lock.writeLock().unlock();
}
}
// 读线程
static class ReadThread implements Runnable{
private MyService myService;
public ReadThread(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
myService.read();
}
}
// 写线程
static class WriteThread implements Runnable{
private MyService myService;
public WriteThread(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
myService.write();
}
}
}
Condition
在调用condition.await()方法之前需要调用lock.lock()获取同步监视器。
public class Demo13Condition {
public static void main(String[] args) {
MyService myService = new MyService();
ThreadA threadA = new ThreadA(myService);
Thread thread = new Thread(threadA);
thread.start();
// 主线程休眠3秒
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
myService.signal();
}
static class MyService{
private Lock lock = new ReentrantLock();
// 通过锁创建条件
public Condition condition = lock.newCondition();
// 等待方法
public void await(){
lock.lock();
System.out.println(Thread.currentThread().getName()+" 线程进入等待");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
// 通知方法
public void signal(){
lock.lock();
System.out.println(Thread.currentThread().getName()+" 线程唤醒");
condition.signal();
lock.unlock();
}
}
static class ThreadA implements Runnable{
private MyService myService;
public ThreadA(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
myService.await();
System.out.println(Thread.currentThread().getName()+" 继续执行");
}
}
}
使用Condition顺序唤醒线程
public class Demo14ConditionOrder {
// 下一个要执行的任务
volatile static int next = 1;
static ReentrantLock lock = new ReentrantLock();
// 创建条件
static Condition conditionA = lock.newCondition();
static Condition conditionB = lock.newCondition();
static Condition conditionC = lock.newCondition();
public static void main(String[] args) {
ThreadA threadA = new ThreadA();
ThreadB threadB = new ThreadB();
ThreadC threadC = new ThreadC();
for (int i = 0; i < 5; i++) {
Thread ta = new Thread(threadA);
Thread tb = new Thread(threadB);
Thread tc = new Thread(threadC);
ta.start();
tb.start();
tc.start();
}
}
static class ThreadA implements Runnable{
@Override
public void run() {
lock.lock();
// 如果next不为1,线程A进入等待状态
while (next!=1){
try {
conditionA.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 线程A开始执行
for (int i = 0; i < 3; i++) {
System.out.println("ThreadA "+i);
}
next = 2;
// 唤醒线程B来执行
conditionB.signal();
lock.unlock();
}
}
static class ThreadB implements Runnable{
@Override
public void run() {
lock.lock();
while (next!=2){
try {
conditionB.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 线程B开始执行
for (int i = 0; i < 3; i++) {
System.out.println("ThreadB "+i);
}
next = 3;
// 唤醒线程C来执行
conditionC.signal();
lock.unlock();
}
}
static class ThreadC implements Runnable{
@Override
public void run() {
lock.lock();
while (next!=3){
try {
conditionC.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 线程C开始执行
for (int i = 0; i < 3; i++) {
System.out.println("ThreadC "+i);
}
next = 1;
// 唤醒线程A来执行
conditionA.signal();
lock.unlock();
}
}
}