minor updates to stream_manager.py

This commit is contained in:
Lex Berezhny 2020-05-01 09:34:57 -04:00
parent 713c665588
commit 2f575a393f

View file

@ -13,8 +13,8 @@ from lbry.stream.descriptor import StreamDescriptor
from lbry.stream.managed_stream import ManagedStream from lbry.stream.managed_stream import ManagedStream
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.schema.url import URL from lbry.schema.url import URL
from lbry.wallet.dewies import dewies_to_lbc from lbry.blockchain.dewies import dewies_to_lbc
from lbry.wallet import Output from lbry.blockchain.transaction import Output
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.conf import Config from lbry.conf import Config
@ -23,6 +23,7 @@ if typing.TYPE_CHECKING:
from lbry.extras.daemon.analytics import AnalyticsManager from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
from lbry.service.base import Service
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -64,12 +65,12 @@ def path_or_none(path) -> Optional[str]:
class StreamManager: class StreamManager:
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
wallet_manager: 'WalletManager', storage: 'SQLiteStorage', node: Optional['Node'], service: 'Service', storage: 'SQLiteStorage', node: Optional['Node'],
analytics_manager: Optional['AnalyticsManager'] = None): analytics_manager: Optional['AnalyticsManager'] = None):
self.loop = loop self.loop = loop
self.config = config self.config = config
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.wallet_manager = wallet_manager self.service = service
self.storage = storage self.storage = storage
self.node = node self.node = node
self.analytics_manager = analytics_manager self.analytics_manager = analytics_manager
@ -318,12 +319,13 @@ class StreamManager:
return None, None return None, None
@staticmethod @staticmethod
def _convert_to_old_resolve_output(wallet_manager, resolves): def _convert_to_old_resolve_output(service: 'Service', resolves):
result = {} result = {}
for url, txo in resolves.items(): for url, txo in resolves.items():
if isinstance(txo, Output): if isinstance(txo, Output):
tx_height = txo.tx_ref.height tx_height = txo.tx_ref.height
best_height = wallet_manager.ledger.headers.height # TODO: fix
#best_height = wallet_manager.ledger.headers.height
result[url] = { result[url] = {
'name': txo.claim_name, 'name': txo.claim_name,
'value': txo.claim, 'value': txo.claim,
@ -334,11 +336,13 @@ class StreamManager:
'amount': dewies_to_lbc(txo.amount), 'amount': dewies_to_lbc(txo.amount),
'effective_amount': txo.meta.get('effective_amount', 0), 'effective_amount': txo.meta.get('effective_amount', 0),
'height': tx_height, 'height': tx_height,
'confirmations': (best_height+1) - tx_height if tx_height > 0 else tx_height, # TODO: fix
'confirmations': 0, # (best_height+1) - tx_height if tx_height > 0 else tx_height,
'claim_sequence': -1, 'claim_sequence': -1,
'address': txo.get_address(wallet_manager.ledger), 'address': txo.get_address(service.ledger),
'valid_at_height': txo.meta.get('activation_height', None), 'valid_at_height': txo.meta.get('activation_height', None),
'timestamp': wallet_manager.ledger.headers.estimated_timestamp(tx_height), # TODO: fix
'timestamp': 0, # wallet_manager.ledger.headers.estimated_timestamp(tx_height),
'supports': [] 'supports': []
} }
else: else:
@ -353,8 +357,8 @@ class StreamManager:
save_file: Optional[bool] = None, save_file: Optional[bool] = None,
resolve_timeout: float = 3.0, resolve_timeout: float = 3.0,
wallet: Optional['Wallet'] = None) -> ManagedStream: wallet: Optional['Wallet'] = None) -> ManagedStream:
manager = self.wallet_manager service = self.service
wallet = wallet or manager.default_wallet wallet = wallet or self.service.wallet_manager.default_wallet
timeout = timeout or self.config.download_timeout timeout = timeout or self.config.download_timeout
start_time = self.loop.time() start_time = self.loop.time()
resolved_time = None resolved_time = None
@ -378,10 +382,10 @@ class StreamManager:
raise ResolveError("cannot download a channel claim, specify a /path") raise ResolveError("cannot download a channel claim, specify a /path")
try: try:
response = await asyncio.wait_for( response = await asyncio.wait_for(
manager.ledger.resolve(wallet.accounts, [uri], include_purchase_receipt=True), service.resolve(wallet.accounts, [uri], include_purchase_receipt=True),
resolve_timeout resolve_timeout
) )
resolved_result = self._convert_to_old_resolve_output(manager, response) resolved_result = self._convert_to_old_resolve_output(service, response)
except asyncio.TimeoutError: except asyncio.TimeoutError:
raise ResolveTimeoutError(uri) raise ResolveTimeoutError(uri)
except Exception as err: except Exception as err:
@ -418,7 +422,7 @@ class StreamManager:
return updated_stream return updated_stream
if not to_replace and txo.has_price and not txo.purchase_receipt: if not to_replace and txo.has_price and not txo.purchase_receipt:
payment = await manager.create_purchase_transaction( payment = await wallet.create_purchase_transaction(
wallet.accounts, txo, exchange_rate_manager wallet.accounts, txo, exchange_rate_manager
) )
@ -436,7 +440,7 @@ class StreamManager:
await self.delete_stream(to_replace) await self.delete_stream(to_replace)
if payment is not None: if payment is not None:
await manager.broadcast_or_release(payment) await service.broadcast_or_release(payment)
payment = None # to avoid releasing in `finally` later payment = None # to avoid releasing in `finally` later
log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri) log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri)
await self.storage.save_content_fee(stream.stream_hash, stream.content_fee) await self.storage.save_content_fee(stream.stream_hash, stream.content_fee)
@ -465,10 +469,11 @@ class StreamManager:
finally: finally:
if payment is not None: if payment is not None:
# payment is set to None after broadcasting, if we're here an exception probably happened # payment is set to None after broadcasting, if we're here an exception probably happened
await manager.ledger.release_tx(payment) await service.release_tx(payment)
if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
stream.downloader.time_to_first_bytes))): stream.downloader.time_to_first_bytes))):
server = self.wallet_manager.ledger.network.client.server # TODO: fix
# server = self.wallet_manager.ledger.network.client.server
self.loop.create_task( self.loop.create_task(
self.analytics_manager.send_time_to_first_bytes( self.analytics_manager.send_time_to_first_bytes(
resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id, resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id,
@ -485,7 +490,8 @@ class StreamManager:
None if not stream else stream.downloader.time_to_first_bytes, None if not stream else stream.downloader.time_to_first_bytes,
None if not error else error.__class__.__name__, None if not error else error.__class__.__name__,
None if not error else str(error), None if not error else str(error),
None if not server else f"{server[0]}:{server[1]}" # TODO: fix
'fakespv.lbry.com:50001' # None if not server else f"{server[0]}:{server[1]}"
) )
) )