remove StreamAssembler, refactor ManagedStream and StreamDownloader (WIP)

This commit is contained in:
Jack Robison 2019-03-30 21:07:43 -04:00
parent c663e5a3cf
commit 93267efe0b
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
5 changed files with 422 additions and 450 deletions

View file

@ -195,12 +195,16 @@ def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor
transaction.executemany("delete from blob where blob_hash=?", blob_hashes) transaction.executemany("delete from blob where blob_hash=?", blob_hashes)
def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: str, download_directory: str, def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str],
data_payment_rate: float, status: str) -> int: download_directory: typing.Optional[str], data_payment_rate: float, status: str) -> int:
if not file_name and not download_directory:
encoded_file_name, encoded_download_dir = "{stream}", "{stream}"
else:
encoded_file_name = binascii.hexlify(file_name.encode()).decode()
encoded_download_dir = binascii.hexlify(download_directory.encode()).decode()
transaction.execute( transaction.execute(
"insert or replace into file values (?, ?, ?, ?, ?)", "insert or replace into file values (?, ?, ?, ?, ?)",
(stream_hash, binascii.hexlify(file_name.encode()).decode(), (stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status)
binascii.hexlify(download_directory.encode()).decode(), data_payment_rate, status)
) )
return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0] return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0]
@ -481,6 +485,12 @@ class SQLiteStorage(SQLiteMixin):
"select stream_hash from stream where sd_hash = ?", sd_blob_hash "select stream_hash from stream where sd_hash = ?", sd_blob_hash
) )
def get_stream_info_for_sd_hash(self, sd_blob_hash):
return self.run_and_return_one_or_none(
"select stream_hash, stream_name, suggested_filename, stream_key from stream where sd_hash = ?",
sd_blob_hash
)
def delete_stream(self, descriptor: 'StreamDescriptor'): def delete_stream(self, descriptor: 'StreamDescriptor'):
return self.db.run_with_foreign_keys_disabled(delete_stream, descriptor) return self.db.run_with_foreign_keys_disabled(delete_stream, descriptor)
@ -492,7 +502,8 @@ class SQLiteStorage(SQLiteMixin):
stream_hash, file_name, download_directory, data_payment_rate, status="running" stream_hash, file_name, download_directory, data_payment_rate, status="running"
) )
def save_published_file(self, stream_hash: str, file_name: str, download_directory: str, data_payment_rate: float, def save_published_file(self, stream_hash: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], data_payment_rate: float,
status="finished") -> typing.Awaitable[int]: status="finished") -> typing.Awaitable[int]:
return self.db.run(store_file, stream_hash, file_name, download_directory, data_payment_rate, status) return self.db.run(store_file, stream_hash, file_name, download_directory, data_payment_rate, status)

View file

@ -1,136 +0,0 @@
import os
import binascii
import logging
import typing
import asyncio
from lbrynet.blob import MAX_BLOB_SIZE
from lbrynet.stream.descriptor import StreamDescriptor
if typing.TYPE_CHECKING:
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.blob.blob_info import BlobInfo
from lbrynet.blob.blob_file import BlobFile
log = logging.getLogger(__name__)
def _get_next_available_file_name(download_directory: str, file_name: str) -> str:
base_name, ext = os.path.splitext(os.path.basename(file_name))
i = 0
while os.path.isfile(os.path.join(download_directory, file_name)):
i += 1
file_name = "%s_%i%s" % (base_name, i, ext)
return file_name
async def get_next_available_file_name(loop: asyncio.BaseEventLoop, download_directory: str, file_name: str) -> str:
return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name)
class StreamAssembler:
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', sd_hash: str,
output_file_name: typing.Optional[str] = None):
self.output_file_name = output_file_name
self.loop = loop
self.blob_manager = blob_manager
self.sd_hash = sd_hash
self.sd_blob: 'BlobFile' = None
self.descriptor: StreamDescriptor = None
self.got_descriptor = asyncio.Event(loop=self.loop)
self.wrote_bytes_event = asyncio.Event(loop=self.loop)
self.stream_finished_event = asyncio.Event(loop=self.loop)
self.output_path = ''
self.stream_handle = None
self.written_bytes: int = 0
async def _decrypt_blob(self, blob: 'BlobFile', blob_info: 'BlobInfo', key: str):
if not blob or not self.stream_handle or self.stream_handle.closed:
return False
def _decrypt_and_write():
offset = blob_info.blob_num * (MAX_BLOB_SIZE - 1)
self.stream_handle.seek(offset)
_decrypted = blob.decrypt(
binascii.unhexlify(key), binascii.unhexlify(blob_info.iv.encode())
)
self.stream_handle.write(_decrypted)
self.stream_handle.flush()
self.written_bytes += len(_decrypted)
log.debug("decrypted %s", blob.blob_hash[:8])
await self.loop.run_in_executor(None, _decrypt_and_write)
return True
async def setup(self):
pass
async def after_got_descriptor(self):
pass
async def after_finished(self):
pass
async def assemble_decrypted_stream(self, output_dir: str, output_file_name: typing.Optional[str] = None):
if not os.path.isdir(output_dir):
raise OSError(f"output directory does not exist: '{output_dir}' '{output_file_name}'")
await self.setup()
self.sd_blob = await self.get_blob(self.sd_hash)
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_manager.blob_dir,
self.sd_blob)
await self.after_got_descriptor()
self.output_file_name = output_file_name or self.descriptor.suggested_file_name
self.output_file_name = await get_next_available_file_name(self.loop, output_dir, self.output_file_name)
self.output_path = os.path.join(output_dir, self.output_file_name)
if not self.got_descriptor.is_set():
self.got_descriptor.set()
await self.blob_manager.storage.store_stream(
self.sd_blob, self.descriptor
)
await self.blob_manager.blob_completed(self.sd_blob)
written_blobs = None
save_tasks = []
try:
with open(self.output_path, 'wb') as stream_handle:
self.stream_handle = stream_handle
for i, blob_info in enumerate(self.descriptor.blobs[:-1]):
if blob_info.blob_num != i:
log.error("sd blob %s is invalid, cannot assemble stream", self.descriptor.sd_hash)
return
while self.stream_handle and not self.stream_handle.closed:
try:
blob = await self.get_blob(blob_info.blob_hash, blob_info.length)
if blob and blob.length != blob_info.length:
log.warning("Found incomplete, deleting: %s", blob_info.blob_hash)
await self.blob_manager.delete_blobs([blob_info.blob_hash])
continue
if await self._decrypt_blob(blob, blob_info, self.descriptor.key):
save_tasks.append(asyncio.ensure_future(self.blob_manager.blob_completed(blob)))
written_blobs = i
if not self.wrote_bytes_event.is_set():
self.wrote_bytes_event.set()
log.debug("written %i/%i", written_blobs, len(self.descriptor.blobs) - 2)
break
except FileNotFoundError:
log.debug("stream assembler stopped")
return
except (ValueError, IOError, OSError):
log.warning("failed to decrypt blob %s for stream %s", blob_info.blob_hash,
self.descriptor.sd_hash)
continue
finally:
if written_blobs == len(self.descriptor.blobs) - 2:
log.debug("finished decrypting and assembling stream")
if save_tasks:
await asyncio.wait(save_tasks)
await self.after_finished()
self.stream_finished_event.set()
else:
log.debug("stream decryption and assembly did not finish (%i/%i blobs are done)", written_blobs or 0,
len(self.descriptor.blobs) - 2)
if self.output_path and os.path.isfile(self.output_path):
log.debug("erasing incomplete file assembly: %s", self.output_path)
os.unlink(self.output_path)
async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
return self.blob_manager.get_blob(blob_hash, length)

View file

@ -1,9 +1,8 @@
import os
import asyncio import asyncio
import typing import typing
import logging import logging
import binascii
from lbrynet.utils import resolve_host from lbrynet.utils import resolve_host
from lbrynet.stream.assembler import StreamAssembler
from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.descriptor import StreamDescriptor
from lbrynet.blob_exchange.downloader import BlobDownloader from lbrynet.blob_exchange.downloader import BlobDownloader
from lbrynet.dht.peer import KademliaPeer from lbrynet.dht.peer import KademliaPeer
@ -11,98 +10,109 @@ if typing.TYPE_CHECKING:
from lbrynet.conf import Config from lbrynet.conf import Config
from lbrynet.dht.node import Node from lbrynet.dht.node import Node
from lbrynet.blob.blob_manager import BlobManager from lbrynet.blob.blob_manager import BlobManager
from lbrynet.blob.blob_file import BlobFile from lbrynet.blob.blob_file import AbstractBlob
from lbrynet.blob.blob_info import BlobInfo
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def drain_into(a: list, b: list): class StreamDownloader:
while a: def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager', sd_hash: str,
b.append(a.pop()) descriptor: typing.Optional[StreamDescriptor] = None):
self.loop = loop
class StreamDownloader(StreamAssembler):
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager', sd_hash: str,
output_dir: typing.Optional[str] = None, output_file_name: typing.Optional[str] = None):
super().__init__(loop, blob_manager, sd_hash, output_file_name)
self.config = config self.config = config
self.output_dir = output_dir or self.config.download_dir self.blob_manager = blob_manager
self.output_file_name = output_file_name self.sd_hash = sd_hash
self.blob_downloader: typing.Optional[BlobDownloader] = None self.search_queue = asyncio.Queue(loop=loop) # blob hashes to feed into the iterative finder
self.search_queue = asyncio.Queue(loop=loop) self.peer_queue = asyncio.Queue(loop=loop) # new peers to try
self.peer_queue = asyncio.Queue(loop=loop) self.blob_downloader = BlobDownloader(self.loop, self.config, self.blob_manager, self.peer_queue)
self.accumulate_task: typing.Optional[asyncio.Task] = None self.descriptor: typing.Optional[StreamDescriptor] = descriptor
self.descriptor: typing.Optional[StreamDescriptor]
self.node: typing.Optional['Node'] = None self.node: typing.Optional['Node'] = None
self.assemble_task: typing.Optional[asyncio.Task] = None self.accumulate_task: typing.Optional[asyncio.Task] = None
self.fixed_peers_handle: typing.Optional[asyncio.Handle] = None self.fixed_peers_handle: typing.Optional[asyncio.Handle] = None
self.fixed_peers_delay: typing.Optional[float] = None self.fixed_peers_delay: typing.Optional[float] = None
self.added_fixed_peers = False self.added_fixed_peers = False
async def setup(self): # start the peer accumulator and initialize the downloader async def add_fixed_peers(self):
if self.blob_downloader: def _delayed_add_fixed_peers():
raise Exception("downloader is already set up") self.added_fixed_peers = True
if self.node: self.peer_queue.put_nowait([
_, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue) KademliaPeer(self.loop, address=address, tcp_port=port + 1)
self.blob_downloader = BlobDownloader(self.loop, self.config, self.blob_manager, self.peer_queue) for address, port in addresses
self.search_queue.put_nowait(self.sd_hash) ])
async def after_got_descriptor(self):
self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash)
log.info("added head blob to search")
async def after_finished(self):
log.info("downloaded stream %s -> %s", self.sd_hash, self.output_path)
await self.blob_manager.storage.change_file_status(self.descriptor.stream_hash, 'finished')
self.blob_downloader.close()
def stop(self):
if self.accumulate_task:
self.accumulate_task.cancel()
self.accumulate_task = None
if self.assemble_task:
self.assemble_task.cancel()
self.assemble_task = None
if self.fixed_peers_handle:
self.fixed_peers_handle.cancel()
self.fixed_peers_handle = None
self.blob_downloader = None
if self.stream_handle:
if not self.stream_handle.closed:
self.stream_handle.close()
self.stream_handle = None
if not self.stream_finished_event.is_set() and self.wrote_bytes_event.is_set() and \
self.output_path and os.path.isfile(self.output_path):
os.remove(self.output_path)
async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
return await self.blob_downloader.download_blob(blob_hash, length)
def add_fixed_peers(self):
async def _add_fixed_peers():
addresses = [
(await resolve_host(url, port + 1, proto='tcp'), port)
for url, port in self.config.reflector_servers
]
def _delayed_add_fixed_peers():
self.added_fixed_peers = True
self.peer_queue.put_nowait([
KademliaPeer(self.loop, address=address, tcp_port=port + 1)
for address, port in addresses
])
self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _delayed_add_fixed_peers)
if not self.config.reflector_servers: if not self.config.reflector_servers:
return return
addresses = [
(await resolve_host(url, port + 1, proto='tcp'), port)
for url, port in self.config.reflector_servers
]
if 'dht' in self.config.components_to_skip or not self.node or not \ if 'dht' in self.config.components_to_skip or not self.node or not \
len(self.node.protocol.routing_table.get_peers()): len(self.node.protocol.routing_table.get_peers()):
self.fixed_peers_delay = 0.0 self.fixed_peers_delay = 0.0
else: else:
self.fixed_peers_delay = self.config.fixed_peer_delay self.fixed_peers_delay = self.config.fixed_peer_delay
self.loop.create_task(_add_fixed_peers())
def download(self, node: typing.Optional['Node'] = None): self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _delayed_add_fixed_peers)
self.node = node
self.assemble_task = self.loop.create_task(self.assemble_decrypted_stream(self.config.download_dir)) async def load_descriptor(self):
self.add_fixed_peers() # download or get the sd blob
sd_blob = self.blob_manager.get_blob(self.sd_hash)
if not sd_blob.get_is_verified():
sd_blob = await self.blob_downloader.download_blob(self.sd_hash)
log.info("downloaded sd blob %s", self.sd_hash)
# parse the descriptor
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(
self.loop, self.blob_manager.blob_dir, sd_blob
)
log.info("loaded stream manifest %s", self.sd_hash)
async def start(self, node: typing.Optional['Node'] = None):
# set up peer accumulation
if node:
self.node = node
_, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue)
await self.add_fixed_peers()
# start searching for peers for the sd hash
self.search_queue.put_nowait(self.sd_hash)
log.info("searching for peers for stream %s", self.sd_hash)
if not self.descriptor:
await self.load_descriptor()
# add the head blob to the peer search
self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash)
log.info("added head blob to peer search for stream %s", self.sd_hash)
if not await self.blob_manager.storage.stream_exists(self.sd_hash):
await self.blob_manager.storage.store_stream(
self.blob_manager.get_blob(self.sd_hash, length=self.descriptor.length), self.descriptor
)
async def download_stream_blob(self, blob_info: 'BlobInfo') -> 'AbstractBlob':
if not filter(lambda blob: blob.blob_hash == blob_info.blob_hash, self.descriptor.blobs[:-1]):
raise ValueError(f"blob {blob_info.blob_hash} is not part of stream with sd hash {self.sd_hash}")
blob = await self.blob_downloader.download_blob(blob_info.blob_hash, blob_info.length)
return blob
def _decrypt_blob(self, blob_info: 'BlobInfo', blob: 'AbstractBlob'):
return blob.decrypt(
binascii.unhexlify(self.descriptor.key.encode()), binascii.unhexlify(blob_info.iv.encode())
)
async def decrypt_blob(self, blob_info: 'BlobInfo', blob: 'AbstractBlob'):
return await self.loop.run_in_executor(None, self._decrypt_blob, blob_info, blob)
async def read_blob(self, blob_info: 'BlobInfo') -> bytes:
blob = await self.download_stream_blob(blob_info)
return await self.decrypt_blob(blob_info, blob)
def stop(self):
if self.accumulate_task:
self.accumulate_task.cancel()
self.accumulate_task = None
if self.fixed_peers_handle:
self.fixed_peers_handle.cancel()
self.fixed_peers_handle = None
self.blob_downloader.close()

View file

@ -9,42 +9,70 @@ from lbrynet.stream.downloader import StreamDownloader
from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.descriptor import StreamDescriptor
from lbrynet.stream.reflector.client import StreamReflectorClient from lbrynet.stream.reflector.client import StreamReflectorClient
from lbrynet.extras.daemon.storage import StoredStreamClaim from lbrynet.extras.daemon.storage import StoredStreamClaim
from lbrynet.blob import MAX_BLOB_SIZE
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbrynet.conf import Config
from lbrynet.schema.claim import Claim from lbrynet.schema.claim import Claim
from lbrynet.blob.blob_manager import BlobManager from lbrynet.blob.blob_manager import BlobManager
from lbrynet.blob.blob_info import BlobInfo
from lbrynet.dht.node import Node from lbrynet.dht.node import Node
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def _get_next_available_file_name(download_directory: str, file_name: str) -> str:
base_name, ext = os.path.splitext(os.path.basename(file_name))
i = 0
while os.path.isfile(os.path.join(download_directory, file_name)):
i += 1
file_name = "%s_%i%s" % (base_name, i, ext)
return file_name
async def get_next_available_file_name(loop: asyncio.BaseEventLoop, download_directory: str, file_name: str) -> str:
return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name)
class ManagedStream: class ManagedStream:
STATUS_RUNNING = "running" STATUS_RUNNING = "running"
STATUS_STOPPED = "stopped" STATUS_STOPPED = "stopped"
STATUS_FINISHED = "finished" STATUS_FINISHED = "finished"
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobManager', rowid: int, def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager',
descriptor: 'StreamDescriptor', download_directory: str, file_name: typing.Optional[str], sd_hash: str, download_directory: typing.Optional[str] = None, file_name: typing.Optional[str] = None,
downloader: typing.Optional[StreamDownloader] = None,
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None, status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None,
download_id: typing.Optional[str] = None): download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None,
descriptor: typing.Optional[StreamDescriptor] = None):
self.loop = loop self.loop = loop
self.config = config
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.rowid = rowid self.sd_hash = sd_hash
self.download_directory = download_directory self.download_directory = download_directory
self._file_name = file_name self._file_name = file_name
self.descriptor = descriptor
self.downloader = downloader
self.stream_hash = descriptor.stream_hash
self.stream_claim_info = claim
self._status = status self._status = status
self.stream_claim_info = claim
self.fully_reflected = asyncio.Event(loop=self.loop)
self.tx = None
self.download_id = download_id or binascii.hexlify(generate_id()).decode() self.download_id = download_id or binascii.hexlify(generate_id()).decode()
self.rowid = rowid
self.written_bytes = 0
self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor)
self.fully_reflected = asyncio.Event(loop=self.loop)
self.file_output_task: typing.Optional[asyncio.Task] = None
self.delayed_stop: typing.Optional[asyncio.Handle] = None
self.saving = asyncio.Event(loop=self.loop)
self.finished_writing = asyncio.Event(loop=self.loop)
@property
def descriptor(self) -> StreamDescriptor:
return self.downloader.descriptor
@property
def stream_hash(self) -> str:
return self.descriptor.stream_hash
@property @property
def file_name(self) -> typing.Optional[str]: def file_name(self) -> typing.Optional[str]:
return self.downloader.output_file_name if self.downloader else self._file_name return self._file_name or (self.descriptor.suggested_file_name if self.descriptor else None)
@property @property
def status(self) -> str: def status(self) -> str:
@ -112,28 +140,29 @@ class ManagedStream:
def blobs_in_stream(self) -> int: def blobs_in_stream(self) -> int:
return len(self.descriptor.blobs) - 1 return len(self.descriptor.blobs) - 1
@property
def sd_hash(self):
return self.descriptor.sd_hash
@property @property
def blobs_remaining(self) -> int: def blobs_remaining(self) -> int:
return self.blobs_in_stream - self.blobs_completed return self.blobs_in_stream - self.blobs_completed
@property @property
def full_path(self) -> typing.Optional[str]: def full_path(self) -> typing.Optional[str]:
return os.path.join(self.download_directory, os.path.basename(self.file_name)) if self.file_name else None return os.path.join(self.download_directory, os.path.basename(self.file_name)) \
if self.file_name and self.download_directory else None
@property @property
def output_file_exists(self): def output_file_exists(self):
return os.path.isfile(self.full_path) if self.full_path else False return os.path.isfile(self.full_path) if self.full_path else False
@property
def mime_type(self):
return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))
def as_dict(self) -> typing.Dict: def as_dict(self) -> typing.Dict:
full_path = self.full_path if self.output_file_exists else None full_path = self.full_path if self.output_file_exists else None
mime_type = guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0] mime_type = guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0]
if self.downloader and self.downloader.written_bytes: if self.written_bytes:
written_bytes = self.downloader.written_bytes written_bytes = self.written_bytes
elif full_path: elif full_path:
written_bytes = os.stat(full_path).st_size written_bytes = os.stat(full_path).st_size
else: else:
@ -143,14 +172,13 @@ class ManagedStream:
'file_name': self.file_name, 'file_name': self.file_name,
'download_directory': self.download_directory, 'download_directory': self.download_directory,
'points_paid': 0.0, 'points_paid': 0.0,
'tx': self.tx,
'stopped': not self.running, 'stopped': not self.running,
'stream_hash': self.stream_hash, 'stream_hash': self.stream_hash,
'stream_name': self.descriptor.stream_name, 'stream_name': self.descriptor.stream_name,
'suggested_file_name': self.descriptor.suggested_file_name, 'suggested_file_name': self.descriptor.suggested_file_name,
'sd_hash': self.descriptor.sd_hash, 'sd_hash': self.descriptor.sd_hash,
'download_path': full_path, 'download_path': full_path,
'mime_type': mime_type, 'mime_type': self.mime_type,
'key': self.descriptor.key, 'key': self.descriptor.key,
'total_bytes_lower_bound': self.descriptor.lower_bound_decrypted_length(), 'total_bytes_lower_bound': self.descriptor.lower_bound_decrypted_length(),
'total_bytes': self.descriptor.upper_bound_decrypted_length(), 'total_bytes': self.descriptor.upper_bound_decrypted_length(),
@ -171,7 +199,7 @@ class ManagedStream:
} }
@classmethod @classmethod
async def create(cls, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', async def create(cls, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager',
file_path: str, key: typing.Optional[bytes] = None, file_path: str, key: typing.Optional[bytes] = None,
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedStream': iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedStream':
descriptor = await StreamDescriptor.create_stream( descriptor = await StreamDescriptor.create_stream(
@ -186,17 +214,105 @@ class ManagedStream:
await blob_manager.blob_completed(blob_manager.get_blob(blob.blob_hash, blob.length)) await blob_manager.blob_completed(blob_manager.get_blob(blob.blob_hash, blob.length))
row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path), row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path),
os.path.dirname(file_path), 0) os.path.dirname(file_path), 0)
return cls(loop, blob_manager, row_id, descriptor, os.path.dirname(file_path), os.path.basename(file_path), return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path),
status=cls.STATUS_FINISHED) os.path.basename(file_path), status=cls.STATUS_FINISHED, rowid=row_id, descriptor=descriptor)
def start_download(self, node: typing.Optional['Node']): async def setup(self, node: typing.Optional['Node'] = None, save_file: typing.Optional[bool] = True):
self.downloader.download(node) await self.downloader.start(node)
self.update_status(self.STATUS_RUNNING) if not save_file:
if not await self.blob_manager.storage.file_exists(self.sd_hash):
self.rowid = self.blob_manager.storage.save_downloaded_file(
self.stream_hash, None, None, 0.0
)
self.update_delayed_stop()
else:
await self.save_file()
self.update_status(ManagedStream.STATUS_RUNNING)
await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING)
def update_delayed_stop(self):
def _delayed_stop():
log.info("Stopping inactive download for stream %s", self.sd_hash)
self.stop_download()
log.info("update delayed stop")
if self.delayed_stop:
self.delayed_stop.cancel()
self.delayed_stop = self.loop.call_later(60, _delayed_stop)
async def aiter_read_stream(self, start_blob_num: typing.Optional[int] = 0) -> typing.AsyncIterator[
typing.Tuple['BlobInfo', bytes]]:
if start_blob_num >= len(self.descriptor.blobs[:-1]):
raise IndexError(start_blob_num)
for i, blob_info in enumerate(self.descriptor.blobs[start_blob_num:-1]):
assert i + start_blob_num == blob_info.blob_num
if self.delayed_stop:
self.delayed_stop.cancel()
try:
decrypted = await self.downloader.read_blob(blob_info)
yield (blob_info, decrypted)
except asyncio.CancelledError:
if not self.saving.is_set() and not self.finished_writing.is_set():
self.update_delayed_stop()
raise
async def _save_file(self, output_path: str):
self.saving.set()
self.finished_writing.clear()
try:
with open(output_path, 'wb') as file_write_handle:
async for blob_info, decrypted in self.aiter_read_stream():
log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
file_write_handle.write(decrypted)
file_write_handle.flush()
self.written_bytes += len(decrypted)
self.finished_writing.set()
except Exception as err:
if not isinstance(err, asyncio.CancelledError):
log.exception("unexpected error encountered writing file for stream %s", self.sd_hash)
if os.path.isfile(output_path):
log.info("removing incomplete download %s", output_path)
os.remove(output_path)
raise err
finally:
self.saving.clear()
async def save_file(self, file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None):
if self.file_output_task and not self.file_output_task.done():
self.file_output_task.cancel()
if self.delayed_stop:
log.info('cancel delayed stop')
self.delayed_stop.cancel()
self.delayed_stop = None
self.download_directory = download_directory or self.download_directory
if not self.download_directory:
raise ValueError("no directory to download to")
if not (file_name or self._file_name or self.descriptor.suggested_file_name):
raise ValueError("no file name to download to")
if not os.path.isdir(self.download_directory):
log.warning("download directory '%s' does not exist, attempting to make it", self.download_directory)
os.mkdir(self.download_directory)
self._file_name = await get_next_available_file_name(
self.loop, self.download_directory,
file_name or self._file_name or self.descriptor.suggested_file_name
)
if not await self.blob_manager.storage.file_exists(self.sd_hash):
self.rowid = self.blob_manager.storage.save_downloaded_file(
self.stream_hash, self.file_name, self.download_directory, 0.0
)
else:
await self.blob_manager.storage.change_file_download_dir_and_file_name(
self.stream_hash, self.download_directory, self.file_name
)
self.written_bytes = 0
self.file_output_task = self.loop.create_task(self._save_file(self.full_path))
def stop_download(self): def stop_download(self):
if self.downloader: if self.file_output_task and not self.file_output_task.done():
self.downloader.stop() self.file_output_task.cancel()
self.downloader = None self.file_output_task = None
self.downloader.stop()
async def upload_to_reflector(self, host: str, port: int) -> typing.List[str]: async def upload_to_reflector(self, host: str, port: int) -> typing.List[str]:
sent = [] sent = []
@ -212,7 +328,9 @@ class ManagedStream:
self.fully_reflected.set() self.fully_reflected.set()
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}") await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
return [] return []
we_have = [blob_hash for blob_hash in needed if blob_hash in self.blob_manager.completed_blob_hashes] we_have = [
blob_hash for blob_hash in needed if blob_hash in self.blob_manager.completed_blob_hashes
]
for blob_hash in we_have: for blob_hash in we_have:
await protocol.send_blob(blob_hash) await protocol.send_blob(blob_hash)
sent.append(blob_hash) sent.append(blob_hash)

View file

@ -5,11 +5,10 @@ import binascii
import logging import logging
import random import random
from decimal import Decimal from decimal import Decimal
from lbrynet.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError, \ from lbrynet.error import ResolveError, InvalidStreamDescriptorError, KeyFeeAboveMaxAllowed, InsufficientFundsError
DownloadDataTimeout, DownloadSDTimeout # DownloadDataTimeout, DownloadSDTimeout
from lbrynet.utils import generate_id from lbrynet.utils import generate_id, cache_concurrent
from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.descriptor import StreamDescriptor
from lbrynet.stream.downloader import StreamDownloader
from lbrynet.stream.managed_stream import ManagedStream from lbrynet.stream.managed_stream import ManagedStream
from lbrynet.schema.claim import Claim from lbrynet.schema.claim import Claim
from lbrynet.schema.uri import parse_lbry_uri from lbrynet.schema.uri import parse_lbry_uri
@ -54,8 +53,17 @@ comparison_operators = {
} }
def path_or_none(p) -> typing.Optional[str]:
try:
return binascii.unhexlify(p).decode()
except binascii.Error as err:
if p == '{stream}':
return None
raise err
class StreamManager: class StreamManager:
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager', def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager',
wallet: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node'], wallet: 'LbryWalletManager', storage: 'SQLiteStorage', node: typing.Optional['Node'],
analytics_manager: typing.Optional['AnalyticsManager'] = None): analytics_manager: typing.Optional['AnalyticsManager'] = None):
self.loop = loop self.loop = loop
@ -65,8 +73,7 @@ class StreamManager:
self.storage = storage self.storage = storage
self.node = node self.node = node
self.analytics_manager = analytics_manager self.analytics_manager = analytics_manager
self.streams: typing.Set[ManagedStream] = set() self.streams: typing.Dict[str, ManagedStream] = {}
self.starting_streams: typing.Dict[str, asyncio.Future] = {}
self.resume_downloading_task: asyncio.Task = None self.resume_downloading_task: asyncio.Task = None
self.re_reflect_task: asyncio.Task = None self.re_reflect_task: asyncio.Task = None
self.update_stream_finished_futs: typing.List[asyncio.Future] = [] self.update_stream_finished_futs: typing.List[asyncio.Future] = []
@ -76,46 +83,6 @@ class StreamManager:
claim_info = await self.storage.get_content_claim(stream.stream_hash) claim_info = await self.storage.get_content_claim(stream.stream_hash)
stream.set_claim(claim_info, claim_info['value']) stream.set_claim(claim_info, claim_info['value'])
async def start_stream(self, stream: ManagedStream) -> bool:
"""
Resume or rebuild a partial or completed stream
"""
if not stream.running and not stream.output_file_exists:
if stream.downloader:
stream.downloader.stop()
stream.downloader = None
# the directory is gone, can happen when the folder that contains a published file is deleted
# reset the download directory to the default and update the file name
if not os.path.isdir(stream.download_directory):
stream.download_directory = self.config.download_dir
stream.downloader = self.make_downloader(
stream.sd_hash, stream.download_directory, stream.descriptor.suggested_file_name
)
if stream.status != ManagedStream.STATUS_FINISHED:
await self.storage.change_file_status(stream.stream_hash, 'running')
stream.update_status('running')
stream.start_download(self.node)
try:
await asyncio.wait_for(self.loop.create_task(stream.downloader.wrote_bytes_event.wait()),
self.config.download_timeout)
except asyncio.TimeoutError:
await self.stop_stream(stream)
if stream in self.streams:
self.streams.remove(stream)
return False
file_name = os.path.basename(stream.downloader.output_path)
output_dir = os.path.dirname(stream.downloader.output_path)
await self.storage.change_file_download_dir_and_file_name(
stream.stream_hash, output_dir, file_name
)
stream._file_name = file_name
stream.download_directory = output_dir
self.wait_for_stream_finished(stream)
return True
return True
async def stop_stream(self, stream: ManagedStream): async def stop_stream(self, stream: ManagedStream):
stream.stop_download() stream.stop_download()
if not stream.finished and stream.output_file_exists: if not stream.finished and stream.output_file_exists:
@ -128,10 +95,12 @@ class StreamManager:
stream.update_status(ManagedStream.STATUS_STOPPED) stream.update_status(ManagedStream.STATUS_STOPPED)
await self.storage.change_file_status(stream.stream_hash, ManagedStream.STATUS_STOPPED) await self.storage.change_file_status(stream.stream_hash, ManagedStream.STATUS_STOPPED)
def make_downloader(self, sd_hash: str, download_directory: str, file_name: str): async def start_stream(self, stream: ManagedStream):
return StreamDownloader( await stream.setup(self.node, save_file=not self.config.streaming_only)
self.loop, self.config, self.blob_manager, sd_hash, download_directory, file_name self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
) stream.update_status(ManagedStream.STATUS_RUNNING)
await self.storage.change_file_status(stream.stream_hash, ManagedStream.STATUS_RUNNING)
self.wait_for_stream_finished(stream)
async def recover_streams(self, file_infos: typing.List[typing.Dict]): async def recover_streams(self, file_infos: typing.List[typing.Dict]):
to_restore = [] to_restore = []
@ -156,70 +125,63 @@ class StreamManager:
if to_restore: if to_restore:
await self.storage.recover_streams(to_restore, self.config.download_dir) await self.storage.recover_streams(to_restore, self.config.download_dir)
log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos))
async def add_stream(self, rowid: int, sd_hash: str, file_name: str, download_directory: str, status: str, # if self.blob_manager._save_blobs:
# log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos))
async def add_stream(self, rowid: int, sd_hash: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], status: str,
claim: typing.Optional['StoredStreamClaim']): claim: typing.Optional['StoredStreamClaim']):
sd_blob = self.blob_manager.get_blob(sd_hash)
if not sd_blob.get_is_verified():
return
try: try:
descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash) descriptor = await self.blob_manager.get_stream_descriptor(sd_hash)
except InvalidStreamDescriptorError as err: except InvalidStreamDescriptorError as err:
log.warning("Failed to start stream for sd %s - %s", sd_hash, str(err)) log.warning("Failed to start stream for sd %s - %s", sd_hash, str(err))
return return
if status == ManagedStream.STATUS_RUNNING:
downloader = self.make_downloader(descriptor.sd_hash, download_directory, file_name)
else:
downloader = None
stream = ManagedStream( stream = ManagedStream(
self.loop, self.blob_manager, rowid, descriptor, download_directory, file_name, downloader, status, claim self.loop, self.config, self.blob_manager, descriptor.sd_hash, download_directory, file_name, status,
claim, rowid=rowid, descriptor=descriptor
) )
self.streams.add(stream) self.streams[sd_hash] = stream
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
async def load_streams_from_database(self): async def load_streams_from_database(self):
to_recover = [] to_recover = []
to_start = []
for file_info in await self.storage.get_all_lbry_files(): for file_info in await self.storage.get_all_lbry_files():
if not self.blob_manager.get_blob(file_info['sd_hash']).get_is_verified(): if not self.blob_manager.get_blob(file_info['sd_hash']).get_is_verified():
to_recover.append(file_info) to_recover.append(file_info)
to_start.append(file_info)
if to_recover: if to_recover:
log.info("Attempting to recover %i streams", len(to_recover)) # if self.blob_manager._save_blobs:
# log.info("Attempting to recover %i streams", len(to_recover))
await self.recover_streams(to_recover) await self.recover_streams(to_recover)
to_start = []
for file_info in await self.storage.get_all_lbry_files():
if self.blob_manager.get_blob(file_info['sd_hash']).get_is_verified():
to_start.append(file_info)
log.info("Initializing %i files", len(to_start)) log.info("Initializing %i files", len(to_start))
if to_start:
await asyncio.gather(*[ await asyncio.gather(*[
self.add_stream( self.add_stream(
file_info['rowid'], file_info['sd_hash'], binascii.unhexlify(file_info['file_name']).decode(), file_info['rowid'], file_info['sd_hash'], path_or_none(file_info['file_name']),
binascii.unhexlify(file_info['download_directory']).decode(), file_info['status'], path_or_none(file_info['download_directory']), file_info['status'],
file_info['claim'] file_info['claim']
) for file_info in to_start ) for file_info in to_start
]) ])
log.info("Started stream manager with %i files", len(self.streams)) log.info("Started stream manager with %i files", len(self.streams))
async def resume(self): async def resume(self):
if self.node: if not self.node:
await self.node.joined.wait()
else:
log.warning("no DHT node given, resuming downloads trusting that we can contact reflector") log.warning("no DHT node given, resuming downloads trusting that we can contact reflector")
t = [ t = [
(stream.start_download(self.node), self.wait_for_stream_finished(stream)) self.loop.create_task(self.start_stream(stream)) for stream in self.streams.values()
for stream in self.streams if stream.status == ManagedStream.STATUS_RUNNING if stream.running
] ]
if t: if t:
log.info("resuming %i downloads", len(t)) log.info("resuming %i downloads", len(t))
await asyncio.gather(*t, loop=self.loop)
async def reflect_streams(self): async def reflect_streams(self):
while True: while True:
if self.config.reflect_streams and self.config.reflector_servers: if self.config.reflect_streams and self.config.reflector_servers:
sd_hashes = await self.storage.get_streams_to_re_reflect() sd_hashes = await self.storage.get_streams_to_re_reflect()
streams = list(filter(lambda s: s.sd_hash in sd_hashes, self.streams)) streams = list(filter(lambda s: s in sd_hashes, self.streams.keys()))
batch = [] batch = []
while streams: while streams:
stream = streams.pop() stream = streams.pop()
@ -236,7 +198,7 @@ class StreamManager:
async def start(self): async def start(self):
await self.load_streams_from_database() await self.load_streams_from_database()
self.resume_downloading_task = self.loop.create_task(self.resume()) self.resume_downloading_task = self.loop.create_task(self.resume())
self.re_reflect_task = self.loop.create_task(self.reflect_streams()) # self.re_reflect_task = self.loop.create_task(self.reflect_streams())
def stop(self): def stop(self):
if self.resume_downloading_task and not self.resume_downloading_task.done(): if self.resume_downloading_task and not self.resume_downloading_task.done():
@ -244,7 +206,7 @@ class StreamManager:
if self.re_reflect_task and not self.re_reflect_task.done(): if self.re_reflect_task and not self.re_reflect_task.done():
self.re_reflect_task.cancel() self.re_reflect_task.cancel()
while self.streams: while self.streams:
stream = self.streams.pop() _, stream = self.streams.popitem()
stream.stop_download() stream.stop_download()
while self.update_stream_finished_futs: while self.update_stream_finished_futs:
self.update_stream_finished_futs.pop().cancel() self.update_stream_finished_futs.pop().cancel()
@ -253,8 +215,8 @@ class StreamManager:
async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None, async def create_stream(self, file_path: str, key: typing.Optional[bytes] = None,
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
stream = await ManagedStream.create(self.loop, self.blob_manager, file_path, key, iv_generator) stream = await ManagedStream.create(self.loop, self.config, self.blob_manager, file_path, key, iv_generator)
self.streams.add(stream) self.streams[stream.sd_hash] = stream
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
if self.config.reflect_streams and self.config.reflector_servers: if self.config.reflect_streams and self.config.reflector_servers:
host, port = random.choice(self.config.reflector_servers) host, port = random.choice(self.config.reflector_servers)
@ -268,8 +230,8 @@ class StreamManager:
async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False): async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False):
await self.stop_stream(stream) await self.stop_stream(stream)
if stream in self.streams: if stream.sd_hash in self.streams:
self.streams.remove(stream) del self.streams[stream.sd_hash]
blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]] blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]
await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
await self.storage.delete_stream(stream.descriptor) await self.storage.delete_stream(stream.descriptor)
@ -277,7 +239,7 @@ class StreamManager:
os.remove(stream.full_path) os.remove(stream.full_path)
def get_stream_by_stream_hash(self, stream_hash: str) -> typing.Optional[ManagedStream]: def get_stream_by_stream_hash(self, stream_hash: str) -> typing.Optional[ManagedStream]:
streams = tuple(filter(lambda stream: stream.stream_hash == stream_hash, self.streams)) streams = tuple(filter(lambda stream: stream.stream_hash == stream_hash, self.streams.values()))
if streams: if streams:
return streams[0] return streams[0]
@ -304,13 +266,13 @@ class StreamManager:
if search_by: if search_by:
comparison = comparison or 'eq' comparison = comparison or 'eq'
streams = [] streams = []
for stream in self.streams: for stream in self.streams.values():
for search, val in search_by.items(): for search, val in search_by.items():
if comparison_operators[comparison](getattr(stream, search), val): if comparison_operators[comparison](getattr(stream, search), val):
streams.append(stream) streams.append(stream)
break break
else: else:
streams = list(self.streams) streams = list(self.streams.values())
if sort_by: if sort_by:
streams.sort(key=lambda s: getattr(s, sort_by)) streams.sort(key=lambda s: getattr(s, sort_by))
if reverse: if reverse:
@ -320,7 +282,7 @@ class StreamManager:
def wait_for_stream_finished(self, stream: ManagedStream): def wait_for_stream_finished(self, stream: ManagedStream):
async def _wait_for_stream_finished(): async def _wait_for_stream_finished():
if stream.downloader and stream.running: if stream.downloader and stream.running:
await stream.downloader.stream_finished_event.wait() await stream.finished_writing.wait()
stream.update_status(ManagedStream.STATUS_FINISHED) stream.update_status(ManagedStream.STATUS_FINISHED)
if self.analytics_manager: if self.analytics_manager:
self.loop.create_task(self.analytics_manager.send_download_finished( self.loop.create_task(self.analytics_manager.send_download_finished(
@ -334,24 +296,12 @@ class StreamManager:
self.update_stream_finished_futs.remove(task) self.update_stream_finished_futs.remove(task)
) )
async def _store_stream(self, downloader: StreamDownloader) -> int:
file_name = os.path.basename(downloader.output_path)
download_directory = os.path.dirname(downloader.output_path)
if not await self.storage.stream_exists(downloader.sd_hash):
await self.storage.store_stream(downloader.sd_blob, downloader.descriptor)
if not await self.storage.file_exists(downloader.sd_hash):
return await self.storage.save_downloaded_file(
downloader.descriptor.stream_hash, file_name, download_directory,
0.0
)
else:
return await self.storage.rowid_for_stream(downloader.descriptor.stream_hash)
async def _check_update_or_replace(self, outpoint: str, claim_id: str, claim: Claim) -> typing.Tuple[ async def _check_update_or_replace(self, outpoint: str, claim_id: str, claim: Claim) -> typing.Tuple[
typing.Optional[ManagedStream], typing.Optional[ManagedStream]]: typing.Optional[ManagedStream], typing.Optional[ManagedStream]]:
existing = self.get_filtered_streams(outpoint=outpoint) existing = self.get_filtered_streams(outpoint=outpoint)
if existing: if existing:
await self.start_stream(existing[0]) if not existing[0].running:
await self.start_stream(existing[0])
return existing[0], None return existing[0], None
existing = self.get_filtered_streams(sd_hash=claim.stream.source.sd_hash) existing = self.get_filtered_streams(sd_hash=claim.stream.source.sd_hash)
if existing and existing[0].claim_id != claim_id: if existing and existing[0].claim_id != claim_id:
@ -363,7 +313,8 @@ class StreamManager:
existing[0].stream_hash, outpoint existing[0].stream_hash, outpoint
) )
await self._update_content_claim(existing[0]) await self._update_content_claim(existing[0])
await self.start_stream(existing[0]) if not existing[0].running:
await self.start_stream(existing[0])
return existing[0], None return existing[0], None
else: else:
existing_for_claim_id = self.get_filtered_streams(claim_id=claim_id) existing_for_claim_id = self.get_filtered_streams(claim_id=claim_id)
@ -372,26 +323,27 @@ class StreamManager:
return None, existing_for_claim_id[0] return None, existing_for_claim_id[0]
return None, None return None, None
async def start_downloader(self, got_descriptor_time: asyncio.Future, downloader: StreamDownloader, # async def start_downloader(self, got_descriptor_time: asyncio.Future, downloader: EncryptedStreamDownloader,
download_id: str, outpoint: str, claim: Claim, resolved: typing.Dict, # download_id: str, outpoint: str, claim: Claim, resolved: typing.Dict,
file_name: typing.Optional[str] = None) -> ManagedStream: # file_name: typing.Optional[str] = None) -> ManagedStream:
start_time = self.loop.time() # start_time = self.loop.time()
downloader.download(self.node) # downloader.download(self.node)
await downloader.got_descriptor.wait() # await downloader.got_descriptor.wait()
got_descriptor_time.set_result(self.loop.time() - start_time) # got_descriptor_time.set_result(self.loop.time() - start_time)
rowid = await self._store_stream(downloader) # rowid = await self._store_stream(downloader)
await self.storage.save_content_claim( # await self.storage.save_content_claim(
downloader.descriptor.stream_hash, outpoint # downloader.descriptor.stream_hash, outpoint
) # )
stream = ManagedStream(self.loop, self.blob_manager, rowid, downloader.descriptor, self.config.download_dir, # stream = ManagedStream(self.loop, self.blob_manager, rowid, downloader.descriptor, self.config.download_dir,
file_name, downloader, ManagedStream.STATUS_RUNNING, download_id=download_id) # file_name, downloader, ManagedStream.STATUS_RUNNING, download_id=download_id)
stream.set_claim(resolved, claim) # stream.set_claim(resolved, claim)
await stream.downloader.wrote_bytes_event.wait() # await stream.downloader.wrote_bytes_event.wait()
self.streams.add(stream) # self.streams.add(stream)
return stream # return stream
async def _download_stream_from_uri(self, uri, timeout: float, exchange_rate_manager: 'ExchangeRateManager', @cache_concurrent
file_name: typing.Optional[str] = None) -> ManagedStream: async def download_stream_from_uri(self, uri, timeout: float, exchange_rate_manager: 'ExchangeRateManager',
file_name: typing.Optional[str] = None) -> ManagedStream:
start_time = self.loop.time() start_time = self.loop.time()
parsed_uri = parse_lbry_uri(uri) parsed_uri = parse_lbry_uri(uri)
if parsed_uri.is_channel: if parsed_uri.is_channel:
@ -434,80 +386,97 @@ class StreamManager:
log.warning(msg) log.warning(msg)
raise InsufficientFundsError(msg) raise InsufficientFundsError(msg)
fee_address = claim.stream.fee.address fee_address = claim.stream.fee.address
# content_fee_tx = await self.wallet.send_amount_to_address(
# lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1')
# )
handled_fee_time = self.loop.time() - resolved_time - start_time
# download the stream # download the stream
download_id = binascii.hexlify(generate_id()).decode() download_id = binascii.hexlify(generate_id()).decode()
downloader = StreamDownloader(self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash,
self.config.download_dir, file_name)
stream = None download_dir = self.config.download_dir
descriptor_time_fut = self.loop.create_future() save_file = True
start_download_time = self.loop.time() if not file_name and self.config.streaming_only:
time_to_descriptor = None download_dir, file_name = None, None
time_to_first_bytes = None save_file = False
error = None stream = ManagedStream(
try: self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_dir,
stream = await asyncio.wait_for( file_name, ManagedStream.STATUS_RUNNING, download_id=download_id
asyncio.ensure_future( )
self.start_downloader(descriptor_time_fut, downloader, download_id, outpoint, claim, resolved,
file_name) await stream.setup(self.node, save_file=save_file)
), timeout stream.set_claim(resolved, claim)
) await self.storage.save_content_claim(stream.stream_hash, outpoint)
time_to_descriptor = await descriptor_time_fut self.streams[stream.sd_hash] = stream
time_to_first_bytes = self.loop.time() - start_download_time - time_to_descriptor
self.wait_for_stream_finished(stream) # stream = None
if fee_address and fee_amount and not to_replace: # descriptor_time_fut = self.loop.create_future()
stream.tx = await self.wallet.send_amount_to_address( # start_download_time = self.loop.time()
lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1')) # time_to_descriptor = None
elif to_replace: # delete old stream now that the replacement has started downloading # time_to_first_bytes = None
await self.delete_stream(to_replace) # error = None
except asyncio.TimeoutError: # try:
if descriptor_time_fut.done(): # stream = await asyncio.wait_for(
time_to_descriptor = descriptor_time_fut.result() # asyncio.ensure_future(
error = DownloadDataTimeout(downloader.sd_hash) # self.start_downloader(descriptor_time_fut, downloader, download_id, outpoint, claim, resolved,
self.blob_manager.delete_blob(downloader.sd_hash) # file_name)
await self.storage.delete_stream(downloader.descriptor) # ), timeout
else: # )
descriptor_time_fut.cancel() # time_to_descriptor = await descriptor_time_fut
error = DownloadSDTimeout(downloader.sd_hash) # time_to_first_bytes = self.loop.time() - start_download_time - time_to_descriptor
if stream: # self.wait_for_stream_finished(stream)
await self.stop_stream(stream) # if fee_address and fee_amount and not to_replace:
else: #
downloader.stop() # elif to_replace: # delete old stream now that the replacement has started downloading
if error: # await self.delete_stream(to_replace)
log.warning(error) # except asyncio.TimeoutError:
if self.analytics_manager: # if descriptor_time_fut.done():
self.loop.create_task( # time_to_descriptor = descriptor_time_fut.result()
self.analytics_manager.send_time_to_first_bytes( # error = DownloadDataTimeout(downloader.sd_hash)
resolved_time, self.loop.time() - start_time, download_id, parse_lbry_uri(uri).name, outpoint, # self.blob_manager.delete_blob(downloader.sd_hash)
None if not stream else len(stream.downloader.blob_downloader.active_connections), # await self.storage.delete_stream(downloader.descriptor)
None if not stream else len(stream.downloader.blob_downloader.scores), # else:
False if not downloader else downloader.added_fixed_peers, # descriptor_time_fut.cancel()
self.config.fixed_peer_delay if not downloader else downloader.fixed_peers_delay, # error = DownloadSDTimeout(downloader.sd_hash)
claim.stream.source.sd_hash, time_to_descriptor, # if stream:
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash, # await self.stop_stream(stream)
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length, # else:
time_to_first_bytes, None if not error else error.__class__.__name__ # downloader.stop()
) # if error:
) # log.warning(error)
if error: # if self.analytics_manager:
raise error # self.loop.create_task(
# self.analytics_manager.send_time_to_first_bytes(
# resolved_time, self.loop.time() - start_time, download_id, parse_lbry_uri(uri).name, outpoint,
# None if not stream else len(stream.downloader.blob_downloader.active_connections),
# None if not stream else len(stream.downloader.blob_downloader.scores),
# False if not downloader else downloader.added_fixed_peers,
# self.config.fixed_peer_delay if not downloader else downloader.fixed_peers_delay,
# claim.source_hash.decode(), time_to_descriptor,
# None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
# None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
# time_to_first_bytes, None if not error else error.__class__.__name__
# )
# )
# if error:
# raise error
return stream return stream
async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager', # async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
file_name: typing.Optional[str] = None, # file_name: typing.Optional[str] = None,
timeout: typing.Optional[float] = None) -> ManagedStream: # timeout: typing.Optional[float] = None) -> ManagedStream:
timeout = timeout or self.config.download_timeout # timeout = timeout or self.config.download_timeout
if uri in self.starting_streams: # if uri in self.starting_streams:
return await self.starting_streams[uri] # return await self.starting_streams[uri]
fut = asyncio.Future(loop=self.loop) # fut = asyncio.Future(loop=self.loop)
self.starting_streams[uri] = fut # self.starting_streams[uri] = fut
try: # try:
stream = await self._download_stream_from_uri(uri, timeout, exchange_rate_manager, file_name) # stream = await self._download_stream_from_uri(uri, timeout, exchange_rate_manager, file_name)
fut.set_result(stream) # fut.set_result(stream)
except Exception as err: # except Exception as err:
fut.set_exception(err) # fut.set_exception(err)
try: # try:
return await fut # return await fut
finally: # finally:
del self.starting_streams[uri] # del self.starting_streams[uri]