前言:

本文内容:同步队列SynchronousQueue、池化技术及线程池使用、7大参数及自定义线程池

推荐免费JUC并发编程视频:【狂神说Java】JUC并发编程最新版通俗易懂_哔哩哔哩_bilibili

同步队列SynchronousQueue

没有容量

进去一个元素,必须等待取出之后,才能再放入下一个元素!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.jokerdig.bq;

import java.sql.Time;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
* @author Joker大雄
* @data 2022/8/23 - 10:55
**/
// 同步队列 SynchronousQueue不存储元素
public class SynchronousQueueDemo {
public static void main(String[] args) {
SynchronousQueue<String> blockQueue = new SynchronousQueue<>();

new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"put 1");
blockQueue.put("1");
System.out.println(Thread.currentThread().getName()+"put 2");
blockQueue.put("2");
System.out.println(Thread.currentThread().getName()+"put 3");
blockQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();


new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"->"+blockQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"->"+blockQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"->"+blockQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}

运行结果

1
2
3
4
5
6
7
8
T1put 1
T2->1
T1put 2
T2->2
T1put 3
T2->3

Process finished with exit code 0

池化技术及线程池使用

线程池常考:三大方法、七大参数、四种拒绝策略

池化技术

程序的运行,本质:占用系统的资源,优化资源的使用!-> 池化技术

池化技术:把一些资源预先分配好,组织到对象池中,之后的业务使用资源从对象池中获取,使用完后放回到对象池中。

线程池的优势

  1. 降低资源的消耗
  2. 提高相应的速度
  3. 方便管理

线程复用,可以控制最大并发数,管理线程

线程池三大方法

Executors.newSingleThreadExecutor()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.jokerdig.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author Joker大雄
* @data 2022/8/23 - 11:11
**/
// Executors 工具类 3大方法
// 使用了线程池之后,要使用线程池来创建线程
public class Demo01 {
public static void main(String[] args) {
ExecutorService threadExecutor = Executors.newSingleThreadExecutor();// 单个线程

try {
for (int i = 0; i < 10; i++) {
threadExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 线程池使用完,程序结束,关闭线程池
threadExecutor.shutdown();
}
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok

Process finished with exit code 0

Executors.newFixedThreadPool(线程数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.jokerdig.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author Joker大雄
* @data 2022/8/23 - 11:11
**/
// Executors 工具类 3大方法
// 使用了线程池之后,要使用线程池来创建线程
public class Demo01 {
public static void main(String[] args) {
ExecutorService threadExecutor = Executors.newFixedThreadPool(3); // 创建固定线程池大小
try {
for (int i = 0; i < 10; i++) {
threadExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 线程池使用完,程序结束,关闭线程池
threadExecutor.shutdown();
}
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
pool-1-thread-3 ok
pool-1-thread-2 ok
pool-1-thread-1 ok
pool-1-thread-2 ok
pool-1-thread-3 ok
pool-1-thread-2 ok
pool-1-thread-1 ok
pool-1-thread-2 ok
pool-1-thread-3 ok
pool-1-thread-1 ok

Process finished with exit code 0

Executors.newCachedThreadPool()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.jokerdig.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author Joker大雄
* @data 2022/8/23 - 11:11
**/
// Executors 工具类 3大方法
// 使用了线程池之后,要使用线程池来创建线程
public class Demo01 {
public static void main(String[] args) {
ExecutorService threadExecutor = Executors.newCachedThreadPool(); // 可伸缩的

try {
for (int i = 0; i < 10; i++) {
threadExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 线程池使用完,程序结束,关闭线程池
threadExecutor.shutdown();
}
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
pool-1-thread-4 ok
pool-1-thread-3 ok
pool-1-thread-1 ok
pool-1-thread-7 ok
pool-1-thread-8 ok
pool-1-thread-2 ok
pool-1-thread-5 ok
pool-1-thread-10 ok
pool-1-thread-9 ok
pool-1-thread-6 ok

Process finished with exit code 0

7大参数及自定义线程池

源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// newFixedThreadPool()
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

开启线程池调用:ThreadPoolExecutor

ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 七大参数
public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小
int maximumPoolSize,// 最大核心线程池大小
long keepAliveTime,// 超时无人调用就释放
TimeUnit unit,// 超时单位
BlockingQueue<Runnable> workQueue,// 阻塞队列
ThreadFactory threadFactory,// 线程工厂
RejectedExecutionHandler handler// 拒绝策略) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样能够更加明确线程池的运行规则,规避资源耗尽的风险。

Executors创建可能会堆积大量的请求,从而导致OOM

手动创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.jokerdig.pool;

import java.util.concurrent.*;

/**
* @author Joker大雄
* @data 2022/8/23 - 11:39
**/
// 手动创建线程池
public class Demo02 {
public static void main(String[] args) {
/*
模拟银行业务:
默认开启2个窗口
最大有5个窗口
候客区有3个座位
*/
ExecutorService threadExecutor = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()); // 银行满员,还有人进入,不处理这个人,抛出异常
try {
// 最大承载 队列+max值->3+8=8人 超过就会被拒绝策略接收,从而抛出异常
for (int i = 1; i <= 8; i++) {
threadExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 线程池使用完,程序结束,关闭线程池
threadExecutor.shutdown();
}
}

}

运行结果

i <=5 只有两个线程执行

1
2
3
4
5
6
7
pool-1-thread-2 ok
pool-1-thread-1 ok
pool-1-thread-2 ok
pool-1-thread-1 ok
pool-1-thread-2 ok

Process finished with exit code 0

i <=6 第三个线程开始执行

1
2
3
4
5
6
7
8
pool-1-thread-1 ok
pool-1-thread-3 ok
pool-1-thread-2 ok
pool-1-thread-3 ok
pool-1-thread-1 ok
pool-1-thread-2 ok

Process finished with exit code 0

i <=8 五个线程都在执行

1
2
3
4
5
6
7
8
9
10
pool-1-thread-1 ok
pool-1-thread-5 ok
pool-1-thread-2 ok
pool-1-thread-3 ok
pool-1-thread-4 ok
pool-1-thread-2 ok
pool-1-thread-5 ok
pool-1-thread-1 ok

Process finished with exit code 0

四种拒绝策略

简单测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.jokerdig.pool;

import java.util.concurrent.*;

/**
* @author Joker大雄
* @data 2022/8/23 - 11:39
**/
// 手动创建线程池
public class Demo02 {
public static void main(String[] args) {
/*
模拟银行业务:
默认开启2个窗口
最大有5个窗口
候客区有3个座位
*/
ExecutorService threadExecutor = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
// new ThreadPoolExecutor.AbortPolicy()); // 队列满了,不处理这任务,抛出异常
// new ThreadPoolExecutor.CallerRunsPolicy()); // 哪来的去哪
// new ThreadPoolExecutor.DiscardPolicy()); // 队列满了,丢掉任务,不会抛出异常
new ThreadPoolExecutor.DiscardOldestPolicy()); // 队列满了,尝试和最早的竞争,不会抛出异常
try {
// 最大承载 队列+max值->3+8=8人 超过就会被拒绝策略接收,从而抛出异常
for (int i = 1; i <=9; i++) {
threadExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 线程池使用完,程序结束,关闭线程池
threadExecutor.shutdown();
}
}
}

AbortPolicy() 队列满了,不处理这个任务,抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
pool-1-thread-1 ok
pool-1-thread-5 ok
pool-1-thread-1 ok
pool-1-thread-4 ok
pool-1-thread-3 ok
pool-1-thread-2 ok
pool-1-thread-1 ok
pool-1-thread-5 ok
java.util.concurrent.RejectedExecutionException: Task com.jokerdig.pool.Demo02$$Lambda$1/1831932724@7699a589 rejected from java.util.concurrent.ThreadPoolExecutor@58372a00[Running, pool size = 5, active threads = 3, queued tasks = 0, completed tasks = 5]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.jokerdig.pool.Demo02.main(Demo02.java:30)

Process finished with exit code 0

CallerRunsPolicy() 哪来的去哪

1
2
3
4
5
6
7
8
9
10
11
pool-1-thread-1 ok
pool-1-thread-4 ok
main ok
pool-1-thread-3 ok
pool-1-thread-2 ok
pool-1-thread-3 ok
pool-1-thread-4 ok
pool-1-thread-1 ok
pool-1-thread-5 ok

Process finished with exit code 0

DiscardPolicy() 队列满了,丢掉任务,不会抛出异常

1
2
3
4
5
6
7
8
9
10
pool-1-thread-2 ok
pool-1-thread-4 ok
pool-1-thread-3 ok
pool-1-thread-3 ok
pool-1-thread-4 ok
pool-1-thread-1 ok
pool-1-thread-2 ok
pool-1-thread-5 ok

Process finished with exit code 0

DiscardOldestPolicy() 队列满了,尝试和最早的竞争,不会抛出异常

1
2
3
4
5
6
7
8
9
10
pool-1-thread-2 ok
pool-1-thread-5 ok
pool-1-thread-4 ok
pool-1-thread-3 ok
pool-1-thread-1 ok
pool-1-thread-5 ok
pool-1-thread-4 ok
pool-1-thread-2 ok

Process finished with exit code 0