Merge pull request #1844 from lbryio/reassemble-when-deleted-from-downloads
Reassemble when deleted from downloads
This commit is contained in:
commit
ba818354af
10 changed files with 168 additions and 75 deletions
|
@ -57,7 +57,7 @@ class BlobServerProtocol(asyncio.Protocol):
|
||||||
incoming_blob = {'blob_hash': blob.blob_hash, 'length': blob.length}
|
incoming_blob = {'blob_hash': blob.blob_hash, 'length': blob.length}
|
||||||
responses.append(BlobDownloadResponse(incoming_blob=incoming_blob))
|
responses.append(BlobDownloadResponse(incoming_blob=incoming_blob))
|
||||||
self.send_response(responses)
|
self.send_response(responses)
|
||||||
log.info("send %s to %s:%i", blob.blob_hash[:8], peer_address, peer_port)
|
log.debug("send %s to %s:%i", blob.blob_hash[:8], peer_address, peer_port)
|
||||||
try:
|
try:
|
||||||
sent = await blob.sendfile(self)
|
sent = await blob.sendfile(self)
|
||||||
except (ConnectionResetError, BrokenPipeError, RuntimeError, OSError):
|
except (ConnectionResetError, BrokenPipeError, RuntimeError, OSError):
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import typing
|
import typing
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from lbrynet.dht.node import Node
|
from lbrynet.dht.node import Node
|
||||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||||
|
@ -9,7 +10,8 @@ log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class BlobAnnouncer:
|
class BlobAnnouncer:
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, node: 'Node', storage: 'SQLiteStorage'):
|
def __init__(self, loop: asyncio.BaseEventLoop, node: 'Node', storage: 'SQLiteStorage',
|
||||||
|
time_getter: typing.Callable[[], float] = time.time):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.node = node
|
self.node = node
|
||||||
self.storage = storage
|
self.storage = storage
|
||||||
|
@ -17,6 +19,7 @@ class BlobAnnouncer:
|
||||||
self.announce_task: asyncio.Task = None
|
self.announce_task: asyncio.Task = None
|
||||||
self.running = False
|
self.running = False
|
||||||
self.announce_queue: typing.List[str] = []
|
self.announce_queue: typing.List[str] = []
|
||||||
|
self.time_getter = time_getter
|
||||||
|
|
||||||
async def _announce(self, batch_size: typing.Optional[int] = 10):
|
async def _announce(self, batch_size: typing.Optional[int] = 10):
|
||||||
if not batch_size:
|
if not batch_size:
|
||||||
|
@ -41,7 +44,7 @@ class BlobAnnouncer:
|
||||||
to_await.append(batch.pop())
|
to_await.append(batch.pop())
|
||||||
if to_await:
|
if to_await:
|
||||||
await asyncio.gather(*tuple(to_await), loop=self.loop)
|
await asyncio.gather(*tuple(to_await), loop=self.loop)
|
||||||
await self.storage.update_last_announced_blobs(announced, self.loop.time())
|
await self.storage.update_last_announced_blobs(announced, self.time_getter())
|
||||||
log.info("announced %i blobs", len(announced))
|
log.info("announced %i blobs", len(announced))
|
||||||
if self.running:
|
if self.running:
|
||||||
self.pending_call = self.loop.call_later(60, self.announce, batch_size)
|
self.pending_call = self.loop.call_later(60, self.announce, batch_size)
|
||||||
|
|
|
@ -79,8 +79,7 @@ class Node:
|
||||||
|
|
||||||
if not self.protocol.external_ip:
|
if not self.protocol.external_ip:
|
||||||
raise Exception("Cannot determine external IP")
|
raise Exception("Cannot determine external IP")
|
||||||
log.info("Store to %i peers", len(peers))
|
log.debug("Store to %i peers", len(peers))
|
||||||
log.info(peers)
|
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
log.debug("store to %s %s %s", peer.address, peer.udp_port, peer.tcp_port)
|
log.debug("store to %s %s %s", peer.address, peer.udp_port, peer.tcp_port)
|
||||||
stored_to_tup = await asyncio.gather(
|
stored_to_tup = await asyncio.gather(
|
||||||
|
|
|
@ -288,7 +288,7 @@ class IterativeNodeFinder(IterativeFinder):
|
||||||
found = response.found and self.key != self.protocol.node_id
|
found = response.found and self.key != self.protocol.node_id
|
||||||
|
|
||||||
if found:
|
if found:
|
||||||
log.info("found")
|
log.debug("found")
|
||||||
return self.put_result(self.shortlist, finish=True)
|
return self.put_result(self.shortlist, finish=True)
|
||||||
if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer):
|
if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer):
|
||||||
# log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted),
|
# log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted),
|
||||||
|
@ -302,7 +302,7 @@ class IterativeNodeFinder(IterativeFinder):
|
||||||
log.info("limit hit")
|
log.info("limit hit")
|
||||||
self.put_result(self.active, True)
|
self.put_result(self.active, True)
|
||||||
elif self.max_results and len(self.active) - len(self.yielded_peers) >= self.max_results:
|
elif self.max_results and len(self.active) - len(self.yielded_peers) >= self.max_results:
|
||||||
log.info("max results")
|
log.debug("max results")
|
||||||
self.put_result(self.active, True)
|
self.put_result(self.active, True)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -601,7 +601,7 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
res = await self.get_rpc_peer(peer).store(hash_value)
|
res = await self.get_rpc_peer(peer).store(hash_value)
|
||||||
if res != b"OK":
|
if res != b"OK":
|
||||||
raise ValueError(res)
|
raise ValueError(res)
|
||||||
log.info("Stored %s to %s", binascii.hexlify(hash_value).decode()[:8], peer)
|
log.debug("Stored %s to %s", binascii.hexlify(hash_value).decode()[:8], peer)
|
||||||
return peer.node_id, True
|
return peer.node_id, True
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
log.debug("Timeout while storing blob_hash %s at %s", binascii.hexlify(hash_value).decode()[:8], peer)
|
log.debug("Timeout while storing blob_hash %s at %s", binascii.hexlify(hash_value).decode()[:8], peer)
|
||||||
|
|
|
@ -20,7 +20,7 @@ from lbrynet.conf import Config, Setting, SLACK_WEBHOOK
|
||||||
from lbrynet.blob.blob_file import is_valid_blobhash
|
from lbrynet.blob.blob_file import is_valid_blobhash
|
||||||
from lbrynet.blob_exchange.downloader import download_blob
|
from lbrynet.blob_exchange.downloader import download_blob
|
||||||
from lbrynet.error import InsufficientFundsError, DownloadSDTimeout, ComponentsNotStarted
|
from lbrynet.error import InsufficientFundsError, DownloadSDTimeout, ComponentsNotStarted
|
||||||
from lbrynet.error import NullFundsError, NegativeFundsError, ResolveError, ComponentStartConditionNotMet
|
from lbrynet.error import NullFundsError, NegativeFundsError, ComponentStartConditionNotMet
|
||||||
from lbrynet.extras import system_info
|
from lbrynet.extras import system_info
|
||||||
from lbrynet.extras.daemon import analytics
|
from lbrynet.extras.daemon import analytics
|
||||||
from lbrynet.extras.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT
|
from lbrynet.extras.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT
|
||||||
|
@ -1308,8 +1308,9 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
file_list [--sd_hash=<sd_hash>] [--file_name=<file_name>] [--stream_hash=<stream_hash>]
|
file_list [--sd_hash=<sd_hash>] [--file_name=<file_name>] [--stream_hash=<stream_hash>]
|
||||||
[--rowid=<rowid>] [--claim_id=<claim_id>] [--outpoint=<outpoint>] [--txid=<txid>] [--nout=<nout>]
|
[--rowid=<rowid>] [--claim_id=<claim_id>] [--outpoint=<outpoint>] [--txid=<txid>] [--nout=<nout>]
|
||||||
[--channel_claim_id=<channel_claim_id>] [--channel_name=<channel_name>]
|
[--channel_claim_id=<channel_claim_id>] [--channel_name=<channel_name>]
|
||||||
[--claim_name=<claim_name>] [--sort=<sort_by>] [--reverse] [--comparison=<comparison>]
|
[--claim_name=<claim_name>] [--blobs_in_stream=<blobs_in_stream>]
|
||||||
[--full_status=<full_status>]
|
[--blobs_remaining=<blobs_remaining>] [--sort=<sort_by>]
|
||||||
|
[--comparison=<comparison>] [--full_status=<full_status>] [--reverse]
|
||||||
|
|
||||||
Options:
|
Options:
|
||||||
--sd_hash=<sd_hash> : (str) get file with matching sd hash
|
--sd_hash=<sd_hash> : (str) get file with matching sd hash
|
||||||
|
@ -1324,9 +1325,10 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
--channel_claim_id=<channel_claim_id> : (str) get file with matching channel claim id
|
--channel_claim_id=<channel_claim_id> : (str) get file with matching channel claim id
|
||||||
--channel_name=<channel_name> : (str) get file with matching channel name
|
--channel_name=<channel_name> : (str) get file with matching channel name
|
||||||
--claim_name=<claim_name> : (str) get file with matching claim name
|
--claim_name=<claim_name> : (str) get file with matching claim name
|
||||||
--sort=<sort_method> : (str) sort by any property, like 'file_name'
|
--blobs_in_stream<blobs_in_stream> : (int) get file with matching blobs in stream
|
||||||
or 'metadata.author'; to specify direction
|
--blobs_remaining=<blobs_remaining> : (int) amount of remaining blobs to download
|
||||||
append ',asc' or ',desc'
|
--sort=<sort_by> : (str) field to sort by (one of the above filter fields)
|
||||||
|
--comparison=<comparison> : (str) logical comparision, (eq | ne | g | ge | l | le)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
(list) List of files
|
(list) List of files
|
||||||
|
@ -1345,21 +1347,24 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
'download_path': (str) download path of file,
|
'download_path': (str) download path of file,
|
||||||
'mime_type': (str) mime type of file,
|
'mime_type': (str) mime type of file,
|
||||||
'key': (str) key attached to file,
|
'key': (str) key attached to file,
|
||||||
'total_bytes': (int) file size in bytes,
|
'total_bytes_lower_bound': (int) lower bound file size in bytes,
|
||||||
|
'total_bytes': (int) file upper bound size in bytes,
|
||||||
'written_bytes': (int) written size in bytes,
|
'written_bytes': (int) written size in bytes,
|
||||||
'blobs_completed': (int) number of fully downloaded blobs,
|
'blobs_completed': (int) number of fully downloaded blobs,
|
||||||
'blobs_in_stream': (int) total blobs on stream,
|
'blobs_in_stream': (int) total blobs on stream,
|
||||||
|
'blobs_remaining': (int) total blobs remaining to download,
|
||||||
'status': (str) downloader status
|
'status': (str) downloader status
|
||||||
'claim_id': (str) None if claim is not found else the claim id,
|
'claim_id': (str) None if claim is not found else the claim id,
|
||||||
'outpoint': (str) None if claim is not found else the tx and output,
|
|
||||||
'txid': (str) None if claim is not found else the transaction id,
|
'txid': (str) None if claim is not found else the transaction id,
|
||||||
'nout': (int) None if claim is not found else the transaction output index,
|
'nout': (int) None if claim is not found else the transaction output index,
|
||||||
|
'outpoint': (str) None if claim is not found else the tx and output,
|
||||||
'metadata': (dict) None if claim is not found else the claim metadata,
|
'metadata': (dict) None if claim is not found else the claim metadata,
|
||||||
'channel_claim_id': (str) None if claim is not found or not signed,
|
'channel_claim_id': (str) None if claim is not found or not signed,
|
||||||
'channel_name': (str) None if claim is not found or not signed,
|
'channel_name': (str) None if claim is not found or not signed,
|
||||||
'claim_name': (str) None if claim is not found else the claim name
|
'claim_name': (str) None if claim is not found else the claim name
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
|
}
|
||||||
"""
|
"""
|
||||||
sort = sort or 'status'
|
sort = sort or 'status'
|
||||||
comparison = comparison or 'eq'
|
comparison = comparison or 'eq'
|
||||||
|
@ -1552,42 +1557,12 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
parsed_uri = parse_lbry_uri(uri)
|
stream = await self.stream_manager.download_stream_from_uri(
|
||||||
if parsed_uri.is_channel:
|
uri, self.exchange_rate_manager, file_name, timeout
|
||||||
raise Exception("cannot download a channel claim, specify a /path")
|
|
||||||
|
|
||||||
resolved = (await self.wallet_manager.resolve(uri)).get(uri, {})
|
|
||||||
resolved = resolved if 'value' in resolved else resolved.get('claim')
|
|
||||||
|
|
||||||
if not resolved:
|
|
||||||
raise ResolveError(
|
|
||||||
"Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", ""))
|
|
||||||
)
|
|
||||||
if 'error' in resolved:
|
|
||||||
raise ResolveError(f"error resolving stream: {resolved['error']}")
|
|
||||||
|
|
||||||
claim = ClaimDict.load_dict(resolved['value'])
|
|
||||||
fee_amount, fee_address = None, None
|
|
||||||
if claim.has_fee:
|
|
||||||
fee_amount = round(self.exchange_rate_manager.convert_currency(
|
|
||||||
claim.source_fee.currency, "LBC", claim.source_fee.amount
|
|
||||||
), 5)
|
|
||||||
fee_address = claim.source_fee.address
|
|
||||||
outpoint = f"{resolved['txid']}:{resolved['nout']}"
|
|
||||||
existing = self.stream_manager.get_filtered_streams(outpoint=outpoint)
|
|
||||||
if not existing:
|
|
||||||
existing.extend(self.stream_manager.get_filtered_streams(claim_id=resolved['claim_id'],
|
|
||||||
sd_hash=claim.source_hash))
|
|
||||||
if existing:
|
|
||||||
log.info("already have matching stream for %s", uri)
|
|
||||||
stream = existing[0]
|
|
||||||
else:
|
|
||||||
stream = await self.stream_manager.download_stream_from_claim(
|
|
||||||
self.dht_node, resolved, file_name, timeout, fee_amount, fee_address
|
|
||||||
)
|
)
|
||||||
if stream:
|
if stream:
|
||||||
return stream.as_dict()
|
return stream.as_dict()
|
||||||
raise DownloadSDTimeout(resolved['value']['stream']['source']['source'])
|
raise DownloadSDTimeout(uri)
|
||||||
|
|
||||||
@requires(STREAM_MANAGER_COMPONENT)
|
@requires(STREAM_MANAGER_COMPONENT)
|
||||||
async def jsonrpc_file_set_status(self, status, **kwargs):
|
async def jsonrpc_file_set_status(self, status, **kwargs):
|
||||||
|
@ -1617,8 +1592,8 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
if not streams:
|
if not streams:
|
||||||
raise Exception(f'Unable to find a file for {kwargs}')
|
raise Exception(f'Unable to find a file for {kwargs}')
|
||||||
stream = streams[0]
|
stream = streams[0]
|
||||||
if status == 'start' and not stream.running and not stream.finished:
|
if status == 'start' and not stream.running:
|
||||||
stream.downloader.download(self.dht_node)
|
await self.stream_manager.start_stream(stream)
|
||||||
msg = "Resumed download"
|
msg = "Resumed download"
|
||||||
elif status == 'stop' and stream.running:
|
elif status == 'stop' and stream.running:
|
||||||
stream.stop_download()
|
stream.stop_download()
|
||||||
|
|
|
@ -447,6 +447,12 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
log.info("update file status %s -> %s", stream_hash, new_status)
|
log.info("update file status %s -> %s", stream_hash, new_status)
|
||||||
return self.db.execute("update file set status=? where stream_hash=?", (new_status, stream_hash))
|
return self.db.execute("update file set status=? where stream_hash=?", (new_status, stream_hash))
|
||||||
|
|
||||||
|
def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: str, file_name: str):
|
||||||
|
return self.db.execute("update file set download_directory=?, file_name=? where stream_hash=?", (
|
||||||
|
binascii.hexlify(download_dir.encode()).decode(), binascii.hexlify(file_name.encode()).decode(),
|
||||||
|
stream_hash
|
||||||
|
))
|
||||||
|
|
||||||
def get_all_stream_hashes(self):
|
def get_all_stream_hashes(self):
|
||||||
return self.run_and_return_list("select stream_hash from stream")
|
return self.run_and_return_list("select stream_hash from stream")
|
||||||
|
|
||||||
|
|
|
@ -77,12 +77,11 @@ class StreamAssembler:
|
||||||
await self.blob_manager.blob_completed(self.sd_blob)
|
await self.blob_manager.blob_completed(self.sd_blob)
|
||||||
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_manager.blob_dir,
|
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_manager.blob_dir,
|
||||||
self.sd_blob)
|
self.sd_blob)
|
||||||
|
self.output_path = await get_next_available_file_name(self.loop, output_dir,
|
||||||
|
output_file_name or self.descriptor.suggested_file_name)
|
||||||
if not self.got_descriptor.is_set():
|
if not self.got_descriptor.is_set():
|
||||||
self.got_descriptor.set()
|
self.got_descriptor.set()
|
||||||
await self.after_got_descriptor()
|
await self.after_got_descriptor()
|
||||||
self.output_path = await get_next_available_file_name(self.loop, output_dir,
|
|
||||||
output_file_name or self.descriptor.suggested_file_name)
|
|
||||||
|
|
||||||
self.stream_handle = open(self.output_path, 'wb')
|
self.stream_handle = open(self.output_path, 'wb')
|
||||||
await self.blob_manager.storage.store_stream(
|
await self.blob_manager.storage.store_stream(
|
||||||
self.sd_blob, self.descriptor
|
self.sd_blob, self.descriptor
|
||||||
|
|
|
@ -11,6 +11,7 @@ from lbrynet.extras.daemon.storage import StoredStreamClaim
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from lbrynet.schema.claim import ClaimDict
|
from lbrynet.schema.claim import ClaimDict
|
||||||
from lbrynet.blob.blob_manager import BlobFileManager
|
from lbrynet.blob.blob_manager import BlobFileManager
|
||||||
|
from lbrynet.dht.node import Node
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -99,6 +100,10 @@ class ManagedStream:
|
||||||
def sd_hash(self):
|
def sd_hash(self):
|
||||||
return self.descriptor.sd_hash
|
return self.descriptor.sd_hash
|
||||||
|
|
||||||
|
@property
|
||||||
|
def blobs_remaining(self) -> int:
|
||||||
|
return self.blobs_in_stream - self.blobs_completed
|
||||||
|
|
||||||
def as_dict(self) -> typing.Dict:
|
def as_dict(self) -> typing.Dict:
|
||||||
full_path = os.path.join(self.download_directory, self.file_name)
|
full_path = os.path.join(self.download_directory, self.file_name)
|
||||||
if not os.path.isfile(full_path):
|
if not os.path.isfile(full_path):
|
||||||
|
@ -129,6 +134,7 @@ class ManagedStream:
|
||||||
'written_bytes': written_bytes,
|
'written_bytes': written_bytes,
|
||||||
'blobs_completed': self.blobs_completed,
|
'blobs_completed': self.blobs_completed,
|
||||||
'blobs_in_stream': self.blobs_in_stream,
|
'blobs_in_stream': self.blobs_in_stream,
|
||||||
|
'blobs_remaining': self.blobs_remaining,
|
||||||
'status': self.status,
|
'status': self.status,
|
||||||
'claim_id': self.claim_id,
|
'claim_id': self.claim_id,
|
||||||
'txid': self.txid,
|
'txid': self.txid,
|
||||||
|
@ -155,6 +161,10 @@ class ManagedStream:
|
||||||
return cls(loop, blob_manager, descriptor, os.path.dirname(file_path), os.path.basename(file_path),
|
return cls(loop, blob_manager, descriptor, os.path.dirname(file_path), os.path.basename(file_path),
|
||||||
status=cls.STATUS_FINISHED)
|
status=cls.STATUS_FINISHED)
|
||||||
|
|
||||||
|
def start_download(self, node: typing.Optional['Node']):
|
||||||
|
self.downloader.download(node)
|
||||||
|
self.update_status(self.STATUS_RUNNING)
|
||||||
|
|
||||||
def stop_download(self):
|
def stop_download(self):
|
||||||
if self.downloader:
|
if self.downloader:
|
||||||
self.downloader.stop()
|
self.downloader.stop()
|
||||||
|
|
|
@ -4,9 +4,11 @@ import typing
|
||||||
import binascii
|
import binascii
|
||||||
import logging
|
import logging
|
||||||
import random
|
import random
|
||||||
|
from lbrynet.error import ResolveError
|
||||||
from lbrynet.stream.downloader import StreamDownloader
|
from lbrynet.stream.downloader import StreamDownloader
|
||||||
from lbrynet.stream.managed_stream import ManagedStream
|
from lbrynet.stream.managed_stream import ManagedStream
|
||||||
from lbrynet.schema.claim import ClaimDict
|
from lbrynet.schema.claim import ClaimDict
|
||||||
|
from lbrynet.schema.uri import parse_lbry_uri
|
||||||
from lbrynet.schema.decode import smart_decode
|
from lbrynet.schema.decode import smart_decode
|
||||||
from lbrynet.extras.daemon.storage import lbc_to_dewies
|
from lbrynet.extras.daemon.storage import lbc_to_dewies
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
|
@ -15,6 +17,7 @@ if typing.TYPE_CHECKING:
|
||||||
from lbrynet.dht.node import Node
|
from lbrynet.dht.node import Node
|
||||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||||
from lbrynet.extras.wallet import LbryWalletManager
|
from lbrynet.extras.wallet import LbryWalletManager
|
||||||
|
from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -32,7 +35,9 @@ filter_fields = [
|
||||||
'nout',
|
'nout',
|
||||||
'channel_claim_id',
|
'channel_claim_id',
|
||||||
'channel_name',
|
'channel_name',
|
||||||
'full_status'
|
'full_status', # TODO: remove
|
||||||
|
'blobs_remaining',
|
||||||
|
'blobs_in_stream'
|
||||||
]
|
]
|
||||||
|
|
||||||
comparison_operators = {
|
comparison_operators = {
|
||||||
|
@ -63,15 +68,55 @@ class StreamManager:
|
||||||
claim_info = await self.storage.get_content_claim(stream.stream_hash)
|
claim_info = await self.storage.get_content_claim(stream.stream_hash)
|
||||||
stream.set_claim(claim_info, smart_decode(claim_info['value']))
|
stream.set_claim(claim_info, smart_decode(claim_info['value']))
|
||||||
|
|
||||||
|
async def start_stream(self, stream: ManagedStream) -> bool:
|
||||||
|
"""
|
||||||
|
Resume or rebuild a partial or completed stream
|
||||||
|
"""
|
||||||
|
|
||||||
|
path = os.path.join(stream.download_directory, stream.file_name)
|
||||||
|
|
||||||
|
if not stream.running and not os.path.isfile(path):
|
||||||
|
if stream.downloader:
|
||||||
|
stream.downloader.stop()
|
||||||
|
stream.downloader = None
|
||||||
|
|
||||||
|
# the directory is gone, can happen when the folder that contains a published file is deleted
|
||||||
|
# reset the download directory to the default and update the file name
|
||||||
|
if not os.path.isdir(stream.download_directory):
|
||||||
|
stream.download_directory = self.config.download_dir
|
||||||
|
|
||||||
|
stream.downloader = self.make_downloader(
|
||||||
|
stream.sd_hash, stream.download_directory, stream.descriptor.suggested_file_name
|
||||||
|
)
|
||||||
|
if stream.status != ManagedStream.STATUS_FINISHED:
|
||||||
|
await self.storage.change_file_status(stream.stream_hash, 'running')
|
||||||
|
stream.update_status('running')
|
||||||
|
stream.start_download(self.node)
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(self.loop.create_task(stream.downloader.got_descriptor.wait()),
|
||||||
|
self.config.download_timeout)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
stream.stop_download()
|
||||||
|
stream.downloader = None
|
||||||
|
return False
|
||||||
|
file_name = os.path.basename(stream.downloader.output_path)
|
||||||
|
await self.storage.change_file_download_dir_and_file_name(
|
||||||
|
stream.stream_hash, self.config.download_dir, file_name
|
||||||
|
)
|
||||||
|
self.wait_for_stream_finished(stream)
|
||||||
|
return True
|
||||||
|
return True
|
||||||
|
|
||||||
|
def make_downloader(self, sd_hash: str, download_directory: str, file_name: str):
|
||||||
|
return StreamDownloader(
|
||||||
|
self.loop, self.config, self.blob_manager, sd_hash, download_directory, file_name
|
||||||
|
)
|
||||||
|
|
||||||
async def add_stream(self, sd_hash: str, file_name: str, download_directory: str, status: str, claim):
|
async def add_stream(self, sd_hash: str, file_name: str, download_directory: str, status: str, claim):
|
||||||
sd_blob = self.blob_manager.get_blob(sd_hash)
|
sd_blob = self.blob_manager.get_blob(sd_hash)
|
||||||
if sd_blob.get_is_verified():
|
if sd_blob.get_is_verified():
|
||||||
descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash)
|
descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash)
|
||||||
downloader = StreamDownloader(
|
downloader = self.make_downloader(descriptor.sd_hash, download_directory, file_name)
|
||||||
self.loop, self.config, self.blob_manager, descriptor.sd_hash,
|
|
||||||
download_directory,
|
|
||||||
file_name
|
|
||||||
)
|
|
||||||
stream = ManagedStream(
|
stream = ManagedStream(
|
||||||
self.loop, self.blob_manager, descriptor,
|
self.loop, self.blob_manager, descriptor,
|
||||||
download_directory,
|
download_directory,
|
||||||
|
@ -96,13 +141,10 @@ class StreamManager:
|
||||||
return
|
return
|
||||||
await self.node.joined.wait()
|
await self.node.joined.wait()
|
||||||
resumed = 0
|
resumed = 0
|
||||||
for stream in self.streams:
|
t = [self.start_stream(stream) for stream in self.streams if stream.status == ManagedStream.STATUS_RUNNING]
|
||||||
if stream.status == ManagedStream.STATUS_RUNNING:
|
|
||||||
resumed += 1
|
|
||||||
stream.downloader.download(self.node)
|
|
||||||
self.wait_for_stream_finished(stream)
|
|
||||||
if resumed:
|
if resumed:
|
||||||
log.info("resuming %i downloads", resumed)
|
log.info("resuming %i downloads", t)
|
||||||
|
await asyncio.gather(*t, loop=self.loop)
|
||||||
|
|
||||||
async def reflect_streams(self):
|
async def reflect_streams(self):
|
||||||
streams = list(self.streams)
|
streams = list(self.streams)
|
||||||
|
@ -185,18 +227,20 @@ class StreamManager:
|
||||||
downloader.stop()
|
downloader.stop()
|
||||||
log.info("stopped stream")
|
log.info("stopped stream")
|
||||||
return
|
return
|
||||||
|
file_name = os.path.basename(downloader.output_path)
|
||||||
|
download_directory = os.path.dirname(downloader.output_path)
|
||||||
if not await self.blob_manager.storage.stream_exists(downloader.sd_hash):
|
if not await self.blob_manager.storage.stream_exists(downloader.sd_hash):
|
||||||
await self.blob_manager.storage.store_stream(downloader.sd_blob, downloader.descriptor)
|
await self.blob_manager.storage.store_stream(downloader.sd_blob, downloader.descriptor)
|
||||||
if not await self.blob_manager.storage.file_exists(downloader.sd_hash):
|
if not await self.blob_manager.storage.file_exists(downloader.sd_hash):
|
||||||
await self.blob_manager.storage.save_downloaded_file(
|
await self.blob_manager.storage.save_downloaded_file(
|
||||||
downloader.descriptor.stream_hash, os.path.basename(downloader.output_path), download_directory,
|
downloader.descriptor.stream_hash, file_name, download_directory,
|
||||||
0.0
|
0.0
|
||||||
)
|
)
|
||||||
await self.blob_manager.storage.save_content_claim(
|
await self.blob_manager.storage.save_content_claim(
|
||||||
downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}"
|
downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}"
|
||||||
)
|
)
|
||||||
stream = ManagedStream(self.loop, self.blob_manager, downloader.descriptor, download_directory,
|
stream = ManagedStream(self.loop, self.blob_manager, downloader.descriptor, download_directory,
|
||||||
os.path.basename(downloader.output_path), downloader, ManagedStream.STATUS_RUNNING)
|
file_name, downloader, ManagedStream.STATUS_RUNNING)
|
||||||
stream.set_claim(claim_info, claim)
|
stream.set_claim(claim_info, claim)
|
||||||
self.streams.add(stream)
|
self.streams.add(stream)
|
||||||
try:
|
try:
|
||||||
|
@ -211,18 +255,18 @@ class StreamManager:
|
||||||
file_name: typing.Optional[str] = None,
|
file_name: typing.Optional[str] = None,
|
||||||
timeout: typing.Optional[float] = 60,
|
timeout: typing.Optional[float] = 60,
|
||||||
fee_amount: typing.Optional[float] = 0.0,
|
fee_amount: typing.Optional[float] = 0.0,
|
||||||
fee_address: typing.Optional[str] = None) -> typing.Optional[ManagedStream]:
|
fee_address: typing.Optional[str] = None,
|
||||||
|
should_pay: typing.Optional[bool] = True) -> typing.Optional[ManagedStream]:
|
||||||
log.info("get lbry://%s#%s", claim_info['name'], claim_info['claim_id'])
|
log.info("get lbry://%s#%s", claim_info['name'], claim_info['claim_id'])
|
||||||
claim = ClaimDict.load_dict(claim_info['value'])
|
claim = ClaimDict.load_dict(claim_info['value'])
|
||||||
if fee_address and fee_amount:
|
|
||||||
if fee_amount > await self.wallet.default_account.get_balance():
|
|
||||||
raise Exception("not enough funds")
|
|
||||||
sd_hash = claim.source_hash.decode()
|
sd_hash = claim.source_hash.decode()
|
||||||
if sd_hash in self.starting_streams:
|
if sd_hash in self.starting_streams:
|
||||||
return await self.starting_streams[sd_hash]
|
return await self.starting_streams[sd_hash]
|
||||||
already_started = tuple(filter(lambda s: s.descriptor.sd_hash == sd_hash, self.streams))
|
already_started = tuple(filter(lambda s: s.descriptor.sd_hash == sd_hash, self.streams))
|
||||||
if already_started:
|
if already_started:
|
||||||
return already_started[0]
|
return already_started[0]
|
||||||
|
if should_pay and fee_address and fee_amount and fee_amount > await self.wallet.default_account.get_balance():
|
||||||
|
raise Exception("not enough funds")
|
||||||
|
|
||||||
self.starting_streams[sd_hash] = asyncio.Future(loop=self.loop)
|
self.starting_streams[sd_hash] = asyncio.Future(loop=self.loop)
|
||||||
stream_task = self.loop.create_task(
|
stream_task = self.loop.create_task(
|
||||||
|
@ -232,7 +276,7 @@ class StreamManager:
|
||||||
await asyncio.wait_for(stream_task, timeout or self.config.download_timeout)
|
await asyncio.wait_for(stream_task, timeout or self.config.download_timeout)
|
||||||
stream = await stream_task
|
stream = await stream_task
|
||||||
self.starting_streams[sd_hash].set_result(stream)
|
self.starting_streams[sd_hash].set_result(stream)
|
||||||
if fee_address and fee_amount:
|
if should_pay and fee_address and fee_amount:
|
||||||
await self.wallet.send_amount_to_address(lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1'))
|
await self.wallet.send_amount_to_address(lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1'))
|
||||||
return stream
|
return stream
|
||||||
except (asyncio.TimeoutError, asyncio.CancelledError):
|
except (asyncio.TimeoutError, asyncio.CancelledError):
|
||||||
|
@ -282,3 +326,60 @@ class StreamManager:
|
||||||
if reverse:
|
if reverse:
|
||||||
streams.reverse()
|
streams.reverse()
|
||||||
return streams
|
return streams
|
||||||
|
|
||||||
|
async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
|
||||||
|
file_name: typing.Optional[str] = None,
|
||||||
|
timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]:
|
||||||
|
timeout = timeout or self.config.download_timeout
|
||||||
|
parsed_uri = parse_lbry_uri(uri)
|
||||||
|
if parsed_uri.is_channel:
|
||||||
|
raise Exception("cannot download a channel claim, specify a /path")
|
||||||
|
|
||||||
|
resolved = (await self.wallet.resolve(uri)).get(uri, {})
|
||||||
|
resolved = resolved if 'value' in resolved else resolved.get('claim')
|
||||||
|
|
||||||
|
if not resolved:
|
||||||
|
raise ResolveError(
|
||||||
|
"Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", ""))
|
||||||
|
)
|
||||||
|
if 'error' in resolved:
|
||||||
|
raise ResolveError(f"error resolving stream: {resolved['error']}")
|
||||||
|
|
||||||
|
claim = ClaimDict.load_dict(resolved['value'])
|
||||||
|
fee_amount, fee_address = None, None
|
||||||
|
if claim.has_fee:
|
||||||
|
fee_amount = round(exchange_rate_manager.convert_currency(
|
||||||
|
claim.source_fee.currency, "LBC", claim.source_fee.amount
|
||||||
|
), 5)
|
||||||
|
fee_address = claim.source_fee.address
|
||||||
|
outpoint = f"{resolved['txid']}:{resolved['nout']}"
|
||||||
|
existing = self.get_filtered_streams(outpoint=outpoint)
|
||||||
|
|
||||||
|
if not existing:
|
||||||
|
existing.extend(self.get_filtered_streams(sd_hash=claim.source_hash.decode()))
|
||||||
|
if existing and existing[0].claim_id != resolved['claim_id']:
|
||||||
|
raise Exception(f"stream for {existing[0].claim_id} collides with existing "
|
||||||
|
f"download {resolved['claim_id']}")
|
||||||
|
elif not existing:
|
||||||
|
existing.extend(self.get_filtered_streams(claim_id=resolved['claim_id']))
|
||||||
|
if existing and existing[0].sd_hash != claim.source_hash.decode():
|
||||||
|
log.info("claim contains an update to a stream we have, downloading it")
|
||||||
|
stream = await self.download_stream_from_claim(
|
||||||
|
self.node, resolved, file_name, timeout, fee_amount, fee_address, False
|
||||||
|
)
|
||||||
|
log.info("started new stream, deleting old one")
|
||||||
|
await self.delete_stream(existing[0])
|
||||||
|
return stream
|
||||||
|
elif existing:
|
||||||
|
log.info("already have matching stream for %s", uri)
|
||||||
|
stream = existing[0]
|
||||||
|
await self.start_stream(stream)
|
||||||
|
return stream
|
||||||
|
else:
|
||||||
|
stream = existing[0]
|
||||||
|
await self.start_stream(stream)
|
||||||
|
return stream
|
||||||
|
log.info("download stream from %s", uri)
|
||||||
|
return await self.download_stream_from_claim(
|
||||||
|
self.node, resolved, file_name, timeout, fee_amount, fee_address
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in a new issue