Java 并发编程的艺术第2版学习笔记
原书:《Java 并发编程的艺术第2版》 | 作者:方腾飞 魏鹏 程晓明 | 2023 年 9 月 | 机械工业出版社
1.4.3 线程池技术及其示例 | 27 页
核心组件
任务队列「jobs」
存放执行任务对象
工作队列「workers」
纯线程池:存放工作线程对象,创建好的线程都放在这里。
常用参数
- 线程池默认的数量
- 线程池最大限制数
- 线程池最小的数量
- 工作队列「workers」存放线程的数量
工作原理
- 工作线程「Worker」在
while
循环中获取任务队列「jobs」中的 job,获取不到时使用 jobs.wait()
方法等待 - 当任务队列「jobs」中添加新任务后,会调用
jobs.notify()
方法唤醒工作线程「Worker」 - 工作线程「Worker」从任务队列「jobs」中获取到任务后执行任务
- 任务执行完毕后,开始新一轮循环:从第 1 步开始
示例代码
ThreadPool
1
2
3
4
5
6
7
8
9
10
11
12
| public interface ThreadPool<Job extends Runnable> {
void execute(Job job);
void shutdown();
void addWorkers(int num);
void removeWorkers(int num);
int getJobSize();
}
|
DefaultThreadPool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
| public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
// 线程池最大限制数
private static final int MAX_WORKER_NUMBERS = 10;
// 线程池默认的数量
private static final int DEFAULT_WORKER_NUMBERS = 5;
// 线程池最小的数量
private static final int MIN_WORKER_NUMBERS = 1;
// 这是一个工作列表,将会向里面插入工作
private final LinkedList<Job> jobs = new LinkedList<>();
// 工作者列表
private final LinkedList<Worker> workers = new LinkedList<>();
// 线程编号生成
private final AtomicLong threadNum = new AtomicLong();
// 工作者线程的数量
private int workerNum = DEFAULT_WORKER_NUMBERS;
public DefaultThreadPool() {
initializeWorkers(DEFAULT_WORKER_NUMBERS);
}
public DefaultThreadPool(int num) {
workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
initializeWorkers(workerNum);
}
@Override
public void execute(Job job) {
if (job != null) {
synchronized (jobs) {
jobs.addLast(job);
jobs.notify();
}
}
}
@Override
public void shutdown() {
removeWorkers(workerNum);
}
@Override
public synchronized void addWorkers(int num) {
// 限制新增的 Worker 数量不能超过最大值
if (num + this.workerNum > MAX_WORKER_NUMBERS) {
num = MAX_WORKER_NUMBERS - this.workerNum;
}
initializeWorkers(num);
this.workerNum += num;
}
@Override
public synchronized void removeWorkers(int num) {
if (num >= this.workerNum) {
throw new IllegalArgumentException("beyond workNum");
}
// 按照给定的数量停止 Worker
int count = 0;
while (count < num) {
workers.removeFirst().shutdown();
count++;
}
this.workerNum -= count;
}
@Override
public int getJobSize() {
return jobs.size();
}
private void initializeWorkers(int num) {
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
thread.start();
}
}
class Worker implements Runnable {
private volatile boolean running = true;
@Override
public void run() {
while (running) {
Job job = null;
synchronized (jobs) {
// 如果工作列表是空的,那么就 wait
while (jobs.isEmpty()) {
try {
jobs.wait();
} catch (InterruptedException e) {
// 感知到外部对 WorkerThread 的中断操作,返回
Thread.currentThread().interrupt();
return;
}
}
// 取出一个 Job
job = jobs.removeFirst();
}
if (job != null) {
try {
job.run();
} catch (Exception e) {
// 忽略 Job 执行中的 Exception
e.printStackTrace();
}
}
}
}
public void shutdown() {
running = false;
}
}
}
|