近况
现在是22年2月底,有很多重要的事情需要处理,论文开题报告,中期检查,毕设,实习工作等
3月份的任务:
- 复习面试题,去杭州找份工作
自己基础还是很差,很多之前学习过的内容,没有使用导致忘记了不少,所以赶紧在找工作之前复习一下
重新学习之前的内容,找工作是必要的,目前在家效率不是特别高,不过找工作意味着租房,头痛。 - 搭建好完整的毕设项目
既然选择了分布式项目,那就要努力学好,虽然之前学习过vue,分布式,但是实战还是差点意思
还有很多问题等着去解决,3月份,我希望能搭建好项目整体框架,学习完尚医通的项目。
上面两件事是整个3月份要去解决的,4,5月份的重点放在工作内容和毕设上,5月份之前务必搞定毕设!!!
本篇博客是学习多线程的最后一篇,基础多线程的学习就先暂告一段落,我知道还有很多多线程的内容没有学习,
多线程的书”并发编程之美”我也没有看,但是不急,终究有一天我会再次学习,只不过现在这个时候,
学习不宜过多深究,很浪费时间,还有操作系统,计网等内容,我都会全部捡起来!
下面是关于本篇博客的内容:
不可变对象以及final的认识
不可变对象定义
不可变对象就是那些一旦被创建,它们的状态就不能被改变的对象,每次对它们的改变都是产生了新的对象
可变对象就是那些创建后,状态依然可以被改变的对象,这里的状态其实指的就是实例变量,静态变量。
一个不可变对象案例:public class Test { public static void main(String[] args) { String str = "I love java"; String str1 = str; System.out.println("after replace str:" + str.replace("java", "Java")); System.out.println("after replace str1:" + str1); } } //输出 after replace str:I love Java after replace str1:I love java //从输出结果可以看出,在对str进行了字符串替换替换之后,str1指向的字符串对象仍然没有发生变化。 //replace虽然看上去修改了字符串,但其实是返回了一个新的字符串,内部使用到了保护性拷贝方式 //保护性拷贝指的是:当你修改一个类的成员变量时,并不是真正修改这个成员变量而是创建一个新的成员变量返回
不可变类存在的意义
让并发编程变得更简单
控制共享资源,使各个线程达到互斥在并发编程中并不容易,大多数情况下,对于资源互斥访问的场景,
都是采用加锁的方式来实现对资源的串行访问,来保证并发安全,但是这种串行方式访问共享变量并不容易
有没有其他方法可以实现并发编程呢?事实上,引起线程安全问题的根本原因在于:多个线程需要同时访问同一个共享资源。
假如没有共享资源,那么多线程安全问题就自然解决了,Java中提供的ThreadLocal机制就是采取的这种思想。
然而大多数时候,线程间是需要使用共享资源互通信息的,如果共享资源在创建之后就完全不再变更,
如同一个常量,而多个线程间并发读取该共享资源是不会存在线上安全问题的,因为所有线程无论何时读取该共享资源,
总是能获取到一致的、完整的资源状态。不可变对象就是这样一种在创建之后就不再变更的对象,
这种特性使得它们天生支持线程安全,让并发编程变得更简单。下面是一个在多线程中使用不可变类的例子//SimpleDateFormat并不是一个线程安全的时间工具类 public class DateFormatMutable { public static void main(String[] args) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); for (int i = 0; i < 10; i++) { new Thread(() -> { try { log.debug("{}",sdf.parse("2021-10-01")); } catch (ParseException e) { log.error("{}", e); } }).start(); } } } //结果 由于 SimpleDateFormat 不是线程安全的,有很大几率出现 `java.lang.NumberFormatException` 或者出现不正确的日期解析结果。
解决办法一般是加锁,但是加锁又比较影响性能,所以可以考虑使用不可变类:DateTimeFormatter
public class DateFormatImmutable { public static void main(String[] args) { DateTimeFormatter sdf = DateTimeFormatter.ofPattern("yyyy-MM-dd"); for (int i = 0; i < 10; i++) { new Thread(() -> { log.debug("{}",sdf.parse("2021-10-01")); }).start(); } } }
减少容器使用过程出错的概率
我们在使用HashSet时,如果HashSet中元素对象的状态可变,就会出现元素丢失的情况,比如下面这个例子:class Person { private int age; // 年龄 private String identityCardID; // 身份证号码 //省略age,identityCardID的getter,setter方法 @Override public boolean equals(Object obj) { if (obj == null) { return false; } if (!(obj instanceof Person)) { return false; } Person personObj = (Person) obj; return this.age == personObj.getAge() && this.identityCardID.equals(personObj.getIdentityCardID()); } @Override public int hashCode() { return age * 37 + identityCardID.hashCode(); } } public class Test { public static void main(String[] args) { Person jack = new Person(); jack.setAge(10); jack.setIdentityCardID("42118220090315234X"); Set<Person> personSet = new HashSet<Person>(); personSet.add(jack); jack.setAge(11); System.out.println(personSet.contains(jack)); } } //输出结果 false
所以在Java中,对于String、包装器这些不可变类,我们经常会用他们来作为HashMap的key,
试想一下如果这些类是可变的,将会发生什么?后果不可预知,这将会大大增加Java代码编写的难度。
如何创建不可变对象
通常来说,创建不可变类原则有以下几条:1)所有成员变量必须是private 2)最好同时用final修饰(非必须) 3)不提供能够修改原有对象状态的方法 3.1)最常见的方式是不提供setter方法 3.2)如果提供修改方法,需要新创建一个对象,并在新创建的对象上进行修改(保护性拷贝,每次返回新的对象) 4)通过构造器初始化所有成员变量,引用类型的成员变量必须进行深拷贝(deep copy) 5)getter方法不能对外泄露this引用以及成员变量的引用 6)最好不允许类被继承(非必须)
JDK中提供了一系列方法方便我们创建不可变集合,如:
Collections.unmodifiableList(List<extendsT> list)
另外,在Google的Guava包中也提供了一系列方法来创建不可变集合,如:ImmutableList.copyOf(list)
这2种方式虽然都能创建不可变list,但是两者是有区别的,JDK自带提供的方式实际上创建出来的不是真正意义上的不可变集合
实际上UnmodifiableList是将入参list的引用复制了一份,同时将所有的修改方法抛出UnsupportedOperationException。
因此如果在外部修改了入参list,实际上会影响到UnmodifiableList,而Guava包提供的ImmutableList
是真正意义上的不可变集合,它实际上是对入参list进行了深拷贝。我们可以看下面一段代码:public class Test { public static void main(String[] args) { List<Integer> list = new ArrayList<Integer>(); list.add(1); System.out.println(list); List unmodifiableList = Collections.unmodifiableList(list); ImmutableList immutableList = ImmutableList.copyOf(list); list.add(2); System.out.println(unmodifiableList); System.out.println(immutableList); } } //输出 [1] [1,2] [1]
不可变对象真的”完全不可改变”吗?
不可变对象虽然具备不可变性,但是不是”完全不可变”的,这里打上引号是因为通过反射的手段是可以改变不可变对象的状态的。
大家看到这里可能有疑惑了,为什么既然能改变,为何还叫不可变对象?这里面大家不要误会不可变的本意,
从不可变对象的意义分析能看出来对象的不可变性只是用来辅助帮助大家更简单地去编写代码,
减少程序编写过程中出错的概率,这是不可变对象的初衷。如果真要靠通过反射来改变一个对象的状态,
此时编写代码的人也应该会意识到此类在设计的时候就不希望其状态被更改,从而引起编写代码的人的注意。
下面是通过反射方式改变不可变对象的例子:public class Test { public static void main(String[] args) throws Exception { String s = "Hello World"; System.out.println("s = " + s); Field valueFieldOfString = String.class.getDeclaredField("value"); valueFieldOfString.setAccessible(true); char[] value = (char[]) valueFieldOfString.get(s); value[5] = '_'; System.out.println("s = " + s); } } //输出 s = Hello World s = Hello_World
Final关键字的认识
final修饰的类不能被继承,final类中的成员变量可以根据需要设为final,
但是要注意final类中的所有成员方法都会被隐式地指定为final方法。final修饰的方法不能被重写,此处需要注意的一点是:因为重写的前提是子类可以从父类中继承此方法,
如果父类中final修饰的方法同时访问控制权限为private,将会导致子类中不能直接继承到此方法,
因此,此时可以在子类中定义相同的方法名和参数,此时不再产生重写与final的矛盾,
而是在子类中重新定义了新的方法。(注:类的private方法会隐式地被指定为final方法)public class B extends A { public static void main(String[] args) { } public void getName() { } } class A { /** * 因为private修饰,子类中不能继承到此方法,因此,子类中的getName方法是重新定义的、 * 属于子类本身的方法,编译正常 */ private final void getName() { } /* 因为pblic修饰,子类可以继承到此方法,导致重写了父类的final方法,编译出错 public final void getName() { } */ }
final修饰的变量为常量,当final修饰一个基本数据类型时,表示该基本数据类型的值一旦在初始化后便不能发生变化;
如果final修饰一个引用类型时,则在对其初始化之后便不能再让其指向其他对象了,但该引用所指向的对象的内容是可以发生变化
本质上是一回事,因为引用的值是一个地址,final要求值,即地址的值不发生变化。
final修饰一个成员变量(属性),必须要显示初始化。这里有两种初始化方式:1)一种是在变量声明的时候初始化; 2)第二种方法是在声明变量的时候不赋初值,但是要在这个变量所在的类的所有的构造函数中对这个变量赋初值。
当函数的参数类型声明为final时,说明该参数是只读型的。即你可以读取使用该参数,但是无法改变该参数的值。
深入理解final关键字
类的final变量和普通变量有什么区别?
先来看下面一个案例:public class Test { public static void main(String[] args) { String a = "hello2"; final String b = "hello"; String d = "hello"; String c = b + 2; String e = d + 2; System.out.println((a == c)); System.out.println((a == e)); } } //输出结果:true、false
为什么第一个比较结果为true,而第二个比较结果为fasle。这里面就是final变量和普通变量的区别了,
当final变量是基本数据类型以及String类型时,如果在编译期间能知道它的确切值,则编译器会把它当做编译期常量使用。
也就是说在用到该final变量的地方,相当于直接访问的这个常量,不需要在运行时确定。因此在上面的一段代码中,
由于变量b被final修饰,因此会被当做编译器常量,所以在使用到b的地方会直接将变量b替换为它的值。
而对于变量d的访问却需要在运行时通过链接来进行。要注意,只有在编译期间能确切知道final变量值的情况下,
编译器才会进行这样的优化,比如下面的这段代码就不会进行优化:public class Test { public static void main(String[] args) { String a = "hello2"; final String b = getHello(); String c = b + 2; System.out.println((a == c)); } public static String getHello() { return "hello"; } } //输出结果为false
这里要注意一点就是:不要以为某些数据是final就可以在编译期知道其值,通过变量b我们就知道了,
在这里是使用getHello()方法对其进行初始化,他要在运行期才能知道其值。
被final修饰的引用变量指向的对象内容可变吗?
在上面提到被final修饰的引用变量一旦初始化赋值之后就不能再指向其他的对象,那么该引用变量指向的对象的内容可变吗?
看下面这个例子:public class Test { public static void main(String[] args) { final MyClass myClass = new MyClass(); System.out.println(++myClass.i); } } class MyClass { public int i = 0; }
这段代码可以顺利编译通过并且有输出结果,输出结果为1。这说明引用变量被final修饰之后,
虽然不能再指向其他对象,但是它指向的对象的内容是可变的。
享元模式
不可变类在尝试修改对象时,当前对象会创建一个新对象,保证线程安全,但是这样带来一个问题就是对象创建过多
会消耗资源,性能变低。如何解决呢?不可变类一般会通过关联设计模式,享元模式来解决
享元模式,是对象池的一种实现。 类似于线程池,线程池可以避免不停的创建和销毁多个对象,消耗性能。
享元模式也是为了减少内存的使用,避免出现大量重复的创建销毁对象的场景。该模式属于结构性模式中的一种
享元模式在很多地方都使用到,例如String,基本类型包装类,BigDecimal BigInteger,线程池等
以往进行数据库操作时,都是来一个请求就创建一个connection对象,连接完数据库再删掉,这种每次请求都要
重新创建和关闭数据库连接,性能都会受到极大影响。这时,预先创建好一批连接,放入连接池。
一次请求到达后,从连接池获取连接;使用完毕后,归还连接池中。这样既节约了时间和资源,实现了重用,
能及时响应客户端请求,不至于给数据库造成过大的负担。下面我们使用享元模式尝试自定义连接池:
public final class Pool {
/** 连接池大小 */
private final int poolSize;
/** 连接数组 */
private final Connection[] conns;
/** 连接是否被占用标记,0-未占用,1-已占用 */
private final AtomicIntegerArray busies;
/**
* 构造方法:初始化连接池
* @param poolSize 初始连接池大小
*/
public Pool(int poolSize) {
this.poolSize = poolSize;
conns = new Connection[poolSize];
busies = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
conns[i] = new MockConnection("conn" + i);
}
}
/**
* 从连接池获取连接
* @return 连接
*/
public Connection getConnection() {
// 检查是否有空闲连接
while (true) {
for (int i = 0; i < poolSize; i++) {
// 有空闲连接返回连接
if (busies.get(i) == 0) {
// 空闲,修改标记,返回连接
if (busies.compareAndSet(i, 0, 1)) {
log.debug("get {}", conns[i]);
return conns[i];
}
}
}
// 没有空闲连接等待
synchronized (this) {
try {
log.debug("wait...");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 归还连接
* @param conn 要归还的连接
*/
public void close(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (conns[i] == conn) {
// 归还连接不会发生竞争
busies.set(i, 0);
synchronized (this) {
log.debug("close {}", conn);
this.notifyAll();
}
break;
}
}
}
}
/**
* 连接
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MockConnection implements Connection {
private String name;
...
}
/**
* 测试连接池
*/
public class PoolTest {
public static void main(String[] args) {
Pool p = new Pool(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
Connection connection = p.getConnection();
try {
// 模拟连接使用耗时
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
p.close(connection);
}
}).start();
}
}
}
// 测试结果
2022-02-27 11:33:32.443 DEBUG [Thread-3] c.Pool - wait...
2022-02-27 11:33:32.443 DEBUG [Thread-1] c.Pool - get MockConnection(name=conn0)
2022-02-27 11:33:32.443 DEBUG [Thread-2] c.Pool - get MockConnection(name=conn1)
2022-02-27 11:33:32.445 DEBUG [Thread-5] c.Pool - wait...
2022-02-27 11:33:32.445 DEBUG [Thread-4] c.Pool - wait...
2022-02-27 11:33:33.445 DEBUG [Thread-2] c.Pool - close MockConnection(name=conn1)
2022-02-27 11:33:33.445 DEBUG [Thread-4] c.Pool - get MockConnection(name=conn0)
2022-02-27 11:33:33.445 DEBUG [Thread-5] c.Pool - get MockConnection(name=conn1)
2022-02-27 11:33:33.445 DEBUG [Thread-3] c.Pool - wait...
2022-02-27 11:33:33.445 DEBUG [Thread-1] c.Pool - close MockConnection(name=conn0)
2022-02-27 11:33:33.445 DEBUG [Thread-3] c.Pool - wait...
2022-02-27 11:33:34.445 DEBUG [Thread-4] c.Pool - close MockConnection(name=conn0)
2022-02-27 11:33:34.445 DEBUG [Thread-3] c.Pool - get MockConnection(name=conn0)
2022-02-27 11:33:34.445 DEBUG [Thread-5] c.Pool - close MockConnection(name=conn1)
2022-02-27 11:33:35.445 DEBUG [Thread-3] c.Pool - close MockConnection(name=conn0)
线程池
池化技术相比大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。
池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。
自定义线程池
我们将尝试看看如何创建自己的线程池,并且看看线程池需要哪些组件/** * 步骤4:自定义线程池测试 */ @Slf4j(topic = "Test18") public class Test18{ public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{ //拒绝策略交给使用者完成 // 1. 死等 // queue.put(task); // 2) 带超时等待 // queue.offer(task, 1500, TimeUnit.MILLISECONDS); // 3) 让调用者放弃任务执行 log.debug("放弃{}", task); // 4) 让调用者抛出异常 // throw new RuntimeException("任务执行失败 " + task); // 5) 让调用者自己执行任务 task.run(); }); for (int i = 0; i < 4; i++) { int j = i; threadPool.execute(() -> { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}", j); }); } } } /** * 步骤1:自定义任务队列 * @param <T> */ @Slf4j(topic = "BlockingQueue") class BlockingQueue<T> { // 1. 任务队列 private Deque<T> queue = new ArrayDeque<>(); // 2. 锁 private ReentrantLock lock = new ReentrantLock(); // 3. 生产者条件变量 private Condition fullWaitSet = lock.newCondition(); // 4. 消费者条件变量 private Condition emptyWaitSet = lock.newCondition(); // 5. 容量 private int capcity; public BlockingQueue(int capcity) { this.capcity = capcity; } // 带超时时间的获取 public T poll(long timeout, TimeUnit unit){ lock.lock(); try{ // 将 timeout 统一转换为 纳秒 long nanos = unit.toNanos(timeout); while (queue.isEmpty()){ try { if (nanos<=0){ return null; } // 返回的是剩余的等待时间,更改navos的值,使虚假唤醒的时候可以继续等待 nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } fullWaitSet.signal(); return queue.getFirst(); }finally { lock.unlock(); } } // 阻塞获取 public T Take(){ lock.lock(); try{ while (queue.isEmpty()){ try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } fullWaitSet.signal(); return queue.getFirst(); }finally { lock.unlock(); } } // 阻塞增加 public void put (T task){ lock.lock(); try{ while (queue.size() == capcity){ try { log.debug("等待加入任务队列 {} ...", task); fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列 {}", task); queue.addLast(task); emptyWaitSet.signal(); }finally { lock.unlock(); } } // 带超时时间的增加 public boolean offer(T task , long timeout , TimeUnit unit){ lock.lock(); try{ // 将 timeout 统一转换为 纳秒 long nanos = unit.toNanos(timeout); while (queue.size() == capcity){ try { if (nanos<=0){ return false; } nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列 {}", task); queue.addLast(task); emptyWaitSet.signal(); return true; }finally { lock.unlock(); } } public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try{ // 不空闲时怎么办?由rejectPolicy决定 if (queue.size()> capcity){ rejectPolicy.reject(this, task); }else { log.debug("加入任务队列 {}", task); queue.addLast(task); emptyWaitSet.signal(); } }finally { lock.unlock(); } } public int size(){ lock.lock(); try{ return queue.size(); }finally { lock.unlock(); } } } /** * 步骤2:自定义线程池 */ @Slf4j(topic = "ThreadPool") class ThreadPool{ // 任务队列 private BlockingQueue<Runnable> taskQueue; // 线程集合 private HashSet<Worker> workers = new HashSet<>(); // 核心线程数 int coreSize; private long timeOut; private TimeUnit timeUnit; private RejectPolicy<Runnable> rejectPolicy; public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int capcity ,RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.timeOut = timeOut; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(capcity); this.rejectPolicy = rejectPolicy; } // 执行任务 public void execute(Runnable task){ // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行 // 如果任务数超过 coreSize 时,加入任务队列暂存 synchronized (workers){ if (workers.size()<coreSize){ Worker worker = new Worker(task); workers.add(worker); worker.start(); }else { // 1) 死等 //taskQueue.put(task); // 2) 带超时等待 //taskQueue.tryPut(rejectPolicy, task); // 3) 让调用者放弃任务执行 // 4) 让调用者抛出异常 // 5) 让调用者自己执行任务 //或者将以上的这些选项封装起来,由调用者调用时自己设计操作逻辑 taskQueue.tryPut(rejectPolicy,task); } } } class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { // 执行任务 // 1) 当 task 不为空,执行任务 // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行 while(task!=null || (task = taskQueue.poll(timeOut,timeUnit)) != null) { try{ log.debug("正在执行...{}", task); task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null; } } synchronized (workers){ log.debug("worker 被移除{}", this); workers.remove(this); } } } } /** * 步骤3:自定义拒绝策略接口 * @param <T> */ @FunctionalInterface // 拒绝策略 @FunctionalInterface的意思是这是一个函数式编程接口 interface RejectPolicy<T> { void reject(BlockingQueue<T> queue, T task); }
从自定义一个简单的线程池我们可以看到线程池的基本要素:
- 等待队列
等待队列中有几个必要状态(属性):具体队列,锁,生产者条件变量,消费者条件变量,队列容量
再加上阻塞获取,阻塞添加,拒绝策略方法,超时获取,超时添加等方法 - 线程池
线程池中几个必要必要状态(属性):等待队列,线程集合,线程核心数,拒绝策略,超时时间和时间单位倒是其次
线程池主要方法是execute方法用来执行任务,线程池中当然还有一个内部类,继承线程,包装任务,重写run方法 - 拒绝策略
单纯的函数式接口,该接口方法用于将线程池中拒绝策略方式交给使用者去实现
大致的流程就是使用者提交任务,线程池创建线程处理任务,如果任务太多则放入等待队列。 以上是一个简单的线程池实现,下面让我们看看JDK中如何实现线程池。
- 等待队列
ThreadPoolExecutor
下面是JDK中线程池的基本结构和继承体系:线程池状态
线程池的状态说的是在不同条件下线程池的行为。ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
从数字上比较(第一位是符号位),TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值
当我们调用线程池shutdown方法时,此时线程池处于shutdown状态,即不会接受新线程,但会处理阻塞队列剩余任务线程池的构造方法
再来看下JDK中线程池如何被创建,创建时需要哪些参数,下面是一个线程池构造方法public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){ }
1,corePoolSize 核心线程数目 (最多保留的线程数)
2,maximumPoolSize 最大线程数目(核心线程数加上救急线程数)
JDK中的线程池与我们自定义的线程池有一个比较大的区别在于,JDK线程池中的线程分为核心线程和救急线程 核心线程一旦创建是一直运行的,而救急线程只有在核心线程满了,等待队列也满了后才会创建救急线程 并且救急线程在一段时间内不使用后会自动销毁,我们自定义线程池中线程则没有细分全都是核心线程。 不过要注意,一定要最大线程数目大于核心线程数目,才会创建救急线程,不然直接返回拒绝策略了
3,keepAliveTime 救急线程的生存时间(核心线程没有生存时间这个东西,核心线程会一直运行)
4,unit 时间单位 - 针对救急线程,是keepAliveTime 的时间单位
5,workQueue 阻塞队列
不同的线程池实现使用不同的阻塞队列,JDK线程池中阻塞队列主要分为以下几个: 1,ArrayBlockingQueue,基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾, 有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来, 则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程, 如果线程数量已经达到maxPoolSize,则会执行拒绝策略。 2,LinkedBlockingQuene,基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。 由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列, 而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。 newFixedThreadPool,newSingleThreadExecutor线程池使用该阻塞队列 3,SynchronousQuene,一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。 也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程, 如果线程数量达到maxPoolSize,则执行拒绝策略。 newCachedThreadPool使用该阻塞队列 4,PriorityBlockingQueue,具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
6,threadFactory 创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等
7,handler 拒绝策略
不同的框架实现的线程池有不同的拒绝策略,JDK中的拒绝策略是前4个 1. ThreadPoolExecutor.AbortPolicy让调用者抛出 RejectedExecutionException 异常,这是默认策略 2. ThreadPoolExecutor.CallerRunsPolicy 让调用者运行任务 3. ThreadPoolExecutor.DiscardPolicy 放弃本次任务 4. ThreadPoolExecutor.DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之 5. Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并dump线程栈信息,方便定位问题 6. Netty 的实现,是创建一个新线程来执行任务 7. ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略 8. PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
线程池如何提交任务
ThreadPoolExecutor中主要有下面几种提交方式//1,提交任务,没有返回值 void execute(Runnable command); //2,提交任务 task,用返回值 Future 获得任务执行结果,Future的原理就是利用我们之前讲到的 //保护性暂停模式来接受返回结果的,主线程可以执行 FutureTask.get()阻塞方法来等待任务执行完成 <T> Future<T> submit(Callable<T> task); 案例如下: private static void test2() throws InterruptedException, ExecutionException { /** * 提交任务,除了execute方法,还有submit方法 */ ExecutorService pool = Executors.newFixedThreadPool(2); Future<String> future = pool.submit(new Callable<String>() { /* new Callable<String>,泛型表示返回值类型 */ @Override public String call() throws Exception { return "fuck"; } }); System.out.println(future.get()); /* Future,该类使用保护性暂停模式,用来传递两个线程之间结果的。 */ pool.shutdown(); } //3,提交 tasks 中所有任务,批量执行任务 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; 案例如下: private static void test3() throws InterruptedException { /* 测试invokeAll方法,执行所有任务 */ ExecutorService pool = Executors.newFixedThreadPool(2); List<Future<String>> futures = pool.invokeAll(Arrays.asList( () -> { Thread.sleep(1000); return "a"; }, () -> { Thread.sleep(500); return "b"; }, () -> { Thread.sleep(1500); return "c"; } )); futures.forEach(f->{ try { System.out.println(f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); } //4,提交 tasks 中所有任务,带超时时间 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; //5,提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; //6,提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
除了提交方法外,还有一些其他方法如下:
//2种关闭线程方法 void shutdown(); /* 线程池状态变为 SHUTDOWN,不会接收新任务,但已提交任务会执行完,包括等待队列里面的 此方法不会阻塞调用线程的执行,线程池处于shutdown状态 */ List<Runnable> shutdownNow(); /* 线程池状态变为 STOP,不会接收新任务,会将队列中的任务返回,并用interrupt的方式中断正在执行的任务 */ // 不在 RUNNING 状态的线程池,此方法就返回 true boolean isShutdown(); // 线程池状态是否是 TERMINATED boolean isTerminated(); // 调用shutdown方法后,由于调用使线程结束线程的方法是异步的并不会等待所有任务运行结束就返回, //因此如果想在线程池 TERMINATED 后做些其它事情,可以利用此方法等待 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
线程池中如何处理异常
如果线程池中的线程执行任务时,如果任务抛出了异常,默认是中断执行该任务而不是抛出异常或者打印异常信息。
那么我们如何处理异常,如何获取线程池中异常呢?一般有下面两种方式://1,直接任务中try,catch异常 ExecutorService pool = Executors.newFixedThreadPool(1); pool.submit(() -> { try { log.debug("task1"); int i = 1 / 0; }catch (Exception e) { log.error("error:", e); } }); //2,使用 Future,错误信息都被封装进submit方法的返回方法中! ExecutorService pool = Executors.newFixedThreadPool(1); Future<Boolean> f = pool.submit(() -> { log.debug("task1"); int i = 1 / 0; return true; }); log.debug("result:{}", f.get()); //如果任务正常执行,future接收返回值,出现异常则接收异常信息
线程池具体执行流程
1)当池子大小(线程池中线程数)小于corePoolSize就新建线程,并处理请求 2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理 3)当workQueue放不下新入的任务时,新建救急线程入池,并处理请求, 如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理 4)另外,当池子的线程数大于corePoolSize的时候,多余的线程(救急线程)会等待keepAliveTime长的时间, 如果无请求可处理就自行销毁,其会优先创建 CorePoolSiz 线程, 当继续增加线程时,先放入Queue中, 当 CorePoolSiz 和 Queue 都满的时候,就增加创建新线程(救急线程),当线程达到MaxPoolSize的时候, 就会抛出错误 org.springframework.core.task.TaskRejectedException(默认拒绝策略) 另外MaxPoolSize的设定如果比系统支持的线程数还要大时, 会抛出java.lang.OutOfMemoryError: unable to create new native thread异常。
使用Executors创建的线程池
Executors是Executor的工具类,用于创建不同的线程池newFixedThreadPool
newFixedThreadPool是固定大小的线程池,最大线程数等于核心线程数,即没有救急线程public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } /* 该线程池特点: 1. 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间 2. 阻塞队列是无界的,可以放任意数量的任务 3. 适用于任务量已知,相对耗时的任务 */
下面是该线程池创建及使用:
ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() { private AtomicInteger t = new AtomicInteger(1); /** * 设置线程工厂如何新建线程 */ @Override public Thread newThread(Runnable r) { return new Thread(r,"Mythread_T" + t.getAndIncrement()); } }); pool.execute(()->{ log.info("1"); }); pool.execute(()->{ log.info("2"); }); pool.execute(()->{ log.info("3"); });
newCachedThreadPool
newCachedThreadPool是可变尺寸的线程池,因为没有线程池中没有核心线程数,全部都是救急线程
即有生存时间的线程,该线程池的等待队列使用SynchronousQueue实现,它是一个不存储元素的阻塞队列。
每个插入操作必须等到另一个线程调用移除操作。该线程池构造参数及特点如下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } /* 特点: 1. 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s, 意味着全部都是救急线程(60s 后可以回收)且救急线程可以无限创建。 2. 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的 3. 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后 释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况 */
使用案例如下:
//测试newCachedThreadPool内部队列的put,offer方法的区别 public class Test { public static void main(String[] args) throws InterruptedException { SynchronousQueue<Integer> integers = new SynchronousQueue<>(); new Thread(() -> { try { System.out.println("putting...1"); integers.put(1); // 对比一下两种方法的区别 //区别在于put方法会等take方法执行,而offer方法不会 // boolean offer = integers.offer(1); // System.out.println(offer); System.out.println("1 putted..."); System.out.println("putting...2"); integers.put(2); System.out.println("2 putted..."); } catch (InterruptedException e) { e.printStackTrace(); } },"t1").start(); TimeUnit.SECONDS.sleep(2); new Thread(() -> { try { System.out.println("taking 1"); integers.take(); } catch (InterruptedException e) { e.printStackTrace(); } },"t2").start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { try { System.out.println("taking 2"); integers.take(); } catch (InterruptedException e) { e.printStackTrace(); } },"t3").start(); } }
newSingleThreadExecutor
newSingleThreadExecutor,单任务线程池,池中只有一个线程工作,阻塞队列无界,
它能保证按照任务提交的顺序来执行任务。
该线程池构造参数及特点如下:public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } /* 特点:希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。 任务执行完毕,这唯一的线程也不会被释放。 */
那么该线程池与我们自己创建的单线程有什么区别呢?区别如下:
1. 和自己创建单线程执行任务的区别:自己创建一个单线程串行执行任务,如果任务执行失败而终止 那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作 2. Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改 FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用threadPoolExecutor中特有的方法 3. 和Executors.newFixedThreadPool(1) 初始时为1时的区别: Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改,对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改
线程池使用如下:
public class MySingleThreadExecutor { public static void main(String[] args) { ExecutorService executor = Executors.newSingleThreadExecutor(); for(int i = 0; i < 10; i++){ final int index = i; executor.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + "执行 >>> " + index); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } } //输出就是按顺序输出,一个任务一个任务串行执行
newScheduledThreadPool
任务调度线程池,当你想让你的任务延迟执行,或者固定每隔一段时间执行一次,你可以使用任务调度线程池
当任务调度线程池还没有出来执行,使用的是Timer来实现上面两个功能。
Timer的特点是:它只能串行执行任务,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务
Timer的使用如下:Timer timer = new Timer(); TimerTask task = new TimerTask(){ @Override public void run() { System.out.println("a"); } }; TimerTask task2 = new TimerTask(){ @Override public void run() { System.out.println("b"); } }; System.out.println("start..."); timer.schedule(task,1000); timer.schedule(task2,1000); //可以想象如果task1执行时间过长,则task已被拖到超过1秒再执行任务,因为任务是串行
使用ScheduledThreadPoolExecutor就不会有上述问题,该线程池表现为:线程数固定,
任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务。
下面是两个使用案例:private static void test3() { //定时执行任务 ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); //每过1秒执行一次任务,如果任务执行时间都要2秒,那么会等待这2秒执行完后再执行 pool.scheduleAtFixedRate(()->{ System.out.println("我最帅!"); },1,1,TimeUnit.SECONDS); //上个任务执行完,等待1秒再次执行任务,如果任务执行时间要2秒,则会等这2秒过后,再等1秒时间后才执行任务 /*pool.scheduleWithFixedDelay(()->{ System.out.println("你最丑"); },1,1,TimeUnit.SECONDS);*/ } private static void test5() { //如何让每周五,早上8点定时执行任务 /* 1,算出现在到周五时间差,如果现在比周五小,就取这周五时间差,如果现在已经过了周五则取下周五的时间差 2,计算一周有多少毫秒 2,创建线程池执行定时任务 */ LocalDateTime now = LocalDateTime.now(); System.out.println(now); LocalDateTime time = now.withHour(8).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.FRIDAY); System.out.println(time); if (now.compareTo(time)>0){ time=time.plusWeeks(1); } //计算出现在距离周五早上8点时间差 long initialDelay = Duration.between(now, time).toMillis(); //2,计算一周的毫秒值 long period = 1000*60*60*24*7; ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); pool.scheduleAtFixedRate(()->{ System.out.println("现在是周五早上8点时间"); }, initialDelay,period,TimeUnit.MILLISECONDS); }
以上就是使用Executors创建的四种线程池,那么在生产中我们常使用哪种线程池呢?
答案:上面四种线程池我们都不应该使用!如果需要使用线程池最好自己new一个ThreadPoolExecutor
而不是通过Executors方法创建,原因如下:FixedThreadPool和SingleThreadExecutor: 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致OOM。 CachedThreadPool和ScheduledThreadPool: 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致OOM。
说白了就是:使用有界队列,控制线程创建数量。
除了避免 OOM 的原因之外,不推荐使用Executors
提供的两种快捷的线程池的原因还有:1,实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。 2,我们应该显示地给我们的线程池命名,这样有助于我们定位问题。
ForkJoinPool
采用分治思想,一步步将任务拆分计算,是stream的底层实现,由于内容众多,有机会接触到了再学习。
AQS基本认识
AQS,全称AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
(就是一套规范,实现它就能自定义锁,不同的实现就是不同的锁)
有很多基于AQS实现的锁,下面是实现AQS机制的体系:
AQS主要构成:
state属性,该属性用来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
//相关api 1. getState - 获取 state 状态 2. setState - 设置 state 状态 3. compareAndSetState - cas 机制设置 state 状态 4. 独占模式是只有一个线程能够访问资源,而共享 模式可以允许多个线程访问资源
提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
上面基于AQS机制实现的锁,大多都是继承Lock类,然后锁的内部类实现AQS抽象类,然后调用内部类的方法
这个锁的内部类,一般叫做同步器,是AQS的具体实现,下面是一个自定义的不可重入锁:
public class Test {
/*
AQS,全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
ReentrantLock底层实现就是AQS,另外,阻塞队列中阻塞的线程,是通过park,unpark阻塞的,
synchronized锁通过调用wait方法
*/
public static void main(String[] args) {
//测试自定义不可重入锁
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
utils.sleep(1);
} finally {
log.debug("unlocking...");
lock.unlock();
}
},"t1").start();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
} finally {
log.debug("unlocking...");
lock.unlock();
}
},"t2").start();
}
}
/*
下面实现一个不可重入的阻塞式锁:使用AbstractQueuedSynchronizer自定义一个同步器来实现自定义锁!
外部类实现Lock接口,内部类继承 AbstractQueuedSynchronizer 抽象类
*/
class MyLock implements Lock{
//独占锁,同步器类
class MySync extends AbstractQueuedSynchronizer{
@Override //获取锁
protected boolean tryAcquire(int arg) {
/*
因为本次实现的是不可重入锁,所以没有使用到arg参数进行计数
*/
if (compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override //释放锁
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0); //state被volatile修饰,这两个语句顺序不能更改,写屏障保证写屏障之前的代码更新到主存中
return true;
}
@Override //判断当前线程是否拥有锁
protected boolean isHeldExclusively() {
return getState()==1;
}
//创建条件变量
public Condition newCondition(){
return new ConditionObject();
}
}
private MySync sync = new MySync();
@Override //加锁,不成功会进入等待队列
public void lock() {
sync.acquire(1);
}
@Override //加锁,不成功进入等待队列后,这个线程可被打断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override //尝试加锁,只会尝试一次
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override //尝试加锁,带超时时间
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override //解锁
public void unlock() {
sync.release(0);
}
@Override //创建条件变量
public Condition newCondition() {
return sync.newCondition();
}
}
以上便是AQS基本认识
AQS原理
上面我们说过,像什么ReentrantLock锁,读写锁都是基于AQS的,并且这些类都有一个内部类
继承AbstractQueuedSynchronizer抽象类,这个类被叫做同步器,各个锁都是调用自身同步器中的方法。
我们只需要研究各个基于AQS实现的锁的同步器,就能明白AQS原理,这里我们使用ReentrantLock来举例说明
加锁过程
//1,开始 ReentrantLock lock = new ReentrantLock(); lock.lock(); //2,ReentrantLock,调用的是同步器中lock方法 public void lock() { sync.lock(); } //3,NonfairSync,默认是非公平锁,该类继承自Sync,Sync继承AQS,这里是重写AQS的lock方法 final void lock() { if (compareAndSetState(0, 1)) //第一次尝试获取锁 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } //4,AQS,if中3个方法很重要,是整个AQS加锁流程 public final void acquire(int arg) { if (!tryAcquire(arg) && //1)再尝试加锁 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //加锁失败创建waiter节点,放入队列 selfInterrupt(); } //5,NonfairSync,子类重写AQS方法,nonfairTryAcquire调用的是父类的方法 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } //6,Sync,该方法总结为:第二次尝试获取锁,判断是否是重入锁,获取不到锁返回false final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { //尝试加锁 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //判断是否为重入锁 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; //这里表明ReentrantLock是可重入锁 } return false; } //7,AQS,第6步如果返回false,则该线程就要被包装成waiter节点并加入队列了 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { //如果队列已经被初始化,则将该节点放在队尾 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); //如果队列还没其他节点,则初始化队列 return node; } //8,AQS,当阻塞队列还未初始化时,进入enq初始化队列,并添加节点 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; //注意:我们的节点是第二位节点,并不是头结点 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } //9,AQS final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { //如果现在的节点是第二个节点,则再尝试获取锁 setHead(node); p.next = null; // help GC //获取锁以后将头节点剔除队列 failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //10,AQS,该方法第一次调用,用于设置现在节点的前一个节点等待状态为-1,第二次再调用,返回true private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { ws>0是节点已经被取消 do { node.prev = pred = pred.prev; //遍历之前的节点,剔除取消的节点 } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //11,AQS private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //节点被阻塞,如果线程打断状态是true,是阻塞不了的 return Thread.interrupted(); } //自此,该线程被阻塞住,它需要等待前一个线程唤醒
解锁过程
//1,开始 lock.unlock(); //2,ReentrantLock public void unlock() { sync.release(1); } //3,AQS public final boolean release(int arg) { if (tryRelease(arg)) { //尝试释放锁 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); //释放成功后,通知队列中第二个节点 return true; } return false; } //4,Sync protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //等于0,则释放锁成功,不等于0,是因为锁重入原因 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } //5,AQS private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //获得头节点的下一个节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //使第二个正在暂停的节点继续运行 LockSupport.unpark(s.thread); }
条件变量过程
await 流程
//1,开始 Condition condition = lock.newCondition();//创建ConditionObject对象 condition.await(); //2,AQS,主要逻辑方法 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //创建一个节点并关联此线程 int savedState = fullyRelease(node);//释放掉此线程的锁 int interruptMode = 0; while (!isOnSyncQueue(node)) { // LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } //3,AQS private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } //4,AQS final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } //5,AQS final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); } //自此线程已经被阻塞park住了
signal流程
//1,开始 condition.signal(); //2,AQS public final void signal() { if (!isHeldExclusively()) //判断当前线程是不是持有锁的线程,不是就报错 throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } //3,Sync protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } //4,AQS,唤醒条件变量中第一个节点 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } //5,AQS final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); //将条件变量的头结点放入阻塞队列中 int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); //唤醒节点 return true; }
公平锁与非公平锁区别
区别在于 tryAcquire 方法的实现,如下所示://非公平锁的获取锁 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } //公平锁获取锁 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && //公平锁在获取锁时会检查等待队列中有没有节点线程 compareAndSetState(0, acquires)) { //让已经等待的线程获取锁,很公平 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
AQS的打断模式
AQS有不可打断模式和可打断模式,这里的打断指的是等待获取锁的线程,即阻塞队列中的线程是否能打断
通常使用lock方法加锁,这种属于不可打断模式,而使用lockInterruptibly方法加锁,则阻塞的线程可被打断在不可打断模式下,如果线程在等待的时候被打断了,该线程依然会留在AQS队列中,一直要等到获得锁后方能得知自己被打断了
在可打断模式下,如果线程在等待的时候被打断了,该线程会立即抛出InterruptedException异常AQS的waitState
阻塞队列中的节点状态默认是0,处于队尾的节点在阻塞之前会将前一个节点的状态改成-1,所以处于队尾的节点状态一直都是0
如果线程在阻塞时被打断了,则该节点的状态大于0,另外处于条件变量中的节点状态全部都是-2
当条件变量中的节点被唤醒后,会被加入到阻塞队列队尾,并将节点状态由-2改成0
以上便是AQS原理的全部内容,在本博客末尾链接中有关于AQS的图解原理。
其他一些锁的使用
这里我们主要介绍一些常用锁的使用,下面前4个都是基于AQS实现的锁:
ReentrantReadWriteLock
读写锁,当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,提高性能。读-写,写-写都是相互互斥的!
注意:1,读写锁中的读锁不支持条件变量 2,重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待,下面这种方式是不行的 r.lock(); try { // ... w.lock(); try { // ... } finally{ w.unlock(); } } finally{ r.unlock(); } 3,重入时降级支持:即持有写锁的情况下去获取读锁 class CachedData { Object data; // 是否有效,如果失效,需要重新计算 data volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // 获取写锁前必须释放读锁 rwl.readLock().unlock(); rwl.writeLock().lock(); try { // 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新 if (!cacheValid) { data = ... cacheValid = true; } // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存 rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); } } // 自己用完数据, 释放读锁 try { use(data); } finally { rwl.readLock().unlock(); } } }
读写锁理解:
读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个,读锁占据state高16位,写锁占据低16位
读写锁与ReentrantLock的主要区别在于,如果是读锁加锁,则节点是共享模式而不是独占模式
ReentrantLock的节点全是独占模式,在唤醒节点时,如果被唤醒的节点是共享模式,并且其下一个节点也是共享模式节点
则会唤醒此节点,以此类推,直到之后的节点不是共享模式就停止了,这就形成了多个共享节点同时运行
也就是多个线程读读并发,只有等这些共享节点全部释放完毕,才能唤醒独占节点,因为读写不能并发。有人可能会问?既然读写锁中读读并发,那为什么还要加读写?
其实看源码就知道,读取线程如果不被阻塞队列管理起来,很可能导致脏读和不可重复读问题。更多图解原理请看最后链接中文件。
Semaphore
信号量,用来限制能同时访问共享资源的线程上限。下面是一个简单的案例:
private static void test1() { /* Semaphore,信号量,用来限制能同时访问共享资源的线程上限 ReentrantLock使用在。同一时刻只允许一个线程访问共享资源 Semaphore使用在,共享资源有多个,我允许多个线程访问共享资源,Semaphore只是希望对访问线程上限做限制 */ //实现效果:每次最多只能有三个线程能访问共享资源 Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 10; i++) { int j=i; new Thread(()->{ try { //获取许可 semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try{ System.out.println("running"); System.out.println(j); TimeUnit.MILLISECONDS.sleep(500); System.out.println("end"); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放许可 semaphore.release(); } }).start(); } }
CountdownLatch
CountDownLatch允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
CountDownLatch是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用countDown方法时,
其实使用了tryReleaseShared
方法以CAS的操作来减少state,直至state为0就代表所有的线程都调用了countDown方法。
当调用await方法的时候,如果state不为0,就代表仍然有线程没有调用countDown方法,那么就把已经
调用过countDown的线程都放入阻塞队列Park,并自旋CAS判断state == 0,直至最后一个线程调用了countDown
使得state == 0,于是阻塞的线程便判断成功,全部往下执行。该工具类用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一
下面是两个简单的案例:private static void test2() { /* CountdownLatch:用来进行线程同步协作,等待所以线程完成倒计时 其中构造参数用来初始化等待计数值,await()用来等待计数归零,countDown()用来让计数减一 */ CountDownLatch countDownLatch = new CountDownLatch(3); new Thread(()->{ System.out.println("线程做自己的操作,t1"); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); //计数减1 },"t1").start(); new Thread(()->{ System.out.println("线程做自己的操作,t2"); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); //计数减1 },"t2").start(); new Thread(()->{ System.out.println("线程做自己的操作,t3"); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } countDownLatch.countDown(); //计数减1 },"t3").start(); try { countDownLatch.await(); //主线程会一直等到计数为0时才会继续运行 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("end..."); } private static void test3() { //线程池和CountdownLatch结合使用 ExecutorService pool = Executors.newFixedThreadPool(3); CountDownLatch countDownLatch = new CountDownLatch(3); AtomicInteger sum = new AtomicInteger(); for (int i = 0; i < 3; i++) { int j = i; pool.submit(()->{ sum.addAndGet(j); countDownLatch.countDown(); }); } try { countDownLatch.await(); pool.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(sum.get()); }
CyclicBarri
CyclicBarri,循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,
每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,
继续执行。跟CountdownLatch一样,但这个可以重用
下面是一个简单的案例:private static void test4() { /* CyclicBarrier,循环栅栏,用来进行线程协作,等待线程满足某个协作, 与CountdownLatch类似,不同的是,CountdownLatch中计数减完后,不能再重新使用,得重新创建对象并指定计数 而CyclicBarrier是,调用了await方法的线程会阻塞,当阻塞线程数量等于构造参数数量时,就会继续向下运行, 而且累加器可以循环使用 */ ExecutorService pool = Executors.newFixedThreadPool(2); CyclicBarrier cyclicBarrier = new CyclicBarrier(2); for (int i = 0; i < 3; i++) { pool.submit(()->{ try { System.out.println("开始任务1"); Thread.sleep(200); cyclicBarrier.await(); //2-1=1,计数减一 System.out.println("完成任务1"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); pool.submit(()->{ try { System.out.println("开始任务2"); Thread.sleep(200); cyclicBarrier.await(); //2-1=1,计数减一 System.out.println("完成任务2"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } pool.shutdown(); }
StampedLock
该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
加解读锁long stamp = lock.readLock(); lock.unlockRead(stamp);
加解写锁
long stamp = lock.writeLock(); lock.unlockWrite(stamp);
乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验
如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。long stamp = lock.tryOptimisticRead(); // 验戳 if(!lock.validate(stamp)){ // 锁升级 }
下面是一个简单的案例:
public class Test { //提供一个数据容器类,内部分别使用读锁保护数据的 read()方法,写锁保护数据的 write()方法 public static void main(String[] args) { DataContainerStamped dataContainer = new DataContainerStamped(1); /** * 可以看到实际没有加读锁 */ new Thread(() -> { dataContainer.read(2); }, "t1").start(); utils.sleep(1); new Thread(() -> { dataContainer.read(0); }, "t2").start(); } } @Slf4j class DataContainerStamped { private int data; final StampedLock lock = new StampedLock(); public DataContainerStamped(int data){ this.data = data; } public int read(int readTime){ long stamp = lock.tryOptimisticRead(); utils.sleep(readTime); log.debug("尝试使用乐观读...{}", stamp); if (lock.validate(stamp)){ log.debug("成功使用了乐观读{},数据 {}", stamp, data); return this.data; } try { // 锁升级 - 读锁 log.debug("乐观读锁升级到写锁加锁 {}", stamp); stamp = lock.readLock(); log.debug("乐观读锁升级到写锁完成 {}", stamp); utils.sleep(readTime); return data; } finally { log.debug("乐观读锁升级到写锁解锁 {}", stamp); lock.unlock(stamp); } } public void write(int data){ long stamp = lock.writeLock(); log.info(" 加上写锁"); try { utils.sleep(1); this.data = data; } finally { lock.unlock(stamp); } } }
注意:StampedLock 不支持条件变量,并且不支持可重入。
CompletableFuture异步编排
现学现卖了,刚开始看到这个异步编排的名词还不知道是什么东西,后来看到代码,我突然意识到,这不就是JS中的Promise嘛
CompletableFuture类实现了CompletionStage和Future接口。Future是Java 5添加的类,用来描述一个异步计算的结果
但是获取一个结果时方法较少,要么通过轮询isDone,确认完成后,调用get()获取值,要么调用get()设置一个超时时间。
但是这个get()方法会阻塞着调用线程,这种阻塞的方式显然和我们的异步编程的初衷相违背。
为了解决这个问题,JDK吸收了guava的设计思想,加入了Future的诸多扩展功能形成了CompletableFuture。
注意:CompletableFuture的命名规则是有规律的
xxx():表示该方法将继续在已有的线程中执行;
xxxAsync():表示将异步在线程池中执行。
如果指定了线程池,会在指定的线程池中执行,如果没有指定,默认会在ForkJoinPool.commonPool()中执行
以下是CompletableFuture的部分API
创建异步任务
//runAsync方法不支持返回值 public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) //supplyAsync可以支持返回值 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) //举个案例 public class CompletableFutureTest { public static void main(String[] args) throws Exception{ ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 2L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3)); CompletableFuture<Integer> future4 = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "\t" + "future4带有返回值"); return 1025; }, executor); System.out.println(future4.get()); //关闭线程池 executor.shutdown(); } }
主动获得结果和触发计算
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return 1; }); //一直会等待下去 //System.out.println(future.get()); //2s后如果没有返回结果就报错 //System.out.println(future.get(2,TimeUnit.SECONDS)); //没有计算完成的情况下,给我一个替代结果,如果主程序执行到getNow,之前的异步还没执行完,则直接返回参数值 //Integer now = future.getNow(3); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } //这里停顿了3s,而我2s后就有结果了,所以可以正常拿到值,输出:false获取到的值是1 //如果这里停顿1s,而我2s后才有结果,那么就不可以正常拿到值,输出:true获取到的值是444 boolean flag = future.complete(444); System.out.println(flag+"获取到的值是"+future.get());
对计算结果进行处理
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return 1; }).thenApply(s -> { System.out.println("-----1"); //如果加上int error=1/0; 由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停 int error=1/0; return s + 1; }).thenApply(s -> { //出现异常时根本不执行此处代码 System.out.println("-----2"); System.out.println("A:"+s); return s + 2; }).whenComplete((v, e) -> { System.out.println("B:"+v); //用来检测出现异常时是否执行此代码,并且v是多少,答案执行,v是null if (e == null) { System.out.println("result-----" + v); } }).exceptionally(e -> { e.printStackTrace(); return null; }); System.out.println(Thread.currentThread().getName() + "\t" + "over...."); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } //如果程序正常,则输出 main over.... -----1 -----2 A:2 B:4 result-----4 //如果程序出现异常,则输出 main over.... -----1 B:null C java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
对计算结果进行消费
CompletableFuture.supplyAsync(() -> { return 1; }).thenApply(f -> { return f+2; }).thenApply(f -> { return f+3; }).thenAccept(r -> System.out.println(r)); // 任务A执行完执行B,并且B不需要A的结果 System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join()); // 任务A执行完成执行B,B需要A的结果,但是任务B无返回值 System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join()); // 任务A执行完成执行B,B需要A的结果,同时任务B有返回值 System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
对计算速度进行选用(applyToEither)
//下面这个在第一个中停留1s,在第二种停留2s,返回的结果是1 System.out.println(CompletableFuture.supplyAsync(() -> { //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return 1; }).applyToEither(CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return 2; }), r -> { return r; }).join()); //暂停几秒钟线程 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
对计算结果进行合并
//两个CompletionStage任务都完成后,最终把两个任务的结果一起交给thenCombine来处理 //先完成的先等着,等待其他分支任务 System.out.println(CompletableFuture.supplyAsync(() -> { return 10; }).thenCombine(CompletableFuture.supplyAsync(() -> { return 20; }), (r1, r2) -> { return r1 + r2; }).thenCombine(CompletableFuture.supplyAsync(() -> { return 30; }), (r3, r4) -> { return r3 + r4; }).join()); System.out.println(CompletableFuture.supplyAsync(() -> { return 10; }).thenCombine(CompletableFuture.supplyAsync(() -> { return 20; }), (r1, r2) -> { return r1 + r2; }).join());
注意:如果执行XXXAsync方法,任务会交给默认线程池去执行,默认线程池中的线程都是守护线程
主线程一旦结束,线程池中线程都需要结束,所以可能导致一种情况:主线程执行完后,线程池中的任务还未执行完就退出了。
最后是别人总结的API图片:
ThreadLocal
ThreadLocal的认识
什么是ThreadLocal,ThreadLocal是线程安全的另一种思路,想想看,线程安全产生的原因是因为多个线程
共享一个共享变量,那如果我们每个线程都使用自己的变量,也就没有共享变量了,那么线程也就不存在安全问题了
具体定义如下:从Java官方文档中的描述:ThreadLocal类用来提供线程内部的局部变量。这种变量在多线程环境下访问 (通过get和set方法访问)时能保证各个线程的变量相对独立于其他线程内的变量。 ThreadLocal实例通常来说都是private static类型的,用于关联线程和线程上下文。 说直白点:ThreadLocal就是提供给每个线程操作变量的工具类,做到了线程之间的变量隔离目的 ThreadLocal,也叫线程变量
ThreadLocal与锁实现线程安全的角度是不一样的,它们适用的场景不同,下面是两者的区别
synchronized 原理:同步机制采用’以时间换空间’的方式, 只提供了一份变量,让不同的线程排队访问 侧重点:多个线程之间访问资源的同步性 ThreadLocal 原理:ThreadLocal采用’以空间换时间’的方式, 为每一个线程都提供了一份变量的副本,从而实现同时访问而相不干扰 侧重点:多线程中让每个线程之间的数据相互隔离
ThreadLocal的基本使用
//没有ThreadLocal的场景 public class Test1 { public static void main(String[] args) { test1("xxx"); } private static void test1(String userId){ test2(userId); } private static void test2(String userId){ test3(userId); } private static void test3(String userId){ System.out.println(userId); } } /* 上面我们可以看到,在一个线程生命周期内调用了好几个方法,当我们想传递一个参数在其他地方使用时 不得不将每个方法都添加参数,并且逐层传递,例如我只需要在test3拿到userid,因为调用关系 不得不给test1,test2也添加参数进行传递,那ThreadLocal就可以解决这种问题, 因为这些方法调用全部是在一个线程生命周期内,如果我把值保存在线程内部,那么只要我线程还没结束 我就可以在任意地方获取这个值,而不需要像这种逐层传递参数方式,下面是使用ThreadLocal的方式 */ public class Test1 { private static final ThreadLocal<String> local = new ThreadLocal<>(); public static void main(String[] args) { local.set("username"); test1("xxx"); } private static void test1(String userId){ test2(userId); } private static void test2(String userId){ test3(userId); } private static void test3(String userId){ System.out.println(userId); System.out.println(local.get()); local.remove(); } } /* 我们只需要把值存到线程中,然后在需要的地方取出来就行,那么你能有两个疑问 1,我看到明明是把值存到ThreadLocal中,为什么说存到了线程中呢? 2,为什么在最后还需要加上local.remove();语句呢?这么有什么作用吗? 不急,我们接下来慢慢讲 */
ThreadLocal原理
让我们回到上面的问题,我们明明把值存到Thread中,为什么说是存到线程中,为什么可以在线程生命周期内获取到值呢?
这不得不说下线程了,Thread类中有两个属性,一个是threadLocals,一个是inheritableThreadLocals
这两个都是变量名,而他们的变量类型是ThreadLocalMap,但是和Map结构又不相同
ThreadLocalMap有一个内部类,名为Entry,它是Map结构,key存的是ThreadLocal,Value存的是我们的值而ThreadLocalMap本身结构是Entry数组,可以想象,一个ThreadLocalMap中可以有多个Entry,
而每个entry中存放不同的ThreadLocal和值,我们创建的ThreadLocal对象调用set后,其内部拿到本线程的
ThreadLocalMap,然后将ThreadLocal对象和我们存的值保存成entry,放入ThreadLocalMap中
当我们需要值得时候,直接获取当前线程的ThreadLocalMap,根据ThreadLocal对象获取值就行了
下面是ThreadLocal的方法解析:/** * 返回当前线程中保存ThreadLocal的值 * 如果当前线程没有此ThreadLocal变量, * 则它会通过调用{@link #initialValue} 方法进行初始化值 * * @return 返回当前线程对应此ThreadLocal的值 */ public T get() { // 获取当前线程对象 Thread t = Thread.currentThread(); // 获取此线程对象中维护的ThreadLocalMap对象 ThreadLocalMap map = getMap(t); // 如果此map存在 if (map != null) { // 以当前的ThreadLocal为key,调用getEntry获取对应的存储实体e ThreadLocalMap.Entry e = map.getEntry(this); // 找到对应的存储实体 e if (e != null) { @SuppressWarnings("unchecked") // 获取存储实体 e 对应的 value值 // 即为我们想要的当前线程对应此ThreadLocal的值 T result = (T)e.value; return result; } } // 如果map不存在,则证明此线程没有维护的ThreadLocalMap对象 // 调用setInitialValue进行初始化 return setInitialValue(); } /** * set的变样实现,用于初始化值initialValue, * 用于代替防止用户重写set()方法 * * @return the initial value 初始化后的值 */ private T setInitialValue() { // 调用initialValue获取初始化的值 T value = initialValue(); // 获取当前线程对象 Thread t = Thread.currentThread(); // 获取此线程对象中维护的ThreadLocalMap对象 ThreadLocalMap map = getMap(t); // 如果此map存在 if (map != null) // 存在则调用map.set设置此实体entry map.set(this, value); else // 1)当前线程Thread 不存在ThreadLocalMap对象 // 2)则调用createMap进行ThreadLocalMap对象的初始化 // 3)并将此实体entry作为第一个值存放至ThreadLocalMap中 createMap(t, value); // 返回设置的值value return value; } /** * 获取当前线程Thread对应维护的ThreadLocalMap * * @param t the current thread 当前线程 * @return the map 对应维护的ThreadLocalMap */ ThreadLocalMap getMap(Thread t) { return t.threadLocals; } /** *创建当前线程Thread对应维护的ThreadLocalMap * * @param t 当前线程 * @param firstValue 存放到map中第一个entry的值 */ void createMap(Thread t, T firstValue) { //这里的this是调用此方法的threadLocal t.threadLocals = new ThreadLocalMap(this, firstValue); }
下面是ThreadLocal,Thread,ThreadLocalMap关系图
看到上面的结构图,你可能已经意识到了,我们如果想保存一个值就需要创建一个ThreadLocal对象 如果想保存多个值,就需要创建多个ThreadLocal对象,你一个ThreadLocal调用多次set方法是没用的,最终会被覆盖掉。 OK,现在你弄清楚了ThreadLocal的原理了,但是你可能还有疑问上面图中为什么Entry对象的key是弱引用,什么是弱引用?
Java中四大引用类型
Java中的引用类型分为强软弱虚四种引用
强引用,就是我们平时使用Object obj = new Object(); //obj指向堆中对象,只要obj还指向,那么gc线程就不能回收这个对象
软引用,GC线程发现堆内存不够时才会回收软引用指向的对象
public class R2_SoftReference { public static void main(String[] args) { SoftReference<byte[]> soft = new SoftReference<>(new byte[1024 * 1024 * 10]);//10M System.out.println(soft.get()); //gc回收 System.gc(); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(soft.get()); //再分配一个数组,好heap(堆)放不下, 这个时候系统会回收一次, 如果不够,会把软引用回收 byte[] bytes = new byte[1024 * 1024 * 15]; System.out.println(soft.get()); } } //结果: [B@1540e19d [B@1540e19d null //测试之前要设置虚拟机参数 -Xmx30M 堆内存最大30M 用于测试
弱引用,遇到GC线程就会回收对象
public class R3_WeakReference { public static void main(String[] args) { WeakReference<C> weak = new WeakReference<>(new C()); System.out.println(weak.get()); //gc回收 System.gc(); //遇到GC就会被回收 System.out.println(weak.get()); } } 结果: com.cz.reference.C@3c679bde null finalize
虚引用,看见就回收, get不到值,一直为null
public class R4_PhantomReference { private static final List<Object> LIST = new LinkedList<>(); private static final ReferenceQueue QUEUE = new ReferenceQueue(); public static void main(String[] args) { PhantomReference<C> phantomReference = new PhantomReference<>(new C(),QUEUE); new Thread(() -> { while (true){ LIST.add(new byte[1024*1024]); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } System.out.println(phantomReference.get()); } }).start(); new Thread(() -> { while (true){ Reference<? extends C> poll = QUEUE.poll(); if (poll != null){ System.out.println("-----虚引用对象被JVm回收了--------" + poll); return; } } }).start(); } } 结果: null null finalize null null
更多的引用知识点请看下面的文章,这么不再深究,我们只需要知道,ThreadLocalMap的Entry中的key是弱引用即可
ThreadLocalMap的缺陷
ThreadLocalMap中Entry的key使用强引用会怎样?ThreadLocalMap中Entry的key使用弱引用会怎样?
从上面我们可以看到,不管是使用强引用还是使用弱引用都有可能造成内存泄漏,这和强弱引用没关系,那内存泄漏跟什么有关呢?
OK,既然强弱引用都可以,为什么一定要用弱引用呢? 因为,在ThreadLocalMap中的set/getEntry方法中,会对key为null (也即是ThreadLocal为null)进行判断 如果为null的话,那么是会对value置为null的。 这就意味着使用完ThreadLocal , CurrentThread依然运行的前提下,就算忘记调用remove方法 弱引用比强引用可以多一层保障:弱引用的ThreadLocal会被回收,对应的value在下一次ThreadLocalMap 调用set,get,remove中的任一方法的时候会被清除,从而避免内存泄漏。
InheritableThreadLocal的认识
让我们先来看个案例:
public class Test1 { private static final ThreadLocal<String> local = new ThreadLocal<>(); public static void main(String[] args) { local.set("username"); new Thread(()->{ System.out.println(local.get()); local.remove(); }).start(); //主线程等待输出结果 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } /* 输出结果为null,为什么呢?其实也很好解释,你的username保存在main线程的ThreadLocalMap中 你在新创建的线程中肯定获取不了username啊,那么如何才能让我在新线程中获取到username呢? 这就要使用InheritableThreadLocal了 */ //将ThreadLocal对象替换为InheritableThreadLocal对象 private static final InheritableThreadLocal<String> local = new InheritableThreadLocal<>(); //其他都不需要更改,你就会发现就能拿到username了
那么怎么样拿到其他线程的数据的呢?我们看InheritableThreadLocal源码找找看
ThreadLocalMap getMap(Thread t) { // 仅仅是将获取map变为从线程中获取inheritableThreadLocals变量 return t.inheritableThreadLocals; } void createMap(Thread t, T firstValue) { //仅仅是将赋值改为设置到线程的inheritableThreadLocals 变量 t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); } /* 这里跟ThreadLocal几乎一模一样,不过这个map获取的是线程的inheritableThreadLocals变量 而ThreadLocal获取的是线程的ThreadLocals变量而已,所以实现线程之间数据传递,不是由 inheritableThreadLocals类决定的,而是由Thread类决定的 */
让我们继续看看线程类中源码部分:
//我测试代码中使用的线程的构造方法 public Thread(Runnable target, String name) { //核心是init方法,其他构造方法也是调用init方法进行线程的初始化 init(null, target, name, 0); } private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) { if (name == null) {//由此可见name是必须设置,默认是thread-内部维护的自增方法 //此处就不发散开了 throw new NullPointerException("name cannot be null"); } this.name = name; //将当前线程设置为新线程的父线程 Thread parent = currentThread(); //省略代码---- //初始时沿用父线程的守护线程属性 this.daemon = parent.isDaemon(); //初始时沿用父线程的优先级 this.priority = parent.getPriority(); //上下文类加载器的设置,这个可以写一个关于类加载器的文章来具体介绍,此处 //就不发散了 if (security == null || isCCLOverridden(parent.getClass())) this.contextClassLoader = parent.getContextClassLoader(); else this.contextClassLoader = parent.contextClassLoader; this.inheritedAccessControlContext = acc != null ? acc : AccessController.getContext(); //构造方法中传入了Runnable接口就有,否则为null,为null则调用在即的run方法 this.target = target; setPriority(priority); //#####此处是关键之处####### //默认inheritThreadLocals =true,那么就关心父线程的inheritableThreadLocals 变量了 //由InheritableThreadLocal重写的两个方法可以看出,父线程如果使用其设置了上下文变量 //那么parent.inheritableThreadLocals是有值得 if (inheritThreadLocals && parent.inheritableThreadLocals != null) // 将父线程的变量遍历到子线程的inheritableThreadLocals 变量中 //从而实现了新开线程也能获取到父线程设置的变量值了, //而且从该方法可以看出,线程的儿子可以得到,线程的孙子也能通过同样的方法获取到 //该过程是自上而下传递的 //#####此处是关键之处######## this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); /* Stash the specified stack size in case the VM cares */ this.stackSize = stackSize; /* Set thread ID */ tid = nextThreadID(); }
我们可以看到,新线程在创建的时候,会将原来线程的inheritableThreadLocals变量的数据拷贝到新线程中
所以新线程中可以访问到其他线程的数据。InheritableThreadLocal的缺陷
我们知道InheritableThreadLocal已经很优秀了,能在线程之间传递数据,它还存在缺陷吗?请看下面的案例:public class Test1 { private static final InheritableThreadLocal<String> local = new InheritableThreadLocal<>(); public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { new Thread(() -> { //创建10个线程,每个线程中存放 local.set(Thread.currentThread().getName()); //使用线程池中获取值 pool.execute(()->{ System.out.println(local.get()); local.remove(); }); }, "threadName-" + i).start(); } //等待上述操作执行完毕 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } pool.shutdown(); } } //输出结果 threadName-0 null null null null null threadName-2 null threadName-1 null
上述代码含义:创建10个线程,每个线程中都有一个InheritableThreadLocal对象,保存着一个entry
我想在数据库连接池中访问这10个线程中的每一个数据,并输出出来,但是结果显然不是我想要的,
问题出在哪里呢?因为线程池是复用的啊,线程池3个线程创建出来后就不再创建了,当更多任务提交后
也是用这已经创建好的线程,然后InheritableThreadLocal的数据交换,只发生在线程创建的时候啊
三个线程创建完毕后,还是原来的三个线程执行local.get(),这个时候获取的数据都是旧数据(如果你没remove的话)
那么上面的问题该如何解决呢?阿里巴巴的TransmittableThreadLocal横空出世TransmittableThreadLocal
先来看下TransmittableThreadLocal如何解决上面的问题的,我们导入该maven依赖,代码更改后如下public class Test { //1,使用TransmittableThreadLocal代替InheritableThreadLocal private static final TransmittableThreadLocal<String> local = new TransmittableThreadLocal<>(); public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { new Thread(() -> { local.set(Thread.currentThread().getName()); //2,使用TtlRunnable.get方法包装任务 pool.execute(TtlRunnable.get(()->{ System.out.println(local.get()); local.remove(); })); }, "threadName-" + i).start(); } //等待上述操作执行完毕 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } pool.shutdown(); } } //输出结果 threadName-3 threadName-1 threadName-4 threadName-5 threadName-6 threadName-7 threadName-9 threadName-2 threadName-8 threadName-0
那么TransmittableThreadLocal是如何做到在复用的线程中也能获取到值呢?
首先,从使用上来看,不管是修饰Runnable还是修饰线程池,本质都是将Runnable增强为TtlRunnable。
而从实现线程变量传递的原理上来看,TTL做的实际上就是将原本与Thread绑定的线程变量,
缓存一份到TtlRunnable对象中,在执行子线程任务前,将对象中缓存的变量值设置到子线程的ThreadLocal
中以供run()方法的代码使用,然后执行完后,又恢复现场,保证不会对复用线程产生影响。更多TransmittableThreadLocal原理内容请看下面链接
文章及资料