Waiting on a list of Future

160

I have a method which returns a List of futures

List<Future<O>> futures = getFutures();

Now I want to wait until either all futures are done processing successfully or any of the tasks whose output is returned by a future throws an exception. Even if one task throws an exception, there is no point in waiting for the other futures.

Simple approach would be to

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}

But the problem here is if, for example, the 4th future throws an exception, then I will wait unnecessarily for the first 3 futures to be available.

How to solve this? Will count down latch help in any way? I'm unable to use Future isDone because the java doc says

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.

This question is tagged with java multithreading future

~ Asked on 2013-10-13 17:42:24

The Best Answer is


133

You can use a CompletionService to receive the futures as soon as they are ready and if one of them throws an exception cancel the processing. Something like this:

Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService = 
       new ExecutorCompletionService<SomeResult>(executor);

//4 tasks
for(int i = 0; i < 4; i++) {
   completionService.submit(new Callable<SomeResult>() {
       public SomeResult call() {
           ...
           return result;
       }
   });
}

int received = 0;
boolean errors = false;

while(received < 4 && !errors) {
      Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
      try {
         SomeResult result = resultFuture.get();
         received ++;
         ... // do something with the result
      }
      catch(Exception e) {
             //log
         errors = true;
      }
}

I think you can further improve to cancel any still executing tasks if one of them throws an error.

~ Answered on 2013-10-13 18:00:42


117

If you are using Java 8 then you can do this easier with CompletableFuture and CompletableFuture.allOf, which applies the callback only after all supplied CompletableFutures are done.

// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.

public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
    CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    return CompletableFuture.allOf(cfs)
            .thenApply(ignored -> futures.stream()
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList())
            );
}

~ Answered on 2016-03-28 11:55:50


Most Viewed Questions: