前言:
本文内容:Stream流式计算、ForkJoin详解、异步回调
推荐免费JUC并发编程视频:【狂神说Java】JUC并发编程最新版通俗易懂_哔哩哔哩_bilibili
Stream流式计算
概述
将要处理的元素看做一种流,流在管道中传输,并且可以在管道的节点上处理,包括过滤筛选、去重、排序、聚合等,并把结果发送到下一计算 节点。
练习
题目要求:一分钟完成,只能用一行代码实现!
现在有5个用户!筛选;
ID必须是偶数
年龄必须大于23岁
用户名转为大写字母
用户名字母倒着排序
只输出一个用户
代码实现
User
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package com.jokerdig.stream;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class User { private int id; private String name; private int age; }
Test
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package com.jokerdig.stream;import java.util.Arrays;import java.util.List;import java.util.Locale;public class Test { public static void main (String[] args) { User user1 = new User (1 ,"a" ,21 ); User user2 = new User (2 ,"b" ,27 ); User user3 = new User (3 ,"c" ,25 ); User user4 = new User (4 ,"d" ,24 ); User user5 = new User (7 ,"e" ,20 ); List<User> list = Arrays.asList(user1, user2, user3, user4, user5); list.stream(). filter(u->u.getId()%2 ==0 ). filter(u->u.getAge()>23 ). map(u->{u.setName(u.getName().toUpperCase()); return u;}). sorted((u1,u2)->u2.getName().compareTo(u1.getName())). limit(1 ). forEach(System.out::println); } }
运行结果
1 2 3 User(id=4 , name=D, age=24 ) Process finished with exit code 0
ForkJoin详解
什么是ForkJoin
ForkJoin
框架是java的JUC包里提供的,用于处理一些比较繁重的(并发)任务,会将这个大任务分为多个小任务,多个小任务处理完成后会将结果汇总给Result,体现的是一种“分而治之”的思想。
ForkJoin特点
特点:工作窃取 (两个线程正在执行,一个线程执行完毕后,回去窃取另一个线程的内容来执行)
执行原理:双端队列 从上面和下面都可以去拿到任务进行执行
简单练习
ForkJoinDemo
计算类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package com.jokerdig.forkjoin;import java.util.concurrent.RecursiveTask;public class ForkJoinDemo extends RecursiveTask <Long> { private Long start; private Long end; private Long temp = 10000L ; public ForkJoinDemo (Long start, Long end) { this .start = start; this .end = end; } @Override protected Long compute () { if ((end-start)<temp){ Long sum=0L ; for (Long i = start; i <= end; i++) { sum+=i; } return sum; }else { long middle = (start+end)/2 ; ForkJoinDemo task1 = new ForkJoinDemo (start, middle); task1.fork(); ForkJoinDemo task2 = new ForkJoinDemo (middle+1 , end); task2.fork(); return task1.join()+task2.join(); } } }
Test
测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 package com.jokerdig.forkjoin;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.stream.LongStream;public class Test { public static void main (String[] args) throws ExecutionException, InterruptedException { } public static void test1 () { Long sum = 0L ; long start = System.currentTimeMillis(); for (Long i = 1L ; i <= 10_0000_0000 ; i++) { sum+=i; } long end = System.currentTimeMillis(); System.out.println("sum=" +sum+"花费时间=" +(end-start)); } public static void test2 () throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool pool = new ForkJoinPool (); ForkJoinTask<Long> task = new ForkJoinDemo (0L , 10_0000_0000L ); ForkJoinTask<Long> submit = pool.submit(task); Long sum = submit.get(); long end = System.currentTimeMillis(); System.out.println("sum=" +sum+"花费时间=" +(end-start)); } public static void test3 () { long start = System.currentTimeMillis(); Long sum = LongStream.rangeClosed(0L ,10_0000_0000L ).parallel().reduce(0 ,Long::sum); long end = System.currentTimeMillis(); System.out.println("sum=" +sum+"花费时间=" +(end-start)); } }
异步回调
Future
CompletableFuture
在Java
里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。CompletableFuture
实现了Future
, CompletionStage
接口,实现了Future
接口就可以兼容现在有线程池框架,而CompletionStage
接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture
类。
简单练习
没有返回值的异步回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.jokerdig.future;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;public class Demo01 { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> completableFuture =CompletableFuture.runAsync(()->{ try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"runAsync" ); }); System.out.println("测试阻塞!" ); completableFuture.get(); } }
运行结果
1 2 3 4 测试阻塞! ForkJoinPool.commonPool-worker-1runAsync Process finished with exit code 0
有返回值的异步回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package com.jokerdig.future;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;public class Demo01 { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture =CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"supplyAsync->Integer" ); return 1024 ; }); System.out.println( completableFuture.whenComplete((t,u)->{ System.out.println("t->" +t); System.out.println("u->" +u); }).exceptionally((e)->{ System.out.println(e.getMessage()); return -1 ; }).get()); } }
运行结果(正常)
1 2 3 4 5 6 ForkJoinPool.commonPool-worker-1supplyAsync->Integer t->1024 u->null 1024 Process finished with exit code 0
运行结果(异常)
1 2 3 4 5 6 7 ForkJoinPool.commonPool-worker-1supplyAsync->Integer t->null u->java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero java.lang.ArithmeticException: / by zero -1 Process finished with exit code 0