- 你有一个思想,我有一个思想,我们交换后,一个人就有两个思想
- If you can NOT explain it simply, you do NOT understand it well enough
上一篇文章 不会用Java Future,我怀疑你泡茶没我快 全面分析了 Future,通过它我们可以获取线程的执行结果,它虽然解决了 Runnable 的 “三无” 短板,但是它自身还是有短板:
不能手动完成计算
假设你使用 Future 运行子线程调用远程 API 来获取某款产品的最新价格,服务器由于洪灾宕机了,此时如果你想手动结束计算,而是想返回上次缓存中的价格,这是 Future 做不到的
调用 get() 方法会阻塞程序
Future 不会通知你它的完成,它提供了一个get()方法,程序调用该方法会阻塞直到结果可用为止,没有办法利用回调函数附加到Future,并在Future的结果可用时自动调用它
不能链式执行
烧水泡茶中,通过构造函数传参做到多个任务的链式执行,万一有更多的任务,或是任务链的执行顺序有变,对原有程序的影响都是非常大的
整合多个 Future 执行结果方式笨重
假设有多个 Future 并行执行,需要在这些任务全部执行完成之后做后续操作,Future 本身是做不到的,需要借助工具类 Executors
的方法
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
没有异常处理
Future 同样没有提供很好的异常处理方案
上一篇文章看 Future 觉得是发现了新天地,这么一说有感觉回到了解放前
对于 Java 后端的同学,在 Java1.8 之前想实现异步编程,还想避开上述这些烦恼,ReactiveX 应该是一个常见解决方案(做Android 的应该会有了解)。如果熟悉前端同学, ES6 Promise(男朋友的承诺)也解决了异步编程的烦恼
天下语言都在彼此借鉴相应优点,Java 作为老牌劲旅自然也要解决上述问题。又是那个男人,并发大师 Doug Lea 忧天下程序员之忧,解天下程序员之困扰,在 Java1.8 版本(Lambda 横空出世)中,新增了一个并发工具类 CompletableFuture,它的出现,让人在泡茶过程中,品尝到了不一样的味道......
CompletableFuture 在 Java1.8 的版本中出现,自然也得搭上 Lambda 的顺风车,为了更好的理解 CompletableFuture,这里我需要先介绍一下几个 Lambda 函数,我们只需要关注它们的以下几点就可以:
Runnable 我们已经说过无数次了,无参数,无返回值
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Function<T, R> 接受一个参数,并且有返回值
@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
}
Consumer接受一个参数,没有返回值
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
}
Supplier没有参数,有一个返回值
@FunctionalInterface
public interface Supplier<T> {
T get();
}
BiConsumer<T, U> 接受两个参数(Bi, 英文单词词根,代表两个的意思),没有返回值
@FunctionalInterface
public interface BiConsumer<T, U> {
void accept(T t, U u);
好了,我们做个小汇总
有些同学可能有疑问,为什么要关注这几个函数式接口,因为 CompletableFuture 的函数命名以及其作用都是和这几个函数式接口高度相关的,一会你就会发现了
前戏做足,终于可以进入正题了 CompletableFuture
老规矩,先从类结构看起:
实现了 Future 接口,那就具有 Future 接口的相关特性,请脑补 Future 那少的可怜的 5 个方法,这里不再赘述,具体请查看 不会用Java Future,我怀疑你泡茶没我快
CompletionStage 这个接口还是挺陌生的,中文直译过来是【竣工阶段】,如果将烧水泡茶比喻成一项大工程,他们的竣工阶段体现是不一样的
所以,CompletionStage 接口的作用就做了这点事,所有函数都用于描述任务的时序关系,总结起来就是这个样子:
CompletableFuture 既然实现了两个接口,自然也就会实现相应的方法充分利用其接口特性,我们走进它的方法来看一看
CompletableFuture 大约有50种不同处理串行,并行,组合以及处理错误的方法。小弟屏幕不争气,方法之多,一个屏幕装不下,看到这么多方法,是不是瞬间要直接 收藏——>吃灰
2连走人?别担心,我们按照相应的命名和作用进行分类,分分钟搞定50多种方法
then
直译【然后】,也就是表示下一步,所以通常是一种串行关系体现, then 后面的单词(比如 run /apply/accept)就是上面说的函数式接口中的抽象方法名称了,它的作用和那几个函数式接口的作用是一样一样滴
CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
combine... with...
和 both...and...
都是要求两者都满足,也就是 and 的关系了
<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
<U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
<U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
<U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
Either...or...
表示两者中的一个,自然也就是 Or 的体现了
<U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
<U> CompletableFuture<U> applyToEitherAsync(、CompletionStage<? extends T> other, Function<? super T, U> fn)
<U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)
CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)
CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn)
CompletableFuture<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor)
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
这个异常处理看着还挺吓人的,拿传统的 try/catch/finally 做个对比也就瞬间秒懂了
whenComplete 和 handle 的区别如果你看接受的参数函数式接口名称你也就能看出差别了,前者使用Comsumer, 自然也就不会有返回值;后者使用 Function,自然也就会有返回值
这里并没有全部列举,不过相信很多同学已经发现了规律:
CompletableFuture 提供的所有回调方法都有两个异步(Async)变体,都像这样
// thenApply() 的变体
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
另外,方法的名称也都与前戏中说的函数式接口完全匹配,按照这中规律分类之后,这 50 多个方法看起来是不是很轻松了呢?
基本方法已经罗列的差不多了,接下来我们通过一些例子来实际演示一下:
创建一个 CompletableFuture 对象并没有什么稀奇的,依旧是通过构造函数构建
CompletableFuture<String> completableFuture = new CompletableFuture<String>();
这是最简单的 CompletableFuture 对象创建方式,由于它实现了 Future 接口,所以自然就可以通过 get() 方法获取结果
String result = completableFuture.get();
文章开头已经说过,get()方法在任务结束之前将一直处在阻塞状态,由于上面创建的 Future 没有返回,所以在这里调用 get() 将会永久性的堵塞
这时就需要我们调用 complete() 方法手动的结束一个 Future
completableFuture.complete("Future's Result Here Manually");
这时,所有等待这个 Future 的 client 都会返回手动结束的指定结果
使用 runAsync
进行异步计算
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("运行在一个单独的线程当中");
});
future.get();
由于使用的是 Runnable 函数式表达式,自然也不会获取到结果
使用 runAsync
是没有返回结果的,我们想获取异步计算的返回结果需要使用 supplyAsync()
方法
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
log.info("运行在一个单独的线程当中");
return "我有返回值";
});
log.info(future.get());
由于使用的是 Supplier 函数式表达式,自然可以获得返回结果
我们已经多次说过,get() 方法在Future 计算完成之前会一直处在 blocking 状态下,对于真正的异步处理,我们希望的是可以通过传入回调函数,在Future 结束时自动调用该回调函数,这样,我们就不用等待结果
CompletableFuture<String> comboText = CompletableFuture.supplyAsync(() -> {
//可以注释掉做快速返回 start
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
log.info("");
//可以注释掉做快速返回 end
return "赞";
})
.thenApply(first -> {
log.info("在看");
return first + ", 在看";
})
.thenApply(second -> second + ", 转发");
log.info("三连有没有?");
log.info(comboText.get());
对 thenApply 的调用并没有阻塞程序打印log,也就是前面说的通过回调通知机制, 这里你看到 thenApply 使用的是supplyAsync所用的线程,如果将supplyAsync 做快速返回,我们再来看一下运行结果:
thenApply 此时使用的是主线程,所以:
串行的后续操作并不一定会和前序操作使用同一个线程
如果你不想从回调函数中返回任何结果,那可以使用 thenAccept
final CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(
// 模拟远端API调用,这里只返回了一个构造的对象
() -> Product.builder().id(12345L).name("颈椎/腰椎治疗仪").build())
.thenAccept(product -> {
log.info("获取到远程API产品名称 " + product.getName());
});
voidCompletableFuture.get();
thenAccept
可以从回调函数中获取前序执行的结果,但thenRun 却不可以,因为它的回调函数式表达式定义中没有任何参数
CompletableFuture.supplyAsync(() -> {
//前序操作
}).thenRun(() -> {
//串行的后需操作,无参数也无返回值
});
我们前面同样说过了,每个提供回调方法的函数都有两个异步(Async)变体,异步就是另外起一个线程
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
log.info("前序操作");
return "前需操作结果";
}).thenApplyAsync(result -> {
log.info("后续操作");
return "后续操作结果";
});
到这里,相信你串行的操作你已经非常熟练了
日常的任务中,通常定义的方法都会返回 CompletableFuture 类型,这样会给后续操作留有更多的余地,假如有这样的业务(X呗是不是都有这样的业务呢?):
//获取用户信息详情
CompletableFuture<User> getUsersDetail(String userId) {
return CompletableFuture.supplyAsync(() -> User.builder().id(12345L).name("日拱一兵").build());
}
//获取用户信用评级
CompletableFuture<Double> getCreditRating(User user) {
return CompletableFuture.supplyAsync(() -> CreditRating.builder().rating(7.5).build().getRating());
}
这时,如果我们还是使用 thenApply() 方法来描述串行关系,返回的结果就会发生 CompletableFuture 的嵌套
CompletableFuture<CompletableFuture<Double>> result = completableFutureCompose.getUsersDetail(12345L)
.thenApply(user -> completableFutureCompose.getCreditRating(user));
显然这不是我们想要的,如果想“拍平” 返回结果,thenCompose 方法就派上用场了
CompletableFuture<Double> result = completableFutureCompose.getUsersDetail(12345L)
.thenCompose(user -> completableFutureCompose.getCreditRating(user));
这个和 Lambda 的map 和 flatMap 的道理是一样一样滴
如果要聚合两个独立 Future 的结果,那么 thenCombine 就会派上用场了
CompletableFuture<Double> weightFuture = CompletableFuture.supplyAsync(() -> 65.0);
CompletableFuture<Double> heightFuture = CompletableFuture.supplyAsync(() -> 183.8);
CompletableFuture<Double> combinedFuture = weightFuture
.thenCombine(heightFuture, (weight, height) -> {
Double heightInMeter = height/100;
return weight/(heightInMeter*heightInMeter);
});
log.info("身体BMI指标 - " + combinedFuture.get());
当然这里多数时处理两个 Future 的关系,如果超过两个Future,如何处理他们的一些聚合关系呢?
相信你看到方法的签名,你已经明白他的用处了,这里就不再介绍了
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
接下来就是异常的处理了
Integer age = -1;
CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
if( age < 0 ) {
throw new IllegalArgumentException("何方神圣?");
}
if(age > 18) {
return "大家都是成年人";
} else {
return "未成年禁止入内";
}
}).thenApply((str) -> {
log.info("游戏开始");
return str;
}).exceptionally(ex -> {
log.info("必有蹊跷,来者" + ex.getMessage());
return "Unknown!";
});
log.info(maturityFuture.get());
exceptionally 就相当于 catch,出现异常,将会跳过 thenApply 的后续操作,直接捕获异常,进行一场处理
用多线程,良好的习惯是使用 try/finally 范式,handle 就可以起到 finally 的作用,对上述程序做一个小小的更改, handle 接受两个参数,一个是正常返回值,一个是异常
注意:handle的写法也算是范式的一种
Integer age = -1;
CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
if( age < 0 ) {
throw new IllegalArgumentException("何方神圣?");
}
if(age > 18) {
return "大家都是成年人";
} else {
return "未成年禁止入内";
}
}).thenApply((str) -> {
log.info("游戏开始");
return str;
}).handle((res, ex) -> {
if(ex != null) {
log.info("必有蹊跷,来者" + ex.getMessage());
return "Unknown!";
}
return res;
});
log.info(maturityFuture.get());
到这里,关于 CompletableFuture
的基本使用你已经了解的差不多了,不知道你是否注意,我们前面说的带有 Sync 的方法是单独起一个线程来执行,但是我们并没有创建线程,这是怎么实现的呢?
细心的朋友如果仔细看每个变种函数的第三个方法也许会发现里面都有一个 Executor 类型的参数,用于指定线程池,因为实际业务中我们是严谨手动创建线程的,这在 我会手动创建线程,为什么要使用线程池?文章中明确说明过;如果没有指定线程池,那自然就会有一个默认的线程池,也就是 ForkJoinPool
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
ForkJoinPool 的线程数默认是 CPU 的核心数。但是,在前序文章中明确说明过:
不要所有业务共用一个线程池,因为,一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能
CompletableFuture
的方法并没有全部介绍完全,也没必要全部介绍,相信大家按照这个思路来理解 CompletableFuture
也不会有什么大问题了,剩下的就交给实践/时间
以及自己的体会了
你以为 JDK1.8 CompletableFuture 已经很完美了是不是,但追去完美的道路上永无止境,Java 9 对CompletableFuture 又做了部分升级和改造
orTimeout()
completeOnTimeout()
详情可以查看:Java 9 CompletableFuture API Improvements. 怎样快速的切换不同 Java 版本来尝鲜?SDKMAN 统一灵活管理多版本Java 这篇文章的方法送给你
最后咱们再泡一壶茶,感受一下新变化吧
本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/QvStRoJNEhOz_4qR5H8iIA
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。