宝塔服务器面板,一键全能部署及管理,送你10850元礼包,点我领取
一.Future的介绍
1.1 官方介绍
Future是JDK1.5中提供的一个接口(interface),关于Future,jdk文档中是这样介绍的:
Future表示“进行异步计算结果”,提供了方法来检测异步计算是否完成,以及获取计算后的结果。异步计算完后后,只能通过get方法来获取计算结果,并且使用get方法是会阻塞直到计算完毕(能拿到计算结果)。可以使用cancel方法来取消异步执行,除此之外,还提供了其他方法来判断异步任务正常完成或者被取消。一旦异步计算任务已经完成,那么任务就不能再被取消;
1.2 Job和Task
先说一下Job和Task,最初在做项目的时候,碰到过job和task,当时并不明白有什么区别和联系,只是笼统的都成为任务。后来明白了,当时项目是每天都有一个定时任务,这个定时任务会开启多个线程去执行多个逻辑(任务),其实那个定时任务就被称为一个Job,而开启的多个线程就称为Task。需要注意的是,并不是说进程就应该称为Job,线程就应该称为Task,而是说Job和Task是“父子关系”,一个进程Job也可以开启多个进程Task。
而本文说的Future可以理解为Task(任务)。
二. Future的使用方式
2.1 使用Callable+Future
两个注意点:
1.需要依赖实现Callable接口,注意不能是Runnable接口,因为需要拿到计算结果,所以需要实现Callable接口;
2.需要依赖于线程池,且要用submit提交task,而不能使用execute方法,因为execute方法提交task后,没有返回值,且task有异常也不能感知。
package cn.ganlixin; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import java.util.concurrent.*; /** * 描述: * 学习使用Future * * @author ganlixin * @create 2019-12-20 */ @Slf4j public class UseFuture { /** * 使用线程池的submit方式提交task,获取Future */ @Test public void testExecutorSubmit) throws ExecutionException, InterruptedException { // 创建线程池 ExecutorService executor = Executors.newFixedThreadPool5); // 提交task Future<String> future = executor.submitnew MyTask"hello")); // get方法会阻塞直到获取到结果或者捕获到异常才继续执行后面的代码 String result = future.get); log.inforesult); } private static class MyTask implements Callable<String> { private String data; public MyTaskString data) { this.data = data; } @Override public String call) throws Exception { // 模拟业务逻辑计算耗时 TimeUnit.SECONDS.sleep3); return data + " " + data; } } }
2.2 使用FutureTask+Callable
使用FutureTask+Callable实现时,最重要的就是拿不到返回值,即使是提供了get)方法来获取结果,拿到的结果也是null。
package cn.ganlixin; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import java.util.concurrent.*; /** * 描述: * 学习使用Future * * @author ganlixin * @create 2019-12-20 */ @Slf4j public class UseFuture { @Test public void testFutureTask) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool5); FutureTask<String> futureTask = new FutureTask<>new MyTask"hello")); Future<?> future = executor.submitfutureTask); // 阻塞,知道任务完成或者抛出异常 Object res = future.get); // 预期得到的结果是"hello hello",但是res的结果是null,因为FutureTask+Callable接口拿不到返回值 System.out.printlnres); // null } private static class MyTask implements Callable<String> { private String data; public MyTaskString data) { this.data = data; } @Override public String call) throws Exception { // 模拟业务逻辑计算耗时 TimeUnit.SECONDS.sleep3); log.info"MyTask.call is running, msg:{}", data); return data + " " + data; } } }
上面使用future.get)阻塞等待结果的时候,拿到的结果是null,如果需要设定默认值,可以在submit的时候,提供第二个参数(默认返回值)。
三. ListenableFuture
ListenableFuture是guava提供的一种继承自JDK Future的一个接口,提供了更多的功能,比如对Future增加回调操作(callback),在成功或者失败的时候进行什么操作。
package cn.ganlixin; import com.google.common.base.Function; import com.google.common.util.concurrent.*; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Test; import java.util.concurrent.*; /** * 描述: * 学习使用Guava的ListenableFuture和ListenableFutureTask * * @author ganlixin * @create 2019-12-21 */ @Slf4j public class UseListenableFuture { private static class MyCallableTask implements Callable<String> { private String data; public MyCallableTaskString data) { this.data = data; } @Override public String call) throws Exception { // 模拟业务逻辑计算耗时 // TimeUnit.SECONDS.sleep3); log.info"MyCallableTask.call is running, msg:{}", data); return data + " " + data; } } /** * 创建ListenableExecutorService、ListenableFuture、FutureCallback */ @Test public void testSimple) throws ExecutionException, InterruptedException { // 原始的线程池 ExecutorService executorService = Executors.newFixedThreadPool5); // 创建ListeningExecutorService对象,使用MoreExecutors.listeningDecorator)进行包装 ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecoratorexecutorService); // 使用方式和executorService一样进行submit ListenableFuture<String> listenableFuture = listeningExecutorService.submitnew MyCallableTask"hello")); // 创建Future的回调操作,分别是成功异常的时候 FutureCallback<String> futureCallback = new FutureCallback<String>) { @Override public void onSuccess@Nullable String result) { log.info"success to run MyCallableTask, and result is:{}", result); } @Override public void onFailureThrowable t) { log.error"fail to run MyCallableTask, e=", t); } }; // 对Future、FutureCallBack、ExecutorService进行绑定 // addCallbackListenableFuture<V>, FutureCallback,Executor) Futures.addCallbacklistenableFuture, futureCallback, listeningExecutorService); final String result = listenableFuture.get); log.info"result:{}", result); /*输出结果 INFO [pool-2-thread-1] cn.ganlixin.UseListenableFuture - MyCallableTask.call is running, msg:hello INFO [main] cn.ganlixin.UseListenableFuture - result:hello hello INFO [pool-2-thread-2] cn.ganlixin.UseListenableFuture - success to run MyCallableTask, and result is:hello hello */ } /** * 为ListenableFuture添加Listener,Listener是在Future执行完毕后执行 */ @Test public void testAddListenr) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool5); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecoratorexecutorService); ListenableFuture<String> listenableFuture = listeningExecutorService.submitnew MyCallableTask"hello")); // 可以为一个ListenableFuture添加多个Listener // void addListenerRunnable listener, Executor executor); listenableFuture.addListenernew Runnable) { @Override public void run) { log.info"this is listener1 running"); } }, listeningExecutorService); listenableFuture.addListenernew Runnable) { @Override public void run) { log.info"this is listener2 running"); } }, listeningExecutorService); String result = listenableFuture.get); log.info"result:{}", result); } /** * 可以使用Futures.transform进行多个Future的异步链式执行 * * @throws ExecutionException * @throws InterruptedException */ @Test public void testTransform) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool5); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecoratorexecutorService); ListenableFuture<String> listenableFuture = listeningExecutorService.submitnew MyCallableTask"hello")); // ListenableFuture执行完毕后,将会执行下面这个Function中定义的apply方法 ListenableFuture<String> task1 = Futures.transformlistenableFuture, new Function<String, String>) { @Nullable @Override public String apply@Nullable String input) { // input是上一个Future的结果 log.info"task1 input:{}", input); try { TimeUnit.SECONDS.sleep2); } catch InterruptedException e) { e.printStackTrace); } return "this is result from task1.apply"; } }, listeningExecutorService); // task1执行完毕后,将会执行下面这个Function中定义的apply方法 ListenableFuture<String> task2 = Futures.transformtask1, new Function<String, String>) { @Nullable @Override public String apply@Nullable String input) { // input是上一个Future的结果 log.info"task2 input:{}", input); return "this is result from task2.apply"; } }, listeningExecutorService); String result = listenableFuture.get); log.info"result:{}", result); TimeUnit.SECONDS.sleep5); /* INFO [pool-2-thread-1] cn.ganlixin.UseListenableFuture - MyCallableTask.call is running, msg:hello INFO [main] cn.ganlixin.UseListenableFuture - result:hello hello INFO [pool-2-thread-2] cn.ganlixin.UseListenableFuture - task1 input:hello hello INFO [pool-2-thread-3] cn.ganlixin.UseListenableFuture - task2 input:this is result from task1.apply */ } /** * 使用ListenableFutureTask */ @Test public void testListenableFutureTask) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool5); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecoratorexecutorService); // 传入Callable接口 ListenableFutureTask<String> listenableFutureTask = ListenableFutureTask.createnew MyCallableTask"Hello")); ListenableFuture<?> future = listeningExecutorService.submitlistenableFutureTask); // 添加Listener的方式相同 listenableFutureTask.addListenernew Runnable) { @Override public void run) { log.info"this is listener of listeneFutureTask"); } }, executorService); future.get); /* INFO [pool-2-thread-1] cn.ganlixin.UseListenableFuture - MyCallableTask.call is running, msg:Hello INFO [pool-2-thread-2] cn.ganlixin.UseListenableFuture - this is listener of listeneFutureTask */ } }
四. ComplatableFuture的用法
CompletableFuture也是guava提供的一种异步计算的使用方式,下面简单演示了用法:
package cn.ganlixin; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 描述: * 学习使用ComplateableFuture * * @author ganlixin * @create 2019-12-22 */ @Slf4j public class UseCompletableFuture { /** * 使用ComplateableFuture */ @Test public void testSupplyAsync) { ExecutorService executorService = Executors.newFixedThreadPool5); // 可以指定第二个参数(线程池),也可以忽略,一般建议添加 CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync) -> { log.info"this is complatebleFuture supplyAsync1 log out"); return "task1 return"; }, executorService); // 可以指定第二个参数(线程池),也可以忽略,一般建议添加 CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync) -> { log.info"this is complatebleFuture supplyAsync2 log out"); return "task2 return"; }, executorService); // 可以指定第二个参数(线程池),也可以忽略,一般建议添加 CompletableFuture<String> completableFuture3 = CompletableFuture.supplyAsync) -> { log.info"this is complatebleFuture supplyAsync3 log out"); return "task3 return"; }, executorService); /** * 输出 * INFO [pool-2-thread-3] cn.ganlixin.UseCompletableFuture - this is complatebleFuture supplyAsync3 log out * INFO [pool-2-thread-2] cn.ganlixin.UseCompletableFuture - this is complatebleFuture supplyAsync2 log out * INFO [pool-2-thread-1] cn.ganlixin.UseCompletableFuture - this is complatebleFuture supplyAsync1 log out */ } /** * 异步执行链 */ @Test public void testThen) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool5); CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync) -> { log.info"this is complatebleFuture supplyAsync1 log out"); return "task1 return"; }, executorService); CompletableFuture<String> completableFuture1 = completableFuture.thenApplyAsyncinput -> { log.info"this is thenApplyAsync, input:{}", input); return "thenApplyAsync return"; }, executorService); String result = completableFuture.get); log.info"result:{}", result); /*输出 INFO [pool-2-thread-1] cn.ganlixin.UseCompletableFuture - this is complatebleFuture supplyAsync1 log out INFO [main] cn.ganlixin.UseCompletableFuture - result:task1 return INFO [pool-2-thread-2] cn.ganlixin.UseCompletableFuture - this is thenApplyAsync, input:task1 return */ } }