ManagedDownloadSource and SourceManager refactor

This commit is contained in:
Jack Robison 2020-01-15 10:18:38 -05:00 committed by Victor Shyba
parent 8be1c8310d
commit 179383540f
5 changed files with 387 additions and 412 deletions

0
lbry/file/__init__.py Normal file
View file

175
lbry/file/source.py Normal file
View file

@ -0,0 +1,175 @@
import os
import asyncio
import typing
import logging
import binascii
from typing import Optional
from lbry.utils import generate_id
from lbry.extras.daemon.storage import StoredContentClaim
if typing.TYPE_CHECKING:
from lbry.conf import Config
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.wallet.transaction import Transaction
from lbry.extras.daemon.storage import SQLiteStorage
log = logging.getLogger(__name__)
# def _get_next_available_file_name(download_directory: str, file_name: str) -> str:
# base_name, ext = os.path.splitext(os.path.basename(file_name))
# i = 0
# while os.path.isfile(os.path.join(download_directory, file_name)):
# i += 1
# file_name = "%s_%i%s" % (base_name, i, ext)
#
# return file_name
#
#
# async def get_next_available_file_name(loop: asyncio.AbstractEventLoop, download_directory: str, file_name: str) -> str:
# return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name)
class ManagedDownloadSource:
STATUS_RUNNING = "running"
STATUS_STOPPED = "stopped"
STATUS_FINISHED = "finished"
SAVING_ID = 1
STREAMING_ID = 2
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str,
file_name: Optional[str] = None, download_directory: Optional[str] = None,
status: Optional[str] = STATUS_STOPPED, claim: Optional[StoredContentClaim] = None,
download_id: Optional[str] = None, rowid: Optional[int] = None,
content_fee: Optional['Transaction'] = None,
analytics_manager: Optional['AnalyticsManager'] = None,
added_on: Optional[int] = None):
self.loop = loop
self.storage = storage
self.config = config
self.identifier = identifier
self.download_directory = download_directory
self._file_name = file_name
self._status = status
self.stream_claim_info = claim
self.download_id = download_id or binascii.hexlify(generate_id()).decode()
self.rowid = rowid
self.content_fee = content_fee
self.purchase_receipt = None
self._added_on = added_on
self.analytics_manager = analytics_manager
self.saving = asyncio.Event(loop=self.loop)
self.finished_writing = asyncio.Event(loop=self.loop)
self.started_writing = asyncio.Event(loop=self.loop)
self.finished_write_attempt = asyncio.Event(loop=self.loop)
# @classmethod
# async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', file_path: str,
# key: Optional[bytes] = None,
# iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedDownloadSource':
# raise NotImplementedError()
async def start(self, timeout: Optional[float] = None):
raise NotImplementedError()
async def stop(self, finished: bool = False):
raise NotImplementedError()
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
raise NotImplementedError()
def stop_tasks(self):
raise NotImplementedError()
# def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
# self.stream_claim_info = StoredContentClaim(
# f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
# claim_info['name'], claim_info['amount'], claim_info['height'],
# binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'],
# claim_info['claim_sequence'], claim_info.get('channel_name')
# )
#
# async def update_content_claim(self, claim_info: Optional[typing.Dict] = None):
# if not claim_info:
# claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash)
# self.set_claim(claim_info, claim_info['value'])
@property
def file_name(self) -> Optional[str]:
return self._file_name
@property
def added_on(self) -> Optional[int]:
return self._added_on
@property
def status(self) -> str:
return self._status
@property
def completed(self):
raise NotImplementedError()
# @property
# def stream_url(self):
# return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}
@property
def finished(self) -> bool:
return self.status == self.STATUS_FINISHED
@property
def running(self) -> bool:
return self.status == self.STATUS_RUNNING
@property
def claim_id(self) -> Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.claim_id
@property
def txid(self) -> Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.txid
@property
def nout(self) -> Optional[int]:
return None if not self.stream_claim_info else self.stream_claim_info.nout
@property
def outpoint(self) -> Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.outpoint
@property
def claim_height(self) -> Optional[int]:
return None if not self.stream_claim_info else self.stream_claim_info.height
@property
def channel_claim_id(self) -> Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.channel_claim_id
@property
def channel_name(self) -> Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.channel_name
@property
def claim_name(self) -> Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.claim_name
@property
def metadata(self) -> Optional[typing.Dict]:
return None if not self.stream_claim_info else self.stream_claim_info.claim.stream.to_dict()
@property
def metadata_protobuf(self) -> bytes:
if self.stream_claim_info:
return binascii.hexlify(self.stream_claim_info.claim.to_bytes())
@property
def full_path(self) -> Optional[str]:
return os.path.join(self.download_directory, os.path.basename(self.file_name)) \
if self.file_name and self.download_directory else None
@property
def output_file_exists(self):
return os.path.isfile(self.full_path) if self.full_path else False

125
lbry/file/source_manager.py Normal file
View file

@ -0,0 +1,125 @@
import os
import asyncio
import binascii
import logging
import typing
from typing import Optional
from lbry.file.source import ManagedDownloadSource
if typing.TYPE_CHECKING:
from lbry.conf import Config
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage
log = logging.getLogger(__name__)
comparison_operators = {
'eq': lambda a, b: a == b,
'ne': lambda a, b: a != b,
'g': lambda a, b: a > b,
'l': lambda a, b: a < b,
'ge': lambda a, b: a >= b,
'le': lambda a, b: a <= b,
}
def path_or_none(p) -> Optional[str]:
if not p:
return
return binascii.unhexlify(p).decode()
class SourceManager:
filter_fields = {
'rowid',
'status',
'file_name',
'added_on',
'claim_name',
'claim_height',
'claim_id',
'outpoint',
'txid',
'nout',
'channel_claim_id',
'channel_name'
}
source_class = ManagedDownloadSource
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage',
analytics_manager: Optional['AnalyticsManager'] = None):
self.loop = loop
self.config = config
self.storage = storage
self.analytics_manager = analytics_manager
self._sources: typing.Dict[str, ManagedDownloadSource] = {}
self.started = asyncio.Event(loop=self.loop)
def add(self, source: ManagedDownloadSource):
self._sources[source.identifier] = source
def remove(self, source: ManagedDownloadSource):
if source.identifier not in self._sources:
return
self._sources.pop(source.identifier)
source.stop_tasks()
async def initialize_from_database(self):
raise NotImplementedError()
async def start(self):
await self.initialize_from_database()
self.started.set()
def stop(self):
while self._sources:
_, source = self._sources.popitem()
source.stop_tasks()
self.started.clear()
async def create(self, file_path: str, key: Optional[bytes] = None, **kw) -> ManagedDownloadSource:
raise NotImplementedError()
async def _delete(self, source: ManagedDownloadSource):
raise NotImplementedError()
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
await self._delete(source)
self.remove(source)
if delete_file and source.output_file_exists:
os.remove(source.full_path)
def get_filtered(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False,
comparison: Optional[str] = None, **search_by) -> typing.List[ManagedDownloadSource]:
"""
Get a list of filtered and sorted ManagedStream objects
:param sort_by: field to sort by
:param reverse: reverse sorting
:param comparison: comparison operator used for filtering
:param search_by: fields and values to filter by
"""
if sort_by and sort_by not in self.filter_fields:
raise ValueError(f"'{sort_by}' is not a valid field to sort by")
if comparison and comparison not in comparison_operators:
raise ValueError(f"'{comparison}' is not a valid comparison")
if 'full_status' in search_by:
del search_by['full_status']
for search in search_by.keys():
if search not in self.filter_fields:
raise ValueError(f"'{search}' is not a valid search operation")
if search_by:
comparison = comparison or 'eq'
sources = []
for stream in self._sources.values():
for search, val in search_by.items():
if comparison_operators[comparison](getattr(stream, search), val):
sources.append(stream)
break
else:
sources = list(self._sources.values())
if sort_by:
sources.sort(key=lambda s: getattr(s, sort_by))
if reverse:
sources.reverse()
return sources

View file

@ -4,6 +4,7 @@ import time
import typing import typing
import logging import logging
import binascii import binascii
from typing import Optional
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
from lbry.utils import generate_id from lbry.utils import generate_id
from lbry.error import DownloadSDTimeoutError from lbry.error import DownloadSDTimeoutError
@ -13,12 +14,14 @@ from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name
from lbry.stream.reflector.client import StreamReflectorClient from lbry.stream.reflector.client import StreamReflectorClient
from lbry.extras.daemon.storage import StoredContentClaim from lbry.extras.daemon.storage import StoredContentClaim
from lbry.blob import MAX_BLOB_SIZE from lbry.blob import MAX_BLOB_SIZE
from lbry.file.source import ManagedDownloadSource
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.conf import Config from lbry.conf import Config
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.blob.blob_manager import BlobManager from lbry.blob.blob_manager import BlobManager
from lbry.blob.blob_info import BlobInfo from lbry.blob.blob_info import BlobInfo
from lbry.extras.daemon.storage import SQLiteStorage
from lbry.dht.node import Node from lbry.dht.node import Node
from lbry.extras.daemon.analytics import AnalyticsManager from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.wallet.transaction import Transaction from lbry.wallet.transaction import Transaction
@ -40,65 +43,20 @@ async def get_next_available_file_name(loop: asyncio.AbstractEventLoop, download
return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name) return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name)
class ManagedStream: class ManagedStream(ManagedDownloadSource):
STATUS_RUNNING = "running"
STATUS_STOPPED = "stopped"
STATUS_FINISHED = "finished"
SAVING_ID = 1
STREAMING_ID = 2
__slots__ = [
'loop',
'config',
'blob_manager',
'sd_hash',
'download_directory',
'_file_name',
'_added_on',
'_status',
'stream_claim_info',
'download_id',
'rowid',
'content_fee',
'purchase_receipt',
'downloader',
'analytics_manager',
'fully_reflected',
'reflector_progress',
'file_output_task',
'delayed_stop_task',
'streaming_responses',
'streaming',
'_running',
'saving',
'finished_writing',
'started_writing',
'finished_write_attempt',
'uploading_to_reflector'
]
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
sd_hash: str, download_directory: typing.Optional[str] = None, file_name: typing.Optional[str] = None, sd_hash: str, download_directory: Optional[str] = None, file_name: Optional[str] = None,
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredContentClaim] = None, status: Optional[str] = ManagedDownloadSource.STATUS_STOPPED,
download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None, claim: Optional[StoredContentClaim] = None,
descriptor: typing.Optional[StreamDescriptor] = None, download_id: Optional[str] = None, rowid: Optional[int] = None,
content_fee: typing.Optional['Transaction'] = None, descriptor: Optional[StreamDescriptor] = None,
analytics_manager: typing.Optional['AnalyticsManager'] = None, content_fee: Optional['Transaction'] = None,
added_on: typing.Optional[int] = None): analytics_manager: Optional['AnalyticsManager'] = None,
self.loop = loop added_on: Optional[int] = None):
self.config = config super().__init__(loop, config, blob_manager.storage, sd_hash, file_name, download_directory, status, claim,
download_id, rowid, content_fee, analytics_manager, added_on)
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.sd_hash = sd_hash
self.download_directory = download_directory
self._file_name = file_name
self._status = status
self.stream_claim_info = claim
self.download_id = download_id or binascii.hexlify(generate_id()).decode()
self.rowid = rowid
self.content_fee = content_fee
self.purchase_receipt = None self.purchase_receipt = None
self._added_on = added_on
self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor) self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor)
self.analytics_manager = analytics_manager self.analytics_manager = analytics_manager
@ -108,12 +66,13 @@ class ManagedStream:
self.file_output_task: typing.Optional[asyncio.Task] = None self.file_output_task: typing.Optional[asyncio.Task] = None
self.delayed_stop_task: typing.Optional[asyncio.Task] = None self.delayed_stop_task: typing.Optional[asyncio.Task] = None
self.streaming_responses: typing.List[typing.Tuple[Request, StreamResponse]] = [] self.streaming_responses: typing.List[typing.Tuple[Request, StreamResponse]] = []
self.fully_reflected = asyncio.Event(loop=self.loop)
self.streaming = asyncio.Event(loop=self.loop) self.streaming = asyncio.Event(loop=self.loop)
self._running = asyncio.Event(loop=self.loop) self._running = asyncio.Event(loop=self.loop)
self.saving = asyncio.Event(loop=self.loop)
self.finished_writing = asyncio.Event(loop=self.loop) @property
self.started_writing = asyncio.Event(loop=self.loop) def sd_hash(self) -> str:
self.finished_write_attempt = asyncio.Event(loop=self.loop) return self.identifier
@property @property
def is_fully_reflected(self) -> bool: def is_fully_reflected(self) -> bool:
@ -128,17 +87,9 @@ class ManagedStream:
return self.descriptor.stream_hash return self.descriptor.stream_hash
@property @property
def file_name(self) -> typing.Optional[str]: def file_name(self) -> Optional[str]:
return self._file_name or (self.descriptor.suggested_file_name if self.descriptor else None) return self._file_name or (self.descriptor.suggested_file_name if self.descriptor else None)
@property
def added_on(self) -> typing.Optional[int]:
return self._added_on
@property
def status(self) -> str:
return self._status
@property @property
def written_bytes(self) -> int: def written_bytes(self) -> int:
return 0 if not self.output_file_exists else os.stat(self.full_path).st_size return 0 if not self.output_file_exists else os.stat(self.full_path).st_size
@ -156,55 +107,6 @@ class ManagedStream:
self._status = status self._status = status
await self.blob_manager.storage.change_file_status(self.stream_hash, status) await self.blob_manager.storage.change_file_status(self.stream_hash, status)
@property
def finished(self) -> bool:
return self.status == self.STATUS_FINISHED
@property
def running(self) -> bool:
return self.status == self.STATUS_RUNNING
@property
def claim_id(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.claim_id
@property
def txid(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.txid
@property
def nout(self) -> typing.Optional[int]:
return None if not self.stream_claim_info else self.stream_claim_info.nout
@property
def outpoint(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.outpoint
@property
def claim_height(self) -> typing.Optional[int]:
return None if not self.stream_claim_info else self.stream_claim_info.height
@property
def channel_claim_id(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.channel_claim_id
@property
def channel_name(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.channel_name
@property
def claim_name(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.claim_name
@property
def metadata(self) -> typing.Optional[typing.Dict]:
return None if not self.stream_claim_info else self.stream_claim_info.claim.stream.to_dict()
@property
def metadata_protobuf(self) -> bytes:
if self.stream_claim_info:
return binascii.hexlify(self.stream_claim_info.claim.to_bytes())
@property @property
def blobs_completed(self) -> int: def blobs_completed(self) -> int:
return sum([1 if b.blob_hash in self.blob_manager.completed_blob_hashes else 0 return sum([1 if b.blob_hash in self.blob_manager.completed_blob_hashes else 0
@ -218,39 +120,30 @@ class ManagedStream:
def blobs_remaining(self) -> int: def blobs_remaining(self) -> int:
return self.blobs_in_stream - self.blobs_completed return self.blobs_in_stream - self.blobs_completed
@property
def full_path(self) -> typing.Optional[str]:
return os.path.join(self.download_directory, os.path.basename(self.file_name)) \
if self.file_name and self.download_directory else None
@property
def output_file_exists(self):
return os.path.isfile(self.full_path) if self.full_path else False
@property @property
def mime_type(self): def mime_type(self):
return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0] return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0]
@classmethod # @classmethod
async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', # async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config',
file_path: str, key: typing.Optional[bytes] = None, # file_path: str, key: Optional[bytes] = None,
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedStream': # iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedDownloadSource':
""" # """
Generate a stream from a file and save it to the db # Generate a stream from a file and save it to the db
""" # """
descriptor = await StreamDescriptor.create_stream( # descriptor = await StreamDescriptor.create_stream(
loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator, # loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator,
blob_completed_callback=blob_manager.blob_completed # blob_completed_callback=blob_manager.blob_completed
) # )
await blob_manager.storage.store_stream( # await blob_manager.storage.store_stream(
blob_manager.get_blob(descriptor.sd_hash), descriptor # blob_manager.get_blob(descriptor.sd_hash), descriptor
) # )
row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path), # row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path),
os.path.dirname(file_path), 0) # os.path.dirname(file_path), 0)
return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path), # return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path),
os.path.basename(file_path), status=cls.STATUS_FINISHED, rowid=row_id, descriptor=descriptor) # os.path.basename(file_path), status=cls.STATUS_FINISHED, rowid=row_id, descriptor=descriptor)
async def start(self, node: typing.Optional['Node'] = None, timeout: typing.Optional[float] = None, async def start(self, node: Optional['Node'] = None, timeout: Optional[float] = None,
save_now: bool = False): save_now: bool = False):
timeout = timeout or self.config.download_timeout timeout = timeout or self.config.download_timeout
if self._running.is_set(): if self._running.is_set():
@ -287,7 +180,7 @@ class ManagedStream:
if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING: if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING:
await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED) await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED)
async def _aiter_read_stream(self, start_blob_num: typing.Optional[int] = 0, connection_id: int = 0)\ async def _aiter_read_stream(self, start_blob_num: Optional[int] = 0, connection_id: int = 0)\
-> typing.AsyncIterator[typing.Tuple['BlobInfo', bytes]]: -> typing.AsyncIterator[typing.Tuple['BlobInfo', bytes]]:
if start_blob_num >= len(self.descriptor.blobs[:-1]): if start_blob_num >= len(self.descriptor.blobs[:-1]):
raise IndexError(start_blob_num) raise IndexError(start_blob_num)
@ -299,7 +192,7 @@ class ManagedStream:
decrypted = await self.downloader.read_blob(blob_info, connection_id) decrypted = await self.downloader.read_blob(blob_info, connection_id)
yield (blob_info, decrypted) yield (blob_info, decrypted)
async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse: async def stream_file(self, request: Request, node: Optional['Node'] = None) -> StreamResponse:
log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id, log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id,
self.sd_hash[:6]) self.sd_hash[:6])
headers, size, skip_blobs, first_blob_start_offset = self._prepare_range_response_headers( headers, size, skip_blobs, first_blob_start_offset = self._prepare_range_response_headers(
@ -391,8 +284,8 @@ class ManagedStream:
self.saving.clear() self.saving.clear()
self.finished_write_attempt.set() self.finished_write_attempt.set()
async def save_file(self, file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None, async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None,
node: typing.Optional['Node'] = None): node: Optional['Node'] = None):
await self.start(node) await self.start(node)
if self.file_output_task and not self.file_output_task.done(): # cancel an already running save task if self.file_output_task and not self.file_output_task.done(): # cancel an already running save task
self.file_output_task.cancel() self.file_output_task.cancel()
@ -476,7 +369,7 @@ class ManagedStream:
claim_info['claim_sequence'], claim_info.get('channel_name') claim_info['claim_sequence'], claim_info.get('channel_name')
) )
async def update_content_claim(self, claim_info: typing.Optional[typing.Dict] = None): async def update_content_claim(self, claim_info: Optional[typing.Dict] = None):
if not claim_info: if not claim_info:
claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash) claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash)
self.set_claim(claim_info, claim_info['value']) self.set_claim(claim_info, claim_info['value'])

View file

@ -15,11 +15,14 @@ from lbry.schema.claim import Claim
from lbry.schema.url import URL from lbry.schema.url import URL
from lbry.wallet.dewies import dewies_to_lbc from lbry.wallet.dewies import dewies_to_lbc
from lbry.wallet import Output from lbry.wallet import Output
from lbry.source_manager import SourceManager
from lbry.file.source import ManagedDownloadSource
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.conf import Config from lbry.conf import Config
from lbry.blob.blob_manager import BlobManager from lbry.blob.blob_manager import BlobManager
from lbry.dht.node import Node from lbry.dht.node import Node
from lbry.wallet.wallet import WalletManager
from lbry.wallet.transaction import Transaction
from lbry.extras.daemon.analytics import AnalyticsManager from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
@ -29,32 +32,12 @@ if typing.TYPE_CHECKING:
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
FILTER_FIELDS = [
'rowid',
'status',
'file_name',
'added_on',
'sd_hash',
'stream_hash',
'claim_name',
'claim_height',
'claim_id',
'outpoint',
'txid',
'nout',
'channel_claim_id',
'channel_name',
'full_status', # TODO: remove
'blobs_remaining',
'blobs_in_stream'
]
SET_FILTER_FIELDS = { SET_FILTER_FIELDS = {
"claim_ids": "claim_id", "claim_ids": "claim_id",
"channel_claim_ids": "channel_claim_id", "channel_claim_ids": "channel_claim_id",
"outpoints": "outpoint" "outpoints": "outpoint"
} }
COMPARISON_OPERATORS = { COMPARISON_OPERATORS = {
'eq': lambda a, b: a == b, 'eq': lambda a, b: a == b,
'ne': lambda a, b: a != b, 'ne': lambda a, b: a != b,
@ -64,35 +47,44 @@ COMPARISON_OPERATORS = {
'le': lambda a, b: a <= b, 'le': lambda a, b: a <= b,
'in': lambda a, b: a in b 'in': lambda a, b: a in b
} }
def path_or_none(p) -> Optional[str]:
if not p:
def path_or_none(path) -> Optional[str]:
if not path:
return return
return binascii.unhexlify(path).decode() return binascii.unhexlify(p).decode()
class StreamManager: class StreamManager(SourceManager):
_sources: typing.Dict[str, ManagedStream]
filter_fields = set(SourceManager.filter_fields)
filter_fields.update({
'sd_hash',
'stream_hash',
'full_status', # TODO: remove
'blobs_remaining',
'blobs_in_stream'
})
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
wallet_manager: 'WalletManager', storage: 'SQLiteStorage', node: Optional['Node'], wallet_manager: 'WalletManager', storage: 'SQLiteStorage', node: Optional['Node'],
analytics_manager: Optional['AnalyticsManager'] = None): analytics_manager: Optional['AnalyticsManager'] = None):
self.loop = loop super().__init__(loop, config, storage, analytics_manager)
self.config = config
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.wallet_manager = wallet_manager self.wallet_manager = wallet_manager
self.storage = storage
self.node = node self.node = node
self.analytics_manager = analytics_manager
self.streams: typing.Dict[str, ManagedStream] = {}
self.resume_saving_task: Optional[asyncio.Task] = None self.resume_saving_task: Optional[asyncio.Task] = None
self.re_reflect_task: Optional[asyncio.Task] = None self.re_reflect_task: Optional[asyncio.Task] = None
self.update_stream_finished_futs: typing.List[asyncio.Future] = [] self.update_stream_finished_futs: typing.List[asyncio.Future] = []
self.running_reflector_uploads: typing.Dict[str, asyncio.Task] = {} self.running_reflector_uploads: typing.Dict[str, asyncio.Task] = {}
self.started = asyncio.Event(loop=self.loop) self.started = asyncio.Event(loop=self.loop)
def add(self, source: ManagedStream):
super().add(source)
self.storage.content_claim_callbacks[source.stream_hash] = lambda: self._update_content_claim(source)
async def _update_content_claim(self, stream: ManagedStream): async def _update_content_claim(self, stream: ManagedStream):
claim_info = await self.storage.get_content_claim(stream.stream_hash) claim_info = await self.storage.get_content_claim(stream.stream_hash)
self.streams.setdefault(stream.sd_hash, stream).set_claim(claim_info, claim_info['value']) self._sources.setdefault(stream.sd_hash, stream).set_claim(claim_info, claim_info['value'])
async def recover_streams(self, file_infos: typing.List[typing.Dict]): async def recover_streams(self, file_infos: typing.List[typing.Dict]):
to_restore = [] to_restore = []
@ -123,10 +115,10 @@ class StreamManager:
# if self.blob_manager._save_blobs: # if self.blob_manager._save_blobs:
# log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos)) # log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos))
async def add_stream(self, rowid: int, sd_hash: str, file_name: Optional[str], async def _load_stream(self, rowid: int, sd_hash: str, file_name: Optional[str],
download_directory: Optional[str], status: str, download_directory: Optional[str], status: str,
claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
added_on: Optional[int], fully_reflected: bool): added_on: Optional[int]):
try: try:
descriptor = await self.blob_manager.get_stream_descriptor(sd_hash) descriptor = await self.blob_manager.get_stream_descriptor(sd_hash)
except InvalidStreamDescriptorError as err: except InvalidStreamDescriptorError as err:
@ -139,10 +131,9 @@ class StreamManager:
) )
if fully_reflected: if fully_reflected:
stream.fully_reflected.set() stream.fully_reflected.set()
self.streams[sd_hash] = stream self.add(stream)
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
async def load_and_resume_streams_from_database(self): async def initialize_from_database(self):
to_recover = [] to_recover = []
to_start = [] to_start = []
@ -156,7 +147,6 @@ class StreamManager:
to_recover.append(file_info) to_recover.append(file_info)
to_start.append(file_info) to_start.append(file_info)
if to_recover: if to_recover:
log.info("Recover %i files", len(to_recover))
await self.recover_streams(to_recover) await self.recover_streams(to_recover)
log.info("Initializing %i files", len(to_start)) log.info("Initializing %i files", len(to_start))
@ -167,7 +157,7 @@ class StreamManager:
download_directory = path_or_none(file_info['download_directory']) download_directory = path_or_none(file_info['download_directory'])
if file_name and download_directory and not file_info['saved_file'] and file_info['status'] == 'running': if file_name and download_directory and not file_info['saved_file'] and file_info['status'] == 'running':
to_resume_saving.append((file_name, download_directory, file_info['sd_hash'])) to_resume_saving.append((file_name, download_directory, file_info['sd_hash']))
add_stream_tasks.append(self.loop.create_task(self.add_stream( add_stream_tasks.append(self.loop.create_task(self._load_stream(
file_info['rowid'], file_info['sd_hash'], file_name, file_info['rowid'], file_info['sd_hash'], file_name,
download_directory, file_info['status'], download_directory, file_info['status'],
file_info['claim'], file_info['content_fee'], file_info['claim'], file_info['content_fee'],
@ -175,25 +165,22 @@ class StreamManager:
))) )))
if add_stream_tasks: if add_stream_tasks:
await asyncio.gather(*add_stream_tasks, loop=self.loop) await asyncio.gather(*add_stream_tasks, loop=self.loop)
log.info("Started stream manager with %i files", len(self.streams)) log.info("Started stream manager with %i files", len(self._sources))
if not self.node: if not self.node:
log.info("no DHT node given, resuming downloads trusting that we can contact reflector") log.info("no DHT node given, resuming downloads trusting that we can contact reflector")
if to_resume_saving: if to_resume_saving:
self.resume_saving_task = self.loop.create_task(self.resume(to_resume_saving)) log.info("Resuming saving %i files", len(to_resume_saving))
self.resume_saving_task = self.loop.create_task(asyncio.gather(
async def resume(self, to_resume_saving): *(self._sources[sd_hash].save_file(file_name, download_directory, node=self.node)
log.info("Resuming saving %i files", len(to_resume_saving)) for (file_name, download_directory, sd_hash) in to_resume_saving),
await asyncio.gather( loop=self.loop
*(self.streams[sd_hash].save_file(file_name, download_directory, node=self.node) ))
for (file_name, download_directory, sd_hash) in to_resume_saving),
loop=self.loop
)
async def reflect_streams(self): async def reflect_streams(self):
while True: while True:
if self.config.reflect_streams and self.config.reflector_servers: if self.config.reflect_streams and self.config.reflector_servers:
sd_hashes = await self.storage.get_streams_to_re_reflect() sd_hashes = await self.storage.get_streams_to_re_reflect()
sd_hashes = [sd for sd in sd_hashes if sd in self.streams] sd_hashes = [sd for sd in sd_hashes if sd in self._sources]
batch = [] batch = []
while sd_hashes: while sd_hashes:
stream = self.streams[sd_hashes.pop()] stream = self.streams[sd_hashes.pop()]
@ -209,18 +196,14 @@ class StreamManager:
await asyncio.sleep(300, loop=self.loop) await asyncio.sleep(300, loop=self.loop)
async def start(self): async def start(self):
await self.load_and_resume_streams_from_database() await super().start()
self.re_reflect_task = self.loop.create_task(self.reflect_streams()) self.re_reflect_task = self.loop.create_task(self.reflect_streams())
self.started.set()
def stop(self): def stop(self):
if self.resume_saving_task and not self.resume_saving_task.done(): if self.resume_saving_task and not self.resume_saving_task.done():
self.resume_saving_task.cancel() self.resume_saving_task.cancel()
if self.re_reflect_task and not self.re_reflect_task.done(): if self.re_reflect_task and not self.re_reflect_task.done():
self.re_reflect_task.cancel() self.re_reflect_task.cancel()
while self.streams:
_, stream = self.streams.popitem()
stream.stop_tasks()
while self.update_stream_finished_futs: while self.update_stream_finished_futs:
self.update_stream_finished_futs.pop().cancel() self.update_stream_finished_futs.pop().cancel()
while self.running_reflector_uploads: while self.running_reflector_uploads:
@ -260,14 +243,7 @@ class StreamManager:
del self.streams[stream.sd_hash] del self.streams[stream.sd_hash]
blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]] blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]
await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
await self.storage.delete_stream(stream.descriptor) await self.storage.delete(stream.descriptor)
if delete_file and stream.output_file_exists:
os.remove(stream.full_path)
def get_stream_by_stream_hash(self, stream_hash: str) -> Optional[ManagedStream]:
streams = tuple(filter(lambda stream: stream.stream_hash == stream_hash, self.streams.values()))
if streams:
return streams[0]
def get_filtered_streams(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False, def get_filtered_streams(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False,
comparison: Optional[str] = None, comparison: Optional[str] = None,
@ -324,199 +300,5 @@ class StreamManager:
streams.reverse() streams.reverse()
return streams return streams
async def _check_update_or_replace(self, outpoint: str, claim_id: str, claim: Claim
) -> typing.Tuple[Optional[ManagedStream], Optional[ManagedStream]]:
existing = self.get_filtered_streams(outpoint=outpoint)
if existing:
return existing[0], None
existing = self.get_filtered_streams(sd_hash=claim.stream.source.sd_hash)
if existing and existing[0].claim_id != claim_id:
raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {claim_id}")
if existing:
log.info("claim contains a metadata only update to a stream we have")
await self.storage.save_content_claim(
existing[0].stream_hash, outpoint
)
await self._update_content_claim(existing[0])
return existing[0], None
else:
existing_for_claim_id = self.get_filtered_streams(claim_id=claim_id)
if existing_for_claim_id:
log.info("claim contains an update to a stream we have, downloading it")
return None, existing_for_claim_id[0]
return None, None
@staticmethod
def _convert_to_old_resolve_output(wallet_manager, resolves):
result = {}
for url, txo in resolves.items():
if isinstance(txo, Output):
tx_height = txo.tx_ref.height
best_height = wallet_manager.ledger.headers.height
result[url] = {
'name': txo.claim_name,
'value': txo.claim,
'protobuf': binascii.hexlify(txo.claim.to_bytes()),
'claim_id': txo.claim_id,
'txid': txo.tx_ref.id,
'nout': txo.position,
'amount': dewies_to_lbc(txo.amount),
'effective_amount': txo.meta.get('effective_amount', 0),
'height': tx_height,
'confirmations': (best_height+1) - tx_height if tx_height > 0 else tx_height,
'claim_sequence': -1,
'address': txo.get_address(wallet_manager.ledger),
'valid_at_height': txo.meta.get('activation_height', None),
'timestamp': wallet_manager.ledger.headers.estimated_timestamp(tx_height),
'supports': []
}
else:
result[url] = txo
return result
@cache_concurrent
async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
timeout: Optional[float] = None,
file_name: Optional[str] = None,
download_directory: Optional[str] = None,
save_file: Optional[bool] = None,
resolve_timeout: float = 3.0,
wallet: Optional['Wallet'] = None) -> ManagedStream:
manager = self.wallet_manager
wallet = wallet or manager.default_wallet
timeout = timeout or self.config.download_timeout
start_time = self.loop.time()
resolved_time = None
stream = None
txo: Optional[Output] = None
error = None
outpoint = None
if save_file is None:
save_file = self.config.save_files
if file_name and not save_file:
save_file = True
if save_file:
download_directory = download_directory or self.config.download_dir
else:
download_directory = None
payment = None
try:
# resolve the claim
if not URL.parse(uri).has_stream:
raise ResolveError("cannot download a channel claim, specify a /path")
try:
response = await asyncio.wait_for(
manager.ledger.resolve(wallet.accounts, [uri], include_purchase_receipt=True),
resolve_timeout
)
resolved_result = self._convert_to_old_resolve_output(manager, response)
except asyncio.TimeoutError:
raise ResolveTimeoutError(uri)
except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.exception("Unexpected error resolving stream:")
raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
await self.storage.save_claims_for_resolve([
value for value in resolved_result.values() if 'error' not in value
])
resolved = resolved_result.get(uri, {})
resolved = resolved if 'value' in resolved else resolved.get('claim')
if not resolved:
raise ResolveError(f"Failed to resolve stream at '{uri}'")
if 'error' in resolved:
raise ResolveError(f"error resolving stream: {resolved['error']}")
txo = response[uri]
claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf']))
outpoint = f"{resolved['txid']}:{resolved['nout']}"
resolved_time = self.loop.time() - start_time
# resume or update an existing stream, if the stream changed: download it and delete the old one after
updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim)
if updated_stream:
log.info("already have stream for %s", uri)
if save_file and updated_stream.output_file_exists:
save_file = False
await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file)
if not updated_stream.output_file_exists and (save_file or file_name or download_directory):
await updated_stream.save_file(
file_name=file_name, download_directory=download_directory, node=self.node
)
return updated_stream
if not to_replace and txo.has_price and not txo.purchase_receipt:
payment = await manager.create_purchase_transaction(
wallet.accounts, txo, exchange_rate_manager
)
stream = ManagedStream(
self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory,
file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
analytics_manager=self.analytics_manager
)
log.info("starting download for %s", uri)
before_download = self.loop.time()
await stream.start(self.node, timeout)
stream.set_claim(resolved, claim)
if to_replace: # delete old stream now that the replacement has started downloading
await self.delete_stream(to_replace)
if payment is not None:
await manager.broadcast_or_release(payment)
payment = None # to avoid releasing in `finally` later
log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri)
await self.storage.save_content_fee(stream.stream_hash, stream.content_fee)
self.streams[stream.sd_hash] = stream
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
await self.storage.save_content_claim(stream.stream_hash, outpoint)
if save_file:
await asyncio.wait_for(stream.save_file(node=self.node), timeout - (self.loop.time() - before_download),
loop=self.loop)
return stream
except asyncio.TimeoutError:
error = DownloadDataTimeoutError(stream.sd_hash)
raise error
except Exception as err: # forgive data timeout, don't delete stream
expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
KeyFeeAboveMaxAllowedError)
if isinstance(err, expected):
log.warning("Failed to download %s: %s", uri, str(err))
elif isinstance(err, asyncio.CancelledError):
pass
else:
log.exception("Unexpected error downloading stream:")
error = err
raise
finally:
if payment is not None:
# payment is set to None after broadcasting, if we're here an exception probably happened
await manager.ledger.release_tx(payment)
if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
stream.downloader.time_to_first_bytes))):
server = self.wallet_manager.ledger.network.client.server
self.loop.create_task(
self.analytics_manager.send_time_to_first_bytes(
resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id,
uri, outpoint,
None if not stream else len(stream.downloader.blob_downloader.active_connections),
None if not stream else len(stream.downloader.blob_downloader.scores),
None if not stream else len(stream.downloader.blob_downloader.connection_failures),
False if not stream else stream.downloader.added_fixed_peers,
self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay,
None if not stream else stream.sd_hash,
None if not stream else stream.downloader.time_to_descriptor,
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
None if not stream else stream.downloader.time_to_first_bytes,
None if not error else error.__class__.__name__,
None if not error else str(error),
None if not server else f"{server[0]}:{server[1]}"
)
)
async def stream_partial_content(self, request: Request, sd_hash: str): async def stream_partial_content(self, request: Request, sd_hash: str):
return await self.streams[sd_hash].stream_file(request, self.node) return await self._sources[sd_hash].stream_file(request, self.node)