ZB-029-03线程池和Callable和Future

线程池与Callable/Future

什么是线程池

  • 线程是昂贵的(Java线程模型的缺陷)
    • 一个线程就是一个工人,每开一个线程意味着 你的JVM里就一个工人开始执行你的代码
    • 生活里公司里人员流动的成本是很高的,你要重新培养它。线程也是一样
    • java的线程和操作系统的线程相绑定,java的线程调度依赖于操作系统的线程调度,因此很昂贵,随意你不能随心所欲的在操作系统里开线程
  • 线程池是预先定义好的若⼲个线程
  • Java中的线程池

    Callable/Future

  • 类⽐Runnable,Callable可以返回值,抛出异常

  • Future代表⼀个“未来才会返回的结果”
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import java.util.concurrent.*;

public class WordCount {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建固定数量的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);

//new Callable<T>(){...} T代表返回值

// 返回数字
Future<Integer> future1 = threadPool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 0;
}
});

// 返回字符串
Future<String> future2 = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "abc";
}
});

// 抛出异常
Future<Object> future3 = threadPool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
throw new RuntimeException();
}
});

/*
submit 和 new Thread().start(); 一样会立刻执行,是异步的 不会阻塞当前线程的执行
submit 的返回值是个 Future
threadPool.submit( ... );


Future 代表一个未来
*/

// 它会等这个结果回来打印
System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());

// 你等不及了,取消一个
// future2.cancel(100);

}
}

实战:多线程的WordCount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.github.hcsp.multithread;

import java.io.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

public class WordCount {
private final int threadNum;
private ExecutorService threadPool;
public WordCount(int threadNum) {
threadPool = Executors.newFixedThreadPool(threadNum);
this.threadNum = threadNum;
}

public static void main(String[] args) {

}

// 统计文件中各单词的数量
public Map<String, Integer> count(File file) throws FileNotFoundException, ExecutionException, InterruptedException {
BufferedReader reader = new BufferedReader(new FileReader(file));

List<Future<Map<String,Integer>>> futures = new ArrayList<>();

// 开多个线程,每个线程读取文件的一行内容,并将其中的单词统计结果返回
// 最后,主线程将 工作线程返回的结果汇总在一起
for (int i = 0; i < threadNum; i++) {
futures.add(threadPool.submit(new WorkerJob(reader)));
}

// 合并
Map<String,Integer> finalResult = new HashMap<>();
for (Future<Map<String,Integer>> future:futures) {
Map<String,Integer> resultFromWorker = future.get();
mergeWorkerResultIntoFinalResult(resultFromWorker,finalResult);
}


return finalResult;
}

private void mergeWorkerResultIntoFinalResult(Map<String, Integer> resultFromWorker,
Map<String, Integer> finalResult) {
for (Map.Entry<String,Integer> entry: resultFromWorker.entrySet()) {
String word = entry.getKey();
int mergedResult = finalResult.getOrDefault(word,0) + entry.getValue();
finalResult.put(word,mergedResult);
}
}

static class WorkerJob implements Callable<Map<String ,Integer>> {
private BufferedReader reader;

public WorkerJob(BufferedReader reader) {
this.reader = reader;
}

@Override
public Map<String, Integer> call() throws Exception {
String line = null;
Map<String,Integer> result = new HashMap<>();
while ((line=reader.readLine())!= null){
String[] words = line.split(" ");
for (String word: words) {
result.put(word,result.getOrDefault(word,0) + 1);
}
}
return result;
}
}
}