prioritize peers to download from

This commit is contained in:
Jack Robison 2019-01-31 12:28:59 -05:00
parent 2335243de9
commit 2e978c00b2
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 37 additions and 30 deletions

View file

@ -79,7 +79,8 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
blob_response = response.get_blob_response() blob_response = response.get_blob_response()
if (not blob_response or blob_response.error) and\ if (not blob_response or blob_response.error) and\
(not availability_response or not availability_response.available_blobs): (not availability_response or not availability_response.available_blobs):
log.warning("blob not in availability response from %s:%i", self.peer_address, self.peer_port) log.warning("%s not in availability response from %s:%i", self.blob.blob_hash, self.peer_address,
self.peer_port)
log.warning(response.to_dict()) log.warning(response.to_dict())
return False, False return False, False
elif availability_response.available_blobs and \ elif availability_response.available_blobs and \

View file

@ -25,32 +25,27 @@ class BlobDownloader:
self.config = config self.config = config
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.peer_queue = peer_queue self.peer_queue = peer_queue
self.active_connections: typing.Dict['KademliaPeer', asyncio.Task] = {} # active request_blob calls self.active_connections: typing.Dict[str, asyncio.Task] = {} # active request_blob calls
self.ignored: typing.Set['KademliaPeer'] = set() self.ignored: typing.Set['KademliaPeer'] = set()
self.scores: typing.Dict['KademliaPeer', int] = {}
@property
def blob_download_timeout(self):
return self.config.blob_download_timeout
@property
def peer_connect_timeout(self):
return self.config.peer_connect_timeout
@property
def max_connections(self):
return self.config.max_connections_per_download
def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'): def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'):
async def _request_blob(): async def _request_blob():
if blob.get_is_verified(): if blob.get_is_verified():
return return
success, keep_connection = await request_blob(self.loop, blob, peer.address, peer.tcp_port, success, keep_connection = await request_blob(
self.peer_connect_timeout, self.blob_download_timeout) self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout,
self.config.blob_download_timeout
)
if not keep_connection and peer not in self.ignored: if not keep_connection and peer not in self.ignored:
self.ignored.add(peer) self.ignored.add(peer)
log.debug("drop peer %s:%i", peer.address, peer.tcp_port) log.debug("drop peer %s:%i", peer.address, peer.tcp_port)
elif keep_connection: elif keep_connection:
log.debug("keep peer %s:%i", peer.address, peer.tcp_port) log.debug("keep peer %s:%i", peer.address, peer.tcp_port)
if success:
self.scores[peer] = self.scores.get(peer, 0) + 2
else:
self.scores[peer] = self.scores.get(peer, 0) - 1
return self.loop.create_task(_request_blob()) return self.loop.create_task(_request_blob())
async def new_peer_or_finished(self, blob: 'BlobFile'): async def new_peer_or_finished(self, blob: 'BlobFile'):
@ -73,15 +68,16 @@ class BlobDownloader:
while not self.peer_queue.empty(): while not self.peer_queue.empty():
batch.extend(await self.peer_queue.get()) batch.extend(await self.peer_queue.get())
for peer in batch: for peer in batch:
if peer not in self.active_connections and peer not in self.ignored: if len(self.active_connections) >= self.config.max_connections_per_download:
log.info("add request %s", blob_hash[:8]) break
self.active_connections[peer] = self.request_blob_from_peer(blob, peer) if peer.address not in self.active_connections and peer not in self.ignored:
log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port)
self.active_connections[peer.address] = self.request_blob_from_peer(blob, peer)
await self.new_peer_or_finished(blob) await self.new_peer_or_finished(blob)
log.info("new peer or finished %s", blob_hash[:8])
to_re_add = list(set(filter(lambda peer: peer not in self.ignored, batch))) to_re_add = list(set(filter(lambda peer: peer not in self.ignored, batch)))
to_re_add.sort(key=lambda peer: self.scores.get(peer, 0), reverse=True)
if to_re_add: if to_re_add:
self.peer_queue.put_nowait(to_re_add) self.peer_queue.put_nowait(to_re_add)
log.info("finished %s", blob_hash[:8])
while self.active_connections: while self.active_connections:
peer, task = self.active_connections.popitem() peer, task = self.active_connections.popitem()
if task and not task.done(): if task and not task.done():
@ -97,7 +93,7 @@ class BlobDownloader:
async def download_blob(loop, config: 'Config', blob_manager: 'BlobFileManager', node: 'Node', async def download_blob(loop, config: 'Config', blob_manager: 'BlobFileManager', node: 'Node',
blob_hash: str) -> 'BlobFile': blob_hash: str) -> 'BlobFile':
search_queue = asyncio.Queue(loop=loop) search_queue = asyncio.Queue(loop=loop, maxsize=config.max_connections_per_download)
search_queue.put_nowait(blob_hash) search_queue.put_nowait(blob_hash)
peer_queue, accumulate_task = node.accumulate_peers(search_queue) peer_queue, accumulate_task = node.accumulate_peers(search_queue)
downloader = BlobDownloader(loop, config, blob_manager, peer_queue) downloader = BlobDownloader(loop, config, blob_manager, peer_queue)

View file

@ -2,6 +2,7 @@ import sys
import os import os
import asyncio import asyncio
import socket import socket
import ipaddress
from lbrynet.conf import Config from lbrynet.conf import Config
from lbrynet.extras.daemon.storage import SQLiteStorage from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.blob.blob_manager import BlobFileManager
@ -17,11 +18,18 @@ async def main(blob_hash: str, url: str):
conf = Config() conf = Config()
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
host_url, port = url.split(":") host_url, port = url.split(":")
host_info = await loop.getaddrinfo( try:
host_url, 'https', host = None
proto=socket.IPPROTO_TCP, if ipaddress.ip_address(host_url):
) host = host_url
host = host_info[0][4][0] except ValueError:
host = None
if not host:
host_info = await loop.getaddrinfo(
host_url, 'https',
proto=socket.IPPROTO_TCP,
)
host = host_info[0][4][0]
storage = SQLiteStorage(conf, os.path.join(conf.data_dir, "lbrynet.sqlite")) storage = SQLiteStorage(conf, os.path.join(conf.data_dir, "lbrynet.sqlite"))
blob_manager = BlobFileManager(loop, os.path.join(conf.data_dir, "blobfiles"), storage) blob_manager = BlobFileManager(loop, os.path.join(conf.data_dir, "blobfiles"), storage)
@ -29,11 +37,13 @@ async def main(blob_hash: str, url: str):
await blob_manager.setup() await blob_manager.setup()
blob = blob_manager.get_blob(blob_hash) blob = blob_manager.get_blob(blob_hash)
protocol = BlobExchangeClientProtocol(loop, conf.blob_download_timeout) success, keep = await request_blob(loop, blob, host, int(port), conf.peer_connect_timeout,
success, keep = await request_blob(loop, blob, protocol, host, int(port), conf.peer_connect_timeout) conf.blob_download_timeout)
print(success, keep) print(f"{'downloaded' if success else 'failed to download'} {blob_hash} from {host}:{port}\n"
f"keep connection: {keep}")
if blob.get_is_verified(): if blob.get_is_verified():
await blob_manager.delete_blobs([blob.blob_hash]) await blob_manager.delete_blobs([blob.blob_hash])
print(f"deleted {blob_hash}")
if __name__ == "__main__": # usage: python download_blob_from_peer.py <blob_hash> [host url:port] if __name__ == "__main__": # usage: python download_blob_from_peer.py <blob_hash> [host url:port]