fix file/download paths

move download-from-uri logic into stream manager
This commit is contained in:
Jack Robison 2019-02-01 20:46:09 -05:00
parent efe4afd09e
commit e96b75a0d0
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 107 additions and 61 deletions

View file

@ -20,7 +20,7 @@ from lbrynet.conf import Config, Setting, SLACK_WEBHOOK
from lbrynet.blob.blob_file import is_valid_blobhash
from lbrynet.blob_exchange.downloader import download_blob
from lbrynet.error import InsufficientFundsError, DownloadSDTimeout, ComponentsNotStarted
from lbrynet.error import NullFundsError, NegativeFundsError, ResolveError, ComponentStartConditionNotMet
from lbrynet.error import NullFundsError, NegativeFundsError, ComponentStartConditionNotMet
from lbrynet.extras import system_info
from lbrynet.extras.daemon import analytics
from lbrynet.extras.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT
@ -1557,47 +1557,12 @@ class Daemon(metaclass=JSONRPCServerType):
}
"""
parsed_uri = parse_lbry_uri(uri)
if parsed_uri.is_channel:
raise Exception("cannot download a channel claim, specify a /path")
resolved = (await self.wallet_manager.resolve(uri)).get(uri, {})
resolved = resolved if 'value' in resolved else resolved.get('claim')
if not resolved:
raise ResolveError(
"Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", ""))
)
if 'error' in resolved:
raise ResolveError(f"error resolving stream: {resolved['error']}")
claim = ClaimDict.load_dict(resolved['value'])
fee_amount, fee_address = None, None
if claim.has_fee:
fee_amount = round(self.exchange_rate_manager.convert_currency(
claim.source_fee.currency, "LBC", claim.source_fee.amount
), 5)
fee_address = claim.source_fee.address
outpoint = f"{resolved['txid']}:{resolved['nout']}"
existing = self.stream_manager.get_filtered_streams(outpoint=outpoint)
if not existing:
existing.extend(self.stream_manager.get_filtered_streams(claim_id=resolved['claim_id'],
sd_hash=claim.source_hash))
if existing:
log.info("already have matching stream for %s", uri)
stream = existing[0]
if not stream.running:
full_path = os.path.join(stream.download_directory, stream.file_name)
if not os.path.isfile(full_path):
log.info("resuming download")
await self.stream_manager.start_stream(stream)
else:
stream = await self.stream_manager.download_stream_from_claim(
self.dht_node, resolved, file_name, timeout, fee_amount, fee_address
)
stream = await self.stream_manager.download_stream_from_uri(
uri, self.exchange_rate_manager, file_name, timeout
)
if stream:
return stream.as_dict()
raise DownloadSDTimeout(resolved['value']['stream']['source']['source'])
raise DownloadSDTimeout(uri)
@requires(STREAM_MANAGER_COMPONENT)
async def jsonrpc_file_set_status(self, status, **kwargs):

View file

@ -447,10 +447,10 @@ class SQLiteStorage(SQLiteMixin):
log.info("update file status %s -> %s", stream_hash, new_status)
return self.db.execute("update file set status=? where stream_hash=?", (new_status, stream_hash))
def change_file_download_dir(self, stream_hash: str, download_dir: str):
log.info("update file status %s -> %s", stream_hash, download_dir)
return self.db.execute("update file set download_directory=? where stream_hash=?", (
binascii.hexlify(download_dir.encode()).decode(), stream_hash
def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: str, file_name: str):
return self.db.execute("update file set download_directory=?, file_name=? where stream_hash=?", (
binascii.hexlify(download_dir.encode()).decode(), binascii.hexlify(file_name.encode()).decode(),
stream_hash
))
def get_all_stream_hashes(self):

View file

@ -77,12 +77,11 @@ class StreamAssembler:
await self.blob_manager.blob_completed(self.sd_blob)
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_manager.blob_dir,
self.sd_blob)
self.output_path = await get_next_available_file_name(self.loop, output_dir,
output_file_name or self.descriptor.suggested_file_name)
if not self.got_descriptor.is_set():
self.got_descriptor.set()
await self.after_got_descriptor()
self.output_path = await get_next_available_file_name(self.loop, output_dir,
output_file_name or self.descriptor.suggested_file_name)
self.stream_handle = open(self.output_path, 'wb')
await self.blob_manager.storage.store_stream(
self.sd_blob, self.descriptor

View file

@ -4,9 +4,11 @@ import typing
import binascii
import logging
import random
from lbrynet.error import ResolveError
from lbrynet.stream.downloader import StreamDownloader
from lbrynet.stream.managed_stream import ManagedStream
from lbrynet.schema.claim import ClaimDict
from lbrynet.schema.uri import parse_lbry_uri
from lbrynet.schema.decode import smart_decode
from lbrynet.extras.daemon.storage import lbc_to_dewies
if typing.TYPE_CHECKING:
@ -15,6 +17,7 @@ if typing.TYPE_CHECKING:
from lbrynet.dht.node import Node
from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.extras.wallet import LbryWalletManager
from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager
log = logging.getLogger(__name__)
@ -65,24 +68,44 @@ class StreamManager:
claim_info = await self.storage.get_content_claim(stream.stream_hash)
stream.set_claim(claim_info, smart_decode(claim_info['value']))
async def start_stream(self, stream: ManagedStream):
async def start_stream(self, stream: ManagedStream) -> bool:
"""
Resume or rebuild a partial or completed stream
"""
path = os.path.join(stream.download_directory, stream.file_name)
if not stream.running and not os.path.isfile(path):
if stream.downloader:
stream.downloader.stop()
stream.downloader = None
if not os.path.isfile(path) and not os.path.isfile(
os.path.join(self.config.download_dir, stream.file_name)):
await self.storage.change_file_download_dir(stream.stream_hash, self.config.download_dir)
# the directory is gone, can happen when the folder that contains a published file is deleted
# reset the download directory to the default and update the file name
if not os.path.isdir(stream.download_directory):
stream.download_directory = self.config.download_dir
stream.downloader = self.make_downloader(
stream.sd_hash, stream.download_directory, stream.file_name
stream.sd_hash, stream.download_directory, stream.descriptor.suggested_file_name
)
if stream.status != ManagedStream.STATUS_FINISHED:
await self.storage.change_file_status(stream.stream_hash, 'running')
stream.update_status('running')
stream.start_download(self.node)
await self.storage.change_file_status(stream.stream_hash, 'running')
stream.update_status('running')
try:
await asyncio.wait_for(self.loop.create_task(stream.downloader.got_descriptor.wait()),
self.config.download_timeout)
except asyncio.TimeoutError:
stream.stop_download()
stream.downloader = None
return False
file_name = os.path.basename(stream.downloader.output_path)
await self.storage.change_file_download_dir_and_file_name(
stream.stream_hash, self.config.download_dir, file_name
)
self.wait_for_stream_finished(stream)
return True
return True
def make_downloader(self, sd_hash: str, download_directory: str, file_name: str):
return StreamDownloader(
@ -204,18 +227,20 @@ class StreamManager:
downloader.stop()
log.info("stopped stream")
return
file_name = os.path.basename(downloader.output_path)
download_directory = os.path.dirname(downloader.output_path)
if not await self.blob_manager.storage.stream_exists(downloader.sd_hash):
await self.blob_manager.storage.store_stream(downloader.sd_blob, downloader.descriptor)
if not await self.blob_manager.storage.file_exists(downloader.sd_hash):
await self.blob_manager.storage.save_downloaded_file(
downloader.descriptor.stream_hash, os.path.basename(downloader.output_path), download_directory,
downloader.descriptor.stream_hash, file_name, download_directory,
0.0
)
await self.blob_manager.storage.save_content_claim(
downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}"
)
stream = ManagedStream(self.loop, self.blob_manager, downloader.descriptor, download_directory,
os.path.basename(downloader.output_path), downloader, ManagedStream.STATUS_RUNNING)
file_name, downloader, ManagedStream.STATUS_RUNNING)
stream.set_claim(claim_info, claim)
self.streams.add(stream)
try:
@ -230,18 +255,18 @@ class StreamManager:
file_name: typing.Optional[str] = None,
timeout: typing.Optional[float] = 60,
fee_amount: typing.Optional[float] = 0.0,
fee_address: typing.Optional[str] = None) -> typing.Optional[ManagedStream]:
fee_address: typing.Optional[str] = None,
should_pay: typing.Optional[bool] = True) -> typing.Optional[ManagedStream]:
log.info("get lbry://%s#%s", claim_info['name'], claim_info['claim_id'])
claim = ClaimDict.load_dict(claim_info['value'])
if fee_address and fee_amount:
if fee_amount > await self.wallet.default_account.get_balance():
raise Exception("not enough funds")
sd_hash = claim.source_hash.decode()
if sd_hash in self.starting_streams:
return await self.starting_streams[sd_hash]
already_started = tuple(filter(lambda s: s.descriptor.sd_hash == sd_hash, self.streams))
if already_started:
return already_started[0]
if should_pay and fee_address and fee_amount and fee_amount > await self.wallet.default_account.get_balance():
raise Exception("not enough funds")
self.starting_streams[sd_hash] = asyncio.Future(loop=self.loop)
stream_task = self.loop.create_task(
@ -251,7 +276,7 @@ class StreamManager:
await asyncio.wait_for(stream_task, timeout or self.config.download_timeout)
stream = await stream_task
self.starting_streams[sd_hash].set_result(stream)
if fee_address and fee_amount:
if should_pay and fee_address and fee_amount:
await self.wallet.send_amount_to_address(lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1'))
return stream
except (asyncio.TimeoutError, asyncio.CancelledError):
@ -301,3 +326,60 @@ class StreamManager:
if reverse:
streams.reverse()
return streams
async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
file_name: typing.Optional[str] = None,
timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]:
timeout = timeout or self.config.download_timeout
parsed_uri = parse_lbry_uri(uri)
if parsed_uri.is_channel:
raise Exception("cannot download a channel claim, specify a /path")
resolved = (await self.wallet.resolve(uri)).get(uri, {})
resolved = resolved if 'value' in resolved else resolved.get('claim')
if not resolved:
raise ResolveError(
"Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", ""))
)
if 'error' in resolved:
raise ResolveError(f"error resolving stream: {resolved['error']}")
claim = ClaimDict.load_dict(resolved['value'])
fee_amount, fee_address = None, None
if claim.has_fee:
fee_amount = round(exchange_rate_manager.convert_currency(
claim.source_fee.currency, "LBC", claim.source_fee.amount
), 5)
fee_address = claim.source_fee.address
outpoint = f"{resolved['txid']}:{resolved['nout']}"
existing = self.get_filtered_streams(outpoint=outpoint)
if not existing:
existing.extend(self.get_filtered_streams(sd_hash=claim.source_hash.decode()))
if existing and existing[0].claim_id != resolved['claim_id']:
raise Exception(f"stream for {existing[0].claim_id} collides with existing "
f"download {resolved['claim_id']}")
elif not existing:
existing.extend(self.get_filtered_streams(claim_id=resolved['claim_id']))
if existing and existing[0].sd_hash != claim.source_hash.decode():
log.info("claim contains an update to a stream we have, downloading it")
stream = await self.download_stream_from_claim(
self.node, resolved, file_name, timeout, fee_amount, fee_address, False
)
log.info("started new stream, deleting old one")
await self.delete_stream(existing[0])
return stream
elif existing:
log.info("already have matching stream for %s", uri)
stream = existing[0]
await self.start_stream(stream)
return stream
else:
stream = existing[0]
await self.start_stream(stream)
return stream
log.info("download stream from %s", uri)
return await self.download_stream_from_claim(
self.node, resolved, file_name, timeout, fee_amount, fee_address
)