Getty Images

Tip

How to use parallel streams in Java with virtual threads

Virtual threads in Java currently lack integration with the stream API, particularly for parallel streams. Here's how a JDK 22 preview feature addresses the problem.

Modern operating systems can support extraordinarily large volumes of users, but run into limitations with threads to support them due to CPU and memory constraints. Java historically has dealt with this problem partly through a powerful concurrency API and thread pooling.

Now Java developers are abuzz about virtual threads, a feature introduced in Java 21. Java virtual threads are much lighter and more efficient than traditional operating system threads, and excel at handling blocking operations.

Unfortunately, despite their advantages, virtual threads currently lack support in a critical area: direct integration into the stream API, especially in parallel streams that are a go-to tool for parallelizing CPU-bound tasks.

So, the natural question arises: Can we combine the strengths of virtual threads and parallel streams for maximum efficiency?

Consider a scenario that requires resizing a directory full of images. One must traverse the directory, identify all JPEG files and resize them to desired dimensions. The Java code to achieve this, shown below, is quite straightforward:

package ca.bazlur;

import javax.imageio.ImageIO;
import java.awt.image.BufferedImage;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.stream.Stream;

public class ImageProcessor {
    public void processImage(String path) {
        try (Stream<Path> paths = Files.walk(Path.of(path))) {
            paths.filter(Files::isRegularFile)
                .filter(p -> p.toString().endsWith(".jpeg") || p.toString().endsWith(".jpg"))
                .forEach(this::resizeAndSaveImage);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void resizeAndSaveImage(Path p) {
        try {
            BufferedImage image = ImageIO.read(p.toFile());
            BufferedImage resizedImage = resize(image);
            ImageIO.write(resizedImage, "jpeg", p.toFile());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public BufferedImage resize(BufferedImage originalImage) {
        // Implement your resize code here
        // Currently, it just returns the original image unchanged
        return originalImage;
    }
}

While this works well in simple cases, it's inherently sequential. If there is a large volume of images, processing takes considerable time. An obvious solution might be to use parallel streams, like so:

paths.parallel()
    .filter(Files::isRegularFile)

Parallel streams utilize the fork/join pool which defaults to the number of available cores. This can help improve performance, but it's limited by the pool size and the fact that this is I/O-bound work. Threads can spend significant time blocked, waiting for file operations to complete.

Available starting with JDK 21, virtual threads are a game changer for handling blocking I/O calls. They're lightweight and excel at concurrent tasks such as described above. Unfortunately, the stream API doesn't yet support virtual threads directly.

Fortunately, JDK 22 released in March 2024 includes a preview feature, JEP 473, that introduces a solution to this problem. The new mapConcurrent method enables concurrent processing with virtual threads, as the following code shows:

public static <T, R> Gatherer<T, ?, R> mapConcurrent(int maxConcurrency, 
            Function<? super T, ? extends R> mapper)

According to the documentation, this API takes advantage of virtual threads. Using this method, we can write the following:

paths.gather(Gatherers.mapConcurrent(100, p -> p))
    .filter(Files::isRegularFile)
    .filter(p -> p.toString().endsWith(".jpeg") || p.toString().endsWith(".jpg"))
    .forEach(this::resizeAndSaveImage);

By adjusting maxConcurrency, we control the number of concurrent tasks based on the system's capacity. With this new feature, processing large volumes of images is much easier and more efficient.

The Java ecosystem is constantly evolving, and the inability to directly use virtual threads with parallel streams is likely a temporary limitation. As virtual threads mature, we can expect better integration with existing APIs, which will open up new possibilities for optimization and performance.

A N M Bazlur Rahman is a Java Champion and staff software developer at DNAstack. He is also founder and moderator of the Java User Group in Bangladesh.

Dig Deeper on Core Java APIs and programming techniques

App Architecture
Software Quality
Cloud Computing
Security
SearchAWS
Close