近况

过个年感觉把自己过没了,明明很多东西都没学好,但总是提不起精神来,毕设是我上半年的心病,
但时间非常紧张,我3月份结束多线程网课,准备去杭州找份工作,与此同时,还要准备毕设,
偏偏毕设要做一个分布式的系统,怪我自己太自信了,以为学习完了前后端分布式就能搞定毕设,
实际上,知道和会使用中间有巨大的鸿沟,我还是太嫩了点,现在就想看完多线程网课,然后看尚硅谷类似的项目
把尚硅谷的分布式项目搞出来再修改这个项目,把若依的项目作为毕设的后台管理系统,前端再弄个网页。
算是拼凑一下吧,时间很紧迫,还不知道要不要去找工作,找工作感觉也浪费时间了,到时候工作上肯定要学习其他东西。
总之,加油吧!!!

本篇博客内容如下

wait&notify

我们都知道线程的wait和notify方法是用来同步线程的,线程调用wait后,释放锁,进入monitor的waitSet队列等待
此时线程状态是waiting,此时线程不能被打断,而是需要等待其他线程唤醒,这与后面讲的ReentrantLock不同
ReentrantLock是可以被打断的。

那么既然wait和notify是用来同步线程的,那么就有以下几个实际应用,或者说设计模式吧。

  • 同步模式之保护性暂停
    同步说的是,一个线程一直等待另一个线程的结果才会继续运行
    保护性暂停被用来:一个线程实时等待另一个线程的执行结果
    下面是保护性暂停的一个案例:

    public class Test {
        public static void main(String[] args) {
            //案例,t1线程等待t2线程
            GuardedObject guardedObject = new GuardedObject();
            new Thread(()->{
                System.out.println("正在准备获取数据");
                String response = guardedObject.getResponse();
                System.out.println("t2发送结果为:"+response);
            },"t1").start();
    
            new Thread(()->{
                System.out.println("正在准备发送数据");
                guardedObject.setResponse("t1是大傻逼");
            },"t2").start();
        }
    }
    
    class GuardedObject{
        //用来交换线程结果
        private String response;
    
        public String getResponse(){
            synchronized (this){
                //while防止虚假唤醒
                while (response==null){
                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            return response;
        }
    
        public void setResponse(String response) {
            synchronized (this){
                this.response=response;
                this.notifyAll();
            }
        }
    }

    上面是同步性暂停的一个简单的案例,那么上面代码还是有点不足,不足在于t1线程获取结果会一直等待
    直到t2线程设置好值,那么我们可以在上述代码上进行改进,增加超时使得t1在等待一段时间后,
    如果在超时时间内还没有返回消息,就直接返回不等了,下面是改进代码:

    public class Test {
        public static void main(String[] args) {
            //案例,t1线程等待t2线程
            GuardedObject2 guardedObject = new GuardedObject2();
            new Thread(()->{
                System.out.println("正在准备获取数据");
                String response = guardedObject.getResponse(3000);
                System.out.println("t2发送结果为:"+response);
            },"t1").start();
    
            new Thread(()->{
                System.out.println("正在准备发送数据");
                guardedObject.setResponse("t1是大傻逼",500);
            },"t2").start();
        }
    }
    
    class GuardedObject2 {
        //保护性暂停进阶版,增加超时
        private String response;
    
        public String getResponse(long timeout){
            synchronized (this){
                //开始时间
                long begin = System.currentTimeMillis();
                //已经等待时间
                long passedTime =0;
                while (response==null){
                    //还要等待时间
                    long waitedTime = timeout-passedTime;
                    if (waitedTime<=0){
                        break;
                    }
                    try {
                        this.wait(waitedTime);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    passedTime=System.currentTimeMillis()-begin;
                }
            }
            return response;
        }
    
        public void setResponse(String response,Integer timeout) {
            synchronized (this){
                try {
                    this.wait(timeout);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                this.response=response;
                this.notifyAll();
            }
        }
    }

    除了增加超时外,其实上述代码实际上还存在一个缺陷,那就是:如果是多对线程之间通信,
    那么你就要创建多个GuardedObject对象,因为一个response只能为一对线程通信嘛,
    下面是一种改进,允许多对线程传递数据。

    多任务版 GuardedObject图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号) 左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员。 如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类, 这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。 和生产者消费者模式的区别就是:这个生产者和消费者之间是一一对应的关系, 但是生产者消费者模式并不是。rpc框架的调用中就使用到了这种模式。

    案例代码如下:

    • People&Postman

      class People extends Thread{
      
          public People(String name) {
              super(name);
          }
      
          @Override
          public void run() {
              GuardedObject2 guardedObject = MailBoxes.createGuardedObject();
              System.out.println("收信人:"+Thread.currentThread().getName()+",正在等信来,关联id:"+guardedObject.getId());
              String response = guardedObject.getResponse(3000);
              System.out.println("收信人:"+Thread.currentThread().getName()+",信的内容是:"+response);
          }
      }
      
      class Postman extends Thread{
          private Integer id;
          private String mail;
      
          public Postman(String name,Integer id, String mail) {
              super(name);
              this.id = id;
              this.mail = mail;
          }
      
          @Override
          public void run() {
              GuardedObject2 guardedObject2 = MailBoxes.getGuardedObject2(id);
              System.out.println("当前线程:"+Thread.currentThread().getName()+",正在送信,关联id:"+guardedObject2.getId());
              guardedObject2.setResponse(mail);
          }
      }
    • MailBoxes

      class MailBoxes{
          private static Map<Integer, GuardedObject2> map = new Hashtable<>();
          private static Integer id=1;
      
          //生成id
          public static synchronized Integer getId(){
              return id++;
          }
      
          public static GuardedObject2 getGuardedObject2(Integer id){
              return map.remove(id);
          }
      
          public static GuardedObject2 createGuardedObject(){
              GuardedObject2 guardedObject2 = new GuardedObject2(getId());
              map.put(guardedObject2.getId(),guardedObject2);
              return guardedObject2;
          }
      
          //获取map中所有id
          public static Set<Integer> getIds(){
              return map.keySet();
          }
      }
    • GuardedObject3

      class GuardedObject3 {
          //增加id属性
          private Integer id;
      
          public Integer getId() {
              return id;
          }
      
          public void setId(Integer id) {
              this.id = id;
          }
      
          public GuardedObject3(Integer id) {
              this.id = id;
          }
      
          public GuardedObject3() {
          }
      
          //保护性暂停进阶版,增加超时
          private String response;
      
          public String getResponse(long timeout){
              synchronized (this){
                  //开始时间
                  long begin = System.currentTimeMillis();
                  //已经等待时间
                  long passedTime =0;
                  while (response==null){
                      //还要等待时间
                      long waitedTime = timeout-passedTime;
                      if (waitedTime<=0){
                          break;
                      }
                      try {
                          this.wait(waitedTime);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      passedTime=System.currentTimeMillis()-begin;
                  }
              }
              return response;
          }
      
          public void setResponse(String response) {
              synchronized (this){
                  this.response=response;
                  this.notifyAll();
              }
          }
      }
    • Test,测试类

      public class Test {
          public static void main(String[] args) {
              /*
                  小知识:内存泄漏是存在未释放的对象,内存溢出是堆中对象太多占满堆内存
               */
      
              /*
                  本次案例实现,多个线程通信,借助中间类MailBoxes解耦收件人和邮递员
               */
              for (int i = 0; i < 3; i++) {
                  new People("people"+i).start();
              }
              try {
                  TimeUnit.SECONDS.sleep(1);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
              for (Integer id : MailBoxes.getIds()) {
                  new Postman("postman"+id,id,"postman"+id+"送的信").start();
              }
          }
      }
      
      //测试结果
      收信人:people1,正在等信来,关联id:1
      收信人:people0,正在等信来,关联id:3
      收信人:people2,正在等信来,关联id:2
      当前线程:postman3,正在送信,关联id:3
      当前线程:postman2,正在送信,关联id:2
      收信人:people0,信的内容是:postman3送的信
      收信人:people2,信的内容是:postman2送的信
      当前线程:postman1,正在送信,关联id:1
      收信人:people1,信的内容是:postman1送的信
  • 异步模式之生产者/消费者

    生产者消费者模式与保护性暂停的区别在于,生产者消费者模式中消费者线程与生产者线程并不是一一对应的
    JDK中各种阻塞队列,采用的就是这种模式。
    “异步”的意思就是生产者产生消息之后消息没有被立刻消费,而“同步模式”中,消息在产生之后被立刻消费了。
    下面是一个线程间通信的消息队列,要注意的是,像rabbitmq等消息框架是进程间通信的。

    public class Test2 {
        public static void main(String[] args) {
            MessageQueue messageQueue = new MessageQueue(2);
    
            //生产者线程
            for (int i = 0; i < 3; i++) {
                int id = i;
                new Thread(() -> {
                    //注意,lambda中引用外部变量,外部变量必须为不可变,这里messageQueue类被final修饰,id是基本类型,是不可变类
                    messageQueue.send(new Message(id, "消息" + id));
                }, "生产者" + i).start();
            }
    
            //消费者线程
            new Thread(() -> {
                while (true) {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    messageQueue.take();
                }
            }, "消费者").start();
        }
    }
    
    final class MessageQueue {
    
        //存放消息的容器
        private LinkedList<Message> list = new LinkedList<>();
    
        //消息队列容量
        private Integer capacity;
    
        public MessageQueue(Integer capacity) {
            this.capacity = capacity;
        }
    
        public Message take() {
            //消费者
            synchronized (list) {
                while (list.isEmpty()) {
                    try {
                        System.out.println("当前无消息,消费者阻塞");
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Message message = list.removeFirst();
                System.out.println("消费者已消费一条消息,消息:" + message);
                list.notifyAll();
                return message;
            }
        }
    
        public void send(Message msg) {
            //生产者
            synchronized (list) {
                while (list.size() == capacity) {
                    try {
                        System.out.println("消息队列已满," + Thread.currentThread().getName() + "正在阻塞");
                        list.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                list.addLast(msg);
                System.out.println(Thread.currentThread().getName() + "已存入一条消息:" + msg);
                list.notifyAll();
            }
        }
    }
    
    class Message {
        private Integer id;
        private String value;
    
        public Message(Integer id, String value) {
            this.id = id;
            this.value = value;
        }
    
        public Integer getId() {
            return id;
        }
    
        public String getValue() {
            return value;
        }
    
        @Override
        public String toString() {
            return "Message{" +
                    "id=" + id +
                    ", value='" + value + '\'' +
                    '}';
        }
    }

park&unpark

park和unpark是LockSupport的两个方法,作用同wait,notify类似,用来同步线程
这两个方法底层都是调用的Unsafe的park,unpark方法。

// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark(Thread thread);

park&unpark原理

  • park和unpark概念

    每个线程都有自己的一个Parker对象,由三部分组成 _counter, _cond和 _mutex
    1. 打个比喻线程就像一个旅人,Parker 就像他随身携带的背包,条件变量 _ cond就好比背包中的帐篷。
       _counter 就好比背包中的备用干粮(0 为耗尽,1 为充足)
    2. 调用park就是要看需不需要停下来歇息
       1. 如果备用干粮耗尽,那么钻进帐篷歇息
       2. 如果备用干粮充足,那么不需停留,继续前进
    3. 调用unpark,就好比令干粮充足
       1. 如果这时线程还在帐篷,就唤醒让他继续前进		
       2. 如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留继续前进
          1. 因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮
  • 线程先调用park方法

    1. 当前线程调用Unsafe.park()方法
    2. 检查 _counter,本情况为 0,这时,获得 _mutex 互斥锁(mutex对象有个等待队列 _cond)
    3. 线程进入 _cond 条件变量阻塞
    4. 设置 _counter = 0
  • 线程后调用unpark方法

    1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
    2. 唤醒 _cond 条件变量中的 Thread_0
    3. Thread_0 恢复运行
    4. 设置 _counter 为 0
  • 线程先调用unpark方法,后调用park方法

    1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
    2. 当前线程调用 Unsafe.park() 方法
    3. 检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
    4. 设置 _counter 为 0

线程状态转换

1. RUNNABLE <--> WAITING
   线程用synchronized(obj)获取了对象锁后
      1. 调用obj.wait()方法时,t 线程从RUNNABLE --> WAITING
      2. 调用obj.notify(),obj.notifyAll(),t.interrupt()时
         1. 竞争锁成功,t 线程从WAITING --> RUNNABLE
         2. 竞争锁失败,t 线程从WAITING --> BLOCKED

2. RUNNABLE <--> WAITING
   1. 当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE --> WAITING
   2. 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING -->
      RUNNABLE

3. RUNNABLE <--> WAITING
   1. 当前线程调用 t.join() 方法时,当前线程从 RUNNABLE --> WAITING
      注意是当前线程在t 线程对象的监视器上等待
   2. t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 WAITING --> RUNNABLE

4. RUNNABLE <--> TIMED_WAITING
   t 线程用 synchronized(obj) 获取了对象锁后
     1. 调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE --> TIMED_WAITING
     2. t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
        1. 竞争锁成功,t 线程从 TIMED_WAITING --> RUNNABLE
        2. 竞争锁失败,t 线程从 TIMED_WAITING --> BLOCKED

5. RUNNABLE <--> TIMED_WAITING
   1. 当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE --> TIMED_WAITING
      注意是当前线程在t 线程对象的监视器上等待
   2. 当前线程等待时间超过了 n 毫秒,或t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从
      TIMED_WAITING --> RUNNABLE

6. RUNNABLE <--> TIMED_WAITING
   1. 当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE --> TIMED_WAITING
   2. 当前线程等待时间超过了 n 毫秒或调用了线程 的 interrupt() ,当前线程从 TIMED_WAITING --> RUNNABLE

7. RUNNABLE <--> TIMED_WAITING
   1. 当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 
      时,当前线程从 RUNNABLE --> TIMED_WAITING
   2. 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从
      TIMED_WAITING--> RUNNABLE

线程活跃性问题

线程活跃性问题指的是线程没有按预期结束或执行不下去的情况,
活跃性问题主要有三个:死锁,活锁,饥饿。

  • 死锁表现为:多个线程持有各自的锁,既等待对方的锁又不释放自己的锁,从而周而复始的运行下去。
    产生死锁的条件在操作系统中总结为四个:互斥,不可剥夺,请求和保持,循环等待
    下面是一个死锁小案例

    public class Test{
    
        public static void main(String[] args) {
            Object o = new Object();
            Object o1 = new Object();
    
            new Thread(() -> {
                synchronized (o) {
                    log.info("线程一占有对象o的锁");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    synchronized (o1) {
                        log.info("线程一占有对象o1的锁");
                    }
    
                }
            }, "线程1").start();
    
            new Thread(() -> {
                synchronized (o1) {
                    log.info("线程二占有对象o1的锁");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    synchronized (o) {
                        log.info("线程一占有对象o的锁");
                    }
                }
            }, "线程二").start();
        }
    }

    检测程序是否死锁可以使用jconsole工具,或者使用jps定位java程序id,再用jstack命令定位死锁
    jps,jstack工具都是jdk安装的时候自带的,直接命令行就行。

  • 活锁指的是:多个线程互相更改了其他线程的结束条件,导致这些线程都结束不了。

    与死锁的区别在于,死锁中的线程都处于阻塞状态,而活锁中线程都处于运行状态

  • 饥饿指的是:线程因为优先级太低始终得不到cpu时间片,从而无法执行,类似于操作系统中进程饥饿。

以上三个线程活跃性问题都可以通过ReentrantLock来解决

ReentrantLock

除了使用synchronized保证线程安全,ReentrantLock锁也能保证线程安全,如果说synchronized是jvm底层实现
线程安全,那么ReentrantLock就纯使用java代码实现线程安全,当然两者还有其他差别我们待会说.

  • ReentrantLock的特点

    • 可打断
      ReentrantLock是可打断的,线程被阻塞进入阻塞状态(blocked)后,可以通过其他对象进行打断,
      使得阻塞中的线程退出阻塞状态,而synchronized锁和ReentrantLock的lock方法并不能打断阻塞中的线程
      注意:这里说的可打断是线程处于阻塞状态,线程还未获得锁,像synchronized锁的线程处于阻塞状态就不可被打断
      之前有提到线程的interrupted方法可中断线程是线程获得锁之后,两者时间上不同。
    • 可以设置超时时间
      上面提到,ReentrantLock锁是可打断的,这需要其他线程主动打断处于阻塞中的线程,然而你也可以
      设置超时时间,使得阻塞线程在阻塞一段时间后,自己退出阻塞状态。
    • 可以设置为公平锁
      公平锁指的是在monitor的阻塞队列中,先阻塞的先获得锁,而不是随机获取锁,synchronized锁就是
      随机在阻塞队列中选择一个线程获得锁。
    • 支持多个条件变量
      即对与不满足条件的线程可以放到不同的集合中等待
      类似于,我们都知道,使用synchronized锁时,当线程不满足某个条件时会调用wait方法,线程释放锁
      进入等待队列中,需要其他线程去唤醒这个线程,但是notify方法只能随机唤醒一个,而notifyAll方法
      唤醒等待队列中所有线程,这就可能导致,不该被唤醒的线程被唤醒了,这就是虚假唤醒。
      我们可以把这个唯一的等待队列看做一个条件变量,所有不满足运行的线程全部放在这里,
      而ReentrantLock支持多个条件变量,则可以不同线程不满足运行时放入不同的条件变量,唤醒时
      指定某个条件变量中线程唤醒,这样就避免了虚假唤醒的问题。
  • ReentrantLock基本语法

    // 获取锁
    reentrantLock.lock();
    try {
     // 临界区
    } finally {
     // 释放锁
     reentrantLock.unlock();
    }
  • 同步模式之顺序控制

    • 固定运行顺序,比如,必须先 2 后 1 打印
      wait,notify实现

      public class Test {
          /**
           * 设计模式。固定运行顺序之wait,notify实现
           *     该模式作用:使得两个线程按照固定顺序运行,例如t2必须等待t1线程运行完毕才能运行
           */
          static Object lock = new Object();
          static boolean flag = false;
          public static void main(String[] args) {
      
              Thread t1 = new Thread(() -> {
                  synchronized (lock) {
                      System.out.println("t1");
                      flag=true;
                      lock.notify();
                  }
              }, "t1");
              t1.start();
      
              new Thread(()->{
                  synchronized (lock){
                      while (!flag){
                          try {
                              lock.wait();
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                      System.out.println("t2");
                  }
              },"t2").start();
          }
      }

      park,unpark实现

      public class Test {
          public static void main(String[] args) {
              Thread t2 = new Thread(() -> {
                  LockSupport.park();
                  System.out.println("t2");
              }, "t2");
              t2.start();
      
              new Thread(()->{
                  System.out.println("t1");
                  LockSupport.unpark(t2);
              },"t1").start();
          }
      }
    • 交替输出,线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出abcabcabcabcabc 怎么实现
      wait,notify版

      //测试
      WaitNotify waitNotify = new WaitNotify(1, 5);
      new Thread(()->{
          waitNotify.print("a",1,2);
      },"t1").start();
      new Thread(()->{
          waitNotify.print("b",2,3);
      },"t2").start();
      new Thread(()->{
          waitNotify.print("c",3,1);
      },"t3").start();
      //******************************************
      class WaitNotify{
          private Integer waitFlag;
          private final Integer loopNum;
      
          public WaitNotify(Integer waitFlag, Integer loopNum) {
              this.waitFlag = waitFlag;
              this.loopNum = loopNum;
          }
      
          public void print(String str,Integer waitFlag,Integer nextFlag){
              for (int i = 0; i <loopNum; i++) {
                  synchronized (this){
                      while (!Objects.equals(waitFlag, this.waitFlag)){
                          try {
                              this.wait();
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                      System.out.print(str);
                      this.waitFlag=nextFlag;
                      this.notifyAll();
                  }
              }
          }
      }

      ReentrantLock条件变量版

      //测试
      AwaitSignal awaitSignal = new AwaitSignal(5);
      Condition c1 = awaitSignal.newCondition();
      Condition c2 = awaitSignal.newCondition();
      Condition c3 = awaitSignal.newCondition();
      new Thread(()->{
          awaitSignal.print("a",c1,c2);
      },"t1").start();
      new Thread(()->{
          awaitSignal.print("b",c2,c3);
      },"t2").start();
      new Thread(()->{
          awaitSignal.print("c",c3,c1);
      },"t3").start();
      //main主线程先唤醒t1
      try {
          Thread.sleep(1000);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      awaitSignal.lock();
      try {
          System.out.println("开始");
          c1.signal();
      }finally {
          awaitSignal.unlock();
      }
      //********************************************
      class AwaitSignal extends ReentrantLock{
          private final int loopNum;
      
          public AwaitSignal(int loopNum){
              this.loopNum=loopNum;
          }
      
          /**
           * 参数1:当前线程打印的内容
           * 参数2:当前线程所处的等待队列
           * 参数3:下一个需要唤醒的等待队列
           */
          public void print(String str, Condition current,Condition next){
              for (int i = 0; i < loopNum; i++) {
                  lock();
                  try {
                      current.await();    //让当前线程进入自己的等待队列
                      System.out.print(str);
                      next.signal();      //唤醒其他线程的等待队列
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  } finally {
                      unlock();
                  }
              }
          }
      }

      Park Unpark 版

      ParkUnpark parkUnpark = new ParkUnpark(5);
      t1=new Thread(()->{
          parkUnpark.print("a",t2);
      },"t1");
      t2=new Thread(()->{
          parkUnpark.print("b",t3);
      },"t2");
      t3=new Thread(()->{
          parkUnpark.print("c",t1);
      },"t3");
      t1.start();
      t2.start();
      t3.start();
      //main主线程先唤醒t1
      try {
          Thread.sleep(1000);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      LockSupport.unpark(t1);

Java内存模型

JMM 即 Java Memory Model,它从java层面定义了主存、工作内存抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、CPU 指令优化等。JMM 体现在以下几个方面

  • 原子性,保证指令不会受到线程上下文切换的影响
  • 可见性,保证指令不会受 cpu 缓存的影响
  • 有序性,保证指令不会受 cpu 指令并行优化的影响

1,可见性
可见性准确的描述应该是,一个线程对共享变量的修改对于另一个线程是否可见,有时候因为对共享变量的修改
其他线程不可见,会出现一些问题,例如下面代码:

public class Test1 {
    static boolean run = true;
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(()->{
            while(run){
                // ....
//                System.out.println(2323);  如果加上这个代码就会停下来
            }
        });
        t.start();
        utils.sleep(1);
        System.out.println(3434);   
        run = false; // 线程t不会如预想的停下来
    }
}

那么为什么线程不能够停下来呢?我们主线程明明修改了变量,这是因为cpu缓存的原因,分析如下:

  • 初始状态, t 线程刚开始从主内存读取了 run 的值到工作内存。
  • 因为t1线程频繁地从主存中读取run的值,jit即时编译器会将run的值缓存至自己工作内存中的高速缓存中,
    减少对主存中run的访问以提高效率
  • 1 秒之后,main 线程修改了 run 的值,并同步至主存,而 t 是从自己工作内存中的高速缓存中读取这个变量
    的值,结果永远是旧值

解决办法也很简单,给变量run加上volatile关键字,它可以用来修饰成员变量和静态成员变量,
他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存
使用synchronized关键字也有相同的效果!在Java内存模型中,synchronized规定,线程在加锁时, 先清空工作内存→在主内存中拷贝最新变量的副本到工作内存 →执行完代码→将更改后的共享变量的值刷新到主内存中→释放互斥锁。
想想为什么在上面代码中输出一下就能解决可见性问题?因为输出语句是synchronized修饰的!

volatile能解决可见性问题,但并不能解决原子性,单独的它解决不了线程安全问题哦
synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性。
但缺点是synchronized 是属于重量级操作,性能相对更低。

1.1,使用volatile改进两阶段终止模式
两阶段终止模式上一篇讲过,是用来一个线程通知停止另一个线程的模式,使用volatile改进后代码如下:

public class Test1 {
    public static void main(String[] args) {
        TwoPhaseTermination2 twoPhaseTermination2 = new TwoPhaseTermination2();
        twoPhaseTermination2.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        twoPhaseTermination2.stop();
    }
}

class TwoPhaseTermination2{
    private Thread thread;
    private volatile Boolean flag = false;
    public void start(){
        System.out.println("正在开启线程");
        thread = new Thread(){
            @Override
            public void run() {
                while (true){
                    if (flag){
                        System.out.println("线程正在停止,处理后事");
                        break;
                    }
                    try {
                        TimeUnit.MILLISECONDS.sleep(200);
                        System.out.println("执行业务逻辑");
                    } catch (InterruptedException e) {
                        System.out.println("异常处理");
//                        thread.interrupt();
//                        e.printStackTrace();
                    }
                }
            }
        };
        thread.start();
    }

    //停止线程
    public void stop(){
        System.out.println("正在停止线程");
        flag=true;
        thread.interrupt();
    }
}

1.2,两阶段终止增加balking模式
上述TwoPhaseTermination2存在一个问题:如果当我多次调用start,就会创建多个线程,
实际上,我只需要一个线程,哪怕你多次调用start方法也没用,这就需要使用设计模式之犹豫模式了,
即Balking模式。该模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回

public class Test1 {
    public static void main(String[] args) {
        TwoPhaseTermination3 twoPhaseTermination3 = new TwoPhaseTermination3();
        twoPhaseTermination3.start();
        twoPhaseTermination3.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        twoPhaseTermination3.stop();
    }
}
class TwoPhaseTermination3{
    private Thread thread;
    private volatile Boolean flag = false;  //结束线程标志
    private volatile Boolean starting = false;  //是否已经开启线程标志
    public void start(){
        synchronized (this){
            if (starting){
                return;
            }
            starting=true;
        }
        System.out.println("正在开启线程");
        thread = new Thread(){
            @Override
            public void run() {
                while (true){
                    if (flag){
                        System.out.println("线程正在停止,处理后事");
                        break;
                    }
                    try {
                        TimeUnit.MILLISECONDS.sleep(200);
                        System.out.println("执行业务逻辑");
                    } catch (InterruptedException e) {
                        System.out.println("异常处理");
//                        thread.interrupt();
//                        e.printStackTrace();
                    }
                }
            }
        };
        thread.start();
    }

    //停止线程
    public void stop(){
        System.out.println("正在停止线程");
        flag=true;
        thread.interrupt();
    }
}

2,有序性
有序性指的是:因为JIT编译器在运行时做了一些优化,即指令重排,导致程序没有出现预期结果,
指令重排序在单线程下可以优化程序,但在多线程下就会造成一些问题,下面是一个出现有序性问题的例子:

int num = 0;

// volatile 修饰的变量,可以禁用指令重排 volatile boolean ready = false; 可以防止变量之前的代码被重排序
boolean ready = false; 
// 线程1 执行此方法,
public void actor1(I_Result r) {
 if(ready) {
 	r.r1 = num + num;
 } 
 else {
 	r.r1 = 1;
 }
}
// 线程2 执行此方法
public void actor2(I_Result r) {
 num = 2;
 ready = true;
}

I_Result 是一个对象,有一个属性 r1 用来保存结果,问可能的结果有几种?
有同学这么分析
情况1:线程1 先执行,这时 ready = false,所以进入 else 分支结果为 1
情况2:线程2 先执行 num = 2,但没来得及执行 ready = true,线程1 执行,还是进入 else 分支,结果为1
情况3:线程2 执行到 ready = true,线程1 执行,这回进入 if 分支,结果为 4(因为 num 已经执行过了)
但我告诉你,结果还有可能是 0 ,信不信吧!这种情况下是:线程2 执行 ready = true,切换到线程1,
进入 if 分支,相加为 0,再切回线程2 执行 num = 2。即因为指令重排,ready=true比num=2先执行。

指令重排序也需要遵守一定规则

  • 重排序操作不会对存在数据依赖关系的操作进行重排序。比如:a=1;b=a; 这个指令序列,由于第二个操作依赖于第一个操作,所以在编译时和处理器运行时这两个操作不会被重排序。
  • 重排序是为了优化性能,但是不管怎么重排序,单线程下程序的执行结果不能被改变。比如:a=1;b=2;c=a+b这三个操作,第一步(a=1)和第二步(b=2)由于不存在数据依赖关系,所以可能会发生重排序,但是c=a+b这个操作是不会被重排序的,因为需要保证最终的结果一定是c=a+b=3。

解决办法就是使用volatile关键字修饰共享变量

总结
volatile能解决可见性,有序性,synchronized能解决原子性,可见性问题,但不能保证有序性,
因为synchronized并不能保证有序性问题,所以要想synchronized实现线程安全就要求共享变量完全被
synchronized管理起来,外面不能访问到共享变量,不然多线程下还是会出现线程不安全的情况。
只要共享变量完全被synchronized包裹住,就不会出现有序性问题。

Volatile原理

volatile底层原理是内存屏障

  • 对volatile变量的读指令之前加入读屏障,保证读屏障之后的代码不能重排到读屏障之前
    保证读屏障之后的代码读取的变量都是最新值(即都是从主存读取)
  • 对volatile变量的写指令之后加入写屏障,保证写屏障之前的代码不能重排到写屏障之后
    保证写屏障之前的代码全部都写入主存中。

内存屏障底层原理是cpu缓存一致性,即MESI,没研究过,以后有用到再看。
我们结合一下上面代码:

int num = 0;

volatile boolean ready = false;  //防止执行重排序
// 线程1 执行此方法,
public void actor1(I_Result r) {
 //存在读屏障  保证之后读的数据都是最新值,并且防止后面指令重排到读屏障之前
 if(ready) {
 	r.r1 = num + num;
 } 
 else {
 	r.r1 = 1;
 }
}
// 线程2 执行此方法
public void actor2(I_Result r) {
 num = 2;
 ready = true;
 //存在写屏障,保证之前的代码不会重排到写屏障之后,即num=2不能在ready=true之后
 //也保证写屏障之前对变量的修改能同步到主存中
}

Double-Checked Locking,双重检查锁
双重检查锁是普通单例模式的改进型,也是volatile最常用的地方

//普通的单例模式
public final class Singleton {
    private Singleton() { }
    private static Singleton INSTANCE = null;
    public static Singleton getInstance() {
        // 这里有缺点,首次访问会同步,而之后的使用不用进入synchronized
        synchronized(Singleton.class) {
            if (INSTANCE == null) { 
                INSTANCE = new Singleton();
            }
        }
        return INSTANCE;
    }
}

/*
    可以看到,只有第一次创建实例才需要加锁,之后调用getInstance方法并不需要加锁
    上述代码效率还是很有问题的,改进如下
*/
public final class Singleton {
    private Singleton() { }
    private static Singleton INSTANCE = null;
    public static Singleton getInstance() {
        if(INSTANCE == null) { 
            // 首次访问会同步,而之后的使用没有 synchronized
            synchronized(Singleton.class) {
                if (INSTANCE == null) { 
                    INSTANCE = new Singleton();
                }
            }
        }
        return INSTANCE;
    }
}
/*
    如果你仔细分析,就会发现,上面代码其实在多线程环境下是有问题的,因为共享变量INSTANCE
    并未完全被synchronized管理到,又因为synchronized并不能保证有序性,从而导致代码并不安全
    结合上述代码,INSTANCE = new Singleton();可以被拆分为初始化对象中属性,返回给INSTANCE
    但因为有序性问题,导致先返回给INSTANCE,再初始化对象属性,这样调用方就会得到一个未初始化的实例
    这就是问题所在,解决办法自然是给INSTANCE变量加上volatile,这样就不会发生重排序,
    保证返回INSTANCE实例操作在调用构造器之后,简直完美
*/
public final class Singleton {
    private Singleton() { }
    private static volatile Singleton INSTANCE = null;
    public static Singleton getInstance() {
        // 实例没创建,才会进入内部的 synchronized代码块
        if (INSTANCE == null) {
            synchronized (Singleton.class) { // t2
                // 也许有其它线程已经创建实例,所以再判断一次
                if (INSTANCE == null) { // t1
                    INSTANCE = new Singleton();
                }
            }
        }
        return INSTANCE;
    }
}
/*
    你可能有疑问了,为什么要加两个if判断呢?我们想象一下,假设INSTANCE为null,两个线程同时都经过
    外层if判断(这是有可能的,外层if又没有锁),然后只有一个线程获得锁,如果没有里面的if,直接创建对象
    那么第二个线程获取锁后,又创建了一次对象,所以里层if判断解决的是第一次加锁时线程竞争状况。
*/

大佬文章:Java中的双重检查锁(double checked locking)

Happens-Before规则
因为jvm会对代码进行编译优化,指令会出现重排序的情况,为了避免编译优化对并发编程安全性的影响,需要happens-before规则定义一些禁止编译优化的场景,保证并发编程的正确性。

关于happens-before规则可以看这篇文章,写的非常好:深入理解happens-before规则

总结
volatile主要用在一个线程改多个线程读时的来保证可见性,
和double-checked locking模式中保证synchronized代码块外的共享变量的重排序问题

CAS的认识

synchronized和ReentrantLock实现线程安全,都属于阻塞式锁,是悲观锁的思想
即时刻提防其他线程修改共享变量,我拿到锁,只有等我改完你们才能改

CAS+volatile方式实现线程安全,被叫做非阻塞式锁,是一种并不需要加锁就能实现线程安全的途径,
是乐观锁的思想,即不怕其他线程修改共享变量,大不了我再重试一次(while(true))

让我们从实际问题出发来认识些CAS


public class Test {
    
    public static void main(String[] args) {
        Account account = new AccountUnsafe(10000);
        Account.demo(account);
    }
}

class AccountUnsafe implements Account {
    private Integer balance;
    ReentrantLock lock = new ReentrantLock();

    public AccountUnsafe(Integer balance) {
        this.balance = balance;
    }

    @Override
    public Integer getBalance() {
        /*synchronized (this){
            return balance;
        }*/
        lock.lock();
        try {
            return balance;
        }finally {
            lock.unlock();
        }
    }

    @Override
    public void  withdraw(Integer amount) {
        // 通过这里加锁就可以实现线程安全,不加就会导致结果异常
        /*synchronized (this){
            balance -= amount;
        }*/

        //reentrantLock锁对象创建应该作为实例变量,让所有线程共享
        lock.lock();
        try {
            balance -= amount;
        }finally {
            lock.unlock();
        }
    }
}

interface Account {
    // 获取余额
    Integer getBalance();
    // 取款
    void withdraw(Integer amount);
    /**
     * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
     * 如果初始余额为 10000 那么正确的结果应当是 0
     */
    static void demo(Account account) {
        List<Thread> ts = new ArrayList<>();
        long start = System.nanoTime();
        for (int i = 0; i < 1000; i++) {
            ts.add(new Thread(() -> {
                account.withdraw(10);
            }));
        }
        ts.forEach(Thread::start);
        ts.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(account.getBalance()
                + " cost: " + (end-start)/1000_000 + " ms");
    }
}

上面代码是使用synchronized和ReentrantLock保证线程安全,但是加锁方式实现线程安全太耗费资源,
这里我们使用无锁来解决上面问题:

class AccountSafe implements Account {
    private AtomicInteger balance;

    public AccountSafe(Integer balance) {
        this.balance = new AtomicInteger(balance);
    }

    @Override
    public Integer getBalance() {
        //悲观锁,阻塞方式解决线程安全
        /*synchronized (this){
            return balance;
        }*/

        /*lock.lock();
        try {
            return balance.get();
        }finally {
            lock.unlock();
        }*/

        //乐观锁,非阻塞方式解决线程安全
        return balance.get();
    }

    @Override
    public void  withdraw(Integer amount) {
        // 通过这里加锁就可以实现线程安全,不加就会导致结果异常
        /*synchronized (this){
            balance -= amount;
        }*/

        //reentrantLock锁对象创建应该作为实例变量,让所有线程共享
        /*lock.lock();
        try {
            balance -= amount;
        }finally {
            lock.unlock();
        }*/
        while (true){
            //线程的工作内存中保存的最新值
            int prev = balance.get();
            //要修改的余额
            int next = prev -amount;
            if (balance.compareAndSet(prev,next)){
                break;
            }
        }
//        balance.addAndGet(-amount);  //直接使用AtomicInteger包装后的方法
    }
}

上述代码中使用了无锁的方式实现线程安全,主要是取款操作这里,withdraw方法中while循环,
这里原子整数balance调用了compareAndSet方法,底层调用了Unsafe的compareAndSet方法,
这种方式简称为CAS方式,这个方法的具体含义是根据现在的值,即prev比较主存中的值,如果一致则
设置新值,即next值,如果不一致则不设置直接进行下一轮,另外CAS方法如果设置值成功返回true,否则返回false

CAS操作是原子操作,即balance.compareAndSet(prev,next),另外CAS的使用必须要配合volatile才能实现
线程安全,AtomicInteger类有个value属性,这个属性是存放我们变量的值,这个属性就是被volatile修饰的。

为什么无锁效率高?

  • 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,
    发生上下文切换,进入阻塞。打个比喻:线程就好像高速跑道上的赛车,高速运行时,速度超快,
    一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大
  • 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,
    线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。

CAS 的特点
结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。

  • CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
  • synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,
    我上了锁你们都别想改,我改完了解开锁,你们才有机会。
  • CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
    因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
    但如果竞争激烈(写操作多),可以想到重试必然频繁发生,反而效率会受影响

JUC包下的类

java.util.concurrent.atomic并发包提供了一些并发工具类,这里把它分成五类:

  • 使用原子的方式更新基本类型
    • AtomicInteger:整型原子类
    • AtomicLong:长整型原子类
    • AtomicBoolean :布尔型原子类
  • 原子引用
    • AtomicReference:引用类型原子类
    • AtomicStampedReference
      原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,
      可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
    • AtomicMarkableReference
      原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来,AtomicStampedReference类似
      只不过用boolean代替整型
  • 原子数组
    • AtomicIntegerArray,整形数组原子类
    • AtomicLongArray,长整形数组原子类
    • AtomicReferenceArray,引用类型数组原子类
  • 字段更新器
    • AtomicReferenceFieldUpdater
      利用字段更新器,可以针对对象的某个域(属性)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常
    • AtomicIntegerFieldUpdater
    • AtomicLongFieldUpdater
  • 原子累加器,LongAdder

原子整型
让我们先来了解一下原子整数类,以AtomicInteger为例讨论它的API接口

public static void main(String[] args) {
    AtomicInteger i = new AtomicInteger(0);
    // 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
    System.out.println(i.getAndIncrement());
    // 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
    System.out.println(i.incrementAndGet());
    // 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
    System.out.println(i.decrementAndGet());
    // 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
    System.out.println(i.getAndDecrement());
    // 获取并加值(i = 0, 结果 i = 5, 返回 0)
    System.out.println(i.getAndAdd(5));
    // 加值并获取(i = 5, 结果 i = 0, 返回 0)
    System.out.println(i.addAndGet(-5));
    // 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
    // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
    System.out.println(i.getAndUpdate(p -> p - 2));
    // 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
    // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
    System.out.println(i.updateAndGet(p -> p + 2));
    // 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
    // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
    // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
    // getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
    System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
    // 计算并获取(i = 10, p 为 i 的当前值, x 为参数1值, 结果 i = 0, 返回 0)
    // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
    System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
}

原子引用
为什么需要原子引用类型?保证引用类型的共享变量是线程安全的(确保这个原子引用没有引用过别人)。
基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。

在之前取款案例中,使用原子整型,如果是小数呢,那就需要更改成如下案例:

class DecimalAccountCas implements DecimalAccount{
    //private BigDecimal balance;
    private AtomicReference<BigDecimal> balance ;

    public DecimalAccountCas(BigDecimal balance) {
        this.balance = new AtomicReference<>(balance);
    }

    @Override
    public BigDecimal getBalance() {
        return balance.get();
    }

    @Override
    public void withdraw(BigDecimal amount) {
        while(true){
            BigDecimal pre = balance.get();
            // 注意:这里的balance返回的是一个新的对象,即 pre!=next
            BigDecimal next = pre.subtract(amount);
            if (balance.compareAndSet(pre,next)){
                break;
            }
        }
    }
}

ABA问题及解决
现在面试问到CAS就必会问ABA问题,那么什么是ABA问题呢?
线程进行CAS操作,比较是主存中值和之前取出来的值(上面的pre局部变量),如果一致就设置新值(上面的next局部变量),
你想想如果一致就真的一定代表这个值有没有被改过吗?有没有这种可能,你取出内存中值,值假设是A,
然后你进行CAS操作时,发现主存中仍然是A,于是你觉得在我取之前到比较这段时间,没有人动过我的值
但实际上另一个线程将主存中值A改成B,再将B改成A,实际上主存中值已经被动过,但你并不知道,还以为没人
动过我的值,直接设置新值了,这就是ABA问题

CAS只能确保取出的共享变量的值在比较时与主存中的值是一致的,但不能确保共享变量的值是否被其他线程更改过

那么如果解决ABA问题呢?
答案是增加一个版本号,线程每次更改共享变量值就更改一次版本号(当然版本号肯定要随机的)。
在CAS时,不仅比较共享变量值是否一致,还要比较版本号是否一致,
这就需要使用到AtomicStampedReference类或者AtomicMarkableReference 类,
这两个类内部都维护着版本号,两者不同的是AtomicStampedReference维护的是一个整型,可以知道
线程更改多少次,如果有时候我们并不想知道在比较时其他线程更改共享变量多少次,而是仅仅想知道
其他线程有没有更改过,就只需要使用AtomicMarkableReference,该类内部维护的是boolean值。

原子数组

有时候我们在cas时,并不想设置一个新的值,而是修改的是原来对象中内部的属性值,
例如修改的是数组中元素而不是重新设置一个新数组,原子数组保护的是数组中元素的线程安全

//这是上面一段代码,假设我们的共享变量是一个数组而不是整型,我们要保证的是数组中元素的线程安全
@Override
public void withdraw(BigDecimal amount) {
    while(true){
        BigDecimal pre = balance.get();
        // 注意:这里的balance返回的是一个新的对象,即 pre!=next
        //有时候我们并不想设置新数组,而是更改原来数组中某个元素的值,这就需要原子数组了
        BigDecimal next = pre.subtract(amount);
        if (balance.compareAndSet(pre,next)){
            break;
        }
    }
}

在介绍更深层次的原子数组应用之前,我们先来学习一些函数式接口,这些接口都可以作为方法参数上使用

  • Supplier
    不要求每次调用供应商时都返回一个新的或不同的结果。这是一个函数接口,其函数方法是get

    public interface Supplier<T> {
        /**
         * Gets a result.
         * @return a result
         */
        T get();
    }
  • Function&BiFunction
    表示接受一个参数或两个参数并返回结果的函数。

    public interface Function<T, R> {
        /**
        * Applies this function to the given argument.
        * @param t the function argument
        * @return the function result
        */	
      R apply(T t);
      //....
    }
  • Consumer&BiConsumer
    表示接受一个或两个输入参数且不返回结果的函数

    public interface BiConsumer<T, U> {
       void accept(T t, U u);
         //....
    }
    public interface Consumer<T> {
        void accept(T t);
         //....
    }

下面是一个使用原子数组的案例

public class Test2<T> {
    
    public static void main(String[] args) {
        //普通数组,在多线程情况下,出现线程安全问题
        demo(()-> new int[10],
                array-> array.length,
                (array,index)-> array[index]++,
                array-> System.out.println(Arrays.toString(array)));

        //原子数组
        demo(()-> new AtomicIntegerArray(10),
                array-> array.length(),
                (array,index)-> array.getAndIncrement(index),   //表示对某个元素进行自增,而不是atomicInteger中对变量自增
                array-> System.out.println(array));

    }

    public static <T> void demo(Supplier<T> arr, Function<T,Integer> arrLength,
                    BiConsumer<T,Integer> operation, Consumer<T> printOperation){
        ArrayList<Thread> ts = new ArrayList<>();
        T array = arr.get();
        Integer length = arrLength.apply(array);  //获得数组长度
        for (int i = 0; i < length; i++) {
            //创建10个线程,每个线程运行1万次自增,将自增的数平均存到数组中
            //正确情况下,数组每个元素的值应该是1万,但是如果不使用原子数组就会有问题
            //而函数式编程,就把数组的定义抽离了出去,只要调用者将普通数组更改为原子数组即可
            ts.add(new Thread(()->{
                for (int j = 0; j < 10000; j++) {
                    operation.accept(array,j%length);
                }
            }));
        }
        //开启所有线程
        ts.forEach((x)->{x.start();});
        //调用所有线程的join,让主线程等这些线程结束
        ts.forEach(thread -> {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        printOperation.accept(array);
    }
}

class Student{
    volatile String name;
    volatile int age;

    @Override
    public String toString() {
        return "Student{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}

字段更新器
字段更新器保护的是某个对象中的属性(成员变量)是线程安全的,保证多个线程访问同一个对象的成员变量的安全性
下面是一个字段更新器案例

Student student = new Student();
AtomicIntegerFieldUpdater<Student> updater = AtomicIntegerFieldUpdater.newUpdater(Student.class,"age");
AtomicReferenceFieldUpdater updater1 = AtomicReferenceFieldUpdater.newUpdater(Student.class,String.class,"name");

System.out.println(updater.compareAndSet(student, 0, 520));
System.out.println(updater1.compareAndSet(student, null, "万一"));
System.out.println(student);	//已被更改为“万一”,520了

原子累加器
使用CAS对共享变量自增,你可能会想到使用AtomicInteger的getAndIncrement方法,
其实,也可以使用原子累加器LongAdder,这个类也是用来做多线程下共享变量的累加的,不同的是这个累加器效率更高
下面是一个LongAdder使用案例

public static void main(String[] args) {
    for (int i = 0; i < 5; i++) {
        demo(() -> new LongAdder(), adder -> adder.increment());
    }
    for (int i = 0; i < 5; i++) {
        demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
    }

}

private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
    T adder = adderSupplier.get();
    long start = System.nanoTime();
    List<Thread> ts = new ArrayList<>();
    // 4 个线程,每人累加 50 万
    for (int i = 0; i < 40; i++) {
        ts.add(new Thread(() -> {
            for (int j = 0; j < 500000; j++) {
                action.accept(adder);
            }
        }));
    }
    ts.forEach(t -> t.start());
    ts.forEach(t -> {
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    long end = System.nanoTime();
    System.out.println(adder + " cost:" + (end - start)/1000_000);
}

性能提升的原因很简单,就是在有竞争时,设置多个累加单元(但不会超过cpu的核心数),Therad-0 累加 Cell[0],
而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。

Unsafe

Unsafe类是JVM本地方法,是由C++编写的,也是CAS底层实现,原子整型等等都是在这个类基础上包装的
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得。
LockSupport的park方法,cas相关的方法底层都是通过Unsafe类来实现的。
下面举个例子,来拿到unsafe对象

public class GetUnsafe{
	static Unsafe unsafe;
    static {
        try {
            // Unsafe 使用了单例模式,unsafe对象是类中的一个私有的变量
            Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
            theUnsafe.setAccessible(true);
            unsafe = (Unsafe) theUnsafe.get(null);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            throw new Error(e);
        }
    }
    static Unsafe getUnsafe() {
        return unsafe;
    }
}

让我们来实现一个自定义的线程安全的原子整型,并用之前的取款实例来进行验证

public class AtomicData implements Account{

    public static void main(String[] args) {
        
        Account.demo(new AtomicData(10000));
    }
    
    private volatile int value;

    private static final Unsafe unsafe;
    
    private static long valueOffset;

    public AtomicData(int value) {
        this.value = value;
    }

    static {
        unsafe = GetUnsafe.getUnsafe();
        try {
            valueOffset = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("value"));
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        }
    }
    
    public int getValue(){
        return this.value;
    }
    
    public void decrement(int amount){
        while (true){
            int pre = getValue();
            int next = pre - amount;
            if (unsafe.compareAndSwapInt(this,valueOffset,pre,next)){
                break;
            }
        }
    }

    @Override
    public Integer getBalance() {
        return getValue();
    }

    @Override
    public void withdraw(Integer amount) {
        decrement(amount);
    }
}