refactor announcer

This commit is contained in:
Victor Shyba 2019-05-07 01:33:15 -03:00
parent 84b471d486
commit dc4560cc9a

View file

@ -13,55 +13,42 @@ class BlobAnnouncer:
self.loop = loop self.loop = loop
self.node = node self.node = node
self.storage = storage self.storage = storage
self.pending_call: asyncio.Handle = None
self.announce_task: asyncio.Task = None self.announce_task: asyncio.Task = None
self.running = False
self.announce_queue: typing.List[str] = [] self.announce_queue: typing.List[str] = []
async def _submit_announcement(self, blob_hash):
try:
peers = len(await self.node.announce_blob(blob_hash))
if peers > 4:
return blob_hash
else:
log.warning("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err:
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
async def _announce(self, batch_size: typing.Optional[int] = 10): async def _announce(self, batch_size: typing.Optional[int] = 10):
if not batch_size: while batch_size:
return
if not self.node.joined.is_set(): if not self.node.joined.is_set():
await self.node.joined.wait() await self.node.joined.wait()
blob_hashes = await self.storage.get_blobs_to_announce() self.announce_queue.extend(await self.storage.get_blobs_to_announce())
if blob_hashes: log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
self.announce_queue.extend(blob_hashes)
log.info("%i blobs to announce", len(blob_hashes))
batch = []
while len(self.announce_queue): while len(self.announce_queue):
cnt = 0 log.warning("%i blobs to announce", len(self.announce_queue))
announced = [] announced = await asyncio.gather(*[
while self.announce_queue and cnt < batch_size: self._submit_announcement(
blob_hash = self.announce_queue.pop() self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue
announced.append(blob_hash) ], loop=self.loop)
batch.append(self.node.announce_blob(blob_hash)) announced = list(filter(None, announced))
cnt += 1 if announced:
to_await = []
while batch:
to_await.append(batch.pop())
if to_await:
await asyncio.gather(*tuple(to_await), loop=self.loop)
await self.storage.update_last_announced_blobs(announced) await self.storage.update_last_announced_blobs(announced)
log.info("announced %i blobs", len(announced)) log.info("announced %i blobs", len(announced))
if self.running: await asyncio.sleep(60)
self.pending_call = self.loop.call_later(60, self.announce, batch_size)
def announce(self, batch_size: typing.Optional[int] = 10):
self.announce_task = self.loop.create_task(self._announce(batch_size))
def start(self, batch_size: typing.Optional[int] = 10): def start(self, batch_size: typing.Optional[int] = 10):
if self.running: assert not self.announce_task or self.announce_task.done(), "already running"
raise Exception("already running") self.announce_task = self.loop.create_task(self._announce(batch_size))
self.running = True
self.announce(batch_size)
def stop(self): def stop(self):
self.running = False if self.announce_task and not self.announce_task.done():
if self.pending_call:
if not self.pending_call.cancelled():
self.pending_call.cancel()
self.pending_call = None
if self.announce_task:
if not (self.announce_task.done() or self.announce_task.cancelled()):
self.announce_task.cancel() self.announce_task.cancel()
self.announce_task = None