2019-01-22 18:54:17 +01:00
|
|
|
import asyncio
|
|
|
|
import typing
|
|
|
|
import logging
|
2019-03-31 03:07:43 +02:00
|
|
|
import binascii
|
2019-03-31 19:42:27 +02:00
|
|
|
from lbrynet.error import DownloadSDTimeout
|
2019-01-31 19:46:19 +01:00
|
|
|
from lbrynet.utils import resolve_host
|
2019-01-30 20:57:09 +01:00
|
|
|
from lbrynet.stream.descriptor import StreamDescriptor
|
|
|
|
from lbrynet.blob_exchange.downloader import BlobDownloader
|
|
|
|
from lbrynet.dht.peer import KademliaPeer
|
2019-01-22 18:54:17 +01:00
|
|
|
if typing.TYPE_CHECKING:
|
2019-01-30 20:57:09 +01:00
|
|
|
from lbrynet.conf import Config
|
2019-01-22 18:54:17 +01:00
|
|
|
from lbrynet.dht.node import Node
|
2019-03-28 19:51:55 +01:00
|
|
|
from lbrynet.blob.blob_manager import BlobManager
|
2019-03-31 03:07:43 +02:00
|
|
|
from lbrynet.blob.blob_file import AbstractBlob
|
|
|
|
from lbrynet.blob.blob_info import BlobInfo
|
2019-01-22 18:54:17 +01:00
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
2019-03-31 03:07:43 +02:00
|
|
|
class StreamDownloader:
|
|
|
|
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager', sd_hash: str,
|
|
|
|
descriptor: typing.Optional[StreamDescriptor] = None):
|
|
|
|
self.loop = loop
|
2019-01-30 20:57:09 +01:00
|
|
|
self.config = config
|
2019-03-31 03:07:43 +02:00
|
|
|
self.blob_manager = blob_manager
|
|
|
|
self.sd_hash = sd_hash
|
|
|
|
self.search_queue = asyncio.Queue(loop=loop) # blob hashes to feed into the iterative finder
|
|
|
|
self.peer_queue = asyncio.Queue(loop=loop) # new peers to try
|
|
|
|
self.blob_downloader = BlobDownloader(self.loop, self.config, self.blob_manager, self.peer_queue)
|
|
|
|
self.descriptor: typing.Optional[StreamDescriptor] = descriptor
|
2019-01-30 20:57:09 +01:00
|
|
|
self.node: typing.Optional['Node'] = None
|
2019-03-31 03:07:43 +02:00
|
|
|
self.accumulate_task: typing.Optional[asyncio.Task] = None
|
2019-01-30 20:57:09 +01:00
|
|
|
self.fixed_peers_handle: typing.Optional[asyncio.Handle] = None
|
2019-03-14 20:08:26 +01:00
|
|
|
self.fixed_peers_delay: typing.Optional[float] = None
|
|
|
|
self.added_fixed_peers = False
|
2019-03-31 19:42:27 +02:00
|
|
|
self.time_to_descriptor: typing.Optional[float] = None
|
|
|
|
self.time_to_first_bytes: typing.Optional[float] = None
|
2019-01-30 20:57:09 +01:00
|
|
|
|
2019-03-31 03:07:43 +02:00
|
|
|
async def add_fixed_peers(self):
|
|
|
|
def _delayed_add_fixed_peers():
|
|
|
|
self.added_fixed_peers = True
|
|
|
|
self.peer_queue.put_nowait([
|
|
|
|
KademliaPeer(self.loop, address=address, tcp_port=port + 1)
|
|
|
|
for address, port in addresses
|
|
|
|
])
|
|
|
|
|
|
|
|
if not self.config.reflector_servers:
|
|
|
|
return
|
|
|
|
addresses = [
|
|
|
|
(await resolve_host(url, port + 1, proto='tcp'), port)
|
|
|
|
for url, port in self.config.reflector_servers
|
|
|
|
]
|
|
|
|
if 'dht' in self.config.components_to_skip or not self.node or not \
|
|
|
|
len(self.node.protocol.routing_table.get_peers()):
|
|
|
|
self.fixed_peers_delay = 0.0
|
|
|
|
else:
|
|
|
|
self.fixed_peers_delay = self.config.fixed_peer_delay
|
|
|
|
|
|
|
|
self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _delayed_add_fixed_peers)
|
|
|
|
|
|
|
|
async def load_descriptor(self):
|
|
|
|
# download or get the sd blob
|
|
|
|
sd_blob = self.blob_manager.get_blob(self.sd_hash)
|
|
|
|
if not sd_blob.get_is_verified():
|
2019-03-31 19:42:27 +02:00
|
|
|
try:
|
|
|
|
now = self.loop.time()
|
|
|
|
sd_blob = await asyncio.wait_for(
|
|
|
|
self.blob_downloader.download_blob(self.sd_hash),
|
|
|
|
self.config.blob_download_timeout, loop=self.loop
|
|
|
|
)
|
|
|
|
log.info("downloaded sd blob %s", self.sd_hash)
|
|
|
|
self.time_to_descriptor = self.loop.time() - now
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
raise DownloadSDTimeout(self.sd_hash)
|
2019-03-31 03:07:43 +02:00
|
|
|
|
|
|
|
# parse the descriptor
|
|
|
|
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(
|
|
|
|
self.loop, self.blob_manager.blob_dir, sd_blob
|
|
|
|
)
|
|
|
|
log.info("loaded stream manifest %s", self.sd_hash)
|
|
|
|
|
|
|
|
async def start(self, node: typing.Optional['Node'] = None):
|
|
|
|
# set up peer accumulation
|
|
|
|
if node:
|
|
|
|
self.node = node
|
2019-01-30 20:57:09 +01:00
|
|
|
_, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue)
|
2019-03-31 03:07:43 +02:00
|
|
|
await self.add_fixed_peers()
|
|
|
|
# start searching for peers for the sd hash
|
2019-01-30 20:57:09 +01:00
|
|
|
self.search_queue.put_nowait(self.sd_hash)
|
2019-03-31 03:07:43 +02:00
|
|
|
log.info("searching for peers for stream %s", self.sd_hash)
|
|
|
|
|
|
|
|
if not self.descriptor:
|
|
|
|
await self.load_descriptor()
|
2019-01-30 20:57:09 +01:00
|
|
|
|
2019-03-31 03:07:43 +02:00
|
|
|
# add the head blob to the peer search
|
2019-01-30 20:57:09 +01:00
|
|
|
self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash)
|
2019-03-31 03:07:43 +02:00
|
|
|
log.info("added head blob to peer search for stream %s", self.sd_hash)
|
2019-01-30 20:57:09 +01:00
|
|
|
|
2019-03-31 03:07:43 +02:00
|
|
|
if not await self.blob_manager.storage.stream_exists(self.sd_hash):
|
|
|
|
await self.blob_manager.storage.store_stream(
|
|
|
|
self.blob_manager.get_blob(self.sd_hash, length=self.descriptor.length), self.descriptor
|
|
|
|
)
|
|
|
|
|
|
|
|
async def download_stream_blob(self, blob_info: 'BlobInfo') -> 'AbstractBlob':
|
|
|
|
if not filter(lambda blob: blob.blob_hash == blob_info.blob_hash, self.descriptor.blobs[:-1]):
|
|
|
|
raise ValueError(f"blob {blob_info.blob_hash} is not part of stream with sd hash {self.sd_hash}")
|
|
|
|
blob = await self.blob_downloader.download_blob(blob_info.blob_hash, blob_info.length)
|
|
|
|
return blob
|
|
|
|
|
|
|
|
def _decrypt_blob(self, blob_info: 'BlobInfo', blob: 'AbstractBlob'):
|
|
|
|
return blob.decrypt(
|
|
|
|
binascii.unhexlify(self.descriptor.key.encode()), binascii.unhexlify(blob_info.iv.encode())
|
|
|
|
)
|
|
|
|
|
2019-03-31 19:42:27 +02:00
|
|
|
async def decrypt_blob(self, blob_info: 'BlobInfo', blob: 'AbstractBlob') -> bytes:
|
2019-03-31 03:07:43 +02:00
|
|
|
return await self.loop.run_in_executor(None, self._decrypt_blob, blob_info, blob)
|
|
|
|
|
|
|
|
async def read_blob(self, blob_info: 'BlobInfo') -> bytes:
|
2019-03-31 19:42:27 +02:00
|
|
|
start = None
|
|
|
|
if self.time_to_first_bytes is None:
|
|
|
|
start = self.loop.time()
|
2019-03-31 03:07:43 +02:00
|
|
|
blob = await self.download_stream_blob(blob_info)
|
2019-03-31 19:42:27 +02:00
|
|
|
decrypted = await self.decrypt_blob(blob_info, blob)
|
|
|
|
if start:
|
|
|
|
self.time_to_first_bytes = self.loop.time() - start
|
|
|
|
return decrypted
|
2019-01-22 18:54:17 +01:00
|
|
|
|
2019-02-01 20:04:53 +01:00
|
|
|
def stop(self):
|
|
|
|
if self.accumulate_task:
|
2019-01-30 20:57:09 +01:00
|
|
|
self.accumulate_task.cancel()
|
|
|
|
self.accumulate_task = None
|
|
|
|
if self.fixed_peers_handle:
|
|
|
|
self.fixed_peers_handle.cancel()
|
|
|
|
self.fixed_peers_handle = None
|
2019-03-31 03:07:43 +02:00
|
|
|
self.blob_downloader.close()
|