lbry-sdk/lbry/dht/blob_announcer.py

81 lines
3.4 KiB
Python
Raw Permalink Normal View History

2019-01-22 12:49:43 -05:00
import asyncio
import typing
import logging
2022-01-24 11:20:48 -03:00
from prometheus_client import Counter, Gauge
2019-01-22 12:49:43 -05:00
if typing.TYPE_CHECKING:
2019-06-20 20:55:47 -04:00
from lbry.dht.node import Node
from lbry.extras.daemon.storage import SQLiteStorage
2019-01-22 12:49:43 -05:00
log = logging.getLogger(__name__)
class BlobAnnouncer:
announcements_sent_metric = Counter(
"announcements_sent", "Number of announcements sent and their respective status.", namespace="dht_node",
labelnames=("peers", "error"),
)
2022-01-24 11:20:48 -03:00
announcement_queue_size_metric = Gauge(
"announcement_queue_size", "Number of hashes waiting to be announced.", namespace="dht_node",
labelnames=("scope",)
)
2019-08-02 13:14:41 -04:00
def __init__(self, loop: asyncio.AbstractEventLoop, node: 'Node', storage: 'SQLiteStorage'):
2019-01-22 12:49:43 -05:00
self.loop = loop
self.node = node
self.storage = storage
self.announce_task: asyncio.Task = None
self.announce_queue: typing.List[str] = []
self._done = asyncio.Event()
self.announced = set()
2019-01-22 12:49:43 -05:00
async def _run_consumer(self):
while self.announce_queue:
try:
blob_hash = self.announce_queue.pop()
peers = len(await self.node.announce_blob(blob_hash))
self.announcements_sent_metric.labels(peers=peers, error=False).inc()
if peers > 4:
self.announced.add(blob_hash)
else:
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err:
self.announcements_sent_metric.labels(peers=0, error=True).inc()
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
2019-01-22 12:49:43 -05:00
2019-05-07 01:33:15 -03:00
async def _announce(self, batch_size: typing.Optional[int] = 10):
while batch_size:
if not self.node.joined.is_set():
await self.node.joined.wait()
await asyncio.sleep(60, loop=self.loop)
if not self.node.protocol.routing_table.get_peers():
log.warning("No peers in DHT, announce round skipped")
continue
2019-05-07 01:33:15 -03:00
self.announce_queue.extend(await self.storage.get_blobs_to_announce())
2022-01-24 11:20:48 -03:00
self.announcement_queue_size_metric.labels(scope="global").set(len(self.announce_queue))
2019-05-07 01:33:15 -03:00
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
2020-01-03 01:31:28 -03:00
while len(self.announce_queue) > 0:
log.info("%i blobs to announce", len(self.announce_queue))
await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop)
announced = list(filter(None, self.announced))
2019-05-07 01:33:15 -03:00
if announced:
await self.storage.update_last_announced_blobs(announced)
log.info("announced %i blobs", len(announced))
self.announced.clear()
self._done.set()
self._done.clear()
2019-01-22 12:49:43 -05:00
def start(self, batch_size: typing.Optional[int] = 10):
2019-05-07 01:33:15 -03:00
assert not self.announce_task or self.announce_task.done(), "already running"
self.announce_task = self.loop.create_task(self._announce(batch_size))
2019-01-22 12:49:43 -05:00
def stop(self):
2019-05-07 01:33:15 -03:00
if self.announce_task and not self.announce_task.done():
self.announce_task.cancel()
def wait(self):
return self._done.wait()