0%

Java|Java多线程案例

Multithreading | threading-memes, multithreading-memes | ProgrammerHumor.io

卷!

线程的创建及状态

1.线程的生命周期

Java 线程的状态

2.创建一个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.concurrent.TimeUnit;

public class ThreadStateTest {
public static void main(String[] args) throws InterruptedException {
// 创建一个线程对象
Thread thread = new Thread();
System.out.println("1- " + thread.getState());
// 启动线程
thread.start();
System.out.println("2- " + thread.getState());

// 主线程睡眠等待线程执行结束
TimeUnit.SECONDS.sleep(1);

System.out.println("3- " + thread.getState());


}

}

3.创建有任务线程

创建有任务线程有以下三种方式:

  • 重写Thread的run()方法
  • 实现Runnable接口,在创建Thread对象时传入
  • 使用FutureTask,在创建Thread对象的时候传入
重写run()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class CreateThreadRun {
public static void main(String[] args) {
Thread thread = new Thread(){
// 重写run方法
@Override
public void run(){
System.out.println("一个子线程任务");
}

};

thread.start();
System.out.println("main结束");

}

}

Runnable实现

Runnable是一个函数式接口,可以作为Thread构造函数的参数

image-20220226091533448

image-20220226091616762

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CreateThreadRunnable {
public static void main(String[] args) {
Thread thread = new Thread(
// lambda表达式可以为函数式接口创建匿名对象(类型是Runnable)
() -> System.out.println("一个子线程任务")
);

thread.start();

System.out.println("main结束");

}

}

FutureTask实现

上面两者方式中,子线程没有返回值也没有抛出异常(这是由其中的run方法决定的),所以主线程对子线程的状况一无所知,而接下来的FutureTask可以解决这两个问题

image-20220226092833201

image-20220226092625296

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CreateThreadFutureTask {
public static void main(String[] args) {
// 实现Callable接口,写入任务,放入FutureTask中
Callable<String> callable = () ->{
System.out.println("一个子线程任务");
return "sub task done";
};

// 构造FutureTask类(其实质是Runnable的一个实现)
FutureTask<String> task = new FutureTask<>(callable);
Thread thread = new Thread(task);
thread.start();
System.out.println("子线程启动");

// 主线程获得子线程返回值并捕捉异常
try{
// get()获得子线程返回值
String subResult = task.get();
System.out.println("子线程返回值:" + subResult);
}
catch (InterruptedException e){
e.printStackTrace();
}
catch (ExecutionException e){
// getCause获得子线程发生的异常
Throwable cause = e.getCause();
e.printStackTrace();
}

System.out.println("main结束");
}

}

主线程调用get方法后,会不断询问子线程的情况,直到子线程执行完毕。FutureTask适用于多个任务需要同时完成的情况(主线程做完自己的任务后,一直等待子线程直到它完成任务,然后一起结束)

CompleteFuture

CompleteFuture和FutureTask都是Future接口的实现类,但是CompleteFuture的使用姿势更加优雅。建议及时抛弃FutureTask,拥抱CompleteFuture🥰

1.必备工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.StringJoiner;

// 线程小工具
public class SmallTool {
// 让当前线程睡眠n毫秒
public static void sleepMillis(long millis){
try{
Thread.sleep(millis);
}
catch (InterruptedException e){
e.printStackTrace();
}
}

// 打印当前时间戳+线程信息+消息
public static void printTimeAndThread(String tag){
String res = new StringJoiner("\t|\t")
.add(String.valueOf(System.currentTimeMillis()))
.add(String.valueOf(Thread.currentThread().getId()))
.add(Thread.currentThread().getName())
.add(tag)
.toString();

System.out.println(res);

}

}

2.CompletableFuture入门

image-20220226112153973

  • 开启:supplyAsync
  • 连接:thenCompose
  • 合并:thenCombine
异步任务的开启

supplyAsync方法开启一个异步任务

场景模拟

  • 顾客进入餐厅
  • 顾客点菜
  • 顾客刷手机等待
    • (线程2)厨师炒菜
    • (线程2)厨师打饭
  • 顾客开始干饭

这里的等待队列是 顾客任务 -> 厨师任务

代码案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.concurrent.CompletableFuture;

public class CompleteFutureSupplyAsync {
public static void main(String[] args) {
SmallTool.printTimeAndThread("顾客进入餐厅");
SmallTool.printTimeAndThread("顾客点菜");

// supplyAsync方法需要一个supplier作为参数
// 为函数式接口,无入参,这里只有一个String类型的返回值(作为CompletableFuture的泛型)
// 调用supplyAsync,supplier中的代码会跑到子线程中执行
CompletableFuture<String> sub = CompletableFuture.supplyAsync(
() -> {
SmallTool.printTimeAndThread("厨师炒菜");
// 炒菜耗时
SmallTool.sleepMillis(200);
SmallTool.printTimeAndThread("厨师打饭");
// 打饭耗时
SmallTool.sleepMillis(100);
return "饭菜已备好";
}

);

SmallTool.printTimeAndThread("顾客正在刷手机等待");
// join方法返回值类型为CompletableFuture泛型的类型
// join方法返回值即为supplier的返回值
// join方法会等待子线程执行结束,并获得其返回值
SmallTool.printTimeAndThread(String.format("%s,顾客开始干饭", sub.join()));

}


}

运行结果

image-20220226103120810

异步任务的连接

thenCompose连接两个任务当做一个线程提交到线程池,thenComposeAsync连接两个任务分别单独提交到线程池处理

场景模拟

  • 顾客进入餐厅
  • 顾客点菜
  • 顾客刷手机等待
    • (线程2)厨师炒菜,将菜交给服务员
    • (线程3)服务员拿到菜并去打饭
  • 顾客开始干饭

这里的等待队列是 顾客任务 -> 厨师任务 -> 服务员任务

代码案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.concurrent.CompletableFuture;

public class CompleteFutureThenCompose {
public static void main(String[] args) {
SmallTool.printTimeAndThread("顾客进入餐厅");
SmallTool.printTimeAndThread("顾客点菜");

// supplyAsync方法需要一个supplier作为参数
// 为函数式接口,无入参,这里只有一个String类型的返回值(作为CompletableFuture的泛型)
// 调用supplyAsync,supplier中的代码会跑到子线程中执行
CompletableFuture<String> sub = CompletableFuture.supplyAsync(
// 厨师任务
() -> {
SmallTool.printTimeAndThread("厨师炒菜");
// 炒菜耗时
SmallTool.sleepMillis(300);
return "鱼香肉丝";

}

// dish即传入厨师任务的返回值
// 菜做好后,开启服务员线程拿得饭菜并打饭
// thenCompose会在前一个线程完成后,开启另一个线程
).thenCompose(dish -> CompletableFuture.supplyAsync(

// 服务员任务
() -> {
SmallTool.printTimeAndThread("服务员打饭");
// 打饭耗时
SmallTool.sleepMillis(100);
return dish + " 米饭已备好";
}


));

SmallTool.printTimeAndThread("顾客正在刷手机等待");
// join方法返回值类型为CompletableFuture泛型的类型
// join方法返回值即为supplier的返回值
// join方法会等待子线程执行结束,并获得其返回值
SmallTool.printTimeAndThread(String.format("%s,顾客开始干饭", sub.join()));

}

}

运行结果

image-20220226113453404

异步任务的结合

thenCombine合并两个任务,同时执行他们

场景模拟

  • 顾客进入餐厅
  • 顾客点菜
  • 顾客刷手机等待
    • (线程2)厨师炒菜
    • (线程3)服务员蒸饭
    • (线程3)菜和饭都备好了,服务员打饭上菜
  • 顾客开始干饭

代码案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import java.util.concurrent.CompletableFuture;

public class CompleteFutureThenCombine {
public static void main(String[] args) {
SmallTool.printTimeAndThread("顾客进入餐厅");
SmallTool.printTimeAndThread("顾客点菜");

// supplyAsync方法需要一个supplier作为参数
// 为函数式接口,无入参,这里只有一个String类型的返回值(作为CompletableFuture的泛型)
// 调用supplyAsync,supplier中的代码会跑到子线程中执行
CompletableFuture<String> sub = CompletableFuture.supplyAsync(
// 厨师任务
() -> {
SmallTool.printTimeAndThread("厨师炒菜");
// 炒菜耗时
SmallTool.sleepMillis(200);
return "鱼香肉丝";

}

// thenCombine将两个线程一起执行
).thenCombine(CompletableFuture.supplyAsync(

// 服务员任务
() -> {
SmallTool.printTimeAndThread("服务员蒸饭");
// 蒸饭耗时
SmallTool.sleepMillis(300);
return "米饭";
}


), (dish, rice) -> {
// 两个并行的线程都完成任务后,合并两个结果后结束两个任务
SmallTool.printTimeAndThread("服务员打饭");
SmallTool.sleepMillis(100);
return String.format("%s + %s 已备好", dish, rice);
});

SmallTool.printTimeAndThread("顾客正在刷手机等待");
// join方法返回值类型为CompletableFuture泛型的类型
// join方法返回值即为supplier的返回值
// join方法会等待子线程执行结束,并获得其返回值
SmallTool.printTimeAndThread(String.format("%s,顾客开始干饭", sub.join()));

}

}

运行结果

image-20220226111744105

3.CompletableFuture进阶

任务的后置处理

场景模拟

  • 顾客就餐完毕
  • 顾客结账并要求开发票
  • 顾客刷手机等待
    • (线程2)服务员收款500元
    • (线程3)服务员开发票
  • 顾客拿到发票

thenApply的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.concurrent.CompletableFuture;

public class CompleteFutureThenApply {
public static void main(String[] args) {
SmallTool.printTimeAndThread("顾客就餐完毕");
SmallTool.printTimeAndThread("顾客结账并要求开发票");

CompletableFuture<String> sub = CompletableFuture.supplyAsync(
() -> {
SmallTool.printTimeAndThread("服务员收款500元");
SmallTool.sleepMillis(100);
return "500";
}

// thenApply将前面异步任务的结果交给后面的Function
// 前后两个任务都是一个服务员完成的(在同一线程中)
).thenApply(money -> {

SmallTool.printTimeAndThread(String.format("服务员开发票,面额%s元", money));
SmallTool.sleepMillis(200);
return String.format("%s元发票", money);
});

SmallTool.printTimeAndThread("顾客正在刷手机等待");
SmallTool.printTimeAndThread(String.format("顾客拿到%s", sub.join()));

}

}

运行结果

image-20220226115245885

可以看到使用thenApply,收款和开发票都在同一个线程不符合要求,我们只需要将thenApply更改为thenApplyAsync即可实现开启另一个线程运行

thenApplyAsync的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.CompletableFuture;

public class CompleteFutureThenApply {
public static void main(String[] args) {
SmallTool.printTimeAndThread("顾客就餐完毕");
SmallTool.printTimeAndThread("顾客结账并要求开发票");

CompletableFuture<String> sub = CompletableFuture.supplyAsync(
() -> {
SmallTool.printTimeAndThread("服务员收款500元");
SmallTool.sleepMillis(100);
return "500";
}

// thenApplyAsync将前面异步任务的结果交给后面的Function,并开启一个新的线程
).thenApplyAsync(money -> {

SmallTool.printTimeAndThread(String.format("服务员开发票,面额%s元", money));
SmallTool.sleepMillis(200);
return String.format("%s元发票", money);
});

SmallTool.printTimeAndThread("顾客正在刷手机等待");
SmallTool.printTimeAndThread(String.format("顾客拿到%s", sub.join()));

}

}

获得最先完成的任务

场景模拟

  • 老王走出餐厅,来到公交车站
  • 等待700路或800路公交到来
    • (线程2)700路公交正在赶来
    • (线程3)800路公交正在赶来
  • 老王坐最先到的公交回家

代码案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.concurrent.CompletableFuture;

public class CompleteFutureToEither {
public static void main(String[] args) {
SmallTool.printTimeAndThread("老王走出餐厅,来到公交车站");
// 700路公交和800路公交哪一个先到,就先上哪一辆车
SmallTool.printTimeAndThread("等待700路或800路公交到来");

CompletableFuture<String> bus = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("700路公交正在赶来");
SmallTool.sleepMillis(100);
return "700路公交已到站";
// applyToEither连接两个任务,哪个任务先完成就传到Function(即firstComeBus中)
// 返回输出到bus
}).applyToEither(CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("800路公交正在赶来");
SmallTool.sleepMillis(200);
return "800路公交已到站";
}), firstComeBus -> firstComeBus);

SmallTool.printTimeAndThread(String.format("%s,老王坐车回家", bus.join()));

}


}

运行结果

image-20220226133508092

处理发生的异常情况

场景模拟

  • 老王走出餐厅,来到公交车站

  • 等待700路或800路公交到来

    • (线程2)700路公交正在赶来
    • (线程3)800路公交正在赶来
  • 老王坐最先到的公交回家

  • 700路公交出现故障,老王叫出租车回家

代码案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.concurrent.CompletableFuture;

public class CompleteFutureToEither {
public static void main(String[] args) {
SmallTool.printTimeAndThread("老王走出餐厅,来到公交车站");
// 700路公交和800路公交哪一个先到,就先上哪一辆车
SmallTool.printTimeAndThread("等待700路或800路公交到来");

CompletableFuture<String> bus = CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("700路公交正在赶来");
SmallTool.sleepMillis(100);
return "700路公交已到站";
// applyToEither连接两个任务,哪个任务先完成就传到Function(即firstComeBus中)
// 返回输出到bus
}).applyToEither(CompletableFuture.supplyAsync(() -> {
SmallTool.printTimeAndThread("800路公交正在赶来");
SmallTool.sleepMillis(200);
return "800路公交已到站";

// 先到的车如果是700路,其中途会抛出异常
}), firstComeBus -> {
SmallTool.printTimeAndThread(firstComeBus);
if(firstComeBus.startsWith("700"))
{
throw new RuntimeException("公交车发生故障");
}

return firstComeBus;

// 上面任任意的链式调用中出现异常都会触发这里的事件
}).exceptionally(e -> {
SmallTool.printTimeAndThread(e.getMessage());
SmallTool.printTimeAndThread("老王叫出租车");
return "出租车到了";

});

SmallTool.printTimeAndThread(String.format("%s,老王坐车回家", bus.join()));

}

}

运行结果

image-20220226134731888

线程池

1.线程池的创建

使用构造函数创建

我们可以使用ThreadPoolExecutor的构造函数创建一个线程池,其构造函数如下

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(
int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
){...}

拒绝策略如下:

ThreadPoolExecutor.AbortPolicy :抛出异常

ThreadPoolExecutor.CallerRunsPolicy :谁提交谁执行

ThreadPoolExecutor.DiscardPolicy :直接丢弃

ThreadPoolExecutor.DiscardOldestPolicy : 替换掉工作队列中最后一个

使用工具类创建

一般不推荐使用,因为它无法适应多变的业务需求,而且容易导致OOM

通过Executor 框架的工具类Executors来实现

2.线程中断

在一个线程中可以通过中断唤醒另一个在睡眠中的线程,达到线程间通信的目的。不建议使用中断

场景模拟

两车过独木桥场景

代码案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.util.Random;
import java.util.TreeMap;

public class TwoCarCrossBridge {
public static void main(String[] args) {
Thread carTwo = new Thread(()-> {

SmallTool.printTimeAndThread("二号车辆准备过桥");
SmallTool.printTimeAndThread("发现一号车正在过桥,开始等待");
try {
// 二号车线程进入TIMED_WAITING状态
Thread.sleep(3000);
}
// 一号车发出中断通知,可以通过捕捉IE异常接收
// sleep和wait方法会抛出InterruptedException异常

catch (InterruptedException e){
SmallTool.printTimeAndThread("二号车开始过桥");
}
SmallTool.sleepMillis(200);
SmallTool.printTimeAndThread("二号车过桥完毕");

});

Thread carOne = new Thread(() -> {
SmallTool.printTimeAndThread("一号车开始过桥");
int timeSpend = new Random().nextInt(500) + 1000;
SmallTool.sleepMillis(timeSpend);
SmallTool.printTimeAndThread("二号车过桥完毕,耗时:" + timeSpend);

// 一号车过桥完毕后,利用中断通知二号车
carTwo.interrupt();

});

// 一号车,二号车同时起步
carOne.start();
carTwo.start();

}

}

但线程处于blocked,waiting,timed_waiting状态,或者从上述三个状态转为runnable状态的过程中,如果被中断就会收到InterruptedException异常

运行结果

image-20220226171200852

参考资料

Java并发编程合集

Java 线程池详解

l-summary/)