Skip to content

Commit e49452e

Browse files
Logical Thread Pool.
1 parent 0c0e15c commit e49452e

3 files changed

Lines changed: 198 additions & 0 deletions

File tree

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package com.github.bordertech.taskmaster.logical;
2+
3+
import com.github.bordertech.config.Config;
4+
import java.io.Serializable;
5+
import java.util.concurrent.Semaphore;
6+
import java.util.concurrent.TimeUnit;
7+
import org.apache.commons.logging.Log;
8+
import org.apache.commons.logging.LogFactory;
9+
10+
/**
11+
* User session logical thread pool details.
12+
* <p>
13+
* Can be used to control a logical pool of threads per user session.
14+
* </p>
15+
*/
16+
public class LogicalThreadPool implements Serializable {
17+
18+
private static final int DEFAULT_WAIT_INTERVAL_MILLISECONDS = Config.getInstance().getInt("bordertech.tm.logicalthreadpool.wait.interval", 300);
19+
private static final int DEFAULT_MAX_WAIT_INTERVALS = Config.getInstance().getInt("bordertech.tm.logicalthreadpool.wait.max.intervals", 200);
20+
21+
private static final Log LOGGER = LogFactory.getLog(LogicalThreadPool.class);
22+
private final String name;
23+
private final int max;
24+
private final Semaphore semaphore;
25+
private boolean shutdown;
26+
27+
/**
28+
* @param name thread pool name
29+
* @param max the maximum threads. Zero means no limit.
30+
*/
31+
public LogicalThreadPool(final String name, final int max) {
32+
this.name = name == null ? "default" : name;
33+
this.max = max > 0 ? max : 0;
34+
this.semaphore = new Semaphore(this.max, true);
35+
}
36+
37+
/**
38+
* @return the thread pool name
39+
*/
40+
public final String getName() {
41+
return name;
42+
}
43+
44+
/**
45+
* @return the maximum threads
46+
*/
47+
public final int getMax() {
48+
return max;
49+
}
50+
51+
/**
52+
* @return the current threads in use
53+
*/
54+
public int getCurrent() {
55+
return max - semaphore.availablePermits();
56+
}
57+
58+
/**
59+
* @return true if thread pool is shutdown
60+
*/
61+
public synchronized boolean isShutdown() {
62+
return shutdown;
63+
}
64+
65+
/**
66+
* Shutdown the thread pool.
67+
*/
68+
public void shutdownPool() {
69+
setShutdown(true);
70+
}
71+
72+
/**
73+
* Start the thread pool.
74+
*/
75+
public void startPool() {
76+
setShutdown(false);
77+
}
78+
79+
/**
80+
* Called when a thread finishes processing.
81+
*/
82+
public void finished() {
83+
if (getCurrent() > 0) {
84+
semaphore.release();
85+
}
86+
}
87+
88+
/**
89+
* @return true if thread pool has available threads
90+
*/
91+
public boolean getAccess() {
92+
checkPoolStatus();
93+
if (max <= 0) {
94+
return true;
95+
}
96+
// No block
97+
return semaphore.tryAcquire();
98+
}
99+
100+
/**
101+
* @param timeout the wait interval
102+
* @param unit the interval unit
103+
* @return true if the thread pool has available threads
104+
*/
105+
public boolean getAccess(final long timeout, final TimeUnit unit) {
106+
checkPoolStatus();
107+
if (max <= 0) {
108+
return true;
109+
}
110+
boolean result = false;
111+
try {
112+
result = semaphore.tryAcquire(timeout, unit);
113+
} catch (InterruptedException e) {
114+
// Restore interrupted state...
115+
Thread.currentThread().interrupt();
116+
throw new IllegalStateException("Interrupted while trying to gain access to thread pool ["
117+
+ getName() + "].", e);
118+
}
119+
return result;
120+
}
121+
122+
/**
123+
* Wait till a thread is available.
124+
* <p>
125+
* Must call finished on pool to release the thread in the pool.
126+
* <p>
127+
*/
128+
public void waitAccess() {
129+
waitAccess(DEFAULT_WAIT_INTERVAL_MILLISECONDS, DEFAULT_MAX_WAIT_INTERVALS);
130+
}
131+
132+
/**
133+
* Wait till a thread is available.
134+
* <p>
135+
* Must call finished on pool to release the thread in the pool.
136+
* <p>
137+
*
138+
* @param waitInterval the wait interval in milliseconds
139+
* @param maxWaitIntervals the max number of waits
140+
*/
141+
public void waitAccess(final int waitInterval, final int maxWaitIntervals) {
142+
checkPoolStatus();
143+
int counts = 0;
144+
while (!getAccess(waitInterval, TimeUnit.MILLISECONDS)) {
145+
if (LOGGER.isDebugEnabled()) {
146+
LOGGER.debug("Waiting for thread pool [" + getName() + "]. Max: "
147+
+ getMax() + " Current: " + getCurrent() + ".");
148+
}
149+
if (counts++ > maxWaitIntervals) {
150+
throw new IllegalStateException("Maximum attempts to get a thread in pool ["
151+
+ getName() + "] exceeded.");
152+
}
153+
}
154+
if (LOGGER.isDebugEnabled()) {
155+
LOGGER.debug("Thread is available in pool " + getName() + ".");
156+
}
157+
}
158+
159+
/**
160+
* Check if the pool is OK for processing.
161+
*/
162+
protected void checkPoolStatus() {
163+
if (isShutdown()) {
164+
throw new IllegalStateException("Thread pool is shutdown for processing.");
165+
}
166+
}
167+
168+
/**
169+
* @param shutdown true if pool shutdown for processing
170+
*/
171+
protected synchronized void setShutdown(final boolean shutdown) {
172+
this.shutdown = shutdown;
173+
}
174+
175+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.github.bordertech.taskmaster.logical;
2+
3+
/**
4+
* Component that controls a logical thread pool.
5+
*/
6+
public interface LogicalThreadPoolController {
7+
8+
/**
9+
*
10+
* @return true if thread available
11+
*/
12+
boolean acquireThread();
13+
14+
/**
15+
* Release the thread back to the pool.
16+
*/
17+
void releaseThread();
18+
19+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Logical thread pool that can be used per user session.
3+
*/
4+
package com.github.bordertech.taskmaster.logical;

0 commit comments

Comments
 (0)