There’s definitely a lot to say about Executors in Java and ThreadPoolExecutor as a particular case, but what I recently ran into was a simple issue. An application gets a large stream of data from source to destination; it does so concurrently so as to speed up the process. Problem was, when I added the parallelism, the application started running out of memory very soon.
Well, what do you do when you have an OutOfMemory exception?
You profile!
Using jvisualvm and extended logging I found out the culprit. One of the stages of the data preparation process, after it’s been read from the source, was slower than others. So, the thread pool with tasks that I had was quickly growing and eating up my precious memory.
Well, the pool I used was supposed to be fixed, and it was created like this:
But reading the documentation again (always an exciting thing to do!), we find the following:
Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.
What does it mean?
It means that the pool won’t process more than a fixed number of threads at a time, but what it won’t process it will keep submitting into the wait queue, and the queue is unlimited.
Great, you say, let’s limit the queue!
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(threadCount,
maxThreads,
0,
TimeUnit.SECONDS,
workQueue);
return threadPoolExecutor;
We run the app again and this time, we have a nasty exception as soon as the pool reaches its limit: a RejectedExecutionException.
What has just happened?
Well, the pool can’t accept new tasks because the queue has no space left, and so it rejects them.
What we need here is basically to have a way for the calling thread to not submit the task while there’s no space for it, i.e. make the method, submitting to the pool, to be a blocking one.
But how?
Turns out there’s a way to do it without complex workarounds, though maybe not a very direct one: a rejection policy.
We can set a RejectedExecutionHandler for the thread pool. There are four standard policies:
- ThreadPoolExecutor.AbortPolicy – this one aborts the task and throws an exception we just saw;
- ThreadPoolExecutor.CallerRunsPolicy – this one makes the calling thread to run the task instead of creating a new thread for it;
- ThreadPoolExecutor.DiscardOldestPolicy – this one discards the oldest task in the pool and replaces it with ours;
- ThreadPoolExecutor.DiscardPolicy – and this one just silently rejects a discarded task and noone is the wiser.
Seriously, I can’t imagine why anyone would want to use #3 and #4. As to the #1, we have just seen it in action and didn’t like it. #2, however, looks promising. If the calling thread is running the task on its own, then it won’t try to submit anything else until it is done, right? And hopefully this will give the pool some time to offload the tasks.
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(threadCount,
maxThreads,
0,
TimeUnit.SECONDS,
workQueue);
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return threadPoolExecutor;
And it turns out that this really works as expected.