forked from LBRYCommunity/lbry-sdk
use tracker on download
This commit is contained in:
parent
3989eef84b
commit
30e8728f7f
|
@ -1,4 +1,6 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import ipaddress
|
||||||
|
import time
|
||||||
import typing
|
import typing
|
||||||
import logging
|
import logging
|
||||||
import binascii
|
import binascii
|
||||||
|
@ -8,6 +10,8 @@ from lbry.error import DownloadSDTimeoutError
|
||||||
from lbry.utils import lru_cache_concurrent
|
from lbry.utils import lru_cache_concurrent
|
||||||
from lbry.stream.descriptor import StreamDescriptor
|
from lbry.stream.descriptor import StreamDescriptor
|
||||||
from lbry.blob_exchange.downloader import BlobDownloader
|
from lbry.blob_exchange.downloader import BlobDownloader
|
||||||
|
from lbry.torrent.tracker import get_peer_list
|
||||||
|
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from lbry.conf import Config
|
from lbry.conf import Config
|
||||||
from lbry.dht.node import Node
|
from lbry.dht.node import Node
|
||||||
|
@ -36,6 +40,7 @@ class StreamDownloader:
|
||||||
self.added_fixed_peers = False
|
self.added_fixed_peers = False
|
||||||
self.time_to_descriptor: typing.Optional[float] = None
|
self.time_to_descriptor: typing.Optional[float] = None
|
||||||
self.time_to_first_bytes: typing.Optional[float] = None
|
self.time_to_first_bytes: typing.Optional[float] = None
|
||||||
|
self.next_tracker_announce_time = None
|
||||||
|
|
||||||
async def cached_read_blob(blob_info: 'BlobInfo') -> bytes:
|
async def cached_read_blob(blob_info: 'BlobInfo') -> bytes:
|
||||||
return await self.read_blob(blob_info, 2)
|
return await self.read_blob(blob_info, 2)
|
||||||
|
@ -62,6 +67,32 @@ class StreamDownloader:
|
||||||
fixed_peers = await get_kademlia_peers_from_hosts(self.config.fixed_peers)
|
fixed_peers = await get_kademlia_peers_from_hosts(self.config.fixed_peers)
|
||||||
self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _add_fixed_peers, fixed_peers)
|
self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _add_fixed_peers, fixed_peers)
|
||||||
|
|
||||||
|
async def refresh_from_trackers(self, save_peers=True):
|
||||||
|
if not self.config.tracker_servers:
|
||||||
|
return
|
||||||
|
node_id = self.node.protocol.node_id if self.node else None
|
||||||
|
port = self.config.tcp_port
|
||||||
|
for server in self.config.tracker_servers:
|
||||||
|
try:
|
||||||
|
announcement = await get_peer_list(
|
||||||
|
bytes.fromhex(self.sd_hash)[:20], node_id, port, server[0], server[1])
|
||||||
|
self.next_tracker_announce_time = max(self.next_tracker_announce_time or 0,
|
||||||
|
time.time() + announcement.interval)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
log.warning("Tracker timed out: %s", server)
|
||||||
|
return
|
||||||
|
except Exception:
|
||||||
|
log.exception("Unexpected error querying tracker %s", server)
|
||||||
|
return
|
||||||
|
if not save_peers:
|
||||||
|
return
|
||||||
|
peers = [(str(ipaddress.ip_address(peer.address)), peer.port) for peer in announcement.peers]
|
||||||
|
peers = await get_kademlia_peers_from_hosts(peers)
|
||||||
|
log.info("Found %d peers from tracker %s for %s", len(peers), server, self.sd_hash[:8])
|
||||||
|
self.peer_queue.put_nowait(peers)
|
||||||
|
|
||||||
async def load_descriptor(self, connection_id: int = 0):
|
async def load_descriptor(self, connection_id: int = 0):
|
||||||
# download or get the sd blob
|
# download or get the sd blob
|
||||||
sd_blob = self.blob_manager.get_blob(self.sd_hash)
|
sd_blob = self.blob_manager.get_blob(self.sd_hash)
|
||||||
|
@ -91,6 +122,7 @@ class StreamDownloader:
|
||||||
self.accumulate_task.cancel()
|
self.accumulate_task.cancel()
|
||||||
_, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue)
|
_, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue)
|
||||||
await self.add_fixed_peers()
|
await self.add_fixed_peers()
|
||||||
|
asyncio.ensure_future(self.refresh_from_trackers())
|
||||||
# start searching for peers for the sd hash
|
# start searching for peers for the sd hash
|
||||||
self.search_queue.put_nowait(self.sd_hash)
|
self.search_queue.put_nowait(self.sd_hash)
|
||||||
log.info("searching for peers for stream %s", self.sd_hash)
|
log.info("searching for peers for stream %s", self.sd_hash)
|
||||||
|
|
Loading…
Reference in a new issue