There’s something about… ThreadPoolExecutor

There’s definitely a lot to say about Executors in Java and ThreadPoolExecutor as a particular case, but what I recently ran into was a simple issue. An application gets a large stream of data from source to destination; it does so concurrently so as to speed up the process. Problem was, when I added the parallelism, the application started running out of memory very soon.

Well, what do you do when you have an OutOfMemory exception?

You profile!

Using jvisualvm and extended logging I found out the culprit. One of the stages of the data preparation process, after it’s been read from the source, was slower than others. So, the thread pool with tasks that I had was quickly growing and eating up my precious memory.

Well, the pool I used was supposed to be fixed, and it was created like this:

ExecutorService pool = Executors.newFixedThreadPool(threadCount);

But reading the documentation again (always an exciting thing to do!), we find the following:

Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.

What does it mean?
It means that the pool won’t process more than a fixed number of threads at a time, but what it won’t process it will keep submitting into the wait queue, and the queue is unlimited.

Great, you say, let’s limit the queue!

        LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(maxThreads);

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(threadCount,

maxThreads,

0,

TimeUnit.SECONDS,

workQueue);

return threadPoolExecutor;

We run the app again and this time, we have a nasty exception as soon as the pool reaches its limit: a RejectedExecutionException.
What has just happened?
Well, the pool can’t accept new tasks because the queue has no space left, and so it rejects them.
What we need here is basically to have a way for the calling thread to not submit the task while there’s no space for it, i.e. make the method, submitting to the pool, to be a blocking one.
But how?
Turns out there’s a way to do it without complex workarounds, though maybe not a very direct one: a rejection policy.

We can set a RejectedExecutionHandler for the thread pool. There are four standard policies:

Seriously, I can’t imagine why anyone would want to use #3 and #4. As to the #1, we have just seen it in action and didn’t like it. #2, however, looks promising. If the calling thread is running the task on its own, then it won’t try to submit anything else until it is done, right? And hopefully this will give the pool some time to offload the tasks.

        LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(maxThreads);

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(threadCount,

maxThreads,

0,

TimeUnit.SECONDS,

workQueue);

threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

return threadPoolExecutor;

And it turns out that this really works as expected.

Advertisements

About Maryna Cherniavska

I have productively spent 10+ years in IT industry, designing, developing, building and deploying desktop and web applications, designing database structures and otherwise proving that females have a place among software developers. And this is a good place.
This entry was posted in java, Programming, Uncategorized and tagged . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s