多线程高并发--2

分享 123456789987654321 ⋅ 于 2021-02-05 15:39:49 ⋅ 1021 阅读

Volatile

可见性

解决内存可见性问题

public class Demo01VolatileVisible {

    public static void main(String[] args) throws InterruptedException {
        Visible runnable = new Visible();
        Thread thread = new Thread(runnable);
        thread.start();
        Thread.sleep(1000);
        Visible.flag = false;
        System.out.println("已经修改flag为false");
    }
    static class Visible implements Runnable{
        static volatile boolean flag = true;
        public void run() {
            System.out.println("子线程执行");
            while (flag){

            }
            System.out.println("子线程结束");
        }
    }
}

Volatile是如何保证可见性的?

有volatile修饰的变量,赋值后会多一个lock操作,这个操作相当于一个内存屏障,指令重排序时不能把后面的指令重排序到内存屏障之前的位置,只有一个CPU访问内存时,不需要内存屏障,但如果有多个CPU访问同一块内存,就需要内存屏障保证一致性,关键在于lock指令,它的作用是使得本CPU的缓存写入内存,该写入动作会造成其他CPU的缓存无效。

Java内存模型对volatile变量定义的特殊规则:

  • 每次使用变量前都必须从主内存中刷新最新的值,用于保证能看见其他线程对变量所做的修改
  • 每次修改变量后必须立刻同步回主内存中,用于保证其他线程可以看到自己对变量所做的修改
  • volatile修饰的变量不会被指令重排序优化,保证代码的执行顺序与程序的执行顺序一致

原子性

被volatile修饰的变量不能解决原子性问题

public class Demo02VolatileA {
    public static void main(String[] args) {
        VolatileThread volatileThread = new VolatileThread();
        for (int i = 0; i < 10; i++) {
            new Thread(volatileThread).start();
        }
    }

    static class VolatileThread implements Runnable{

        public volatile int count;

        private void addCount(){
            for (int i = 0; i < 1000; i++) {
                count++;
            }
            System.out.println("count = "+ count);
        }

        public void run() {
            addCount();
        }
    }
}

这里主要是因为count++ 不是一个原子操作,操作步骤如下:

  1. 从内存中取出count值
  2. 计算count+1
  3. 将count写回到内存中

JUC提供原子类来解决i++的原子性问题

public class Demo02VolatileA {
    public static void main(String[] args) {
        VolatileThread volatileThread = new VolatileThread();
        for (int i = 0; i < 10; i++) {
            new Thread(volatileThread).start();
        }
    }

    static class VolatileThread implements Runnable{

        public volatile int count;
        AtomicInteger atomicInteger = new AtomicInteger();
        private  void addCount(){
            for (int i = 0; i < 1000; i++) {
//                count++;
                atomicInteger.getAndIncrement();
            }
//            System.out.println("count = "+ count);
            System.out.println("count = "+ atomicInteger);
        }

        public void run() {
            addCount();
        }
    }
}

有序性

Java语言提供了volatile和synchronized两个关键字来保证线程之间操作的有序性。

volatile关键字本身就包含了禁止指令重排序的语义

synchronized是有“一个变量在同一个时刻只允许一条线程对其进行操作”这条规则来保证

CAS

CAS:Compare And Swap ,即比较再交换

CAS原理

CAS算法的过程是这样的:它包含3个参数CAS(V,E,N),V表示要更新的变量,E表示预期值,N表示新值。仅当V的值等于E时,才会将V的值设置为N值,如果V和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做,最后,CAS返回当前V的真实值。当多个线程同时使用CAS操作同一个变量时,只有一个会胜出并更新成功,其他均会更新失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。

image-20200409100647499

public class Demo03CAS {
    static AtomicInteger atomicInteger = new AtomicInteger();
    public static void main(String[] args) {
        System.out.println(atomicInteger);
        atomicInteger.compareAndSet(1,200);
        System.out.println(atomicInteger);
        atomicInteger.compareAndSet(0,100);
        System.out.println(atomicInteger);
        atomicInteger.getAndSet(300);
        // i++ 操作
        atomicInteger.getAndIncrement();
        System.out.println(atomicInteger);
    }
}

CAS和synchronized对比

public class Demo04CASAndSynchronized {
    static AtomicInteger atomicInteger = new AtomicInteger();
    static int count = 0;
    static synchronized void add(){
        count++;
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        // 开启10个线程
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                public void run() {
                    for (int i1 = 0; i1 < 1000000; i1++) {
//                        add();
                        atomicInteger.getAndIncrement();
                    }
                }
            }).start();
        }
        // 当前活跃的线程大于2,让主线程让出CPU资源
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        long end = System.currentTimeMillis();
        System.out.println("总耗时: "+(end-start)+ " 毫秒");
//        System.out.println("count = "+ count);
        System.out.println("count = "+ atomicInteger);
        // synchronized 执行1千万次i++操作平均耗时 560ms
        // 使用原子类AtomicInteger 执行1千万次i++操作平均耗时 280ms
    }
}

CAS ABA 问题

调用CAS(V,E,N)过程中,此时其他线程将V的值改成了V1,然后又改回了V,CAS操作会误认为变量没有被改变过。这个漏洞就称为CAS 的ABA问题。JUC包为了解决这个问题,提供了一个带有标记的原子引用类AtomicStampedReference来解决这个问题。大部分情况下ABA问题不会印象程序并发的正确性。

数据库中使用

在表设计的时候,会加上一个版本号字段(version),每次更新都会将版本号+1

update account set money = money + 1000,version = version+1 where account_id = 123 and version = 1;

Atomic包

基本类型

public class Demo05Atomic {
    public static void main(String[] args) {
        // 整形原子类
        AtomicInteger atomicInteger = new AtomicInteger();
        atomicInteger.compareAndSet(0,100);
        // 长整型原子类
        AtomicLong atomicLong = new AtomicLong();
        atomicLong.compareAndSet(0,100);
        // 布尔型原子类
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        atomicBoolean.compareAndSet(false,true);
    }
}

引用类型

AtomicReference

public class Demo05AtomicReference {
    public static void main(String[] args) {
        // 基本类型引用
        AtomicReference<Integer> atomicReference = new AtomicReference<Integer>();
        atomicReference.compareAndSet(0,1);
        // 自定义类型引用
        User user1 = new User("tom",18);
        User user2 = new User("jerry",19);
        // 使用user1初始化
        AtomicReference<User> userAtomicReference = new AtomicReference<User>(user1);
        // 修改user1的属性
        user1.setAge(20);
        // 将user1的地址改变
        user1 = new User("bob",20);
        boolean b = userAtomicReference.compareAndSet(user1, user2);
        System.out.println(b);
        System.out.println(userAtomicReference.get());
    }
}

User

public class User {
    private String name;
    private int age;

    public User(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

AtomicStampedReference

public class Demo06AtomicStamperReference {
    public static void main(String[] args) {
        User user1 = new User("tom",18);
        User user2 = new User("jerry",19);
        // 使用user1,版本号为1来初始化
        AtomicStampedReference<User> atomicStampedReference = new AtomicStampedReference<User>(user1,1);
        user1.setAge(20);
        // 修改成功将版本号修改为2
        boolean b = atomicStampedReference.compareAndSet(user1, user2, 1, 2);
        System.out.println(b);
        System.out.println(atomicStampedReference.getReference());
        // 期望版本号为1
        boolean b1 = atomicStampedReference.compareAndSet(user2, user1, 1, 2);
        System.out.println(b1);
        System.out.println(atomicStampedReference.getReference());
    }
}

AtomicMarkableReference

public class Demo07AtomicMarkbaleReference {
    public static void main(String[] args) {
        User user1 = new User("tom",18);
        User user2 = new User("jerry",19);
        AtomicMarkableReference<User> atomicMarkableReference = new AtomicMarkableReference<User>(user1,true);
        System.out.println(atomicMarkableReference.getReference());
        atomicMarkableReference.compareAndSet(user1,user2,true,false);
        System.out.println(atomicMarkableReference.getReference());
    }
}

数组类型

public class Demo08AtomicArray {
    public static void main(String[] args) {
        int[] arr = {1,2,3,4,5};
        // 整形类数组
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(arr);
        atomicIntegerArray.compareAndSet(0,1,200);
        System.out.println(atomicIntegerArray);

        // 引用型数组
        User user1 = new User("tom",18);
        User user2 = new User("jerry",19);
        User[] arrUser = {user1,user2};
        AtomicReferenceArray<User> atomicReferenceArray = new AtomicReferenceArray<User>(arrUser);
        atomicReferenceArray.compareAndSet(1,user2,user1);
        System.out.println(atomicReferenceArray);
    }
}

对象属性修改器

AtomicIntegerFieldUpdater

需要满足条件:

  • 被修改的属性不能是private
  • 被修改的属性必须使用volatile修饰
  • 被修改的属性类型必须是Integer
  • 操作的目标不能是static

AtomicLongFieldUpdater

需要满足条件:

  • 被修改的属性不能是private
  • 被修改的属性必须使用volatile修饰
  • 被修改的属性类型必须是Long
  • 操作的目标不能是static

AtomicReferenceFieldUpdater

需要满足条件:

  • 被修改的属性不能是private
  • 被修改的属性必须使用volatile修饰
  • 操作的目标不能是static
public class Demo09AtomicInterFieldUpdater {
    public static void main(String[] args) {
        User user = new User("tom",18);
        // 第一个参数是类,第二个参数是需要修改的属性
        AtomicIntegerFieldUpdater<User> atomicIntegerFieldUpdater =
                AtomicIntegerFieldUpdater.newUpdater(User.class,"age");
        atomicIntegerFieldUpdater.incrementAndGet(user);
        System.out.println(user);

        // 第一个参数是类,第二个参数是需要修改的字段的类,第三个参数是需要修改的字段
        AtomicReferenceFieldUpdater<User,String>  fieldUpdater =
                AtomicReferenceFieldUpdater.newUpdater(User.class,String.class,"name");
        boolean b = fieldUpdater.compareAndSet(user, "tom", "jerry");
        System.out.println(b);
        System.out.println(user);
    }
}

LongAdder

public class Demo04CASAndSynchronized {
    static AtomicInteger atomicInteger = new AtomicInteger();
    static LongAdder longAdder = new LongAdder();
    static int count = 0;
    static synchronized void add(){
        count++;
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        // 开启10个线程
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                public void run() {
                    for (int i1 = 0; i1 < 1000000; i1++) {
//                        add();
//                        atomicInteger.getAndIncrement();
                        longAdder.increment();
                    }
                }
            }).start();
        }
        // 当前活跃的线程大于2,让主线程让出CPU资源
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        long end = System.currentTimeMillis();
        System.out.println("总耗时: "+(end-start)+ " 毫秒");
//        System.out.println("count = "+ count);
//        System.out.println("count = "+ atomicInteger);
        System.out.println("count = "+ longAdder);
        // synchronized 执行1千万次i++操作平均耗时 560ms
        // 使用原子类AtomicInteger 执行1千万次i++操作平均耗时 280ms
        // 使用LongAdder 执行1千万次i++操作平均耗时 65ms
    }
}

ReentrantLock

"可重入锁"的概念可以理解为,自己可以再次获取自己的内部锁。比如一个线程获取了某个对象的锁,此时这个对象锁还没有释放,当其再次想获取这个对象的锁时还是可以获取到的。

基本用法:

public class Demo10ReentrantLock {
    public static void main(String[] args) {
        MyService myService = new MyService();
        MyThread thread = new MyThread(myService);
        for (int i = 0; i < 3; i++) {
            new Thread(thread).start();
        }
    }

    static class MyService{
        private ReentrantLock lock = new ReentrantLock();
        public void test(){
            lock.lock();
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName()+" "+i);
            }
            lock.unlock();
        }
    }
    static  class  MyThread implements Runnable{
        private MyService myService;

        public MyThread(MyService myService) {
            this.myService = myService;
        }

        @Override
        public void run() {
            myService.test();
        }
    }
}

synchronize也是可重入的

public class Demo11ReentrantSynchronized {
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                Service service = new Service();
                service.service1();
            }
        }).start();
    }
    static class Service{
        private ReentrantLock lock = new ReentrantLock();
        public void service1(){
            lock.lock();
            System.out.println("service1");
            service2();
            lock.unlock();
        }
        public void service2(){
            lock.lock();
            System.out.println("service2");
            service3();
            lock.unlock();
        }
        public void service3(){
            lock.lock();
            System.out.println("service3");
            lock.unlock();
        }
    }
}
public class Demo11ReentrantSynchronized {
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                Service service = new Service();
                service.service1();
            }
        }).start();
    }
    static class Service{
//        private ReentrantLock lock = new ReentrantLock();
        public synchronized void service1(){
//            lock.lock();
            System.out.println("service1");
            service2();
//            lock.unlock();
        }
        public synchronized void service2(){
//            lock.lock();
            System.out.println("service2");
            service3();
//            lock.unlock();
        }
        public synchronized void service3(){
//            lock.lock();
            System.out.println("service3");
//            lock.unlock();
        }
    }
}

ReentrantReadWriteLock

读读不互斥

public class Demo12ReentrantReadWriteLock {
    public static void main(String[] args) {
        MyService myService = new MyService();
        ReadThread readThread = new ReadThread(myService);
        WriteThread writeThread = new WriteThread(myService);
        // 启动读线程
        for (int i = 0; i < 3; i++) {
            new Thread(readThread).start();
        }

    }

    static class MyService{
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        // 读取方法
        public void read(){
            // 读锁
            lock.readLock().lock();
            // 业务逻辑
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName()+" read "+i);
            }
            // 释放锁
            lock.readLock().unlock();
        }
        public void write(){
            // 写锁
            lock.writeLock().lock();
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName()+" write "+i);
            }
            lock.writeLock().unlock();
        }
    }

    // 读线程
    static class ReadThread implements Runnable{
        private MyService myService;

        public ReadThread(MyService myService) {
            this.myService = myService;
        }

        @Override
        public void run() {
            myService.read();
        }
    }
    // 写线程
    static class WriteThread implements Runnable{

        private MyService myService;

        public WriteThread(MyService myService) {
            this.myService = myService;
        }

        @Override
        public void run() {
            myService.write();
        }
    }
}

读写互斥

public class Demo12ReentrantReadWriteLock {
    public static void main(String[] args) {
        MyService myService = new MyService();
        ReadThread readThread = new ReadThread(myService);
        WriteThread writeThread = new WriteThread(myService);
        // 读读不互斥
//        for (int i = 0; i < 3; i++) {
//            new Thread(readThread).start();
//        }
        // 读写互斥
        new Thread(readThread).start();
        new Thread(writeThread).start();
    }

    static class MyService{
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        // 读取方法
        public void read(){
            // 读锁
            lock.readLock().lock();
            // 业务逻辑
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName()+" read "+i);
            }
            // 释放锁
            lock.readLock().unlock();
        }
        public void write(){
            // 写锁
            lock.writeLock().lock();
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName()+" write "+i);
            }
            lock.writeLock().unlock();
        }
    }

    // 读线程
    static class ReadThread implements Runnable{
        private MyService myService;

        public ReadThread(MyService myService) {
            this.myService = myService;
        }

        @Override
        public void run() {
            myService.read();
        }
    }
    // 写线程
    static class WriteThread implements Runnable{

        private MyService myService;

        public WriteThread(MyService myService) {
            this.myService = myService;
        }

        @Override
        public void run() {
            myService.write();
        }
    }
}

写写互斥

public class Demo12ReentrantReadWriteLock {
    public static void main(String[] args) {
        MyService myService = new MyService();
        ReadThread readThread = new ReadThread(myService);
        WriteThread writeThread = new WriteThread(myService);
        // 读读不互斥
//        for (int i = 0; i < 3; i++) {
//            new Thread(readThread).start();
//        }
        // 读写互斥
//        new Thread(readThread).start();
//        new Thread(writeThread).start();
        // 写写互斥
        for (int i = 0; i < 3; i++) {
            new Thread(writeThread).start();
        }
    }

    static class MyService{
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        // 读取方法
        public void read(){
            // 读锁
            lock.readLock().lock();
            // 业务逻辑
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName()+" read "+i);
            }
            // 释放锁
            lock.readLock().unlock();
        }
        public void write(){
            // 写锁
            lock.writeLock().lock();
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName()+" write "+i);
            }
            lock.writeLock().unlock();
        }
    }

    // 读线程
    static class ReadThread implements Runnable{
        private MyService myService;

        public ReadThread(MyService myService) {
            this.myService = myService;
        }

        @Override
        public void run() {
            myService.read();
        }
    }
    // 写线程
    static class WriteThread implements Runnable{

        private MyService myService;

        public WriteThread(MyService myService) {
            this.myService = myService;
        }

        @Override
        public void run() {
            myService.write();
        }
    }
}

Condition

在调用condition.await()方法之前需要调用lock.lock()获取同步监视器。

public class Demo13Condition {
    public static void main(String[] args) {
        MyService myService = new MyService();
        ThreadA threadA = new ThreadA(myService);
        Thread thread = new Thread(threadA);
        thread.start();
        // 主线程休眠3秒
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        myService.signal();
    }

    static class MyService{
        private Lock lock = new ReentrantLock();
        // 通过锁创建条件
        public Condition condition = lock.newCondition();

        // 等待方法
        public void await(){
            lock.lock();
            System.out.println(Thread.currentThread().getName()+" 线程进入等待");
            try {
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
        // 通知方法
        public void signal(){
            lock.lock();
            System.out.println(Thread.currentThread().getName()+" 线程唤醒");
            condition.signal();
            lock.unlock();
        }
    }

    static class ThreadA implements Runnable{
        private MyService myService;

        public ThreadA(MyService myService) {
            this.myService = myService;
        }

        @Override
        public void run() {
            myService.await();
            System.out.println(Thread.currentThread().getName()+" 继续执行");
        }
    }
}

使用Condition顺序唤醒线程

public class Demo14ConditionOrder {
    // 下一个要执行的任务
    volatile static int next = 1;
    static ReentrantLock lock = new ReentrantLock();
    // 创建条件
    static Condition conditionA = lock.newCondition();
    static Condition conditionB = lock.newCondition();
    static Condition conditionC = lock.newCondition();
    public static void main(String[] args) {
        ThreadA threadA = new ThreadA();
        ThreadB threadB = new ThreadB();
        ThreadC threadC = new ThreadC();
        for (int i = 0; i < 5; i++) {
            Thread ta = new Thread(threadA);
            Thread tb = new Thread(threadB);
            Thread tc = new Thread(threadC);
            ta.start();
            tb.start();
            tc.start();
        }
    }

    static class ThreadA implements Runnable{

        @Override
        public void run() {
            lock.lock();
            // 如果next不为1,线程A进入等待状态
            while (next!=1){
                try {
                    conditionA.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 线程A开始执行
            for (int i = 0; i < 3; i++) {
                System.out.println("ThreadA "+i);
            }
            next = 2;
            // 唤醒线程B来执行
            conditionB.signal();
            lock.unlock();
        }
    }
    static class ThreadB implements Runnable{

        @Override
        public void run() {
            lock.lock();
            while (next!=2){
                try {
                    conditionB.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 线程B开始执行
            for (int i = 0; i < 3; i++) {
                System.out.println("ThreadB "+i);
            }
            next = 3;
            // 唤醒线程C来执行
            conditionC.signal();
            lock.unlock();
        }
    }
    static class ThreadC implements Runnable{

        @Override
        public void run() {
            lock.lock();
            while (next!=3){
                try {
                    conditionC.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 线程C开始执行
            for (int i = 0; i < 3; i++) {
                System.out.println("ThreadC "+i);
            }
            next = 1;
            // 唤醒线程A来执行
            conditionA.signal();
            lock.unlock();
        }
    }
}
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-123456789987654321,http://hainiubl.com/topics/75389
点赞
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter