Tuesday 2 May 2017

Implement Thread pool in Java

What is ThreadPool?
ThreadPool is a pool of threads which reuses a fixed number of threads to execute tasks.

At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.
ThreadPool implementation internally uses LinkedBlockingQueue for adding and removing tasks.



How ThreadPool works?
We will instantiate ThreadPool, in ThreadPool’s constructor nThreads number of threads are created and started.

/* Create ThreadPool of Size#4. */
ThreadPool pool = new ThreadPool(4);
Here 4 threads will be created and started in ThreadPool. Then, threads will enter run() method of PoolWorker class and will call poll() method on queue.

If tasks are available thread will execute task by entering run() method of task else waits for tasks to become available. (As tasks executed always implements Runnable).

public void run() {
       Runnable task;
       while(true) {
              synchronized (queue) {
                     while (queue.isEmpty()) {
                           try {
                                  queue.wait();
                           } catch (InterruptedException e) {
                                  System.out.println("An error while queue is
                                  waiting: " + e.getMessage());
                           }
                     }
                     task = queue.poll();
              }

              // If we don't catch RuntimeException, the pool could leak threads
              try {
                     task.run();
              } catch (RuntimeException e) {
                     System.out.println("Thread pool is interrupted: "
                            + e.getMessage());
              }
       }
}

When tasks are added?
When execute() method of ThreadPool is called, it internally calls add() method on queue to add tasks.

      public void execute(Task task) {
            synchronized (queue) {
                  queue.add(task);
                  queue.notify();
            }
      }
Once tasks are available all waiting threads are notified that task is available.

In the above code, we used notify() instead of notifyAll(). Because notify() has more desirable performance characteristics than notifyAll(); in particular, notify() causes many fewer context switches, which is important in a server application. But it is important to make sure when using notify() in other situation as there are subtle risks associated with using notify(), and it is only appropriate to use it under certain specific conditions.

How threads in ThreadPool can be stopped?
shutDown() method can be used to stop threads executing in ThreadPool, once shutdown of ThreadPool is initiated, previously submitted tasks are executed, but no new tasks could be accepted.

After thread has executed task
Check whether pool shutDown has been initiated or not, if pool shutDown has been initiated and
queue does not contain any unExecuted task (i.e. queue size is 0) than interrupt() the thread.

public void run() {
       Runnable task;
       while(true) {
              …………………………………………………………………
              …………………………………………………………………
              …………………………………………………………………
              …………………………………………………………………
              /*
               * 1) Check whether pool shutDown has been initiated or not,
               * If pool shutDown has been initiated
               * AND
               * 2) queue does not contain any unExecuted task(i.e. queue's size is 0)
               *            than  interrupt() the thread.
               */
              if(this.threadPool.isPoolShutDownInitiated()
                           &&  this.threadPool.queue.size()==0){
                     this.interrupt();
                     /*
                      *  Interrupting basically sends a message to the thread
                      *  indicating it has been interrupted but it doesn't cause
                      *  a thread to stop immediately,
                      *  If sleep is called, thread immediately throws
                      *  InterruptedException
                      */
                     try {
                           Thread.sleep(1);
                     } catch (InterruptedException e) {
                           System.out.println("InterruptedException while calling sleep.");
                     } 
              }
       }
}

Effective use of ThreadPools
Thread pool is a powerful mechanism for structuring multithreaded applications, but it is not without risk. Applications built with thread pools could have all the same concurrency risks as any other multithreaded applications, such as deadlock, resource thrashing, synchronization or concurrency errors, thread leakage and request overload.
Some important points:
1. Do not queue tasks which wait synchronously for other tasks as this can cause a deadlock.
2. If the task requires waiting for a resource such as I/O, specify a maximum wait time and then fail or requeue the task execution. This guarantees that some progress will be made by freeing the thread for another task that might complete successfully.
3. Tune the thread pool size effectively, and understand that having too few threads or too many threads both can cause problems. The optimum size of a thread pool depends on the number of available processors and the nature of the tasks on the work queue.

ThreadPool implemention:
ThreadPool.java
package com.thread;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadPool {

      private final int nThreads;
      private final PoolWorker[] threads;
      private final LinkedBlockingQueue<Task> queue;
      private boolean poolShutDownInitiated;

      public ThreadPool(int nThreads) {
            this.nThreads = nThreads;
            this.threads = new PoolWorker[this.nThreads];
            this.queue = new LinkedBlockingQueue<Task>();

            for (int i = 0; i < nThreads; i++) {
                  threads[i] = new PoolWorker(this);
                  threads[i].start();
            }
      }

      public void execute(Task task) {
            synchronized (queue) {
                  queue.add(task);
                  queue.notify();
            }
      }

      public boolean isPoolShutDownInitiated() {
            return poolShutDownInitiated;
      }

      /**
       * Initiates shutdown of ThreadPool, previously submitted tasks
       * are executed, but no new tasks will be accepted.
       */
      public synchronized void shutdown(){
            this.poolShutDownInitiated = true;
            System.out.println("ThreadPool SHUTDOWN initiated.");
      }



      private class PoolWorker extends Thread {
            private ThreadPool threadPool;
            public PoolWorker(ThreadPool threadPool) {
                  this.threadPool = threadPool;
            }

            public void run() {
                  Runnable task;
                  while(true) {
                        synchronized (queue) {
                              while (queue.isEmpty()) {
                                    try {
                                          queue.wait();
                                    } catch (InterruptedException e) {
                                          System.out.println("Error while queue is waiting:" + e.getMessage());
                                    }
                              }
                              task = queue.poll();
                        }

                        /* If we don't catch RuntimeException, the pool could leak threads*/
                        try {
                              task.start();
                        } catch (RuntimeException e) {
                              System.out.println("Thread pool is interrupted: " + e.getMessage());
                        }

                        /*
                         * 1) Check whether pool shutDown has been initiated or not,
                         * if pool shutDown has been initiated
                         * AND
                         * 2) queue does not contain any unExecuted task (i.e. queue's size is 0)
                         *          than  interrupt() the thread.
                         */
                        if(this.threadPool.isPoolShutDownInitiated()
                                    &&  this.threadPool.queue.size()==0){
                              this.interrupt();
                              /*
                               *  Interrupting basically sends a message to the thread
                               *  indicating it has been interrupted but it doesn't cause
                               *  a thread to stop immediately,
                               *
                               *  if sleep is called, thread immediately throws
                               *  InterruptedException
                               */
                              try {
                                    Thread.sleep(1);
                              } catch (InterruptedException e) {
                                    System.out.println("InterruptedException while calling sleep.");
                              } 
                        }
                  }
            }
      }
}

Task.java
package com.thread;
public class Task implements Runnable {

    private int num;

    public Task(int n) {
        num = n;
    }

    public void run() {
        System.out.println("Task " + num + " is running.");
    }
}

ThreadPoolTest.java
package com.thread;

/**
 * Test Thread Pool Scheduler
 * @author rajesh.dixit
 */
public class ThreadPoolTest {

    public static void main(String[] args) {
     
      /* Create ThreadPool of Size#4. */
        ThreadPool pool = new ThreadPool(4);

        for (int i = 0; i < 5; i++) {
            Task task = new Task(i);
            pool.execute(task);
        }
    }
}

No comments:

Post a Comment

Related Posts Plugin for WordPress, Blogger...