前言:

本文内容:Stream流式计算、ForkJoin详解、异步回调

推荐免费JUC并发编程视频:【狂神说Java】JUC并发编程最新版通俗易懂_哔哩哔哩_bilibili

Stream流式计算

概述

将要处理的元素看做一种流,流在管道中传输,并且可以在管道的节点上处理,包括过滤筛选、去重、排序、聚合等,并把结果发送到下一计算节点。

练习

题目要求:一分钟完成,只能用一行代码实现!

现在有5个用户!筛选;

  1. ID必须是偶数
  2. 年龄必须大于23岁
  3. 用户名转为大写字母
  4. 用户名字母倒着排序
  5. 只输出一个用户

代码实现

User

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.jokerdig.stream;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* @author Joker大雄
* @data 2022/8/25 - 10:37
**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
private int id;
private String name;
private int age;
}

Test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.jokerdig.stream;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;

/**
* @author Joker大雄
* @data 2022/8/25 - 10:37
**/
public class Test {
public static void main(String[] args) {
User user1 = new User(1,"a",21);
User user2 = new User(2,"b",27);
User user3 = new User(3,"c",25);
User user4 = new User(4,"d",24);
User user5 = new User(7,"e",20);
// 集合就是存储
List<User> list = Arrays.asList(user1, user2, user3, user4, user5);

// 计算交给流
list.stream().
// ID 为偶数
filter(u->u.getId()%2==0).
// 年龄大于 23
filter(u->u.getAge()>23).
// 用户名转换为大写字母
map(u->{u.setName(u.getName().toUpperCase()); return u;}).
//用户名倒着排序
sorted((u1,u2)->u2.getName().compareTo(u1.getName())).
// 只输出一个用户
limit(1).
forEach(System.out::println);
}
}

运行结果

1
2
3
User(id=4, name=D, age=24)

Process finished with exit code 0

ForkJoin详解

什么是ForkJoin

ForkJoin框架是java的JUC包里提供的,用于处理一些比较繁重的(并发)任务,会将这个大任务分为多个小任务,多个小任务处理完成后会将结果汇总给Result,体现的是一种“分而治之”的思想。

18

ForkJoin特点

特点:工作窃取(两个线程正在执行,一个线程执行完毕后,回去窃取另一个线程的内容来执行)

执行原理:双端队列从上面和下面都可以去拿到任务进行执行

简单练习

ForkJoinDemo计算类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.jokerdig.forkjoin;

import java.util.concurrent.RecursiveTask;

/**
* @author Joker大雄
* @data 2022/8/25 - 11:29
**/
// 求和计算任务
public class ForkJoinDemo extends RecursiveTask<Long> {
/*
如何使用ForkJoin
1. ForkJoinPool 通过它来执行
2. 计算任务 ForkJoinPool.execute(ForkJoinTask task)
3. 计算类继承ForkJoinTask ->RecursiveTask、RecursiveAction
*/
private Long start;
private Long end;
// 临界值
private Long temp = 10000L;
// 构造器
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
// 计算方法
@Override
protected Long compute() {
if((end-start)<temp){
Long sum=0L;
for (Long i = start; i <= end; i++) {
sum+=i;
}
return sum;
}else{
// 分支合并计算 ForkJoin
long middle = (start+end)/2; // 中间值
ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
task1.fork(); // 拆分任务,把任务放入线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle+1, end);
task2.fork();
return task1.join()+task2.join(); // 返回计算结果
}
}
}

Test测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.jokerdig.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

/**
* @author Joker大雄
* @data 2022/8/25 - 11:47
**/
// 计算10亿之和
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// test1(); // sum=500000000500000000 花费时间=7643
// test2(); // sum=500000000500000000 花费时间=7650
// test3(); // sum=500000000500000000 花费时间=227
}

// 方法一 直接使用for循环计算
public static void test1(){
Long sum = 0L;
long start = System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000; i++) {
sum+=i;
}
long end = System.currentTimeMillis();
System.out.println("sum="+sum+"花费时间="+(end-start));
}

// 方法二 使用ForkJoin计算
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
// ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
// 新建任务
ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L);
// 提交
ForkJoinTask<Long> submit = pool.submit(task);
// 计算结果
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum="+sum+"花费时间="+(end-start));
}

// 方法三 使用并行流计算
public static void test3(){
long start = System.currentTimeMillis();
// 使用Stream并行流
Long sum = LongStream.rangeClosed(0L,10_0000_0000L).parallel().reduce(0,Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum="+sum+"花费时间="+(end-start));
}
}

异步回调

Future

CompletableFutureJava里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。CompletableFuture实现了Future, CompletionStage接口,实现了Future接口就可以兼容现在有线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类。

简单练习

没有返回值的异步回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.jokerdig.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* @author Joker大雄
* @data 2022/8/25 - 12:23
**/
/*
异步调用:CompletableFuture
异步执行
成功回调
失败回调
*/
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 发起请求 没有返回值的异步回调
CompletableFuture<Void> completableFuture =CompletableFuture.runAsync(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"runAsync");
});
System.out.println("测试阻塞!");
// 阻塞获取执行结果
completableFuture.get();
}
}

运行结果

1
2
3
4
测试阻塞!
ForkJoinPool.commonPool-worker-1runAsync

Process finished with exit code 0

有返回值的异步回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.jokerdig.future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* @author Joker大雄
* @data 2022/8/25 - 12:23
**/
/*
异步调用:CompletableFuture
异步执行
成功回调
失败回调
*/
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 有返回值的异步回调
CompletableFuture<Integer> completableFuture =CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+"supplyAsync->Integer");
// 添加异常,查看返回结果
// int i =10/0;
return 1024;
});
System.out.println(// 编译成功 whenComplete有两个参数,没有返回值
completableFuture.whenComplete((t,u)->{
// 打印参数
System.out.println("t->"+t); // 正常的返回结果
System.out.println("u->"+u); // 异常信息
// 编译失败 exceptionally 有参数和返回值
}).exceptionally((e)->{
// 打印消息
System.out.println(e.getMessage());
// 异常返回-1
return -1;
}).get());
}
}

运行结果(正常)

1
2
3
4
5
6
ForkJoinPool.commonPool-worker-1supplyAsync->Integer
t->1024
u->null
1024

Process finished with exit code 0

运行结果(异常)

1
2
3
4
5
6
7
ForkJoinPool.commonPool-worker-1supplyAsync->Integer
t->null
u->java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
java.lang.ArithmeticException: / by zero
-1

Process finished with exit code 0