forked from LBRYCommunity/lbry-sdk
implement announcer as a consumer task on gather
This commit is contained in:
parent
ee36162b16
commit
2a698932da
1 changed files with 19 additions and 19 deletions
|
@ -28,21 +28,23 @@ class BlobAnnouncer:
|
||||||
self.announce_task: asyncio.Task = None
|
self.announce_task: asyncio.Task = None
|
||||||
self.announce_queue: typing.List[str] = []
|
self.announce_queue: typing.List[str] = []
|
||||||
self._done = asyncio.Event()
|
self._done = asyncio.Event()
|
||||||
|
self.announced = set()
|
||||||
|
|
||||||
async def _submit_announcement(self, blob_hash):
|
async def _run_consumer(self):
|
||||||
try:
|
while self.announce_queue:
|
||||||
|
try:
|
||||||
peers = len(await self.node.announce_blob(blob_hash))
|
blob_hash = self.announce_queue.pop()
|
||||||
self.announcements_sent_metric.labels(peers=peers, error=False).inc()
|
peers = len(await self.node.announce_blob(blob_hash))
|
||||||
if peers > 4:
|
self.announcements_sent_metric.labels(peers=peers, error=False).inc()
|
||||||
return blob_hash
|
if peers > 4:
|
||||||
else:
|
self.announced.add(blob_hash)
|
||||||
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
|
else:
|
||||||
except Exception as err:
|
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
|
||||||
self.announcements_sent_metric.labels(peers=0, error=True).inc()
|
except Exception as err:
|
||||||
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
self.announcements_sent_metric.labels(peers=0, error=True).inc()
|
||||||
raise err
|
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
||||||
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
|
raise err
|
||||||
|
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
|
||||||
|
|
||||||
async def _announce(self, batch_size: typing.Optional[int] = 10):
|
async def _announce(self, batch_size: typing.Optional[int] = 10):
|
||||||
while batch_size:
|
while batch_size:
|
||||||
|
@ -57,14 +59,12 @@ class BlobAnnouncer:
|
||||||
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
|
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
|
||||||
while len(self.announce_queue) > 0:
|
while len(self.announce_queue) > 0:
|
||||||
log.info("%i blobs to announce", len(self.announce_queue))
|
log.info("%i blobs to announce", len(self.announce_queue))
|
||||||
announced = await asyncio.gather(*[
|
await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop)
|
||||||
self._submit_announcement(
|
announced = list(filter(None, self.announced))
|
||||||
self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue
|
|
||||||
], loop=self.loop)
|
|
||||||
announced = list(filter(None, announced))
|
|
||||||
if announced:
|
if announced:
|
||||||
await self.storage.update_last_announced_blobs(announced)
|
await self.storage.update_last_announced_blobs(announced)
|
||||||
log.info("announced %i blobs", len(announced))
|
log.info("announced %i blobs", len(announced))
|
||||||
|
self.announced.clear()
|
||||||
self._done.set()
|
self._done.set()
|
||||||
self._done.clear()
|
self._done.clear()
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue