2019-01-22 18:49:43 +01:00
|
|
|
import asyncio
|
|
|
|
import typing
|
|
|
|
import logging
|
2022-01-24 14:54:19 +01:00
|
|
|
|
|
|
|
from prometheus_client import Counter
|
|
|
|
|
2019-01-22 18:49:43 +01:00
|
|
|
if typing.TYPE_CHECKING:
|
2019-06-21 02:55:47 +02:00
|
|
|
from lbry.dht.node import Node
|
|
|
|
from lbry.extras.daemon.storage import SQLiteStorage
|
2019-01-22 18:49:43 +01:00
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
class BlobAnnouncer:
|
2022-01-24 14:54:19 +01:00
|
|
|
announcements_sent_metric = Counter(
|
|
|
|
"announcements_sent", "Number of announcements sent and their respective status.", namespace="dht_node",
|
|
|
|
labelnames=("peers", "error"),
|
|
|
|
)
|
|
|
|
|
2019-08-02 19:14:41 +02:00
|
|
|
def __init__(self, loop: asyncio.AbstractEventLoop, node: 'Node', storage: 'SQLiteStorage'):
|
2019-01-22 18:49:43 +01:00
|
|
|
self.loop = loop
|
|
|
|
self.node = node
|
|
|
|
self.storage = storage
|
|
|
|
self.announce_task: asyncio.Task = None
|
|
|
|
self.announce_queue: typing.List[str] = []
|
|
|
|
|
2019-05-07 06:33:15 +02:00
|
|
|
async def _submit_announcement(self, blob_hash):
|
|
|
|
try:
|
2022-01-24 14:54:19 +01:00
|
|
|
|
2019-05-07 06:33:15 +02:00
|
|
|
peers = len(await self.node.announce_blob(blob_hash))
|
2022-01-24 14:54:19 +01:00
|
|
|
self.announcements_sent_metric.labels(peers=peers, error=False).inc()
|
2019-05-07 06:33:15 +02:00
|
|
|
if peers > 4:
|
|
|
|
return blob_hash
|
|
|
|
else:
|
2019-08-21 21:58:34 +02:00
|
|
|
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
|
2019-05-07 06:33:15 +02:00
|
|
|
except Exception as err:
|
2022-01-24 14:54:19 +01:00
|
|
|
self.announcements_sent_metric.labels(peers=0, error=True).inc()
|
2020-01-10 18:27:56 +01:00
|
|
|
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
|
2019-05-07 15:59:33 +02:00
|
|
|
raise err
|
2019-05-07 06:33:15 +02:00
|
|
|
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
|
2019-01-22 18:49:43 +01:00
|
|
|
|
2019-05-07 06:33:15 +02:00
|
|
|
async def _announce(self, batch_size: typing.Optional[int] = 10):
|
|
|
|
while batch_size:
|
|
|
|
if not self.node.joined.is_set():
|
|
|
|
await self.node.joined.wait()
|
2019-05-12 08:39:11 +02:00
|
|
|
await asyncio.sleep(60, loop=self.loop)
|
2019-05-12 05:52:46 +02:00
|
|
|
if not self.node.protocol.routing_table.get_peers():
|
|
|
|
log.warning("No peers in DHT, announce round skipped")
|
|
|
|
continue
|
2019-05-07 06:33:15 +02:00
|
|
|
self.announce_queue.extend(await self.storage.get_blobs_to_announce())
|
|
|
|
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
|
2020-01-03 05:31:28 +01:00
|
|
|
while len(self.announce_queue) > 0:
|
2019-05-07 06:56:13 +02:00
|
|
|
log.info("%i blobs to announce", len(self.announce_queue))
|
2019-05-07 06:33:15 +02:00
|
|
|
announced = await asyncio.gather(*[
|
|
|
|
self._submit_announcement(
|
|
|
|
self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue
|
|
|
|
], loop=self.loop)
|
|
|
|
announced = list(filter(None, announced))
|
|
|
|
if announced:
|
|
|
|
await self.storage.update_last_announced_blobs(announced)
|
|
|
|
log.info("announced %i blobs", len(announced))
|
2019-01-22 18:49:43 +01:00
|
|
|
|
|
|
|
def start(self, batch_size: typing.Optional[int] = 10):
|
2019-05-07 06:33:15 +02:00
|
|
|
assert not self.announce_task or self.announce_task.done(), "already running"
|
|
|
|
self.announce_task = self.loop.create_task(self._announce(batch_size))
|
2019-01-22 18:49:43 +01:00
|
|
|
|
|
|
|
def stop(self):
|
2019-05-07 06:33:15 +02:00
|
|
|
if self.announce_task and not self.announce_task.done():
|
|
|
|
self.announce_task.cancel()
|