Completable-Future is used for asynchronous programming in Java. Asynchronous programming means writing non-blocking code by running a task on a separate thread than the main application thread and notifying the main thread about its progress, completion or failure.
This way, main thread does not block/wait for the completion of the task and it can execute other tasks in parallel.
Having this kind of parallelism greatly improves the performance of your programs.
CompletableFuture is an extension to Java’s Future API which was introduced in Java 5.
A Future is used as a reference to the result of an asynchronous computation. It provides an isDone() method to check whether the computation is done or not, and a get() method to retrieve the result of the computation when it is done.
SpringBoot APIs can leverage concurrency in its business logic using Async annotations and java.util.concurrent.CompletableFuture class. This would enable the application to run the annotated methods in a separate thread pool, asynchronously whenever called. Main thread that makes the call would not be blocked for each of those calls. How do we enable it and use it in a sample API? Read on.
The key steps behind enabling Async methods in Spring are here:
- Enable async processing in Spring Boot by annotation Spring Boot Application class with @EnableAsync.
AsyncConfiguration.java
|
@Configuration
@EnableAsync public class AsynchConfiguration { @Bean(name = “asyncExecutor”) public Executor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(3); executor.setMaxPoolSize(3); executor.setQueueCapacity(100); executor.setThreadNamePrefix(“AsynchThread-“); executor.initialize(); return executor; } } |
2. Create a ThreadPoolExecutor to run async methods in separate threads.
@Configuration
@EnableAsync
public class AsyncMethodConfigurer implements AsyncConfigurer{
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix(“AsyncGetUserInfo-“);
executor.initialize();
return executor;
}
3. Annotate the methods that need to be run in separate threads with @Async.
@Async(“asyncExecutor”)
public CompletableFuture<EmployeeNames> methodOne() throws InterruptedException {
//code
}
4. keep method return type wrapped into: CompletableFuture <T>
@Async
public CompletableFuture<User> findUser(String user) {
5. return the result as: pass exception block along. your try{ }catch would not work here.
return cmpletableFuture.completedFuture(results).exceptionally((throwable -> {
logger.error(” “+throwable);
return null;
}));
6. Handler sample :-
@RequestMapping(value = “/async-poc/userinfo”, method = RequestMethod.GET)
@ResponseBody
public List<User> getAsyncUserInfo(@RequestBody List<User> userList) throws InterruptedException, ExecutionException {
List<User> responseList =new LinkedList<>();
//Step 1: Collect input list for async methods
List<String> userNameList = userList.stream().map(User::getName)
.collect(Collectors.toList());
//Step 2: invoke async methods
List<CompletableFuture> futuresList =userNameList.stream().map(asyncUserInfoService::findUser)
.collect(Collectors.toList());
//Step 3: Get all results and Combine to Send back HTTP Response Entity
futuresList.stream().map(CompletableFuture::join).collect(Collectors.toList());
for (CompletableFuture completableFuture : futuresList) {
try {
log.info(“– > “+completableFuture.get());
responseList.add((User)completableFuture.get());
} catch (InterruptedException e) {
log.error(ex.getMessage());
} catch (ExecutionException e) {
log.error(ex.getMessage());
} catch (Exception e) {
log.error(ex.getMessage());
}
}
return responseList;
}
The key classes/methods used from java concurrency package:-
- CompletableFuture – Class that we will be using mainly here.
- exceptionally() ,handle(), whenComplete() – called after completion of your method. keep any error handling here.
- note: handle() and whenComplete have access to completable future’s success result (T) and failure (Throwable) as input arguments. On the other hand, method exceptionally() only has access to failure as an input argument.
- allOf (*) – Returns a new CF that is completed when all of the given CFs complete. Used by the main thread to wait until the async threads passed in, are completed and move forward with processing other things.
- join() – to receive the result of each method call. your actual return object that was wrapped in CF before.
REFERENCES: