51 lines
1.5 KiB
Python
51 lines
1.5 KiB
Python
|
import itertools
|
||
|
import logging
|
||
|
|
||
|
from twisted.internet import defer
|
||
|
|
||
|
|
||
|
log = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
class DeferredPool(defer.Deferred):
|
||
|
def __init__(self, deferred_iter, pool_size):
|
||
|
self.deferred_iter = deferred_iter
|
||
|
self.pool_size = pool_size
|
||
|
# results are stored unordered
|
||
|
self.result_list = []
|
||
|
self.started_count = 0
|
||
|
self.total_count = None
|
||
|
defer.Deferred.__init__(self)
|
||
|
|
||
|
for deferred in itertools.islice(deferred_iter, pool_size):
|
||
|
self._start_one(deferred)
|
||
|
|
||
|
def _start_one(self, deferred):
|
||
|
deferred.addCallbacks(self._callback, self._callback,
|
||
|
callbackArgs=(self.started_count, defer.SUCCESS),
|
||
|
errbackArgs=(self.started_count, defer.FAILURE))
|
||
|
self.started_count += 1
|
||
|
|
||
|
def _callback(self, result, index, success):
|
||
|
self.result_list.append((index, success, result))
|
||
|
if self._done():
|
||
|
self._finish()
|
||
|
else:
|
||
|
self._process_next()
|
||
|
return result
|
||
|
|
||
|
def _done(self):
|
||
|
return self.total_count == len(self.result_list)
|
||
|
|
||
|
def _finish(self):
|
||
|
result_list = [(s, r) for i, s, r in sorted(self.result_list)]
|
||
|
self.callback(result_list)
|
||
|
|
||
|
def _process_next(self):
|
||
|
try:
|
||
|
deferred = next(self.deferred_iter)
|
||
|
except StopIteration:
|
||
|
self.total_count = self.started_count
|
||
|
else:
|
||
|
self._start_one(deferred)
|