Capability of concurrent processing is no longer a nice to have quality, but rather a must have one in the today’s software applications. While almost all the modern programming languages provide different semantics to achieve this capability, when it comes to Java, the JVM threads are mostly concerned with concurrency.
Therefore Java provides many utilities to handle this concurrency related matters under its java.util.concurrent package. One of the most important such utility is an executorservice called ThreadPoolExecutor, which has the capability to accept asynchronous concurrent tasks and carry out them using a pool of threads that it maintains underneath. Most importantly, this ExecutorService takes care of the maintenance tasks of the thread pool such as spawning new threads and evicting idle ones, while the developers only have to provide the parameters that control the size and scalability factors of the pool.
The 3 most important such parameters are the core pool size, maximum pool size and the work queue. According to the Java documentation, following are the definitions of those parameters.
- corePoolSize — the number of threads to keep in the pool, even if they are idle
- maximumPoolSize — the maximum number of threads to allow in the pool
- workQueue — the queue to use for holding tasks before they are executed
So let’s take an example scenario and see how the default Java thread pool executor will behave under a load of concurrent tasks.
If you are already aware of how the default thread pool executor works and you are here just to know whether scale-first is a myth or a reality, you can skip the next section and head on to the one after that.
How it works?
First of all, let’s configure an instance of Java’s ThreadPoolExecutor (which will be referred as Thread Pool here onwards) and observe how it behaves when a set of concurrent tasks are submitted. Assume that this thread pool is configured with a core size of 20, max size of 100, a bounded work queue of size 50 and all the other parameters are kept to default values.
So when the thread pool is started, by default it will start with no threads. When we submit it the first task, the first worker thread will be spawned and the task will be handed over to that thread. As long as the current worker thread count is lesser than the configured core pool size, a new worker thread will be spawned for each newly submitted task, even though some of the previously created core threads may be in the idle state.
What will happen when the worker thread count reaches the core pool size? As soon as the number of worker threads reaches the core pool size (20 as of our example), we can observe that the thread pool stops creating new worker threads. When we submit more and more tasks, one of the following behaviours can be observed.
Behaviour 1
- As long as there are any idle core threads (worker threads that have been created earlier, but already finished their assigned tasks), they will take up the new tasks and execute them.
Behaviour 2
- If there are no idle core threads available, each new task will be enqueued into the defined work queue until a core thread becomes available to accept it.
- In case the work queue becomes full with still not enough idle core threads to handle the tasks, then thread pool resumes to create new worker threads and the new tasks will be carried out by them.
- Once the worker thread count reached the max pool size, thread pool again stops creating new worker threads and any task that is submitted after this point will be rejected.
In case you got lost in the above details, the following flow chart will be helpful to grab the idea 🙂
The Problem
So when we look at those behaviours, although the 1st behaviour is completely acceptable, the 2nd behaviour is not so. While it might be perfectly sensible for some use cases, this is not the ideal behavior for a system that could expect sudden peaks, yet having the ability to scale as soon as it hits the peak without waiting for the system to be stabilized. While one could possibly argue that there should be more core threads in that case, the number of threads in the system under the usual operation (non peak) should be a fair amount, due to obvious contention issues caused by higher thread counts.
We at Adroitlogic, also encountered this same problem when we were designing Project-X, which is the core framework of our integration product stack. Since all our integration products are performance critical, we needed to get rid of this queue-first behaviour of the thread pool executor and make it scale-first. So after trying out several alternative approaches, we came across the following implementation which brought the expected results.
The Solution
Before going into the implementation details, let’s see what is the expected
Understanding the implementation of the current behaviour
So as the first step, let’s go through Java’s ThreadPoolExecutor class to find out how this queue-first behavior is implemented and the following code block from its execute() method looks interesting.
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
The first if block (at line 2) is responsible for creating a new core thread for the newly submitted task if the current worker thread count is lesser than the core pool size. Since we are fine with that behaviour, let’s move to the next if block.
Simply put (ignoring some rechecks performed to preserve consistency of the thread pool), what the second if block (at line 7) does is, it invokes the offer() method of the work queue with the task to be executed.
By default, when the offer() method of a BlockingQueue is invoked, it tries to enqueue the provided element (the task in this case) and returns true if successful. If the queue does not have enough space left to store the element, it will simply return false.
So in this case, if the queue has enough space, the task will be queued. If there are any idle worker threads available, they will be looking at this queue and one of them will quickly grab this task.
Otherwise, the execution will be moved to the final else if block (at line 14). That code block will first try to create a new worker thread to handle the task and if it failed (due to max pool size limit is reached), it will simply reject the task.
Modifying the behaviour of the offer() method
So from what we saw here, we can understand that by modifying the default behaviour of this offer() method of the work queue, we can solve a major part of this issue. Conceptually what we have to do is to first check whether there are any idle worker threads in the pool. If there are any, we can try to enqueue the task so one of those threads can pick it up from there. Otherwise, we should return false from offer method, which will make the thread pool to create a new worker thread for the task.
The following 2 diagrams show the default behaviour of the offer() method and our modified behaviour.
So assuming that an AtomicInteger with the name currentIdleThreadCountcontains the number of current idle threads, our new offer() method looks like below.
@Override
public boolean offer(Runnable e) {
return currentIdleThreadCount.get() > 0 && super.offer(e);
}
But unfortunately, there is no straightforward way without introducing a performance bottleneck for the work queue to get the current idle worker thread count of the thread pool. So now we need to implement a way to keep track of that too.
Keeping track of idle worker thread count
To implement this, basically we need to identify a point where the status of a worker thread changes from idle to busy or vice versa. If we go back to the code of the ThreadPoolExecutor class, we can see that the getTask() method performs exactly that. Each idle thread executes this method to acquire a new task to be executed and the following code block plays an important part of that.
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
Just to be clear, the timed variable here basically indicates whether the thread pool is currently in a state where it can evict some of the idle threads. This can be true either if the thread pool has more worker threads than the core pool size or if the thread pool is configured to allow evicting idle core threads.
First of all, let’s consider the scenario when timed is false. In this case, the thread will call the take() method of the work queue. So it is obvious that any thread that comes into the take() method is currently idle and therefore we can override the implementation of this method in our work queue to increment the idleThreadCount at the beginning. Then we can call the actual take() method, which could result in one of the following two scenarios.
- If there are no tasks in the queue, the thread will block at this call until a task is available. So it is still in idle state and our incremented counter value is correct.
- If there is any task, then it will be returned by the method call. So now this thread is no longer idle and we can decrement our counter at this point.
So our overridden take() method looks like below.
@Override
public Runnable take() throws InterruptedException {
currentIdleThreadCount.incrementAndGet();
Runnable take = super.take();
currentIdleThreadCount.decrementAndGet();
return take;
}
Then let’s consider the other scenario where timed is true. In this case, the thread will call the poll(long timeout, TimeUnit unit) method of the work queue with a timeout value. So here also it is obvious that any thread that comes into the poll() method is currently idle and therefore we can override the implementation of this method in our work queue to increment the idleThreadCount at the beginning. Then we can call the actual poll()method, which could result in one of the following two scenarios.
- If there are no tasks in the queue, the thread will wait for this call for the provided timeout and then return with null. By this time, the thread will be timed-out and will be evicted soon from the pool reducing the number of idle threads by 1. So we can decrement our counter at this point.
- If there is any task, then it will be returned by the method call. So now this thread is no longer idle and we can decrement our counter at this point too.
So our overridden poll(long timeout, TimeUnit unit) method looks like below.
@Override
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
currentIdleThreadCount.incrementAndGet();
Runnable poll = super.poll(timeout, unit);
currentIdleThreadCount.decrementAndGet();
return poll;
}
Therefore with these new implementations of offer(), take() and poll(long timeout, TimeUnit unit) methods, now our thread pool will be scaled up when it has no idle worker threads to take up new tasks.
Introducing queueing after the scaling
Now do we have implemented our scale-first executor service? Unfortunately not.
Our current implementation is a scale-only ExecutorService, which will only try to scale-up and once it came to the max pool size, it will reject the tasks instead of trying to queue them. So let’s fix that as well.
Fixing that is quite easy and simple. The thread pool executor provides the flexibility to configure a RejectedExecutionHandler which will be called whenever a task is rejected from the thread pool. So we can implement a custom such rejection handler which will first tries to put the rejected task back to the work queue. If work queue cannot accept the task (i.e. if the queue is full), then it will call the original rejection handler, which will either throw a RejectionException or will handle the rejection according to a user defined logic.
public class ReEnqueuePolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.getQueue().add(r)) {
rejectedExecutionHandler.rejectedExecution(r, executor);
}
}
}
Please note that, rejectedExecutionHandler variable holds the original rejection handler of the thread pool. In order to guarantee the correctness of this implementation, we also need to override the add() method of the work queue as below.
@Override
public boolean add(Runnable runnable) {
return super.offer(runnable);
}
Now the implementation is all done and dusted and we have got a perfect and very much real scale-first executor service on top of Java’s ThreadPoolExecutor.