//Java Concurrency – Multi Threading with ExecutorService

Multi Threading with ExecutorService

In this post we’ll look how the ExeutorService can be used to run multi-threaded asynchronous tasks. We’ll start by creating threads directly and then move on to explore the ExeutorService and how it can be used to simplify things.

Creating Threads Directly

Before the Executor API came along, developers were responsible for instantiating and managing threads directly. Let’s look at a simple example below.

/**
 * Call 2 expensive methods on separate threads 
 *    
 * @throws InterruptedException 
 */
public void doMultiThreadedWork() throws InterruptedException {
  
  /* create Runnable using anonymous inner class */
  Thread t1 = new Thread(new Runnable() { 
    public void run() {
      System.out.println("starting expensive task thread t1");
      doSomethingExpensive(); 
      System.out.println("finished expensive task thread t1");
    }
  });

  /* start processing on new threads */
  t1.start();

  /* block current thread until t1 has finished */
  t1.join();
}

In the method above we create a new Thread t1 and pass a Runnable to its constructor. An anonymous inner class implements Runnable where the run() method contains the logic that will be executed by the Thread when it is started. Note that if the code inside run() throws a checked Exception it must be caught and handled inside the method.

Thread t1 is started by calling its start() method. The JVM spawns a new process and executes the run() method in the context of the newly created Thread.  Be careful not to call the run() method directly as this will cause the run() method to execute in the context of the current thread.

The join() method is called to stop execution of the main thread until Thread t1 has terminated. Calling join() is only necessary if you want the main thread to wait for the spawned thread to terminate. Often this isn’t necessary, but in the example above we want t1 to complete before continuing.

Introducing the Executor Service

Dealing with threads directly can be cumbersome so Oracle simplified things by providing a layer of abstraction via its Executor API.  An Executor allows you to process tasks asynchronously without having to deal with threads directly.

Creating an Executor

The Executors factory class is used to create an instance of an Executor, either an ExecutorService or a ScheduledExecutorService. Some of the most common types of Executor are described below.

  • Executors.newCachedThreadPool() – An ExecutorService with a thread pool that creates new threads as required but reuses previously created threads as they become available.
  • Executors.newFixedThreadPool(int numThreads) – An ExecutorServicethat has a thread pool with a fixed number of threads. The numThreads parameter is the maximum number of threads that can be active in the ExecutorService at any one time. If the number of requests submitted to the pool exceeds the pool size, requests are queued until a thread becomes available.
  • Executors.newScheduledThreadPool(int numThreads) – A ScheduledExecutorServicewith a thread pool that is used to run tasks periodically or after a specified delay.
  • Executors.newSingleThreadExecutor() – An ExecutorService with a single thread. Tasks submitted will be executed one at a time and in the order submitted.
  • Executors.newSingleThreadScheduledExecutor() – An ExecutorService that uses a single thread to execute tasks periodically or after a specified delay.
The snippet below creates a fixed thread pool ExecutorService with a pool size of 2. I’ll use this ExecutorService in the sections that follow.
 ExecutorService executorService = Executors.newFixedThreadPool(2);

In the following sections we’ll look at how ExecutorService can be used to create and manage asynchronous tasks.

execute(Runnable)

The execute method takes a Runnable and is useful when you want to run a task and are not concerned about checking its status or obtaining a result. Think of it as fire and forget asynchronous task.

executorService.execute(()->{
  System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
  doSomethingExpensive();    
}

Unlike the first Thread example, which used an anonymous inner class, the example above creates a Runnable using a lambda expression. The Runnable will be executed as soon as a thread is available from the ExecutorService thread pool.

Future<?> submit(Runnable)

Like execute(), the submit() method also takes a Runnable but differs from execute()because it returns a Future. A Future is an object that represents the pending response from an asynchronous task. Think of it as a handle that can be used to check the status of the task or retrieve its result when the task completes. Futures use generics to allow you to specify the return type of the task. However, given that the Runnable  run() method has the return type void, the Future holds the status of the task rather than a pending result. This is represented as Future<?> in the example below.

Future<?> taskStatus = executorService.submit(()->{
  System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
  doSomethingExpensive();    
}

The submit(Runnable) method is useful when you want to run a task that doesn’t return a value but you’d like to check the status of the task after it’s been submitted to the ExecutorService.

Checking the Status of a Task

Future has a few useful methods for checking the status of a task that’s been submitted to the ExecutorService.

  • isCancelled() checks if the submitted task has already been cancelled.
  • isDone() checks if the submitted task has already completed. When a task has finished isDone will return true whether the task completed successfully, unsuccessfully or was cancelled.
  • cancel() cancels the submitted task. A boolean parameter specifies whether or not the task should be interrupted if it has already started.
/* check if both tasks have completed - if not sleep current thread 
 * for 1 second and check again
 */
while(!task1Future.isDone() || !task2Future.isDone()){
  System.out.println("Task 1 and Task 2 are not yet complete....sleeping");
  Thread.sleep(1000);
}

Future<T> submit(Callable)

The submitmethod is overloaded to take a Callable as well as a Runnable. Like a Runable, a Callable represents a task that is executed on another thread. A Callable differs from a Runable because it returns a value and can throw a checked Exception. The Callable interface has a single abstract method public T call() throws Exception and like  Runable can be implemented with an anonymous inner class or lambda. The return type of the call() method is used to type the  Future returned by the ExecutorService. Two code snippets below show how a  Callable can be created via an anonymous inner class and a lambda expression.

Future<Double> task1Future = executorService.submit(new Callable<Double>() {
  
  public Double call() throws Exception {

    System.out.println(String.format("starting expensive task thread %s", 
        Thread.currentThread().getName()));
    Double returnedValue = someExpensiveRemoteCall();

    return returnedValue;
  } 
});
Future<Double> task2Future = executorService.submit(()->{
  
  System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
  Double returnedValue = someExpensiveRemoteCall();

  return returnedValue;
});

Both examples create a Callable and pass it to the execute method. The  Callable is executed as soon as a thread is available.

Getting a Result from a Future

When a Callable is submitted to the ExecutorService we receive a Future with the return type of the  call() method. In the example above, call() returns a  Double so we get a  Future<Double>. One way of retrieving the result from a Future is by calling its get() method. get() will block indefinitely waiting on the submitted task to complete. If the task doesn’t complete or takes a long time to complete, the main application thread will remain blocked.

Waiting indefinitely for a result is usually not ideal. We’d rather have more control over how we retrieve the result and take some action if a task doesn’t complete within a certain amount of time. Luckily there’s an overloaded get(long timeout, TimeUnit unit) method that waits for the specified period of time and if the task hasn’t finished (result not available), throws a TimeoutException.

Double value1 = task1Future.get();
Double value2 = task2Future.get(4,  TimeUnit.SECONDS); // throws TimeoutException

Submitting Multiple Callables

As well as allowing you to submit of a single  Callable, the ExecutorService allows you to submit a  Collection of  Callable using the  invokeAll method. As you might expect, instead of returning a single Future, a  Collection of Futures is returned. A Future  is returned representing the pending result of each submitted task.

Collection<Callable<Double>> callables = new ArrayList<>();
IntStream.rangeClosed(1, 8).forEach(i-> {
  callables.add(createCallable());
});

/* invoke all supplied Callables */ 
List<Future<Double>> taskFutureList = executorService.invokeAll(callables);

/* call get on Futures to retrieve result when it becomes available.
 * If specified period elapses before result is returned a TimeoutException
 * is thrown
 */
for (Future<Double> future : taskFutureList) {

  /* get Double result from Future when it becomes available */
  Double value = future.get(4, TimeUnit.SECONDS);
  System.out.println(String.format("TaskFuture returned value %s", value)); 
}

The code snippet above submits 8 Callable to the  ExecutorService and retrieves a List containing 8 Future . The list of Future returned is in the same order as the  Callables were submitted. Note that submitting multiple  Callables  will require the size of the thread pool to be tweaked if we want most or all of the submitted tasks can be executed in parallel. In the example above we’d need a thread pool with 8 threads to run all tasks in parallel.

Shutting Down the ExecutorService

After all the tasks have completed its important to shut down the ExecutorService gracefully so that resources used can be reclaimed. There are 2 methods available,  shutDown() and shutDownNow().   shutDown() triggers a shutdown of the ExecutorService allowing currently processing tasks to finish but rejecting newly submitted tasks.

shutDownNow() also triggers a shutdown of the ExecutorService, but does not allow currently executing tasks to complete and attempts to terminate them immediately.  shutDownNow() returns a list of tasks that were queued for execution when the shutdown was initiated.  To ensure the ExecutorService is shutdown in all cases and to avoid potential resource leaks,  it’s important that shutDown() or shutDownNow() is called inside a finally block.

ExecutorService executorService = null;

try{ 
  executorService = Executors.newFixedThreadPool(2);

  executorService.execute(()->{
    System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
    doSomethingExpensive(); 
  });

}
finally{
  executorService.shutdown(); 
}

Wrapping Up

In this post, we looked at the ExecutorService and how it can be used to simplify the creation and management of asynchronous tasks.  The source code that accompanies this post is available on Github so why not pull the code and have a play around. As always, feel free to post comments or questions below.

By |2019-06-20T08:21:26+01:00March 28th, 2017|Core Java|0 Comments

Leave A Comment