The CompletionService will take your Callables with the .submit() method and you can retrieve the computed futures with the .take() method.
One thing you must not forget is to terminate the ExecutorService by calling the .shutdown() method. Also you can only call this method when you have saved a reference to the executor service so make sure to keep one.
Example code - For a fixed number of work items to be worked on in parallel:
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletionService<YourCallableImplementor> completionService =
new ExecutorCompletionService<YourCallableImplementor>(service);
ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();
for (String computeMe : elementsToCompute) {
futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;
while(received < elementsToCompute.size()) {
Future<YourCallableImplementor> resultFuture = completionService.take();
YourCallableImplementor result = resultFuture.get();
received ++;
}
//important: shutdown your ExecutorService
service.shutdown();
Example code - For a dynamic number of work items to be worked on in parallel:
public void runIt(){
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service);
ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>();
//Initial workload is 8 threads
for (int i = 0; i < 9; i++) {
futures.add(completionService.submit(write.new CallableImplementor()));
}
boolean finished = false;
while (!finished) {
try {
Future<CallableImplementor> resultFuture;
resultFuture = completionService.take();
CallableImplementor result = resultFuture.get();
finished = doSomethingWith(result.getResult());
result.setResult(null);
result = null;
resultFuture = null;
//After work package has been finished create new work package and add it to futures
futures.add(completionService.submit(write.new CallableImplementor()));
} catch (InterruptedException | ExecutionException e) {
//handle interrupted and assert correct thread / work packet count
}
}
//important: shutdown your ExecutorService
service.shutdown();
}
public class CallableImplementor implements Callable{
boolean result;
@Override
public CallableImplementor call() throws Exception {
//business logic goes here
return this;
}
public boolean getResult() {
return result;
}
public void setResult(boolean result) {
this.result = result;
}
}