Merge pull request #1831 from lbryio/cleanup-downloader
Download and publish fixes
This commit is contained in:
commit
607ccefe90
22 changed files with 386 additions and 450 deletions
|
@ -79,8 +79,10 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||||
blob_response = response.get_blob_response()
|
blob_response = response.get_blob_response()
|
||||||
if (not blob_response or blob_response.error) and\
|
if (not blob_response or blob_response.error) and\
|
||||||
(not availability_response or not availability_response.available_blobs):
|
(not availability_response or not availability_response.available_blobs):
|
||||||
log.warning("blob not in availability response from %s:%i", self.peer_address, self.peer_port)
|
log.warning("%s not in availability response from %s:%i", self.blob.blob_hash, self.peer_address,
|
||||||
return False, True
|
self.peer_port)
|
||||||
|
log.warning(response.to_dict())
|
||||||
|
return False, False
|
||||||
elif availability_response.available_blobs and \
|
elif availability_response.available_blobs and \
|
||||||
availability_response.available_blobs != [self.blob.blob_hash]:
|
availability_response.available_blobs != [self.blob.blob_hash]:
|
||||||
log.warning("blob availability response doesn't match our request from %s:%i",
|
log.warning("blob availability response doesn't match our request from %s:%i",
|
||||||
|
@ -118,8 +120,6 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||||
self._response_fut.cancel()
|
self._response_fut.cancel()
|
||||||
if self.writer and not self.writer.closed():
|
if self.writer and not self.writer.closed():
|
||||||
self.writer.close_handle()
|
self.writer.close_handle()
|
||||||
if self.blob:
|
|
||||||
await self.blob.close()
|
|
||||||
self._response_fut = None
|
self._response_fut = None
|
||||||
self.writer = None
|
self.writer = None
|
||||||
self.blob = None
|
self.blob = None
|
||||||
|
@ -160,11 +160,13 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||||
self.loop.create_task(self.close())
|
self.loop.create_task(self.close())
|
||||||
|
|
||||||
|
|
||||||
async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', protocol: 'BlobExchangeClientProtocol',
|
async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: str, tcp_port: int,
|
||||||
address: str, tcp_port: int, peer_connect_timeout: float) -> typing.Tuple[bool, bool]:
|
peer_connect_timeout: float, blob_download_timeout: float) -> typing.Tuple[bool, bool]:
|
||||||
"""
|
"""
|
||||||
Returns [<downloaded blob>, <keep connection>]
|
Returns [<downloaded blob>, <keep connection>]
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
protocol = BlobExchangeClientProtocol(loop, blob_download_timeout)
|
||||||
if blob.get_is_verified():
|
if blob.get_is_verified():
|
||||||
return False, True
|
return False, True
|
||||||
try:
|
try:
|
||||||
|
@ -173,3 +175,5 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', protocol:
|
||||||
return await protocol.download_blob(blob)
|
return await protocol.download_blob(blob)
|
||||||
except (asyncio.TimeoutError, asyncio.CancelledError, ConnectionRefusedError, ConnectionAbortedError, OSError):
|
except (asyncio.TimeoutError, asyncio.CancelledError, ConnectionRefusedError, ConnectionAbortedError, OSError):
|
||||||
return False, False
|
return False, False
|
||||||
|
finally:
|
||||||
|
await protocol.close()
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import typing
|
import typing
|
||||||
import logging
|
import logging
|
||||||
from lbrynet import conf
|
|
||||||
from lbrynet.utils import drain_tasks
|
from lbrynet.utils import drain_tasks
|
||||||
from lbrynet.blob_exchange.client import BlobExchangeClientProtocol, request_blob
|
from lbrynet.blob_exchange.client import request_blob
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
|
from lbrynet.conf import Config
|
||||||
from lbrynet.dht.node import Node
|
from lbrynet.dht.node import Node
|
||||||
from lbrynet.dht.peer import KademliaPeer
|
from lbrynet.dht.peer import KademliaPeer
|
||||||
from lbrynet.blob.blob_manager import BlobFileManager
|
from lbrynet.blob.blob_manager import BlobFileManager
|
||||||
|
@ -18,115 +18,88 @@ def drain_into(a: list, b: list):
|
||||||
b.append(a.pop())
|
b.append(a.pop())
|
||||||
|
|
||||||
|
|
||||||
class BlobDownloader: # TODO: refactor to be the base class used by StreamDownloader
|
class BlobDownloader:
|
||||||
"""A single blob downloader"""
|
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager',
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', config: conf.Config):
|
peer_queue: asyncio.Queue):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
|
self.config = config
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.new_peer_event = asyncio.Event(loop=self.loop)
|
self.peer_queue = peer_queue
|
||||||
self.active_connections: typing.Dict['KademliaPeer', BlobExchangeClientProtocol] = {}
|
self.active_connections: typing.Dict['KademliaPeer', asyncio.Task] = {} # active request_blob calls
|
||||||
self.running_download_requests: typing.List[asyncio.Task] = []
|
self.ignored: typing.Set['KademliaPeer'] = set()
|
||||||
self.requested_from: typing.Dict[str, typing.Dict['KademliaPeer', asyncio.Task]] = {}
|
self.scores: typing.Dict['KademliaPeer', int] = {}
|
||||||
self.lock = asyncio.Lock(loop=self.loop)
|
|
||||||
self.blob: 'BlobFile' = None
|
|
||||||
self.blob_queue = asyncio.Queue(loop=self.loop)
|
|
||||||
|
|
||||||
self.blob_download_timeout = config.blob_download_timeout
|
def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'):
|
||||||
self.peer_connect_timeout = config.peer_connect_timeout
|
async def _request_blob():
|
||||||
self.max_connections = config.max_connections_per_download
|
if blob.get_is_verified():
|
||||||
|
|
||||||
async def _request_blob(self, peer: 'KademliaPeer'):
|
|
||||||
if self.blob.get_is_verified():
|
|
||||||
log.info("already verified")
|
|
||||||
return
|
return
|
||||||
if peer not in self.active_connections:
|
success, keep_connection = await request_blob(
|
||||||
log.warning("not active, adding: %s", str(peer))
|
self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout,
|
||||||
self.active_connections[peer] = BlobExchangeClientProtocol(self.loop, self.blob_download_timeout)
|
self.config.blob_download_timeout
|
||||||
protocol = self.active_connections[peer]
|
)
|
||||||
success, keep_connection = await request_blob(self.loop, self.blob, protocol, peer.address, peer.tcp_port,
|
if not keep_connection and peer not in self.ignored:
|
||||||
self.peer_connect_timeout)
|
self.ignored.add(peer)
|
||||||
await protocol.close()
|
log.debug("drop peer %s:%i", peer.address, peer.tcp_port)
|
||||||
if not keep_connection:
|
elif keep_connection:
|
||||||
log.info("drop peer %s:%i", peer.address, peer.tcp_port)
|
log.debug("keep peer %s:%i", peer.address, peer.tcp_port)
|
||||||
if peer in self.active_connections:
|
if success:
|
||||||
async with self.lock:
|
self.scores[peer] = self.scores.get(peer, 0) + 2
|
||||||
del self.active_connections[peer]
|
|
||||||
return
|
|
||||||
log.info("keep peer %s:%i", peer.address, peer.tcp_port)
|
|
||||||
|
|
||||||
def _update_requests(self):
|
|
||||||
self.new_peer_event.clear()
|
|
||||||
if self.blob.blob_hash not in self.requested_from:
|
|
||||||
self.requested_from[self.blob.blob_hash] = {}
|
|
||||||
to_add = []
|
|
||||||
for peer in self.active_connections.keys():
|
|
||||||
if peer not in self.requested_from[self.blob.blob_hash] and peer not in to_add:
|
|
||||||
to_add.append(peer)
|
|
||||||
if to_add or self.running_download_requests:
|
|
||||||
log.info("adding download probes for %i peers to %i already active",
|
|
||||||
min(len(to_add), 8 - len(self.running_download_requests)),
|
|
||||||
len(self.running_download_requests))
|
|
||||||
else:
|
else:
|
||||||
log.info("downloader idle...")
|
self.scores[peer] = self.scores.get(peer, 0) - 1
|
||||||
for peer in to_add:
|
return self.loop.create_task(_request_blob())
|
||||||
if len(self.running_download_requests) >= 8:
|
|
||||||
|
async def new_peer_or_finished(self, blob: 'BlobFile'):
|
||||||
|
async def get_and_re_add_peers():
|
||||||
|
new_peers = await self.peer_queue.get()
|
||||||
|
self.peer_queue.put_nowait(new_peers)
|
||||||
|
tasks = [self.loop.create_task(get_and_re_add_peers()), self.loop.create_task(blob.verified.wait())]
|
||||||
|
try:
|
||||||
|
await asyncio.wait(tasks, loop=self.loop, return_when='FIRST_COMPLETED')
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
drain_tasks(tasks)
|
||||||
|
|
||||||
|
async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
|
||||||
|
blob = self.blob_manager.get_blob(blob_hash, length)
|
||||||
|
if blob.get_is_verified():
|
||||||
|
return blob
|
||||||
|
try:
|
||||||
|
while not blob.get_is_verified():
|
||||||
|
batch: typing.List['KademliaPeer'] = []
|
||||||
|
while not self.peer_queue.empty():
|
||||||
|
batch.extend(await self.peer_queue.get())
|
||||||
|
for peer in batch:
|
||||||
|
if len(self.active_connections) >= self.config.max_connections_per_download:
|
||||||
break
|
break
|
||||||
task = self.loop.create_task(self._request_blob(peer))
|
if peer not in self.active_connections and peer not in self.ignored:
|
||||||
self.requested_from[self.blob.blob_hash][peer] = task
|
log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port)
|
||||||
self.running_download_requests.append(task)
|
self.active_connections[peer] = self.request_blob_from_peer(blob, peer)
|
||||||
|
await self.new_peer_or_finished(blob)
|
||||||
def _add_peer_protocols(self, peers: typing.List['KademliaPeer']):
|
to_re_add = list(set(filter(lambda peer: peer not in self.ignored, batch)))
|
||||||
added = 0
|
to_re_add.sort(key=lambda peer: self.scores.get(peer, 0), reverse=True)
|
||||||
for peer in peers:
|
if to_re_add:
|
||||||
if peer not in self.active_connections:
|
self.peer_queue.put_nowait(to_re_add)
|
||||||
self.active_connections[peer] = BlobExchangeClientProtocol(self.loop, self.blob_download_timeout)
|
while self.active_connections:
|
||||||
added += 1
|
peer, task = self.active_connections.popitem()
|
||||||
if added:
|
if task and not task.done():
|
||||||
if not self.new_peer_event.is_set():
|
task.cancel()
|
||||||
log.info("added %i new peers", len(peers))
|
await blob.close()
|
||||||
self.new_peer_event.set()
|
return blob
|
||||||
|
|
||||||
async def _accumulate_connections(self, node: 'Node'):
|
|
||||||
try:
|
|
||||||
async with node.stream_peer_search_junction(self.blob_queue) as search_junction:
|
|
||||||
async for peers in search_junction:
|
|
||||||
if not isinstance(peers, list): # TODO: what's up with this?
|
|
||||||
log.error("not a list: %s %s", peers, str(type(peers)))
|
|
||||||
else:
|
|
||||||
self._add_peer_protocols(peers)
|
|
||||||
return
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
while self.active_connections:
|
||||||
|
peer, task = self.active_connections.popitem()
|
||||||
async def get_blob(self, blob_hash: str, node: 'Node') -> 'BlobFile':
|
if task and not task.done():
|
||||||
self.blob = self.blob_manager.get_blob(blob_hash)
|
task.cancel()
|
||||||
if self.blob.get_is_verified():
|
|
||||||
return self.blob
|
|
||||||
accumulator = self.loop.create_task(self._accumulate_connections(node))
|
|
||||||
self.blob_queue.put_nowait(blob_hash)
|
|
||||||
try:
|
|
||||||
while not self.blob.get_is_verified():
|
|
||||||
if len(self.running_download_requests) < self.max_connections:
|
|
||||||
self._update_requests()
|
|
||||||
|
|
||||||
# drain the tasks into a temporary list
|
|
||||||
download_tasks = []
|
|
||||||
drain_into(self.running_download_requests, download_tasks)
|
|
||||||
got_new_peer = self.loop.create_task(self.new_peer_event.wait())
|
|
||||||
|
|
||||||
# wait for a new peer to be added or for a download attempt to finish
|
|
||||||
await asyncio.wait([got_new_peer] + download_tasks, return_when='FIRST_COMPLETED',
|
|
||||||
loop=self.loop)
|
|
||||||
if got_new_peer and not got_new_peer.done():
|
|
||||||
got_new_peer.cancel()
|
|
||||||
if self.blob.get_is_verified():
|
|
||||||
if got_new_peer and not got_new_peer.done():
|
|
||||||
got_new_peer.cancel()
|
|
||||||
drain_tasks(download_tasks)
|
|
||||||
return self.blob
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
drain_tasks(self.running_download_requests)
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
async def download_blob(loop, config: 'Config', blob_manager: 'BlobFileManager', node: 'Node',
|
||||||
|
blob_hash: str) -> 'BlobFile':
|
||||||
|
search_queue = asyncio.Queue(loop=loop, maxsize=config.max_connections_per_download)
|
||||||
|
search_queue.put_nowait(blob_hash)
|
||||||
|
peer_queue, accumulate_task = node.accumulate_peers(search_queue)
|
||||||
|
downloader = BlobDownloader(loop, config, blob_manager, peer_queue)
|
||||||
|
try:
|
||||||
|
return await downloader.download_blob(blob_hash)
|
||||||
finally:
|
finally:
|
||||||
if accumulator and not accumulator.done():
|
if accumulate_task and not accumulate_task.done():
|
||||||
accumulator.cancel()
|
accumulate_task.cancel()
|
||||||
|
|
|
@ -60,7 +60,7 @@ class BlobServerProtocol(asyncio.Protocol):
|
||||||
log.info("send %s to %s:%i", blob.blob_hash[:8], peer_address, peer_port)
|
log.info("send %s to %s:%i", blob.blob_hash[:8], peer_address, peer_port)
|
||||||
try:
|
try:
|
||||||
sent = await blob.sendfile(self)
|
sent = await blob.sendfile(self)
|
||||||
except ConnectionResetError:
|
except (ConnectionResetError, BrokenPipeError, RuntimeError, OSError):
|
||||||
return
|
return
|
||||||
log.info("sent %s (%i bytes) to %s:%i", blob.blob_hash[:8], sent, peer_address, peer_port)
|
log.info("sent %s (%i bytes) to %s:%i", blob.blob_hash[:8], sent, peer_address, peer_port)
|
||||||
if responses:
|
if responses:
|
||||||
|
|
|
@ -489,9 +489,13 @@ class Config(CLIConfig):
|
||||||
previous_names=['concurrent_announcers']
|
previous_names=['concurrent_announcers']
|
||||||
)
|
)
|
||||||
max_connections_per_download = Integer(
|
max_connections_per_download = Integer(
|
||||||
"Maximum number of peers to connect to while downloading a blob", 5,
|
"Maximum number of peers to connect to while downloading a blob", 8,
|
||||||
previous_names=['max_connections_per_stream']
|
previous_names=['max_connections_per_stream']
|
||||||
)
|
)
|
||||||
|
fixed_peer_delay = Float(
|
||||||
|
"Amount of seconds before adding the reflector servers as potential peers to download from in case dht"
|
||||||
|
"peers are not found or are slow", 2.0
|
||||||
|
)
|
||||||
max_key_fee = MaxKeyFee(
|
max_key_fee = MaxKeyFee(
|
||||||
"Don't download streams with fees exceeding this amount", {'currency': 'USD', 'amount': 50.0}
|
"Don't download streams with fees exceeding this amount", {'currency': 'USD', 'amount': 50.0}
|
||||||
) # TODO: use this
|
) # TODO: use this
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
import typing
|
import typing
|
||||||
import socket
|
|
||||||
import binascii
|
import binascii
|
||||||
import contextlib
|
import contextlib
|
||||||
|
from lbrynet.utils import resolve_host
|
||||||
from lbrynet.dht import constants
|
from lbrynet.dht import constants
|
||||||
from lbrynet.dht.error import RemoteException
|
from lbrynet.dht.error import RemoteException
|
||||||
from lbrynet.dht.protocol.async_generator_junction import AsyncGeneratorJunction
|
from lbrynet.dht.protocol.async_generator_junction import AsyncGeneratorJunction
|
||||||
|
@ -116,26 +116,22 @@ class Node:
|
||||||
log.warning("Already bound to port %s", self.listening_port)
|
log.warning("Already bound to port %s", self.listening_port)
|
||||||
|
|
||||||
async def join_network(self, interface: typing.Optional[str] = '',
|
async def join_network(self, interface: typing.Optional[str] = '',
|
||||||
known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None):
|
||||||
known_node_addresses: typing.Optional[typing.List[typing.Tuple[str, int]]] = None):
|
|
||||||
if not self.listening_port:
|
if not self.listening_port:
|
||||||
await self.start_listening(interface)
|
await self.start_listening(interface)
|
||||||
self.protocol.ping_queue.start()
|
self.protocol.ping_queue.start()
|
||||||
self._refresh_task = self.loop.create_task(self.refresh_node())
|
self._refresh_task = self.loop.create_task(self.refresh_node())
|
||||||
|
|
||||||
# resolve the known node urls
|
# resolve the known node urls
|
||||||
known_node_addresses = known_node_addresses or []
|
known_node_addresses = []
|
||||||
url_to_addr = {}
|
url_to_addr = {}
|
||||||
|
|
||||||
if known_node_urls:
|
if known_node_urls:
|
||||||
for host, port in known_node_urls:
|
for host, port in known_node_urls:
|
||||||
info = await self.loop.getaddrinfo(
|
address = await resolve_host(host)
|
||||||
host, 'https',
|
if (address, port) not in known_node_addresses:
|
||||||
proto=socket.IPPROTO_TCP,
|
known_node_addresses.append((address, port))
|
||||||
)
|
url_to_addr[address] = host
|
||||||
if (info[0][4][0], port) not in known_node_addresses:
|
|
||||||
known_node_addresses.append((info[0][4][0], port))
|
|
||||||
url_to_addr[info[0][4][0]] = host
|
|
||||||
|
|
||||||
if known_node_addresses:
|
if known_node_addresses:
|
||||||
while not self.protocol.routing_table.get_peers():
|
while not self.protocol.routing_table.get_peers():
|
||||||
|
@ -236,3 +232,19 @@ class Node:
|
||||||
distance = Distance(node_id)
|
distance = Distance(node_id)
|
||||||
accumulated.sort(key=lambda peer: distance(peer.node_id))
|
accumulated.sort(key=lambda peer: distance(peer.node_id))
|
||||||
return accumulated[:count]
|
return accumulated[:count]
|
||||||
|
|
||||||
|
async def _accumulate_search_junction(self, search_queue: asyncio.Queue,
|
||||||
|
result_queue: asyncio.Queue):
|
||||||
|
try:
|
||||||
|
async with self.stream_peer_search_junction(search_queue) as search_junction: # pylint: disable=E1701
|
||||||
|
async for peers in search_junction:
|
||||||
|
if peers:
|
||||||
|
result_queue.put_nowait(peers)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
return
|
||||||
|
|
||||||
|
def accumulate_peers(self, search_queue: asyncio.Queue,
|
||||||
|
peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[
|
||||||
|
asyncio.Queue, asyncio.Task]:
|
||||||
|
q = peer_queue or asyncio.Queue()
|
||||||
|
return q, asyncio.create_task(self._accumulate_search_junction(search_queue, q))
|
||||||
|
|
|
@ -243,6 +243,8 @@ def main(argv=None):
|
||||||
handler = logging.StreamHandler()
|
handler = logging.StreamHandler()
|
||||||
handler.setFormatter(default_formatter)
|
handler.setFormatter(default_formatter)
|
||||||
log.addHandler(handler)
|
log.addHandler(handler)
|
||||||
|
logging.getLogger('torba').addHandler(handler)
|
||||||
|
logging.getLogger('torba').setLevel(logging.INFO)
|
||||||
|
|
||||||
logging.getLogger('aioupnp').setLevel(logging.WARNING)
|
logging.getLogger('aioupnp').setLevel(logging.WARNING)
|
||||||
logging.getLogger('aiohttp').setLevel(logging.CRITICAL)
|
logging.getLogger('aiohttp').setLevel(logging.CRITICAL)
|
||||||
|
|
|
@ -5,7 +5,6 @@ import logging
|
||||||
import math
|
import math
|
||||||
import binascii
|
import binascii
|
||||||
import typing
|
import typing
|
||||||
import socket
|
|
||||||
from hashlib import sha256
|
from hashlib import sha256
|
||||||
from types import SimpleNamespace
|
from types import SimpleNamespace
|
||||||
import base58
|
import base58
|
||||||
|
@ -18,7 +17,6 @@ import lbrynet.schema
|
||||||
from lbrynet import utils
|
from lbrynet import utils
|
||||||
from lbrynet.conf import HEADERS_FILE_SHA256_CHECKSUM
|
from lbrynet.conf import HEADERS_FILE_SHA256_CHECKSUM
|
||||||
from lbrynet.dht.node import Node
|
from lbrynet.dht.node import Node
|
||||||
from lbrynet.dht.peer import KademliaPeer
|
|
||||||
from lbrynet.dht.blob_announcer import BlobAnnouncer
|
from lbrynet.dht.blob_announcer import BlobAnnouncer
|
||||||
from lbrynet.blob.blob_manager import BlobFileManager
|
from lbrynet.blob.blob_manager import BlobFileManager
|
||||||
from lbrynet.blob_exchange.server import BlobServer
|
from lbrynet.blob_exchange.server import BlobServer
|
||||||
|
@ -65,14 +63,6 @@ async def get_external_ip(): # used if upnp is disabled or non-functioning
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
async def resolve_host(loop: asyncio.BaseEventLoop, url: str):
|
|
||||||
info = await loop.getaddrinfo(
|
|
||||||
url, 'https',
|
|
||||||
proto=socket.IPPROTO_TCP,
|
|
||||||
)
|
|
||||||
return info[0][4][0]
|
|
||||||
|
|
||||||
|
|
||||||
class DatabaseComponent(Component):
|
class DatabaseComponent(Component):
|
||||||
component_name = DATABASE_COMPONENT
|
component_name = DATABASE_COMPONENT
|
||||||
|
|
||||||
|
@ -463,11 +453,7 @@ class StreamManagerComponent(Component):
|
||||||
log.info('Starting the file manager')
|
log.info('Starting the file manager')
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
self.stream_manager = StreamManager(
|
self.stream_manager = StreamManager(
|
||||||
loop, blob_manager, wallet, storage, node, self.conf.blob_download_timeout,
|
loop, self.conf, blob_manager, wallet, storage, node,
|
||||||
self.conf.peer_connect_timeout, [
|
|
||||||
KademliaPeer(loop, address=(await resolve_host(loop, url)), tcp_port=port + 1)
|
|
||||||
for url, port in self.conf.reflector_servers
|
|
||||||
], self.conf.reflector_servers
|
|
||||||
)
|
)
|
||||||
await self.stream_manager.start()
|
await self.stream_manager.start()
|
||||||
log.info('Done setting up file manager')
|
log.info('Done setting up file manager')
|
||||||
|
|
|
@ -6,6 +6,7 @@ import inspect
|
||||||
import typing
|
import typing
|
||||||
import aiohttp
|
import aiohttp
|
||||||
import base58
|
import base58
|
||||||
|
import random
|
||||||
from urllib.parse import urlencode, quote
|
from urllib.parse import urlencode, quote
|
||||||
from typing import Callable, Optional, List
|
from typing import Callable, Optional, List
|
||||||
from binascii import hexlify, unhexlify
|
from binascii import hexlify, unhexlify
|
||||||
|
@ -18,7 +19,7 @@ from torba.client.baseaccount import SingleKey, HierarchicalDeterministic
|
||||||
from lbrynet import __version__, utils
|
from lbrynet import __version__, utils
|
||||||
from lbrynet.conf import Config, Setting, SLACK_WEBHOOK
|
from lbrynet.conf import Config, Setting, SLACK_WEBHOOK
|
||||||
from lbrynet.blob.blob_file import is_valid_blobhash
|
from lbrynet.blob.blob_file import is_valid_blobhash
|
||||||
from lbrynet.blob_exchange.downloader import BlobDownloader
|
from lbrynet.blob_exchange.downloader import download_blob
|
||||||
from lbrynet.error import InsufficientFundsError, DownloadSDTimeout, ComponentsNotStarted
|
from lbrynet.error import InsufficientFundsError, DownloadSDTimeout, ComponentsNotStarted
|
||||||
from lbrynet.error import NullFundsError, NegativeFundsError, ResolveError, ComponentStartConditionNotMet
|
from lbrynet.error import NullFundsError, NegativeFundsError, ResolveError, ComponentStartConditionNotMet
|
||||||
from lbrynet.extras import system_info
|
from lbrynet.extras import system_info
|
||||||
|
@ -1582,7 +1583,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
stream = existing[0]
|
stream = existing[0]
|
||||||
else:
|
else:
|
||||||
stream = await self.stream_manager.download_stream_from_claim(
|
stream = await self.stream_manager.download_stream_from_claim(
|
||||||
self.dht_node, self.conf, resolved, file_name, timeout, fee_amount, fee_address
|
self.dht_node, resolved, file_name, timeout, fee_amount, fee_address
|
||||||
)
|
)
|
||||||
if stream:
|
if stream:
|
||||||
return stream.as_dict()
|
return stream.as_dict()
|
||||||
|
@ -2060,6 +2061,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
await self.storage.save_content_claim(
|
await self.storage.save_content_claim(
|
||||||
stream_hash, tx.outputs[0].id
|
stream_hash, tx.outputs[0].id
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.analytics_manager.send_claim_action('publish')
|
await self.analytics_manager.send_claim_action('publish')
|
||||||
nout = 0
|
nout = 0
|
||||||
txo = tx.outputs[nout]
|
txo = tx.outputs[nout]
|
||||||
|
@ -2567,8 +2569,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
(str) Success/Fail message or (dict) decoded data
|
(str) Success/Fail message or (dict) decoded data
|
||||||
"""
|
"""
|
||||||
|
|
||||||
downloader = BlobDownloader(asyncio.get_event_loop(), self.blob_manager, self.conf)
|
blob = await download_blob(asyncio.get_event_loop(), self.conf, self.blob_manager, self.dht_node, blob_hash)
|
||||||
blob = await downloader.get_blob(blob_hash, self.dht_node)
|
|
||||||
if read:
|
if read:
|
||||||
with open(blob.file_path, 'rb') as handle:
|
with open(blob.file_path, 'rb') as handle:
|
||||||
return handle.read().decode()
|
return handle.read().decode()
|
||||||
|
@ -2789,7 +2790,19 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
(list) list of blobs reflected
|
(list) list of blobs reflected
|
||||||
"""
|
"""
|
||||||
|
|
||||||
raise NotImplementedError()
|
server, port = kwargs.get('server'), kwargs.get('port')
|
||||||
|
if server and port:
|
||||||
|
port = int(port)
|
||||||
|
else:
|
||||||
|
server, port = random.choice(self.conf.reflector_servers)
|
||||||
|
reflected = await asyncio.gather(*[
|
||||||
|
stream.upload_to_reflector(server, port)
|
||||||
|
for stream in self.stream_manager.get_filtered_streams(**kwargs)
|
||||||
|
])
|
||||||
|
total = []
|
||||||
|
for reflected_for_stream in reflected:
|
||||||
|
total.extend(reflected_for_stream)
|
||||||
|
return total
|
||||||
|
|
||||||
@requires(DHT_COMPONENT)
|
@requires(DHT_COMPONENT)
|
||||||
async def jsonrpc_peer_ping(self, node_id, address, port):
|
async def jsonrpc_peer_ping(self, node_id, address, port):
|
||||||
|
|
|
@ -3,6 +3,7 @@ import sqlite3
|
||||||
import typing
|
import typing
|
||||||
import asyncio
|
import asyncio
|
||||||
import binascii
|
import binascii
|
||||||
|
import time
|
||||||
from torba.client.basedatabase import SQLiteMixin
|
from torba.client.basedatabase import SQLiteMixin
|
||||||
from lbrynet.conf import Config
|
from lbrynet.conf import Config
|
||||||
from lbrynet.extras.wallet.dewies import dewies_to_lbc, lbc_to_dewies
|
from lbrynet.extras.wallet.dewies import dewies_to_lbc, lbc_to_dewies
|
||||||
|
@ -399,16 +400,15 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
|
|
||||||
def save_downloaded_file(self, stream_hash, file_name, download_directory, data_payment_rate):
|
def save_downloaded_file(self, stream_hash, file_name, download_directory, data_payment_rate):
|
||||||
return self.save_published_file(
|
return self.save_published_file(
|
||||||
stream_hash, binascii.hexlify(file_name.encode()).decode(),
|
stream_hash, file_name, download_directory, data_payment_rate, status="running"
|
||||||
binascii.hexlify(download_directory.encode()).decode(), data_payment_rate,
|
|
||||||
status="running"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def save_published_file(self, stream_hash: str, file_name: str, download_directory: str, data_payment_rate: float,
|
def save_published_file(self, stream_hash: str, file_name: str, download_directory: str, data_payment_rate: float,
|
||||||
status="finished"):
|
status="finished"):
|
||||||
return self.db.execute(
|
return self.db.execute(
|
||||||
"insert into file values (?, ?, ?, ?, ?)",
|
"insert into file values (?, ?, ?, ?, ?)",
|
||||||
(stream_hash, file_name, download_directory, data_payment_rate, status)
|
(stream_hash, binascii.hexlify(file_name.encode()).decode(),
|
||||||
|
binascii.hexlify(download_directory.encode()).decode(), data_payment_rate, status)
|
||||||
)
|
)
|
||||||
|
|
||||||
async def get_all_lbry_files(self) -> typing.List[typing.Dict]:
|
async def get_all_lbry_files(self) -> typing.List[typing.Dict]:
|
||||||
|
@ -682,7 +682,7 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
if success:
|
if success:
|
||||||
return self.db.execute(
|
return self.db.execute(
|
||||||
"insert or replace into reflected_stream values (?, ?, ?)",
|
"insert or replace into reflected_stream values (?, ?, ?)",
|
||||||
(sd_hash, reflector_address, self.loop.time())
|
(sd_hash, reflector_address, time.time())
|
||||||
)
|
)
|
||||||
return self.db.execute(
|
return self.db.execute(
|
||||||
"delete from reflected_stream where sd_hash=? and reflector_address=?",
|
"delete from reflected_stream where sd_hash=? and reflector_address=?",
|
||||||
|
|
|
@ -48,6 +48,8 @@ class StreamAssembler:
|
||||||
def _decrypt_and_write():
|
def _decrypt_and_write():
|
||||||
if self.stream_handle.closed:
|
if self.stream_handle.closed:
|
||||||
return False
|
return False
|
||||||
|
if not blob:
|
||||||
|
return False
|
||||||
self.stream_handle.seek(offset)
|
self.stream_handle.seek(offset)
|
||||||
_decrypted = blob.decrypt(
|
_decrypted = blob.decrypt(
|
||||||
binascii.unhexlify(key), binascii.unhexlify(blob_info.iv.encode())
|
binascii.unhexlify(key), binascii.unhexlify(blob_info.iv.encode())
|
||||||
|
@ -62,15 +64,26 @@ class StreamAssembler:
|
||||||
log.debug("decrypted %s", blob.blob_hash[:8])
|
log.debug("decrypted %s", blob.blob_hash[:8])
|
||||||
return
|
return
|
||||||
|
|
||||||
|
async def setup(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def after_got_descriptor(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def after_finished(self):
|
||||||
|
pass
|
||||||
|
|
||||||
async def assemble_decrypted_stream(self, output_dir: str, output_file_name: typing.Optional[str] = None):
|
async def assemble_decrypted_stream(self, output_dir: str, output_file_name: typing.Optional[str] = None):
|
||||||
if not os.path.isdir(output_dir):
|
if not os.path.isdir(output_dir):
|
||||||
raise OSError(f"output directory does not exist: '{output_dir}' '{output_file_name}'")
|
raise OSError(f"output directory does not exist: '{output_dir}' '{output_file_name}'")
|
||||||
|
await self.setup()
|
||||||
self.sd_blob = await self.get_blob(self.sd_hash)
|
self.sd_blob = await self.get_blob(self.sd_hash)
|
||||||
await self.blob_manager.blob_completed(self.sd_blob)
|
await self.blob_manager.blob_completed(self.sd_blob)
|
||||||
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_manager.blob_dir,
|
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_manager.blob_dir,
|
||||||
self.sd_blob)
|
self.sd_blob)
|
||||||
if not self.got_descriptor.is_set():
|
if not self.got_descriptor.is_set():
|
||||||
self.got_descriptor.set()
|
self.got_descriptor.set()
|
||||||
|
await self.after_got_descriptor()
|
||||||
self.output_path = await get_next_available_file_name(self.loop, output_dir,
|
self.output_path = await get_next_available_file_name(self.loop, output_dir,
|
||||||
output_file_name or self.descriptor.suggested_file_name)
|
output_file_name or self.descriptor.suggested_file_name)
|
||||||
|
|
||||||
|
@ -85,17 +98,20 @@ class StreamAssembler:
|
||||||
blob = await self.get_blob(blob_info.blob_hash, blob_info.length)
|
blob = await self.get_blob(blob_info.blob_hash, blob_info.length)
|
||||||
await self._decrypt_blob(blob, blob_info, self.descriptor.key)
|
await self._decrypt_blob(blob, blob_info, self.descriptor.key)
|
||||||
break
|
break
|
||||||
except ValueError as err:
|
except FileNotFoundError:
|
||||||
log.error("failed to decrypt blob %s for stream %s - %s", blob_info.blob_hash,
|
log.debug("stream assembler stopped")
|
||||||
self.descriptor.sd_hash, str(err))
|
return
|
||||||
|
except (ValueError, IOError, OSError):
|
||||||
|
log.warning("failed to decrypt blob %s for stream %s", blob_info.blob_hash,
|
||||||
|
self.descriptor.sd_hash)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if not self.wrote_bytes_event.is_set():
|
if not self.wrote_bytes_event.is_set():
|
||||||
self.wrote_bytes_event.set()
|
self.wrote_bytes_event.set()
|
||||||
self.stream_finished_event.set()
|
self.stream_finished_event.set()
|
||||||
|
await self.after_finished()
|
||||||
finally:
|
finally:
|
||||||
self.stream_handle.close()
|
self.stream_handle.close()
|
||||||
|
|
||||||
async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
|
async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
|
||||||
f = asyncio.Future(loop=self.loop)
|
return self.blob_manager.get_blob(blob_hash, length)
|
||||||
f.set_result(self.blob_manager.get_blob(blob_hash, length))
|
|
||||||
return await f
|
|
||||||
|
|
|
@ -1,13 +1,14 @@
|
||||||
import os
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import typing
|
import typing
|
||||||
import logging
|
import logging
|
||||||
from lbrynet.utils import drain_tasks, cancel_task
|
from lbrynet.utils import resolve_host
|
||||||
from lbrynet.stream.assembler import StreamAssembler
|
from lbrynet.stream.assembler import StreamAssembler
|
||||||
from lbrynet.blob_exchange.client import BlobExchangeClientProtocol, request_blob
|
from lbrynet.stream.descriptor import StreamDescriptor
|
||||||
|
from lbrynet.blob_exchange.downloader import BlobDownloader
|
||||||
|
from lbrynet.dht.peer import KademliaPeer
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
|
from lbrynet.conf import Config
|
||||||
from lbrynet.dht.node import Node
|
from lbrynet.dht.node import Node
|
||||||
from lbrynet.dht.peer import KademliaPeer
|
|
||||||
from lbrynet.blob.blob_manager import BlobFileManager
|
from lbrynet.blob.blob_manager import BlobFileManager
|
||||||
from lbrynet.blob.blob_file import BlobFile
|
from lbrynet.blob.blob_file import BlobFile
|
||||||
|
|
||||||
|
@ -19,212 +20,70 @@ def drain_into(a: list, b: list):
|
||||||
b.append(a.pop())
|
b.append(a.pop())
|
||||||
|
|
||||||
|
|
||||||
class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor to inherit BlobDownloader
|
class StreamDownloader(StreamAssembler):
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', sd_hash: str,
|
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager', sd_hash: str,
|
||||||
peer_timeout: float, peer_connect_timeout: float, output_dir: typing.Optional[str] = None,
|
output_dir: typing.Optional[str] = None, output_file_name: typing.Optional[str] = None):
|
||||||
output_file_name: typing.Optional[str] = None,
|
|
||||||
fixed_peers: typing.Optional[typing.List['KademliaPeer']] = None,
|
|
||||||
max_connections_per_stream: typing.Optional[int] = 8):
|
|
||||||
super().__init__(loop, blob_manager, sd_hash)
|
super().__init__(loop, blob_manager, sd_hash)
|
||||||
self.peer_timeout = peer_timeout
|
self.config = config
|
||||||
self.peer_connect_timeout = peer_connect_timeout
|
self.output_dir = output_dir or self.config.download_dir
|
||||||
self.current_blob: 'BlobFile' = None
|
|
||||||
self.download_task: asyncio.Task = None
|
|
||||||
self.accumulate_connections_task: asyncio.Task = None
|
|
||||||
self.new_peer_event = asyncio.Event(loop=self.loop)
|
|
||||||
self.active_connections: typing.Dict['KademliaPeer', BlobExchangeClientProtocol] = {}
|
|
||||||
self.running_download_requests: typing.List[asyncio.Task] = []
|
|
||||||
self.requested_from: typing.Dict[str, typing.Dict['KademliaPeer', asyncio.Task]] = {}
|
|
||||||
self.output_dir = output_dir or os.getcwd()
|
|
||||||
self.output_file_name = output_file_name
|
self.output_file_name = output_file_name
|
||||||
self._lock = asyncio.Lock(loop=self.loop)
|
self.blob_downloader: typing.Optional[BlobDownloader] = None
|
||||||
self.max_connections_per_stream = max_connections_per_stream
|
self.search_queue = asyncio.Queue(loop=loop)
|
||||||
self.fixed_peers = fixed_peers or []
|
self.peer_queue = asyncio.Queue(loop=loop)
|
||||||
|
self.accumulate_task: typing.Optional[asyncio.Task] = None
|
||||||
|
self.descriptor: typing.Optional[StreamDescriptor]
|
||||||
|
self.node: typing.Optional['Node'] = None
|
||||||
|
self.assemble_task: typing.Optional[asyncio.Task] = None
|
||||||
|
self.fixed_peers_handle: typing.Optional[asyncio.Handle] = None
|
||||||
|
|
||||||
async def _update_current_blob(self, blob: 'BlobFile'):
|
async def setup(self): # start the peer accumulator and initialize the downloader
|
||||||
async with self._lock:
|
if self.blob_downloader:
|
||||||
drain_tasks(self.running_download_requests)
|
raise Exception("downloader is already set up")
|
||||||
self.current_blob = blob
|
if self.node:
|
||||||
if not blob.get_is_verified():
|
_, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue)
|
||||||
self._update_requests()
|
self.blob_downloader = BlobDownloader(self.loop, self.config, self.blob_manager, self.peer_queue)
|
||||||
|
self.search_queue.put_nowait(self.sd_hash)
|
||||||
|
|
||||||
async def _request_blob(self, peer: 'KademliaPeer'):
|
async def after_got_descriptor(self):
|
||||||
if self.current_blob.get_is_verified():
|
self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash)
|
||||||
log.debug("already verified")
|
log.info("added head blob to search")
|
||||||
return
|
|
||||||
if peer not in self.active_connections:
|
|
||||||
log.warning("not active, adding: %s", str(peer))
|
|
||||||
self.active_connections[peer] = BlobExchangeClientProtocol(self.loop, self.peer_timeout)
|
|
||||||
protocol = self.active_connections[peer]
|
|
||||||
success, keep_connection = await request_blob(self.loop, self.current_blob, protocol,
|
|
||||||
peer.address, peer.tcp_port, self.peer_connect_timeout)
|
|
||||||
await protocol.close()
|
|
||||||
if not keep_connection:
|
|
||||||
log.debug("drop peer %s:%i", peer.address, peer.tcp_port)
|
|
||||||
if peer in self.active_connections:
|
|
||||||
async with self._lock:
|
|
||||||
del self.active_connections[peer]
|
|
||||||
return
|
|
||||||
log.debug("keep peer %s:%i", peer.address, peer.tcp_port)
|
|
||||||
|
|
||||||
def _update_requests(self):
|
async def after_finished(self):
|
||||||
self.new_peer_event.clear()
|
log.info("downloaded stream %s -> %s", self.sd_hash, self.output_path)
|
||||||
if self.current_blob.blob_hash not in self.requested_from:
|
await self.blob_manager.storage.change_file_status(self.descriptor.stream_hash, 'finished')
|
||||||
self.requested_from[self.current_blob.blob_hash] = {}
|
|
||||||
to_add = []
|
|
||||||
for peer in self.active_connections.keys():
|
|
||||||
if peer not in self.requested_from[self.current_blob.blob_hash] and peer not in to_add:
|
|
||||||
to_add.append(peer)
|
|
||||||
if to_add or self.running_download_requests:
|
|
||||||
log.debug("adding download probes for %i peers to %i already active",
|
|
||||||
min(len(to_add), 8 - len(self.running_download_requests)),
|
|
||||||
len(self.running_download_requests))
|
|
||||||
else:
|
|
||||||
log.info("downloader idle...")
|
|
||||||
for peer in to_add:
|
|
||||||
if len(self.running_download_requests) >= self.max_connections_per_stream:
|
|
||||||
break
|
|
||||||
task = self.loop.create_task(self._request_blob(peer))
|
|
||||||
self.requested_from[self.current_blob.blob_hash][peer] = task
|
|
||||||
self.running_download_requests.append(task)
|
|
||||||
|
|
||||||
async def wait_for_download_or_new_peer(self) -> typing.Optional['BlobFile']:
|
|
||||||
async with self._lock:
|
|
||||||
if len(self.running_download_requests) < self.max_connections_per_stream:
|
|
||||||
# update the running download requests
|
|
||||||
self._update_requests()
|
|
||||||
|
|
||||||
# drain the tasks into a temporary list
|
|
||||||
download_tasks = []
|
|
||||||
drain_into(self.running_download_requests, download_tasks)
|
|
||||||
|
|
||||||
got_new_peer = self.loop.create_task(self.new_peer_event.wait())
|
|
||||||
|
|
||||||
# wait for a new peer to be added or for a download attempt to finish
|
|
||||||
await asyncio.wait([got_new_peer] + download_tasks, return_when='FIRST_COMPLETED',
|
|
||||||
loop=self.loop)
|
|
||||||
if got_new_peer and not got_new_peer.done():
|
|
||||||
got_new_peer.cancel()
|
|
||||||
|
|
||||||
async with self._lock:
|
|
||||||
if self.current_blob.get_is_verified():
|
|
||||||
# a download attempt finished
|
|
||||||
if got_new_peer and not got_new_peer.done():
|
|
||||||
got_new_peer.cancel()
|
|
||||||
drain_tasks(download_tasks)
|
|
||||||
return self.current_blob
|
|
||||||
else:
|
|
||||||
# we got a new peer, re add the other pending download attempts
|
|
||||||
for task in download_tasks:
|
|
||||||
if task and not task.done():
|
|
||||||
self.running_download_requests.append(task)
|
|
||||||
return
|
|
||||||
|
|
||||||
async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
|
|
||||||
blob = self.blob_manager.get_blob(blob_hash, length)
|
|
||||||
await self._update_current_blob(blob)
|
|
||||||
if blob.get_is_verified():
|
|
||||||
return blob
|
|
||||||
|
|
||||||
# the blob must be downloaded
|
|
||||||
try:
|
|
||||||
while not self.current_blob.get_is_verified():
|
|
||||||
if not self.active_connections: # wait for a new connection
|
|
||||||
await self.new_peer_event.wait()
|
|
||||||
continue
|
|
||||||
blob = await self.wait_for_download_or_new_peer()
|
|
||||||
if blob:
|
|
||||||
drain_tasks(self.running_download_requests)
|
|
||||||
return blob
|
|
||||||
return blob
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
drain_tasks(self.running_download_requests)
|
|
||||||
raise
|
|
||||||
|
|
||||||
def _add_peer_protocols(self, peers: typing.List['KademliaPeer']):
|
|
||||||
added = 0
|
|
||||||
for peer in peers:
|
|
||||||
if peer not in self.active_connections:
|
|
||||||
self.active_connections[peer] = BlobExchangeClientProtocol(self.loop, self.peer_timeout)
|
|
||||||
added += 1
|
|
||||||
if added:
|
|
||||||
if not self.new_peer_event.is_set():
|
|
||||||
log.debug("added %i new peers", len(peers))
|
|
||||||
self.new_peer_event.set()
|
|
||||||
|
|
||||||
async def _accumulate_connections(self, node: 'Node'):
|
|
||||||
blob_queue = asyncio.Queue(loop=self.loop)
|
|
||||||
blob_queue.put_nowait(self.sd_hash)
|
|
||||||
task = asyncio.create_task(self.got_descriptor.wait())
|
|
||||||
add_fixed_peers_timer: typing.Optional[asyncio.Handle] = None
|
|
||||||
|
|
||||||
if self.fixed_peers:
|
|
||||||
def check_added_peers():
|
|
||||||
self._add_peer_protocols(self.fixed_peers)
|
|
||||||
log.info("adding fixed peer %s:%i", self.fixed_peers[0].address, self.fixed_peers[0].tcp_port)
|
|
||||||
|
|
||||||
add_fixed_peers_timer = self.loop.call_later(2, check_added_peers)
|
|
||||||
|
|
||||||
def got_descriptor(f):
|
|
||||||
try:
|
|
||||||
f.result()
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
return
|
|
||||||
log.info("add head blob hash to peer search")
|
|
||||||
blob_queue.put_nowait(self.descriptor.blobs[0].blob_hash)
|
|
||||||
|
|
||||||
task.add_done_callback(got_descriptor)
|
|
||||||
try:
|
|
||||||
async with node.stream_peer_search_junction(blob_queue) as search_junction:
|
|
||||||
async for peers in search_junction:
|
|
||||||
if peers:
|
|
||||||
self._add_peer_protocols(peers)
|
|
||||||
return
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
if task and not task.done():
|
|
||||||
task.cancel()
|
|
||||||
log.info("cancelled head blob task")
|
|
||||||
if add_fixed_peers_timer and not add_fixed_peers_timer.cancelled():
|
|
||||||
add_fixed_peers_timer.cancel()
|
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
cancel_task(self.accumulate_connections_task)
|
if self.accumulate_task and not self.accumulate_task.done():
|
||||||
self.accumulate_connections_task = None
|
self.accumulate_task.cancel()
|
||||||
drain_tasks(self.running_download_requests)
|
self.accumulate_task = None
|
||||||
|
if self.assemble_task and not self.assemble_task.done():
|
||||||
|
self.assemble_task.cancel()
|
||||||
|
self.assemble_task = None
|
||||||
|
if self.fixed_peers_handle:
|
||||||
|
self.fixed_peers_handle.cancel()
|
||||||
|
self.fixed_peers_handle = None
|
||||||
|
self.blob_downloader = None
|
||||||
|
|
||||||
while self.requested_from:
|
async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
|
||||||
_, peer_task_dict = self.requested_from.popitem()
|
return await self.blob_downloader.download_blob(blob_hash, length)
|
||||||
while peer_task_dict:
|
|
||||||
peer, task = peer_task_dict.popitem()
|
|
||||||
try:
|
|
||||||
cancel_task(task)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
while self.active_connections:
|
def add_fixed_peers(self):
|
||||||
_, client = self.active_connections.popitem()
|
async def _add_fixed_peers():
|
||||||
if client:
|
self.peer_queue.put_nowait([
|
||||||
await client.close()
|
KademliaPeer(self.loop, address=(await resolve_host(url)), tcp_port=port + 1)
|
||||||
log.info("stopped downloader")
|
for url, port in self.config.reflector_servers
|
||||||
|
])
|
||||||
async def _download(self):
|
if self.config.reflector_servers:
|
||||||
try:
|
self.fixed_peers_handle = self.loop.call_later(
|
||||||
|
self.config.fixed_peer_delay if (
|
||||||
log.info("download and decrypt stream")
|
'dht' not in self.config.components_to_skip
|
||||||
await self.assemble_decrypted_stream(self.output_dir, self.output_file_name)
|
and self.node
|
||||||
log.info(
|
and len(self.node.protocol.routing_table.get_peers())
|
||||||
"downloaded stream %s -> %s", self.sd_hash, self.output_path
|
) else 0.0,
|
||||||
|
self.loop.create_task, _add_fixed_peers()
|
||||||
)
|
)
|
||||||
await self.blob_manager.storage.change_file_status(
|
|
||||||
self.descriptor.stream_hash, 'finished'
|
|
||||||
)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
await self.stop()
|
|
||||||
|
|
||||||
def download(self, node: 'Node'):
|
def download(self, node: typing.Optional['Node'] = None):
|
||||||
self.accumulate_connections_task = self.loop.create_task(self._accumulate_connections(node))
|
self.node = node
|
||||||
self.download_task = self.loop.create_task(self._download())
|
self.assemble_task = self.loop.create_task(self.assemble_decrypted_stream(self.config.download_dir))
|
||||||
|
self.add_fixed_peers()
|
||||||
|
|
|
@ -2,12 +2,14 @@ import os
|
||||||
import asyncio
|
import asyncio
|
||||||
import typing
|
import typing
|
||||||
import logging
|
import logging
|
||||||
|
import binascii
|
||||||
from lbrynet.extras.daemon.mime_types import guess_media_type
|
from lbrynet.extras.daemon.mime_types import guess_media_type
|
||||||
from lbrynet.stream.downloader import StreamDownloader
|
from lbrynet.stream.downloader import StreamDownloader
|
||||||
from lbrynet.stream.descriptor import StreamDescriptor
|
from lbrynet.stream.descriptor import StreamDescriptor
|
||||||
from lbrynet.stream.reflector.client import StreamReflectorClient
|
from lbrynet.stream.reflector.client import StreamReflectorClient
|
||||||
|
from lbrynet.extras.daemon.storage import StoredStreamClaim
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from lbrynet.extras.daemon.storage import StoredStreamClaim
|
from lbrynet.schema.claim import ClaimDict
|
||||||
from lbrynet.blob.blob_manager import BlobFileManager
|
from lbrynet.blob.blob_manager import BlobFileManager
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -20,7 +22,7 @@ class ManagedStream:
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', descriptor: 'StreamDescriptor',
|
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', descriptor: 'StreamDescriptor',
|
||||||
download_directory: str, file_name: str, downloader: typing.Optional[StreamDownloader] = None,
|
download_directory: str, file_name: str, downloader: typing.Optional[StreamDownloader] = None,
|
||||||
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional['StoredStreamClaim'] = None):
|
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.download_directory = download_directory
|
self.download_directory = download_directory
|
||||||
|
@ -99,11 +101,11 @@ class ManagedStream:
|
||||||
|
|
||||||
def as_dict(self) -> typing.Dict:
|
def as_dict(self) -> typing.Dict:
|
||||||
full_path = os.path.join(self.download_directory, self.file_name)
|
full_path = os.path.join(self.download_directory, self.file_name)
|
||||||
if not os.path.exists(full_path):
|
if not os.path.isfile(full_path):
|
||||||
full_path = None
|
full_path = None
|
||||||
mime_type = guess_media_type(os.path.basename(self.file_name))
|
mime_type = guess_media_type(os.path.basename(self.file_name))
|
||||||
|
|
||||||
if self.downloader:
|
if self.downloader and self.downloader.written_bytes:
|
||||||
written_bytes = self.downloader.written_bytes
|
written_bytes = self.downloader.written_bytes
|
||||||
elif full_path:
|
elif full_path:
|
||||||
written_bytes = os.stat(full_path).st_size
|
written_bytes = os.stat(full_path).st_size
|
||||||
|
@ -176,6 +178,11 @@ class ManagedStream:
|
||||||
sent_sd, needed = await protocol.send_descriptor()
|
sent_sd, needed = await protocol.send_descriptor()
|
||||||
if sent_sd:
|
if sent_sd:
|
||||||
sent.append(self.sd_hash)
|
sent.append(self.sd_hash)
|
||||||
|
if not sent_sd and not needed:
|
||||||
|
if not self.fully_reflected.is_set():
|
||||||
|
self.fully_reflected.set()
|
||||||
|
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
|
||||||
|
return []
|
||||||
except (asyncio.CancelledError, asyncio.TimeoutError, ValueError):
|
except (asyncio.CancelledError, asyncio.TimeoutError, ValueError):
|
||||||
if protocol.transport:
|
if protocol.transport:
|
||||||
protocol.transport.close()
|
protocol.transport.close()
|
||||||
|
@ -192,4 +199,13 @@ class ManagedStream:
|
||||||
protocol.transport.close()
|
protocol.transport.close()
|
||||||
if not self.fully_reflected.is_set():
|
if not self.fully_reflected.is_set():
|
||||||
self.fully_reflected.set()
|
self.fully_reflected.set()
|
||||||
|
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
|
||||||
return sent
|
return sent
|
||||||
|
|
||||||
|
def set_claim(self, claim_info: typing.Dict, claim: 'ClaimDict'):
|
||||||
|
self.stream_claim_info = StoredStreamClaim(
|
||||||
|
self.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
|
||||||
|
claim_info['name'], claim_info['amount'], claim_info['height'],
|
||||||
|
binascii.hexlify(claim.serialized).decode(), claim.certificate_id, claim_info['address'],
|
||||||
|
claim_info['claim_sequence'], claim_info.get('channel_name')
|
||||||
|
)
|
||||||
|
|
|
@ -7,7 +7,8 @@ import random
|
||||||
from lbrynet.stream.downloader import StreamDownloader
|
from lbrynet.stream.downloader import StreamDownloader
|
||||||
from lbrynet.stream.managed_stream import ManagedStream
|
from lbrynet.stream.managed_stream import ManagedStream
|
||||||
from lbrynet.schema.claim import ClaimDict
|
from lbrynet.schema.claim import ClaimDict
|
||||||
from lbrynet.extras.daemon.storage import StoredStreamClaim, lbc_to_dewies
|
from lbrynet.schema.decode import smart_decode
|
||||||
|
from lbrynet.extras.daemon.storage import lbc_to_dewies
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from lbrynet.conf import Config
|
from lbrynet.conf import Config
|
||||||
from lbrynet.blob.blob_manager import BlobFileManager
|
from lbrynet.blob.blob_manager import BlobFileManager
|
||||||
|
@ -46,23 +47,22 @@ comparison_operators = {
|
||||||
|
|
||||||
|
|
||||||
class StreamManager:
|
class StreamManager:
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', wallet: 'LbryWalletManager',
|
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager',
|
||||||
storage: 'SQLiteStorage', node: typing.Optional['Node'], peer_timeout: float,
|
wallet: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node']):
|
||||||
peer_connect_timeout: float, fixed_peers: typing.Optional[typing.List['KademliaPeer']] = None,
|
|
||||||
reflector_servers: typing.Optional[typing.List[typing.Tuple[str, int]]] = None):
|
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
|
self.config = config
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.wallet = wallet
|
self.wallet = wallet
|
||||||
self.storage = storage
|
self.storage = storage
|
||||||
self.node = node
|
self.node = node
|
||||||
self.peer_timeout = peer_timeout
|
|
||||||
self.peer_connect_timeout = peer_connect_timeout
|
|
||||||
self.streams: typing.Set[ManagedStream] = set()
|
self.streams: typing.Set[ManagedStream] = set()
|
||||||
self.starting_streams: typing.Dict[str, asyncio.Future] = {}
|
self.starting_streams: typing.Dict[str, asyncio.Future] = {}
|
||||||
self.resume_downloading_task: asyncio.Task = None
|
self.resume_downloading_task: asyncio.Task = None
|
||||||
self.update_stream_finished_futs: typing.List[asyncio.Future] = []
|
self.update_stream_finished_futs: typing.List[asyncio.Future] = []
|
||||||
self.fixed_peers = fixed_peers
|
|
||||||
self.reflector_servers = reflector_servers
|
async def _update_content_claim(self, stream: ManagedStream):
|
||||||
|
claim_info = await self.storage.get_content_claim(stream.stream_hash)
|
||||||
|
stream.set_claim(claim_info, smart_decode(claim_info['value']))
|
||||||
|
|
||||||
async def load_streams_from_database(self):
|
async def load_streams_from_database(self):
|
||||||
infos = await self.storage.get_all_lbry_files()
|
infos = await self.storage.get_all_lbry_files()
|
||||||
|
@ -71,9 +71,9 @@ class StreamManager:
|
||||||
if sd_blob.get_is_verified():
|
if sd_blob.get_is_verified():
|
||||||
descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash)
|
descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash)
|
||||||
downloader = StreamDownloader(
|
downloader = StreamDownloader(
|
||||||
self.loop, self.blob_manager, descriptor.sd_hash, self.peer_timeout,
|
self.loop, self.config, self.blob_manager, descriptor.sd_hash,
|
||||||
self.peer_connect_timeout, binascii.unhexlify(file_info['download_directory']).decode(),
|
binascii.unhexlify(file_info['download_directory']).decode(),
|
||||||
binascii.unhexlify(file_info['file_name']).decode(), self.fixed_peers
|
binascii.unhexlify(file_info['file_name']).decode()
|
||||||
)
|
)
|
||||||
stream = ManagedStream(
|
stream = ManagedStream(
|
||||||
self.loop, self.blob_manager, descriptor,
|
self.loop, self.blob_manager, descriptor,
|
||||||
|
@ -82,6 +82,7 @@ class StreamManager:
|
||||||
downloader, file_info['status'], file_info['claim']
|
downloader, file_info['status'], file_info['claim']
|
||||||
)
|
)
|
||||||
self.streams.add(stream)
|
self.streams.add(stream)
|
||||||
|
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
|
||||||
|
|
||||||
async def resume(self):
|
async def resume(self):
|
||||||
if not self.node:
|
if not self.node:
|
||||||
|
@ -128,8 +129,9 @@ class StreamManager:
|
||||||
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
|
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
|
||||||
stream = await ManagedStream.create(self.loop, self.blob_manager, file_path, key, iv_generator)
|
stream = await ManagedStream.create(self.loop, self.blob_manager, file_path, key, iv_generator)
|
||||||
self.streams.add(stream)
|
self.streams.add(stream)
|
||||||
if self.reflector_servers:
|
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
|
||||||
host, port = random.choice(self.reflector_servers)
|
if self.config.reflector_servers:
|
||||||
|
host, port = random.choice(self.config.reflector_servers)
|
||||||
self.loop.create_task(stream.upload_to_reflector(host, port))
|
self.loop.create_task(stream.upload_to_reflector(host, port))
|
||||||
return stream
|
return stream
|
||||||
|
|
||||||
|
@ -165,9 +167,9 @@ class StreamManager:
|
||||||
async def _download_stream_from_claim(self, node: 'Node', download_directory: str, claim_info: typing.Dict,
|
async def _download_stream_from_claim(self, node: 'Node', download_directory: str, claim_info: typing.Dict,
|
||||||
file_name: typing.Optional[str] = None) -> typing.Optional[ManagedStream]:
|
file_name: typing.Optional[str] = None) -> typing.Optional[ManagedStream]:
|
||||||
|
|
||||||
claim = ClaimDict.load_dict(claim_info['value'])
|
claim = smart_decode(claim_info['value'])
|
||||||
downloader = StreamDownloader(self.loop, self.blob_manager, claim.source_hash.decode(), self.peer_timeout,
|
downloader = StreamDownloader(self.loop, self.config, self.blob_manager, claim.source_hash.decode(),
|
||||||
self.peer_connect_timeout, download_directory, file_name, self.fixed_peers)
|
download_directory, file_name)
|
||||||
try:
|
try:
|
||||||
downloader.download(node)
|
downloader.download(node)
|
||||||
await downloader.got_descriptor.wait()
|
await downloader.got_descriptor.wait()
|
||||||
|
@ -187,16 +189,9 @@ class StreamManager:
|
||||||
await self.blob_manager.storage.save_content_claim(
|
await self.blob_manager.storage.save_content_claim(
|
||||||
downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}"
|
downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}"
|
||||||
)
|
)
|
||||||
|
|
||||||
stored_claim = StoredStreamClaim(
|
|
||||||
downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
|
|
||||||
claim_info['name'], claim_info['amount'], claim_info['height'], claim_info['hex'],
|
|
||||||
claim.certificate_id, claim_info['address'], claim_info['claim_sequence'],
|
|
||||||
claim_info.get('channel_name')
|
|
||||||
)
|
|
||||||
stream = ManagedStream(self.loop, self.blob_manager, downloader.descriptor, download_directory,
|
stream = ManagedStream(self.loop, self.blob_manager, downloader.descriptor, download_directory,
|
||||||
os.path.basename(downloader.output_path), downloader, ManagedStream.STATUS_RUNNING,
|
os.path.basename(downloader.output_path), downloader, ManagedStream.STATUS_RUNNING)
|
||||||
stored_claim)
|
stream.set_claim(claim_info, claim)
|
||||||
self.streams.add(stream)
|
self.streams.add(stream)
|
||||||
try:
|
try:
|
||||||
await stream.downloader.wrote_bytes_event.wait()
|
await stream.downloader.wrote_bytes_event.wait()
|
||||||
|
@ -205,7 +200,7 @@ class StreamManager:
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
await downloader.stop()
|
await downloader.stop()
|
||||||
|
|
||||||
async def download_stream_from_claim(self, node: 'Node', config: 'Config', claim_info: typing.Dict,
|
async def download_stream_from_claim(self, node: 'Node', claim_info: typing.Dict,
|
||||||
file_name: typing.Optional[str] = None,
|
file_name: typing.Optional[str] = None,
|
||||||
timeout: typing.Optional[float] = 60,
|
timeout: typing.Optional[float] = 60,
|
||||||
fee_amount: typing.Optional[float] = 0.0,
|
fee_amount: typing.Optional[float] = 0.0,
|
||||||
|
@ -224,10 +219,10 @@ class StreamManager:
|
||||||
|
|
||||||
self.starting_streams[sd_hash] = asyncio.Future(loop=self.loop)
|
self.starting_streams[sd_hash] = asyncio.Future(loop=self.loop)
|
||||||
stream_task = self.loop.create_task(
|
stream_task = self.loop.create_task(
|
||||||
self._download_stream_from_claim(node, config.download_dir, claim_info, file_name)
|
self._download_stream_from_claim(node, self.config.download_dir, claim_info, file_name)
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(stream_task, timeout or config.download_timeout)
|
await asyncio.wait_for(stream_task, timeout or self.config.download_timeout)
|
||||||
stream = await stream_task
|
stream = await stream_task
|
||||||
self.starting_streams[sd_hash].set_result(stream)
|
self.starting_streams[sd_hash].set_result(stream)
|
||||||
if fee_address and fee_amount:
|
if fee_address and fee_amount:
|
||||||
|
|
|
@ -8,6 +8,7 @@ import json
|
||||||
import typing
|
import typing
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import ipaddress
|
||||||
import pkg_resources
|
import pkg_resources
|
||||||
from lbrynet.schema.claim import ClaimDict
|
from lbrynet.schema.claim import ClaimDict
|
||||||
from lbrynet.cryptoutils import get_lbry_hash_obj
|
from lbrynet.cryptoutils import get_lbry_hash_obj
|
||||||
|
@ -136,3 +137,16 @@ def cancel_tasks(tasks: typing.List[typing.Optional[asyncio.Task]]):
|
||||||
def drain_tasks(tasks: typing.List[typing.Optional[asyncio.Task]]):
|
def drain_tasks(tasks: typing.List[typing.Optional[asyncio.Task]]):
|
||||||
while tasks:
|
while tasks:
|
||||||
cancel_task(tasks.pop())
|
cancel_task(tasks.pop())
|
||||||
|
|
||||||
|
|
||||||
|
async def resolve_host(url: str) -> str:
|
||||||
|
try:
|
||||||
|
if ipaddress.ip_address(url):
|
||||||
|
return url
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
return (await loop.getaddrinfo(
|
||||||
|
url, 'https',
|
||||||
|
proto=socket.IPPROTO_TCP,
|
||||||
|
))[0][4][0]
|
||||||
|
|
|
@ -2,6 +2,7 @@ import sys
|
||||||
import os
|
import os
|
||||||
import asyncio
|
import asyncio
|
||||||
import socket
|
import socket
|
||||||
|
import ipaddress
|
||||||
from lbrynet.conf import Config
|
from lbrynet.conf import Config
|
||||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||||
from lbrynet.blob.blob_manager import BlobFileManager
|
from lbrynet.blob.blob_manager import BlobFileManager
|
||||||
|
@ -17,6 +18,13 @@ async def main(blob_hash: str, url: str):
|
||||||
conf = Config()
|
conf = Config()
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
host_url, port = url.split(":")
|
host_url, port = url.split(":")
|
||||||
|
try:
|
||||||
|
host = None
|
||||||
|
if ipaddress.ip_address(host_url):
|
||||||
|
host = host_url
|
||||||
|
except ValueError:
|
||||||
|
host = None
|
||||||
|
if not host:
|
||||||
host_info = await loop.getaddrinfo(
|
host_info = await loop.getaddrinfo(
|
||||||
host_url, 'https',
|
host_url, 'https',
|
||||||
proto=socket.IPPROTO_TCP,
|
proto=socket.IPPROTO_TCP,
|
||||||
|
@ -29,11 +37,13 @@ async def main(blob_hash: str, url: str):
|
||||||
await blob_manager.setup()
|
await blob_manager.setup()
|
||||||
|
|
||||||
blob = blob_manager.get_blob(blob_hash)
|
blob = blob_manager.get_blob(blob_hash)
|
||||||
protocol = BlobExchangeClientProtocol(loop, conf.blob_download_timeout)
|
success, keep = await request_blob(loop, blob, host, int(port), conf.peer_connect_timeout,
|
||||||
success, keep = await request_blob(loop, blob, protocol, host, int(port), conf.peer_connect_timeout)
|
conf.blob_download_timeout)
|
||||||
print(success, keep)
|
print(f"{'downloaded' if success else 'failed to download'} {blob_hash} from {host}:{port}\n"
|
||||||
|
f"keep connection: {keep}")
|
||||||
if blob.get_is_verified():
|
if blob.get_is_verified():
|
||||||
await blob_manager.delete_blobs([blob.blob_hash])
|
await blob_manager.delete_blobs([blob.blob_hash])
|
||||||
|
print(f"deleted {blob_hash}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__": # usage: python download_blob_from_peer.py <blob_hash> [host url:port]
|
if __name__ == "__main__": # usage: python download_blob_from_peer.py <blob_hash> [host url:port]
|
||||||
|
|
33
scripts/repair_0_31_1_db.py
Normal file
33
scripts/repair_0_31_1_db.py
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
import os
|
||||||
|
import binascii
|
||||||
|
import sqlite3
|
||||||
|
from lbrynet.conf import Config
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
conf = Config()
|
||||||
|
db = sqlite3.connect(os.path.join(conf.data_dir, 'lbrynet.sqlite'))
|
||||||
|
cur = db.cursor()
|
||||||
|
files = cur.execute("select stream_hash, file_name, download_directory from file").fetchall()
|
||||||
|
update = {}
|
||||||
|
for stream_hash, file_name, download_directory in files:
|
||||||
|
try:
|
||||||
|
binascii.unhexlify(file_name)
|
||||||
|
except binascii.Error:
|
||||||
|
try:
|
||||||
|
binascii.unhexlify(download_directory)
|
||||||
|
except binascii.Error:
|
||||||
|
update[stream_hash] = (
|
||||||
|
binascii.hexlify(file_name.encode()).decode(), binascii.hexlify(download_directory.encode()).decode()
|
||||||
|
)
|
||||||
|
if update:
|
||||||
|
print(f"repair {len(update)} streams")
|
||||||
|
for stream_hash, (file_name, download_directory) in update.items():
|
||||||
|
cur.execute('update file set file_name=?, download_directory=? where stream_hash=?',
|
||||||
|
(file_name, download_directory, stream_hash))
|
||||||
|
db.commit()
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
|
@ -57,31 +57,29 @@ def variance(times):
|
||||||
return round(sum(((i - mean) ** 2.0 for i in times)) / (len(times) - 1), 3)
|
return round(sum(((i - mean) ** 2.0 for i in times)) / (len(times) - 1), 3)
|
||||||
|
|
||||||
|
|
||||||
async def wait_for_done(api, uri):
|
async def wait_for_done(conf, uri):
|
||||||
name = uri.split("#")[0]
|
name = uri.split("#")[0]
|
||||||
last_complete = 0
|
last_complete = 0
|
||||||
hang_count = 0
|
hang_count = 0
|
||||||
while True:
|
while True:
|
||||||
files = await api.file_list(claim_name=name)
|
files = await daemon_rpc(conf, "file_list", claim_name=name)
|
||||||
file = files[0]
|
file = files[0]
|
||||||
if file['status'] in ['finished', 'stopped']:
|
if file['status'] in ['finished', 'stopped']:
|
||||||
return True, f"{file['blobs_completed']}/{file['blobs_in_stream']}", int(file['blobs_completed'])
|
return True, f"{file['blobs_completed']}/{file['blobs_in_stream']}", int(file['blobs_completed'])
|
||||||
if last_complete < int(file['blobs_completed']):
|
if last_complete < int(file['blobs_completed']):
|
||||||
print(f"{file['blobs_completed']}/{file['blobs_in_stream']}...")
|
|
||||||
hang_count = 0
|
hang_count = 0
|
||||||
last_complete = int(file['blobs_completed'])
|
last_complete = int(file['blobs_completed'])
|
||||||
else:
|
else:
|
||||||
hang_count += 1
|
hang_count += 1
|
||||||
await asyncio.sleep(1.0)
|
await asyncio.sleep(1.0)
|
||||||
if hang_count > 30:
|
if hang_count > 10:
|
||||||
return False, f"{file['blobs_completed']}/{file['blobs_in_stream']}", int(file['blobs_completed'])
|
return False, f"{file['blobs_completed']}/{file['blobs_in_stream']}", int(file['blobs_completed'])
|
||||||
|
|
||||||
|
|
||||||
async def main(start_daemon=True, uris=None):
|
async def main(uris=None):
|
||||||
if not uris:
|
if not uris:
|
||||||
uris = await get_frontpage_uris()
|
uris = await get_frontpage_uris()
|
||||||
conf = Config()
|
conf = Config()
|
||||||
daemon = None
|
|
||||||
try:
|
try:
|
||||||
await daemon_rpc(conf, 'status')
|
await daemon_rpc(conf, 'status')
|
||||||
except (ClientConnectorError, ConnectionError):
|
except (ClientConnectorError, ConnectionError):
|
||||||
|
@ -113,12 +111,12 @@ async def main(start_daemon=True, uris=None):
|
||||||
first_byte = time.time()
|
first_byte = time.time()
|
||||||
first_byte_times.append(first_byte - start)
|
first_byte_times.append(first_byte - start)
|
||||||
print(f"{i + 1}/{len(resolvable)} - {first_byte - start} {uri}")
|
print(f"{i + 1}/{len(resolvable)} - {first_byte - start} {uri}")
|
||||||
# downloaded, msg, blobs_in_stream = await wait_for_done(api, uri)
|
# downloaded, msg, blobs_in_stream = await wait_for_done(conf, uri)
|
||||||
# if downloaded:
|
# if downloaded:
|
||||||
# downloaded_times.append((time.time()-start) / downloaded)
|
# downloaded_times.append((time.time()-start) / downloaded)
|
||||||
# print(f"{i + 1}/{len(uris)} - downloaded @ {(time.time()-start) / blobs_in_stream}, {msg} {uri}")
|
# print(f"\tdownloaded {uri} @ {(time.time()-start) / blobs_in_stream} seconds per blob")
|
||||||
# else:
|
# else:
|
||||||
# print(f"failed to downlload {uri}, got {msg}")
|
# print(f"\tfailed to download {uri}, got {msg}")
|
||||||
# download_failures.append(uri)
|
# download_failures.append(uri)
|
||||||
except:
|
except:
|
||||||
print(f"{i + 1}/{len(uris)} - timeout in {time.time() - start} {uri}")
|
print(f"{i + 1}/{len(uris)} - timeout in {time.time() - start} {uri}")
|
||||||
|
@ -136,8 +134,7 @@ async def main(start_daemon=True, uris=None):
|
||||||
nt = '\n\t'
|
nt = '\n\t'
|
||||||
result += f"\nFailures:\n\t{nt.join([f for f in failures])}"
|
result += f"\nFailures:\n\t{nt.join([f for f in failures])}"
|
||||||
print(result)
|
print(result)
|
||||||
if daemon:
|
|
||||||
await daemon.shutdown()
|
|
||||||
# webhook = os.environ.get('TTFB_SLACK_TOKEN', None)
|
# webhook = os.environ.get('TTFB_SLACK_TOKEN', None)
|
||||||
# if webhook:
|
# if webhook:
|
||||||
# await report_to_slack(result, webhook)
|
# await report_to_slack(result, webhook)
|
||||||
|
|
|
@ -44,6 +44,7 @@ class CommandTestCase(IntegrationTestCase):
|
||||||
conf.reflect_streams = False
|
conf.reflect_streams = False
|
||||||
conf.blockchain_name = 'lbrycrd_regtest'
|
conf.blockchain_name = 'lbrycrd_regtest'
|
||||||
conf.lbryum_servers = [('localhost', 50001)]
|
conf.lbryum_servers = [('localhost', 50001)]
|
||||||
|
conf.reflector_servers = []
|
||||||
conf.known_dht_nodes = []
|
conf.known_dht_nodes = []
|
||||||
|
|
||||||
await self.account.ensure_address_gap()
|
await self.account.ensure_address_gap()
|
||||||
|
|
|
@ -18,6 +18,12 @@ from lbrynet.dht.peer import KademliaPeer, PeerManager
|
||||||
# logging.getLogger("lbrynet").setLevel(logging.DEBUG)
|
# logging.getLogger("lbrynet").setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
|
||||||
|
def mock_config():
|
||||||
|
config = Config()
|
||||||
|
config.fixed_peer_delay = 10000
|
||||||
|
return config
|
||||||
|
|
||||||
|
|
||||||
class BlobExchangeTestBase(AsyncioTestCase):
|
class BlobExchangeTestBase(AsyncioTestCase):
|
||||||
async def asyncSetUp(self):
|
async def asyncSetUp(self):
|
||||||
self.loop = asyncio.get_event_loop()
|
self.loop = asyncio.get_event_loop()
|
||||||
|
@ -26,12 +32,15 @@ class BlobExchangeTestBase(AsyncioTestCase):
|
||||||
self.server_dir = tempfile.mkdtemp()
|
self.server_dir = tempfile.mkdtemp()
|
||||||
self.addCleanup(shutil.rmtree, self.client_dir)
|
self.addCleanup(shutil.rmtree, self.client_dir)
|
||||||
self.addCleanup(shutil.rmtree, self.server_dir)
|
self.addCleanup(shutil.rmtree, self.server_dir)
|
||||||
|
self.server_config = Config(data_dir=self.server_dir, download_dir=self.server_dir, wallet=self.server_dir,
|
||||||
self.server_storage = SQLiteStorage(Config(), os.path.join(self.server_dir, "lbrynet.sqlite"))
|
reflector_servers=[])
|
||||||
|
self.server_storage = SQLiteStorage(self.server_config, os.path.join(self.server_dir, "lbrynet.sqlite"))
|
||||||
self.server_blob_manager = BlobFileManager(self.loop, self.server_dir, self.server_storage)
|
self.server_blob_manager = BlobFileManager(self.loop, self.server_dir, self.server_storage)
|
||||||
self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')
|
self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')
|
||||||
|
|
||||||
self.client_storage = SQLiteStorage(Config(), os.path.join(self.client_dir, "lbrynet.sqlite"))
|
self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir, wallet=self.client_dir,
|
||||||
|
reflector_servers=[])
|
||||||
|
self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite"))
|
||||||
self.client_blob_manager = BlobFileManager(self.loop, self.client_dir, self.client_storage)
|
self.client_blob_manager = BlobFileManager(self.loop, self.client_dir, self.client_storage)
|
||||||
self.client_peer_manager = PeerManager(self.loop)
|
self.client_peer_manager = PeerManager(self.loop)
|
||||||
self.server_from_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333)
|
self.server_from_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333)
|
||||||
|
@ -57,12 +66,10 @@ class TestBlobExchange(BlobExchangeTestBase):
|
||||||
|
|
||||||
async def _test_transfer_blob(self, blob_hash: str):
|
async def _test_transfer_blob(self, blob_hash: str):
|
||||||
client_blob = self.client_blob_manager.get_blob(blob_hash)
|
client_blob = self.client_blob_manager.get_blob(blob_hash)
|
||||||
protocol = BlobExchangeClientProtocol(self.loop, 2)
|
|
||||||
|
|
||||||
# download the blob
|
# download the blob
|
||||||
downloaded = await request_blob(self.loop, client_blob, protocol, self.server_from_client.address,
|
downloaded = await request_blob(self.loop, client_blob, self.server_from_client.address,
|
||||||
self.server_from_client.tcp_port, 2)
|
self.server_from_client.tcp_port, 2, 3)
|
||||||
await protocol.close()
|
|
||||||
self.assertEqual(client_blob.get_is_verified(), True)
|
self.assertEqual(client_blob.get_is_verified(), True)
|
||||||
self.assertTrue(downloaded)
|
self.assertTrue(downloaded)
|
||||||
|
|
||||||
|
@ -95,17 +102,15 @@ class TestBlobExchange(BlobExchangeTestBase):
|
||||||
await self._add_blob_to_server(blob_hash, mock_blob_bytes)
|
await self._add_blob_to_server(blob_hash, mock_blob_bytes)
|
||||||
|
|
||||||
second_client_blob = self.client_blob_manager.get_blob(blob_hash)
|
second_client_blob = self.client_blob_manager.get_blob(blob_hash)
|
||||||
protocol = BlobExchangeClientProtocol(self.loop, 2)
|
|
||||||
|
|
||||||
# download the blob
|
# download the blob
|
||||||
await asyncio.gather(
|
await asyncio.gather(
|
||||||
request_blob(
|
request_blob(
|
||||||
self.loop, second_client_blob, protocol, server_from_second_client.address,
|
self.loop, second_client_blob, server_from_second_client.address,
|
||||||
server_from_second_client.tcp_port, 2
|
server_from_second_client.tcp_port, 2, 3
|
||||||
),
|
),
|
||||||
self._test_transfer_blob(blob_hash)
|
self._test_transfer_blob(blob_hash)
|
||||||
)
|
)
|
||||||
await protocol.close()
|
|
||||||
self.assertEqual(second_client_blob.get_is_verified(), True)
|
self.assertEqual(second_client_blob.get_is_verified(), True)
|
||||||
|
|
||||||
async def test_host_different_blobs_to_multiple_peers_at_once(self):
|
async def test_host_different_blobs_to_multiple_peers_at_once(self):
|
||||||
|
@ -129,16 +134,14 @@ class TestBlobExchange(BlobExchangeTestBase):
|
||||||
await self._add_blob_to_server(sd_hash, mock_sd_blob_bytes)
|
await self._add_blob_to_server(sd_hash, mock_sd_blob_bytes)
|
||||||
|
|
||||||
second_client_blob = self.client_blob_manager.get_blob(blob_hash)
|
second_client_blob = self.client_blob_manager.get_blob(blob_hash)
|
||||||
protocol = BlobExchangeClientProtocol(self.loop, 2)
|
|
||||||
|
|
||||||
await asyncio.gather(
|
await asyncio.gather(
|
||||||
request_blob(
|
request_blob(
|
||||||
self.loop, second_client_blob, protocol, server_from_second_client.address,
|
self.loop, second_client_blob, server_from_second_client.address,
|
||||||
server_from_second_client.tcp_port, 2
|
server_from_second_client.tcp_port, 2, 3
|
||||||
),
|
),
|
||||||
self._test_transfer_blob(sd_hash)
|
self._test_transfer_blob(sd_hash)
|
||||||
)
|
)
|
||||||
await protocol.close()
|
|
||||||
self.assertEqual(second_client_blob.get_is_verified(), True)
|
self.assertEqual(second_client_blob.get_is_verified(), True)
|
||||||
|
|
||||||
async def test_server_chunked_request(self):
|
async def test_server_chunked_request(self):
|
||||||
|
|
|
@ -7,8 +7,8 @@ from lbrynet.conf import Config
|
||||||
from lbrynet.blob.blob_manager import BlobFileManager
|
from lbrynet.blob.blob_manager import BlobFileManager
|
||||||
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
|
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
|
||||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||||
from lbrynet.stream.descriptor import StreamDescriptor
|
|
||||||
from lbrynet.stream.assembler import StreamAssembler
|
from lbrynet.stream.assembler import StreamAssembler
|
||||||
|
from lbrynet.stream.descriptor import StreamDescriptor
|
||||||
|
|
||||||
|
|
||||||
class TestStreamAssembler(AsyncioTestCase):
|
class TestStreamAssembler(AsyncioTestCase):
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
import os
|
import os
|
||||||
|
import unittest
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
import asyncio
|
import asyncio
|
||||||
import contextlib
|
from lbrynet.conf import Config
|
||||||
from lbrynet.stream.descriptor import StreamDescriptor
|
from lbrynet.stream.descriptor import StreamDescriptor
|
||||||
from lbrynet.stream.downloader import StreamDownloader
|
from lbrynet.stream.downloader import StreamDownloader
|
||||||
from lbrynet.dht.node import Node
|
from lbrynet.dht.node import Node
|
||||||
|
@ -21,22 +22,21 @@ class TestStreamDownloader(BlobExchangeTestBase):
|
||||||
f.write(self.stream_bytes)
|
f.write(self.stream_bytes)
|
||||||
descriptor = await StreamDescriptor.create_stream(self.loop, self.server_blob_manager.blob_dir, file_path)
|
descriptor = await StreamDescriptor.create_stream(self.loop, self.server_blob_manager.blob_dir, file_path)
|
||||||
self.sd_hash = descriptor.calculate_sd_hash()
|
self.sd_hash = descriptor.calculate_sd_hash()
|
||||||
self.downloader = StreamDownloader(self.loop, self.client_blob_manager, self.sd_hash, 3, 3, self.client_dir)
|
conf = Config(data_dir=self.server_dir, wallet_dir=self.server_dir, download_dir=self.server_dir,
|
||||||
|
reflector_servers=[])
|
||||||
|
self.downloader = StreamDownloader(self.loop, conf, self.client_blob_manager, self.sd_hash)
|
||||||
|
|
||||||
async def _test_transfer_stream(self, blob_count: int, mock_peer_search=None):
|
async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None):
|
||||||
await self.setup_stream(blob_count)
|
await self.setup_stream(blob_count)
|
||||||
|
|
||||||
mock_node = mock.Mock(spec=Node)
|
mock_node = mock.Mock(spec=Node)
|
||||||
|
|
||||||
@contextlib.asynccontextmanager
|
def _mock_accumulate_peers(q1, q2):
|
||||||
async def _mock_peer_search(*_):
|
async def _task():
|
||||||
async def _gen():
|
pass
|
||||||
yield [self.server_from_client]
|
q2.put_nowait([self.server_from_client])
|
||||||
return
|
return q2, self.loop.create_task(_task())
|
||||||
|
|
||||||
yield _gen()
|
mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers
|
||||||
|
|
||||||
mock_node.stream_peer_search_junction = mock_peer_search or _mock_peer_search
|
|
||||||
|
|
||||||
self.downloader.download(mock_node)
|
self.downloader.download(mock_node)
|
||||||
await self.downloader.stream_finished_event.wait()
|
await self.downloader.stream_finished_event.wait()
|
||||||
|
@ -48,32 +48,30 @@ class TestStreamDownloader(BlobExchangeTestBase):
|
||||||
async def test_transfer_stream(self):
|
async def test_transfer_stream(self):
|
||||||
await self._test_transfer_stream(10)
|
await self._test_transfer_stream(10)
|
||||||
|
|
||||||
# async def test_transfer_hundred_blob_stream(self):
|
@unittest.SkipTest
|
||||||
# await self._test_transfer_stream(100)
|
async def test_transfer_hundred_blob_stream(self):
|
||||||
|
await self._test_transfer_stream(100)
|
||||||
|
|
||||||
async def test_transfer_stream_bad_first_peer_good_second(self):
|
async def test_transfer_stream_bad_first_peer_good_second(self):
|
||||||
await self.setup_stream(2)
|
await self.setup_stream(2)
|
||||||
|
|
||||||
mock_node = mock.Mock(spec=Node)
|
mock_node = mock.Mock(spec=Node)
|
||||||
|
q = asyncio.Queue()
|
||||||
|
|
||||||
bad_peer = KademliaPeer(self.loop, "127.0.0.1", b'2' * 48, tcp_port=3334)
|
bad_peer = KademliaPeer(self.loop, "127.0.0.1", b'2' * 48, tcp_port=3334)
|
||||||
|
|
||||||
@contextlib.asynccontextmanager
|
def _mock_accumulate_peers(q1, q2):
|
||||||
async def mock_peer_search(*_):
|
async def _task():
|
||||||
async def _gen():
|
pass
|
||||||
await asyncio.sleep(0.05, loop=self.loop)
|
|
||||||
yield [bad_peer]
|
|
||||||
await asyncio.sleep(0.1, loop=self.loop)
|
|
||||||
yield [self.server_from_client]
|
|
||||||
return
|
|
||||||
|
|
||||||
yield _gen()
|
q2.put_nowait([bad_peer])
|
||||||
|
self.loop.call_later(1, q2.put_nowait, [self.server_from_client])
|
||||||
|
return q2, self.loop.create_task(_task())
|
||||||
|
|
||||||
mock_node.stream_peer_search_junction = mock_peer_search
|
mock_node.accumulate_peers = _mock_accumulate_peers
|
||||||
|
|
||||||
self.downloader.download(mock_node)
|
self.downloader.download(mock_node)
|
||||||
await self.downloader.stream_finished_event.wait()
|
await self.downloader.stream_finished_event.wait()
|
||||||
await self.downloader.stop()
|
|
||||||
self.assertTrue(os.path.isfile(self.downloader.output_path))
|
self.assertTrue(os.path.isfile(self.downloader.output_path))
|
||||||
with open(self.downloader.output_path, 'rb') as f:
|
with open(self.downloader.output_path, 'rb') as f:
|
||||||
self.assertEqual(f.read(), self.stream_bytes)
|
self.assertEqual(f.read(), self.stream_bytes)
|
||||||
|
|
|
@ -21,7 +21,7 @@ class TestStreamAssembler(AsyncioTestCase):
|
||||||
self.storage = SQLiteStorage(Config(), os.path.join(tmp_dir, "lbrynet.sqlite"))
|
self.storage = SQLiteStorage(Config(), os.path.join(tmp_dir, "lbrynet.sqlite"))
|
||||||
await self.storage.open()
|
await self.storage.open()
|
||||||
self.blob_manager = BlobFileManager(self.loop, tmp_dir, self.storage)
|
self.blob_manager = BlobFileManager(self.loop, tmp_dir, self.storage)
|
||||||
self.stream_manager = StreamManager(self.loop, self.blob_manager, None, self.storage, None, 3.0, 3.0)
|
self.stream_manager = StreamManager(self.loop, Config(), self.blob_manager, None, self.storage, None)
|
||||||
|
|
||||||
server_tmp_dir = tempfile.mkdtemp()
|
server_tmp_dir = tempfile.mkdtemp()
|
||||||
self.addCleanup(lambda: shutil.rmtree(server_tmp_dir))
|
self.addCleanup(lambda: shutil.rmtree(server_tmp_dir))
|
||||||
|
|
Loading…
Reference in a new issue