2222import java .lang .Long ;
2323import java .util .AbstractMap ;
2424import java .util .HashMap ;
25+ import java .util .Iterator ;
2526import java .util .Map ;
2627import java .util .concurrent .Future ;
2728import java .util .concurrent .TimeUnit ;
@@ -52,7 +53,25 @@ public class ClientImpl<T extends ServiceDefinition> implements Client<T> {
5253 private final WeakReference <Node > nodeReference ;
5354 private long handle ;
5455 private final String serviceName ;
55- private Map <Long , Map .Entry <Consumer , ResponseFuture >> pendingRequests ;
56+
57+ private class PendingRequest
58+ {
59+ public Consumer callback ;
60+ public ResponseFuture future ;
61+ public long requestTimestamp ;
62+
63+ public PendingRequest (
64+ final Consumer callback ,
65+ final ResponseFuture future ,
66+ final long requestTimestamp )
67+ {
68+ this .callback = callback ;
69+ this .future = future ;
70+ this .requestTimestamp = requestTimestamp ;
71+ }
72+ }
73+
74+ private Map <Long , PendingRequest > pendingRequests ;
5675
5776 private final ServiceDefinition serviceDefinition ;
5877
@@ -66,7 +85,7 @@ public ClientImpl(
6685 this .handle = handle ;
6786 this .serviceName = serviceName ;
6887 this .serviceDefinition = serviceDefinition ;
69- this .pendingRequests = new HashMap <Long , Map . Entry < Consumer , ResponseFuture > >();
88+ this .pendingRequests = new HashMap <Long , PendingRequest >();
7089 }
7190
7291 public ServiceDefinition getServiceDefinition () {
@@ -88,8 +107,7 @@ public void accept(Future<V> input) {}
88107 request .getDestructorInstance (), request );
89108 ResponseFuture <V > future = new ResponseFuture <V >(sequenceNumber );
90109
91- Map .Entry <Consumer , ResponseFuture > entry =
92- new AbstractMap .SimpleEntry <Consumer , ResponseFuture >(callback , future );
110+ PendingRequest entry = new PendingRequest (callback , future , System .nanoTime ());
93111 pendingRequests .put (sequenceNumber , entry );
94112 return future ;
95113 }
@@ -98,20 +116,44 @@ public void accept(Future<V> input) {}
98116 public final <V extends MessageDefinition > boolean
99117 removePendingRequest (ResponseFuture <V > future ) {
100118 synchronized (pendingRequests ) {
101- Map . Entry < Consumer , ResponseFuture > entry = pendingRequests .remove (
119+ PendingRequest entry = pendingRequests .remove (
102120 future .getRequestSequenceNumber ());
103121 return entry != null ;
104122 }
105123 }
106124
125+ public final long
126+ prunePendingRequests () {
127+ synchronized (pendingRequests ) {
128+ long size = pendingRequests .size ();
129+ pendingRequests .clear ();
130+ return size ;
131+ }
132+ }
133+
134+ public final long
135+ prunePendingRequestsOlderThan (long nanoTime ) {
136+ synchronized (pendingRequests ) {
137+ Iterator <Map .Entry <Long , PendingRequest >> iter = pendingRequests .entrySet ().iterator ();
138+ long removed = 0 ;
139+ while (iter .hasNext ()) {
140+ if (iter .next ().getValue ().requestTimestamp < nanoTime ) {
141+ iter .remove ();
142+ ++removed ;
143+ }
144+ }
145+ return removed ;
146+ }
147+ }
148+
107149 public final <U extends MessageDefinition > void handleResponse (
108150 final RMWRequestId header , final U response ) {
109151 synchronized (pendingRequests ) {
110152 long sequenceNumber = header .sequenceNumber ;
111- Map . Entry < Consumer , ResponseFuture > entry = pendingRequests .remove (sequenceNumber );
153+ PendingRequest entry = pendingRequests .remove (sequenceNumber );
112154 if (entry != null ) {
113- Consumer <Future > callback = entry .getKey () ;
114- ResponseFuture <U > future = entry .getValue () ;
155+ Consumer <Future > callback = entry .callback ;
156+ ResponseFuture <U > future = entry .future ;
115157 future .set (response );
116158 callback .accept (future );
117159 return ;
0 commit comments