From 2f575a393f8966b2354d198e1cfa99116218949f Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Fri, 1 May 2020 09:34:57 -0400 Subject: [PATCH] minor updates to stream_manager.py --- lbry/stream/stream_manager.py | 42 ++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 0ff206936..1dac94c5c 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -13,8 +13,8 @@ from lbry.stream.descriptor import StreamDescriptor from lbry.stream.managed_stream import ManagedStream from lbry.schema.claim import Claim from lbry.schema.url import URL -from lbry.wallet.dewies import dewies_to_lbc -from lbry.wallet import Output +from lbry.blockchain.dewies import dewies_to_lbc +from lbry.blockchain.transaction import Output if typing.TYPE_CHECKING: from lbry.conf import Config @@ -23,6 +23,7 @@ if typing.TYPE_CHECKING: from lbry.extras.daemon.analytics import AnalyticsManager from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager + from lbry.service.base import Service log = logging.getLogger(__name__) @@ -64,12 +65,12 @@ def path_or_none(path) -> Optional[str]: class StreamManager: def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', - wallet_manager: 'WalletManager', storage: 'SQLiteStorage', node: Optional['Node'], + service: 'Service', storage: 'SQLiteStorage', node: Optional['Node'], analytics_manager: Optional['AnalyticsManager'] = None): self.loop = loop self.config = config self.blob_manager = blob_manager - self.wallet_manager = wallet_manager + self.service = service self.storage = storage self.node = node self.analytics_manager = analytics_manager @@ -318,12 +319,13 @@ class StreamManager: return None, None @staticmethod - def _convert_to_old_resolve_output(wallet_manager, resolves): + def _convert_to_old_resolve_output(service: 'Service', resolves): result = {} for url, txo in resolves.items(): if isinstance(txo, Output): tx_height = txo.tx_ref.height - best_height = wallet_manager.ledger.headers.height + # TODO: fix + #best_height = wallet_manager.ledger.headers.height result[url] = { 'name': txo.claim_name, 'value': txo.claim, @@ -334,11 +336,13 @@ class StreamManager: 'amount': dewies_to_lbc(txo.amount), 'effective_amount': txo.meta.get('effective_amount', 0), 'height': tx_height, - 'confirmations': (best_height+1) - tx_height if tx_height > 0 else tx_height, + # TODO: fix + 'confirmations': 0, # (best_height+1) - tx_height if tx_height > 0 else tx_height, 'claim_sequence': -1, - 'address': txo.get_address(wallet_manager.ledger), + 'address': txo.get_address(service.ledger), 'valid_at_height': txo.meta.get('activation_height', None), - 'timestamp': wallet_manager.ledger.headers.estimated_timestamp(tx_height), + # TODO: fix + 'timestamp': 0, # wallet_manager.ledger.headers.estimated_timestamp(tx_height), 'supports': [] } else: @@ -353,8 +357,8 @@ class StreamManager: save_file: Optional[bool] = None, resolve_timeout: float = 3.0, wallet: Optional['Wallet'] = None) -> ManagedStream: - manager = self.wallet_manager - wallet = wallet or manager.default_wallet + service = self.service + wallet = wallet or self.service.wallet_manager.default_wallet timeout = timeout or self.config.download_timeout start_time = self.loop.time() resolved_time = None @@ -378,10 +382,10 @@ class StreamManager: raise ResolveError("cannot download a channel claim, specify a /path") try: response = await asyncio.wait_for( - manager.ledger.resolve(wallet.accounts, [uri], include_purchase_receipt=True), + service.resolve(wallet.accounts, [uri], include_purchase_receipt=True), resolve_timeout ) - resolved_result = self._convert_to_old_resolve_output(manager, response) + resolved_result = self._convert_to_old_resolve_output(service, response) except asyncio.TimeoutError: raise ResolveTimeoutError(uri) except Exception as err: @@ -418,7 +422,7 @@ class StreamManager: return updated_stream if not to_replace and txo.has_price and not txo.purchase_receipt: - payment = await manager.create_purchase_transaction( + payment = await wallet.create_purchase_transaction( wallet.accounts, txo, exchange_rate_manager ) @@ -436,7 +440,7 @@ class StreamManager: await self.delete_stream(to_replace) if payment is not None: - await manager.broadcast_or_release(payment) + await service.broadcast_or_release(payment) payment = None # to avoid releasing in `finally` later log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri) await self.storage.save_content_fee(stream.stream_hash, stream.content_fee) @@ -465,10 +469,11 @@ class StreamManager: finally: if payment is not None: # payment is set to None after broadcasting, if we're here an exception probably happened - await manager.ledger.release_tx(payment) + await service.release_tx(payment) if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or stream.downloader.time_to_first_bytes))): - server = self.wallet_manager.ledger.network.client.server + # TODO: fix + # server = self.wallet_manager.ledger.network.client.server self.loop.create_task( self.analytics_manager.send_time_to_first_bytes( resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id, @@ -485,7 +490,8 @@ class StreamManager: None if not stream else stream.downloader.time_to_first_bytes, None if not error else error.__class__.__name__, None if not error else str(error), - None if not server else f"{server[0]}:{server[1]}" + # TODO: fix + 'fakespv.lbry.com:50001' # None if not server else f"{server[0]}:{server[1]}" ) )