Skip to content

Commit 0c0e15c

Browse files
ExecutorService Thread Pool Impl
1 parent 78fccc0 commit 0c0e15c

File tree

3 files changed

+178
-0
lines changed

3 files changed

+178
-0
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.github.bordertech.taskmaster.pool;
2+
3+
import com.github.bordertech.taskmaster.RejectedTaskException;
4+
import com.github.bordertech.taskmaster.TaskFuture;
5+
import com.github.bordertech.taskmaster.TaskMaster;
6+
import com.github.bordertech.taskmaster.impl.TaskFutureWrapper;
7+
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.Future;
9+
import java.util.concurrent.RejectedExecutionException;
10+
import javax.inject.Singleton;
11+
12+
/**
13+
* TaskMaster implementation that allows for Thread Pools.
14+
*/
15+
@Singleton
16+
public class TaskMasterPoolImpl implements TaskMaster {
17+
18+
@Override
19+
public void shutdown() {
20+
shutdownNow();
21+
}
22+
23+
/**
24+
* TODO This needs to be put in TaskMaster.
25+
*/
26+
public void shutdownNow() {
27+
// Provide an immediate shutdown of threads without running the waiting threads.
28+
TaskMasterPoolUtil.shutdown();
29+
}
30+
31+
@Override
32+
public <T> TaskFuture<T> submit(final Runnable task, final T result) throws RejectedTaskException {
33+
return submit(task, result, TaskMasterPoolUtil.DEFAULT_POOL);
34+
}
35+
36+
@Override
37+
public <T> TaskFuture<T> submit(final Runnable task, final T result, final String pool) throws RejectedTaskException {
38+
ExecutorService exec = TaskMasterPoolUtil.getPool(pool);
39+
try {
40+
Future<T> future = exec.submit(task, result);
41+
return new TaskFutureWrapper<>(future);
42+
} catch (RejectedExecutionException e) {
43+
throw new RejectedTaskException("Unable to start task in pool [" + pool + "].", e);
44+
}
45+
}
46+
47+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package com.github.bordertech.taskmaster.pool;
2+
3+
import com.github.bordertech.config.Config;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
import java.util.concurrent.ArrayBlockingQueue;
7+
import java.util.concurrent.BlockingQueue;
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.Executors;
10+
import java.util.concurrent.LinkedBlockingQueue;
11+
import java.util.concurrent.SynchronousQueue;
12+
import java.util.concurrent.ThreadPoolExecutor;
13+
import java.util.concurrent.TimeUnit;
14+
15+
/**
16+
* TaskMaster ExecutorService thread pool utility.
17+
*/
18+
public final class TaskMasterPoolUtil {
19+
20+
private static final Map<String, ExecutorService> THREAD_POOLS = new HashMap<>();
21+
private static final String TP_PARAM_PREFIX = "bordertech.taskmaster.pool.";
22+
private static final int DEFAULT_MAX_THREADS = 20;
23+
private static final int DEFAULT_QUEUE_LENGTH = 0;
24+
25+
/**
26+
* Default thread pool name.
27+
*/
28+
public static final String DEFAULT_POOL = Config.getInstance().getString(TP_PARAM_PREFIX + "default", "default");
29+
30+
static {
31+
// Load thread pools
32+
String[] pools = Config.getInstance().getStringArray(TaskMasterPoolUtil.TP_PARAM_PREFIX + "names");
33+
for (String pool : pools) {
34+
THREAD_POOLS.put(pool, TaskMasterPoolUtil.buildPool(pool));
35+
}
36+
// Check if default pool needs to be created
37+
if (!THREAD_POOLS.containsKey(TaskMasterPoolUtil.DEFAULT_POOL)) {
38+
THREAD_POOLS.put(TaskMasterPoolUtil.DEFAULT_POOL, TaskMasterPoolUtil.buildPool(TaskMasterPoolUtil.DEFAULT_POOL));
39+
}
40+
}
41+
42+
/**
43+
* Private constructor to prevent instantiation.
44+
*/
45+
private TaskMasterPoolUtil() {
46+
// Do nothing
47+
}
48+
49+
/**
50+
* Build a thread pool with the given name.
51+
*
52+
* @param pool the pool name to create
53+
* @return the executor service
54+
*/
55+
public static ExecutorService buildPool(final String pool) {
56+
// TODO Logging and performance parameters
57+
// http://www.nurkiewicz.com/2014/11/executorservice-10-tips-and-tricks.html
58+
59+
// Get the pool type - defaults to cached
60+
String type = Config.getInstance().getString(TP_PARAM_PREFIX + pool + ".type", "cached");
61+
switch (type.toLowerCase()) {
62+
case "single":
63+
return Executors.newSingleThreadExecutor();
64+
case "fixed":
65+
// Number of fixed threads
66+
int max = Config.getInstance().getInt(TP_PARAM_PREFIX + pool + ".max", DEFAULT_MAX_THREADS);
67+
if (max < 1) {
68+
max = DEFAULT_MAX_THREADS;
69+
}
70+
// Length of pending queue
71+
int queue = Config.getInstance().getInt(TP_PARAM_PREFIX + pool + ".queue", DEFAULT_QUEUE_LENGTH);
72+
// Create executable with the appropriate queue type
73+
BlockingQueue<Runnable> blkQueue;
74+
if (queue < 0) {
75+
// Unlimited
76+
blkQueue = new LinkedBlockingQueue<>();
77+
} else if (queue == 0) {
78+
// No queue
79+
blkQueue = new SynchronousQueue<>();
80+
} else {
81+
// Fixed queue length
82+
blkQueue = new ArrayBlockingQueue<>(queue);
83+
}
84+
return new ThreadPoolExecutor(max, max, 0L, TimeUnit.MILLISECONDS, blkQueue);
85+
default:
86+
// Default - Unlimited Threads and No Queue
87+
return Executors.newCachedThreadPool();
88+
}
89+
}
90+
91+
/**
92+
* Shutdown the thread pools.
93+
*/
94+
public static void shutdown() {
95+
for (ExecutorService exec : THREAD_POOLS.values()) {
96+
// TODO This logic needs to be put into TaskMaster
97+
exec.shutdownNow();
98+
}
99+
}
100+
101+
/**
102+
* Retrieve the thread pool for the given name.
103+
*
104+
* @param poolName the thread pool name
105+
* @return the thread pool for the given name.
106+
*/
107+
public static synchronized ExecutorService getPool(final String poolName) {
108+
String name = poolName == null ? TaskMasterPoolUtil.DEFAULT_POOL : poolName;
109+
ExecutorService pool = THREAD_POOLS.get(name);
110+
if (pool == null) {
111+
throw new IllegalStateException("Pool [" + name + "] has not been defined.");
112+
}
113+
// TODO This logic needs to be put into TaskMaster
114+
// Check if terminated (reactivate)
115+
if (pool.isTerminated()) {
116+
// TODO LOG Warning and maybe control this via flag
117+
// Check not interrupted for some reason (maybe server shutting down)
118+
if (Thread.currentThread().isInterrupted()) {
119+
throw new IllegalStateException("Pool [" + name + "] has terminated and thread is interrupted.");
120+
}
121+
pool = TaskMasterPoolUtil.buildPool(name);
122+
THREAD_POOLS.put(name, pool);
123+
}
124+
return pool;
125+
}
126+
127+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* TaskMaster using ExecutorService and Thread Pools.
3+
*/
4+
package com.github.bordertech.taskmaster.pool;

0 commit comments

Comments
 (0)