[java] Custom thread pool in Java 8 parallel stream

Is it possible to specify a custom thread pool for Java 8 parallel stream? I can not find it anywhere.

Imagine that I have a server application and I would like to use parallel streams. But the application is large and multi-threaded so I want to compartmentalize it. I do not want a slow running task in one module of the applicationblock tasks from another module.

If I can not use different thread pools for different modules, it means I can not safely use parallel streams in most of real world situations.

Try the following example. There are some CPU intensive tasks executed in separate threads. The tasks leverage parallel streams. The first task is broken, so each step takes 1 second (simulated by thread sleep). The issue is that other threads get stuck and wait for the broken task to finish. This is contrived example, but imagine a servlet app and someone submitting a long running task to the shared fork join pool.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

This question is related to java concurrency parallel-processing java-8 java-stream

The answer is


Note: There appears to be a fix implemented in JDK 10 that ensures the Custom Thread Pool uses the expected number of threads.

Parallel stream execution within a custom ForkJoinPool should obey the parallelism https://bugs.openjdk.java.net/browse/JDK-8190974


The parallel streams use the default ForkJoinPool.commonPool which by default has one less threads as you have processors, as returned by Runtime.getRuntime().availableProcessors() (This means that parallel streams leave one processor for the calling thread).

For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.

This also means if you have nested parallel streams or multiple parallel streams started concurrently, they will all share the same pool. Advantage: you will never use more than the default (number of available processors). Disadvantage: you may not get "all the processors" assigned to each parallel stream you initiate (if you happen to have more than one). (Apparently you can use a ManagedBlocker to circumvent that.)

To change the way parallel streams are executed, you can either

  • submit the parallel stream execution to your own ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); or
  • you can change the size of the common pool using system properties: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") for a target parallelism of 20 threads. However, this no longer works after the backported patch https://bugs.openjdk.java.net/browse/JDK-8190974.

Example of the latter on my machine which has 8 processors. If I run the following program:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

The output is:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

So you can see that the parallel stream processes 8 items at a time, i.e. it uses 8 threads. However, if I uncomment the commented line, the output is:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

This time, the parallel stream has used 20 threads and all 20 elements in the stream have been processed concurrently.


To measure the actual number of used threads, you can check Thread.activeCount():

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

This can produce on a 4-core CPU an output like:

5 // common pool
23 // custom pool

Without .parallel() it gives:

3 // common pool
4 // custom pool

Alternatively to the trick of triggering the parallel computation inside your own forkJoinPool you can also pass that pool to the CompletableFuture.supplyAsync method like in:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);

I tried the custom ForkJoinPool as follows to adjust the pool size:

private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
    List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
    return () -> aList.parallelStream()
            .peek((i) -> {
                String threadName = Thread.currentThread().getName();
                ThreadNameSet.add(threadName);
            })
            .reduce(0L, Long::sum);
}

private static void testForkJoinPool() {
    final int parallelism = 10;

    ForkJoinPool forkJoinPool = null;
    Long result = 0L;
    try {
        forkJoinPool = new ForkJoinPool(parallelism);
        result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown(); //always remember to shutdown the pool
        }
    }
    out.println(result);
    out.println(ThreadNameSet);
}

Here is the output saying the pool is using more threads than the default 4.

50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

But actually there is a weirdo, when I tried to achieve the same result using ThreadPoolExecutor as follows:

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));

but I failed.

It will only start the parallelStream in a new thread and then everything else is just the same, which again proves that the parallelStream will use the ForkJoinPool to start its child threads.


We can change the default parallelism using the following property:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

which can set up to use more parallelism.


If you don't mind using a third-party library, with cyclops-react you can mix sequential and parallel Streams within the same pipeline and provide custom ForkJoinPools. For example

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

Or if we wished to continue processing within a sequential Stream

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[Disclosure I am the lead developer of cyclops-react]


Go to get AbacusUtil. Thread number can by specified for parallel stream. Here is the sample code:

LongStream.range(4, 1_000_000).parallel(threadNum)...

Disclosure: I'm the developer of AbacusUtil.


If you don't need a custom ThreadPool but you rather want to limit the number of concurrent tasks, you can use:

List<Path> paths = List.of("/path/file1.csv", "/path/file2.csv", "/path/file3.csv").stream().map(e -> Paths.get(e)).collect(toList());
List<List<Path>> partitions = Lists.partition(paths, 4); // Guava method

partitions.forEach(group -> group.parallelStream().forEach(csvFilePath -> {
       // do your processing   
}));

(Duplicate question asking for this is locked, so please bear me here)


The original solution (setting the ForkJoinPool common parallelism property) no longer works. Looking at the links in the original answer, an update which breaks this has been back ported to Java 8. As mentioned in the linked threads, this solution was not guaranteed to work forever. Based on that, the solution is the forkjoinpool.submit with .get solution discussed in the accepted answer. I think the backport fixes the unreliability of this solution also.

ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
        .forEach((int theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List<Integer> list = new ArrayList<>();
for (int theInt: array)
{
    list.add(theInt);
}
fjpool.submit(() -> list.parallelStream()
        .forEach((theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();

Here is how I set the max thread count flag mentioned above programatically and a code sniped to verify that the parameter is honored

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
Set<String> threadNames = Stream.iterate(0, n -> n + 1)
  .parallel()
  .limit(100000)
  .map(i -> Thread.currentThread().getName())
  .collect(Collectors.toSet());
System.out.println(threadNames);

// Output -> [ForkJoinPool.commonPool-worker-1, Test worker, ForkJoinPool.commonPool-worker-3]

If you don't want to rely on implementation hacks, there's always a way to achieve the same by implementing custom collectors that will combine map and collect semantics... and you wouldn't be limited to ForkJoinPool:

list.stream()
  .collect(parallel(i -> process(i), executor, 4))
  .join()

Luckily, it's done already here and available on Maven Central: http://github.com/pivovarit/parallel-collectors

Disclaimer: I wrote it and take responsibility for it.


you can try implementing this ForkJoinWorkerThreadFactory and inject it to Fork-Join class.

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }

you can use this constructor of Fork-Join pool to do this.

notes:-- 1. if you use this, take into consideration that based on your implementation of new threads, scheduling from JVM will be affected, which generally schedules fork-join threads to different cores(treated as a computational thread). 2. task scheduling by fork-join to threads won't get affected. 3. Haven't really figured out how parallel stream is picking threads from fork-join(couldn't find proper documentation on it), so try using a different threadNaming factory so as to make sure, if threads in parallel stream are being picked from customThreadFactory that you provide. 4. commonThreadPool won't use this customThreadFactory.


Until now, I used the solutions described in the answers of this question. Now, I came up with a little library called Parallel Stream Support for that:

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

But as @PabloMatiasGomez pointed out in the comments, there are drawbacks regarding the splitting mechanism of parallel streams which depends heavily on the size of the common pool. See Parallel stream from a HashSet doesn't run in parallel .

I am using this solution only to have separate pools for different types of work but I can not set the size of the common pool to 1 even if I don't use it.


Examples related to java

Under what circumstances can I call findViewById with an Options Menu / Action Bar item? How much should a function trust another function How to implement a simple scenario the OO way Two constructors How do I get some variable from another class in Java? this in equals method How to split a string in two and store it in a field How to do perspective fixing? String index out of range: 4 My eclipse won't open, i download the bundle pack it keeps saying error log

Examples related to concurrency

WAITING at sun.misc.Unsafe.park(Native Method) What is the Swift equivalent to Objective-C's "@synchronized"? Custom thread pool in Java 8 parallel stream How to check if another instance of my shell script is running How to use the CancellationToken property? What's the difference between a Future and a Promise? Why use a ReentrantLock if one can use synchronized(this)? NSOperation vs Grand Central Dispatch What's the difference between Thread start() and Runnable run() multiprocessing.Pool: When to use apply, apply_async or map?

Examples related to parallel-processing

Custom thread pool in Java 8 parallel stream How to do parallel programming in Python? Should I always use a parallel stream when possible? How to create threads in nodejs Shared-memory objects in multiprocessing How do I parallelize a simple Python loop? No ConcurrentList<T> in .Net 4.0? What are the differences between stateless and stateful systems, and how do they impact parallelism? How to abort a Task like aborting a Thread (Thread.Abort method)? Can Powershell Run Commands in Parallel?

Examples related to java-8

Default interface methods are only supported starting with Android N Class has been compiled by a more recent version of the Java Environment Why is ZoneOffset.UTC != ZoneId.of("UTC")? Modify property value of the objects in list using Java 8 streams How to use if-else logic in Java 8 stream forEach Android Studio Error: Error:CreateProcess error=216, This version of %1 is not compatible with the version of Windows you're running Error:could not create the Java Virtual Machine Error:A fatal exception has occured.Program will exit What are functional interfaces used for in Java 8? java.time.format.DateTimeParseException: Text could not be parsed at index 21 Java 8 lambda get and remove element from list

Examples related to java-stream

Sorting a list with stream.sorted() in Java Modify property value of the objects in list using Java 8 streams How to use if-else logic in Java 8 stream forEach Java 8 lambda get and remove element from list Create list of object from another using Java 8 Streams Java 8 Stream API to find Unique Object matching a property value Reverse a comparator in Java 8 Ignore duplicates when producing map using streams Modifying Objects within stream in Java8 while iterating How can I get a List from some class properties with Java 8 Stream?