check if this works for you,
public <T,S,K,V> ResponseObject<Collection<ResponseObject<T>>> runOnScheduler(ThreadPoolExecutor threadPoolExecutor,
int parallelismLevel, TimeUnit timeUnit, int timeToCompleteEachTask, Collection<S> collection,
Map<K,V> context, Task<T,S,K,V> someTask){
if(threadPoolExecutor==null){
return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("threadPoolExecutor can not be null").build();
}
if(someTask==null){
return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("Task can not be null").build();
}
if(CollectionUtils.isEmpty(collection)){
return ResponseObject.<Collection<ResponseObject<T>>>builder().errorCode("500").errorMessage("input collection can not be empty").build();
}
LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue = new LinkedBlockingQueue<>(collection.size());
collection.forEach(value -> {
callableLinkedBlockingQueue.offer(()->someTask.perform(value,context)); //pass some values in callable. which can be anything.
});
LinkedBlockingQueue<Future<T>> futures = new LinkedBlockingQueue<>();
int count = 0;
while(count<parallelismLevel && count < callableLinkedBlockingQueue.size()){
Future<T> f = threadPoolExecutor.submit(callableLinkedBlockingQueue.poll());
futures.offer(f);
count++;
}
Collection<ResponseObject<T>> responseCollection = new ArrayList<>();
while(futures.size()>0){
Future<T> future = futures.poll();
ResponseObject<T> responseObject = null;
try {
T response = future.get(timeToCompleteEachTask, timeUnit);
responseObject = ResponseObject.<T>builder().data(response).build();
} catch (InterruptedException e) {
future.cancel(true);
} catch (ExecutionException e) {
future.cancel(true);
} catch (TimeoutException e) {
future.cancel(true);
} finally {
if (Objects.nonNull(responseObject)) {
responseCollection.add(responseObject);
}
futures.remove(future);//remove this
Callable<T> callable = getRemainingCallables(callableLinkedBlockingQueue);
if(null!=callable){
Future<T> f = threadPoolExecutor.submit(callable);
futures.add(f);
}
}
}
return ResponseObject.<Collection<ResponseObject<T>>>builder().data(responseCollection).build();
}
private <T> Callable<T> getRemainingCallables(LinkedBlockingQueue<Callable<T>> callableLinkedBlockingQueue){
if(callableLinkedBlockingQueue.size()>0){
return callableLinkedBlockingQueue.poll();
}
return null;
}
you can restrict the no of thread uses from scheduler as well as put timeout on the task.