@@ -225,66 +225,97 @@ def __init__(self):
225225 # end by this time in any case
226226 self .deadline = 0
227227 self .maxLen = 0
228+ # during shutdown, wait up to 20 seconds to finish uploading
229+ self .shutdownWait = 20
230+ # forget tracking objects after 60 seconds
231+ self .objectWait = 60
232+ # wait 10 seconds between clears
233+ self .clearDelay = 10
234+ self .lastCleared = time .time ()
228235
229236 def add (self , objectHash = None ):
230237 with self .lock :
231238 # add a new object into existing thread lists
232239 if objectHash :
233240 if objectHash not in self .hashes :
234- self .hashes [objectHash ] = []
241+ self .hashes [objectHash ] = { 'created' : time . time (), 'sendCount' : 0 , 'peers' : []}
235242 for thread in threadingEnumerate ():
236243 if thread .isAlive () and hasattr (thread , 'peer' ) and \
237- thread .peer not in self .hashes [objectHash ]:
238- self .hashes [objectHash ].append (thread .peer )
244+ thread .peer not in self .hashes [objectHash ][ 'peers' ] :
245+ self .hashes [objectHash ][ 'peers' ] .append (thread .peer )
239246 # add all objects into the current thread
240247 else :
241248 for objectHash in self .hashes :
242- if current_thread ().peer not in self .hashes [objectHash ]:
243- self .hashes [objectHash ].append (current_thread ().peer )
249+ if current_thread ().peer not in self .hashes [objectHash ][ 'peers' ] :
250+ self .hashes [objectHash ][ 'peers' ] .append (current_thread ().peer )
244251
245252 def len (self ):
253+ self .clearHashes ()
246254 with self .lock :
247- return sum (len (self .hashes [x ]) > 0 for x in self .hashes )
255+ return sum (1
256+ for x in self .hashes if (self .hashes [x ]['created' ] + self .objectWait < time .time () or
257+ self .hashes [x ]['sendCount' ] == 0 ))
248258
249259 def _progress (self ):
250260 with self .lock :
251- return float (sum (len (self .hashes [x ]) for x in self .hashes ))
261+ return float (sum (len (self .hashes [x ]['peers' ])
262+ for x in self .hashes if (self .hashes [x ]['created' ] + self .objectWait < time .time ()) or
263+ self .hashes [x ]['sendCount' ] == 0 ))
252264
253- def progress (self , throwDeadline = True ):
265+ def progress (self , raiseDeadline = True ):
254266 if self .maxLen < self ._progress ():
255267 self .maxLen = self ._progress ()
256268 if self .deadline < time .time ():
257- if self .deadline > 0 and throwDeadline :
269+ if self .deadline > 0 and raiseDeadline :
258270 raise PendingUploadDeadlineException
259271 self .deadline = time .time () + 20
260272 try :
261273 return 1.0 - self ._progress () / self .maxLen
262274 except ZeroDivisionError :
263275 return 1.0
264276
265- def delete (self , objectHash ):
277+ def clearHashes (self , objectHash = None ):
278+ if objectHash is None :
279+ if self .lastCleared > time .time () - self .clearDelay :
280+ return
281+ objects = self .hashes .keys ()
282+ else :
283+ objects = objectHash ,
284+ with self .lock :
285+ for i in objects :
286+ try :
287+ if self .hashes [i ]['sendCount' ] > 0 and (
288+ len (self .hashes [i ]['peers' ]) == 0 or
289+ self .hashes [i ]['created' ] + self .objectWait < time .time ()):
290+ del self .hashes [i ]
291+ except KeyError :
292+ pass
293+ self .lastCleared = time .time ()
294+
295+ def delete (self , objectHash = None ):
266296 if not hasattr (current_thread (), 'peer' ):
267297 return
298+ if objectHash is None :
299+ return
268300 with self .lock :
269- if objectHash in self .hashes and current_thread ().peer in self .hashes [objectHash ]:
270- self .hashes [objectHash ].remove (current_thread ().peer )
271- if len (self .hashes [objectHash ]) == 0 :
272- del self .hashes [objectHash ]
301+ try :
302+ if objectHash in self .hashes and current_thread ().peer in self .hashes [objectHash ]['peers' ]:
303+ self .hashes [objectHash ]['sendCount' ] += 1
304+ self .hashes [objectHash ]['peers' ].remove (current_thread ().peer )
305+ except KeyError :
306+ pass
307+ self .clearHashes (objectHash )
273308
274309 def stop (self ):
275310 with self .lock :
276311 self .hashes = {}
277312
278313 def threadEnd (self ):
279- while True :
280- try :
281- with self .lock :
282- for objectHash in self .hashes :
283- if current_thread ().peer in self .hashes [objectHash ]:
284- self .hashes [objectHash ].remove (current_thread ().peer )
285- if len (self .hashes [objectHash ]) == 0 :
286- del self .hashes [objectHash ]
287- except (KeyError , RuntimeError ):
288- pass
289- else :
290- break
314+ with self .lock :
315+ for objectHash in self .hashes :
316+ try :
317+ if current_thread ().peer in self .hashes [objectHash ]['peers' ]:
318+ self .hashes [objectHash ]['peers' ].remove (current_thread ().peer )
319+ except KeyError :
320+ pass
321+ self .clearHashes ()
0 commit comments