This commit is contained in:
Victor Shyba 2019-09-08 23:23:14 -03:00
parent 368c6ab4a0
commit a394713171
4 changed files with 15 additions and 4 deletions

View file

@ -227,6 +227,7 @@ class JSONResponseEncoder(JSONEncoder):
'file_name': managed_stream.file_name if output_exists else None, 'file_name': managed_stream.file_name if output_exists else None,
'download_directory': managed_stream.download_directory if output_exists else None, 'download_directory': managed_stream.download_directory if output_exists else None,
'download_path': managed_stream.full_path if output_exists else None, 'download_path': managed_stream.full_path if output_exists else None,
'claim_output': managed_stream.claim_output,
'points_paid': 0.0, 'points_paid': 0.0,
'stopped': not managed_stream.running, 'stopped': not managed_stream.running,
'stream_hash': managed_stream.stream_hash, 'stream_hash': managed_stream.stream_hash,

View file

@ -18,7 +18,7 @@ if typing.TYPE_CHECKING:
from lbry.blob.blob_info import BlobInfo from lbry.blob.blob_info import BlobInfo
from lbry.dht.node import Node from lbry.dht.node import Node
from lbry.extras.daemon.analytics import AnalyticsManager from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.wallet.transaction import Transaction from lbry.wallet.transaction import Transaction, Output
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -68,7 +68,8 @@ class ManagedStream:
'saving', 'saving',
'finished_writing', 'finished_writing',
'started_writing', 'started_writing',
'finished_write_attempt' 'finished_write_attempt',
'claim_output'
] ]
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
@ -77,7 +78,8 @@ class ManagedStream:
download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None, download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None,
descriptor: typing.Optional[StreamDescriptor] = None, descriptor: typing.Optional[StreamDescriptor] = None,
content_fee: typing.Optional['Transaction'] = None, content_fee: typing.Optional['Transaction'] = None,
analytics_manager: typing.Optional['AnalyticsManager'] = None): analytics_manager: typing.Optional['AnalyticsManager'] = None,
output: typing.Optional['Output'] = None):
self.loop = loop self.loop = loop
self.config = config self.config = config
self.blob_manager = blob_manager self.blob_manager = blob_manager
@ -91,6 +93,7 @@ class ManagedStream:
self.content_fee = content_fee self.content_fee = content_fee
self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor) self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor)
self.analytics_manager = analytics_manager self.analytics_manager = analytics_manager
self.claim_output = output
self.fully_reflected = asyncio.Event(loop=self.loop) self.fully_reflected = asyncio.Event(loop=self.loop)
self.file_output_task: typing.Optional[asyncio.Task] = None self.file_output_task: typing.Optional[asyncio.Task] = None

View file

@ -1,5 +1,6 @@
import os import os
import asyncio import asyncio
import time
import typing import typing
import binascii import binascii
import logging import logging
@ -126,12 +127,15 @@ class StreamManager:
claim, content_fee=content_fee, rowid=rowid, descriptor=descriptor, claim, content_fee=content_fee, rowid=rowid, descriptor=descriptor,
analytics_manager=self.analytics_manager analytics_manager=self.analytics_manager
) )
tx = await self.wallet.get_transaction(claim.txid, save_missing=True)
stream.claim_output = tx.outputs[claim.nout]
self.streams[sd_hash] = stream self.streams[sd_hash] = stream
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
async def load_and_resume_streams_from_database(self): async def load_and_resume_streams_from_database(self):
to_recover = [] to_recover = []
to_start = [] to_start = []
start = time.time()
await self.storage.update_manually_removed_files_since_last_run() await self.storage.update_manually_removed_files_since_last_run()
@ -161,6 +165,7 @@ class StreamManager:
if add_stream_tasks: if add_stream_tasks:
await asyncio.gather(*add_stream_tasks, loop=self.loop) await asyncio.gather(*add_stream_tasks, loop=self.loop)
log.info("Started stream manager with %i files", len(self.streams)) log.info("Started stream manager with %i files", len(self.streams))
log.info("took %s seconds", time.time() - start)
if not self.node: if not self.node:
log.warning("no DHT node given, resuming downloads trusting that we can contact reflector") log.warning("no DHT node given, resuming downloads trusting that we can contact reflector")
if to_resume_saving: if to_resume_saving:

View file

@ -194,7 +194,7 @@ class LbryWalletManager(BaseWalletManager):
await account.ledger.broadcast(tx) await account.ledger.broadcast(tx)
return tx return tx
async def get_transaction(self, txid): async def get_transaction(self, txid, save_missing=False):
tx = await self.db.get_transaction(txid=txid) tx = await self.db.get_transaction(txid=txid)
if not tx: if not tx:
try: try:
@ -206,6 +206,8 @@ class LbryWalletManager(BaseWalletManager):
return {'success': False, 'code': e.code, 'message': e.message} return {'success': False, 'code': e.code, 'message': e.message}
tx = self.ledger.transaction_class(unhexlify(raw)) tx = self.ledger.transaction_class(unhexlify(raw))
await self.ledger.maybe_verify_transaction(tx, height) await self.ledger.maybe_verify_transaction(tx, height)
if save_missing:
await self.db.insert_transaction(tx)
return tx return tx
def save(self): def save(self):