Merge pull request #2930 from lbryio/source_manager_

This commit is contained in:
Jack Robison 2020-05-07 16:10:01 -04:00 committed by GitHub
commit 37a5f77415
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 1510 additions and 637 deletions

View file

@ -41,7 +41,7 @@ test:datanetwork-integration:
stage: test stage: test
script: script:
- pip install tox-travis - pip install tox-travis
- tox -e datanetwork - tox -e datanetwork --recreate
test:blockchain-integration: test:blockchain-integration:
stage: test stage: test
@ -94,6 +94,7 @@ build:linux:
- apt-get update - apt-get update
- apt-get install -y --no-install-recommends python3.7-dev - apt-get install -y --no-install-recommends python3.7-dev
- python3.7 <(curl -q https://bootstrap.pypa.io/get-pip.py) # make sure we get pip with python3.7 - python3.7 <(curl -q https://bootstrap.pypa.io/get-pip.py) # make sure we get pip with python3.7
- pip install lbry-libtorrent
build:mac: build:mac:
extends: .build extends: .build

View file

@ -1,6 +1,7 @@
.PHONY: install tools lint test idea .PHONY: install tools lint test idea
install: install:
pip install https://s3.amazonaws.com/files.lbry.io/python_libtorrent-1.2.4-py3-none-any.whl
CFLAGS="-DSQLITE_MAX_VARIABLE_NUMBER=2500000" pip install -U https://github.com/rogerbinns/apsw/releases/download/3.30.1-r1/apsw-3.30.1-r1.zip \ CFLAGS="-DSQLITE_MAX_VARIABLE_NUMBER=2500000" pip install -U https://github.com/rogerbinns/apsw/releases/download/3.30.1-r1/apsw-3.30.1-r1.zip \
--global-option=fetch \ --global-option=fetch \
--global-option=--version --global-option=3.30.1 --global-option=--all \ --global-option=--version --global-option=3.30.1 --global-option=--all \

View file

@ -17,11 +17,17 @@ from lbry.dht.blob_announcer import BlobAnnouncer
from lbry.blob.blob_manager import BlobManager from lbry.blob.blob_manager import BlobManager
from lbry.blob_exchange.server import BlobServer from lbry.blob_exchange.server import BlobServer
from lbry.stream.stream_manager import StreamManager from lbry.stream.stream_manager import StreamManager
from lbry.file.file_manager import FileManager
from lbry.extras.daemon.component import Component from lbry.extras.daemon.component import Component
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
from lbry.extras.daemon.storage import SQLiteStorage from lbry.extras.daemon.storage import SQLiteStorage
from lbry.torrent.torrent_manager import TorrentManager
from lbry.wallet import WalletManager from lbry.wallet import WalletManager
from lbry.wallet.usage_payment import WalletServerPayer from lbry.wallet.usage_payment import WalletServerPayer
try:
from lbry.torrent.session import TorrentSession
except ImportError:
TorrentSession = None
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -33,10 +39,11 @@ WALLET_COMPONENT = "wallet"
WALLET_SERVER_PAYMENTS_COMPONENT = "wallet_server_payments" WALLET_SERVER_PAYMENTS_COMPONENT = "wallet_server_payments"
DHT_COMPONENT = "dht" DHT_COMPONENT = "dht"
HASH_ANNOUNCER_COMPONENT = "hash_announcer" HASH_ANNOUNCER_COMPONENT = "hash_announcer"
STREAM_MANAGER_COMPONENT = "stream_manager" FILE_MANAGER_COMPONENT = "file_manager"
PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server" PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
UPNP_COMPONENT = "upnp" UPNP_COMPONENT = "upnp"
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager" EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
LIBTORRENT_COMPONENT = "libtorrent_component"
class DatabaseComponent(Component): class DatabaseComponent(Component):
@ -319,23 +326,23 @@ class HashAnnouncerComponent(Component):
} }
class StreamManagerComponent(Component): class FileManagerComponent(Component):
component_name = STREAM_MANAGER_COMPONENT component_name = FILE_MANAGER_COMPONENT
depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT] depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT, LIBTORRENT_COMPONENT]
def __init__(self, component_manager): def __init__(self, component_manager):
super().__init__(component_manager) super().__init__(component_manager)
self.stream_manager: typing.Optional[StreamManager] = None self.file_manager: typing.Optional[FileManager] = None
@property @property
def component(self) -> typing.Optional[StreamManager]: def component(self) -> typing.Optional[FileManager]:
return self.stream_manager return self.file_manager
async def get_status(self): async def get_status(self):
if not self.stream_manager: if not self.file_manager:
return return
return { return {
'managed_files': len(self.stream_manager.streams), 'managed_files': len(self.file_manager.get_filtered()),
} }
async def start(self): async def start(self):
@ -344,16 +351,52 @@ class StreamManagerComponent(Component):
wallet = self.component_manager.get_component(WALLET_COMPONENT) wallet = self.component_manager.get_component(WALLET_COMPONENT)
node = self.component_manager.get_component(DHT_COMPONENT) \ node = self.component_manager.get_component(DHT_COMPONENT) \
if self.component_manager.has_component(DHT_COMPONENT) else None if self.component_manager.has_component(DHT_COMPONENT) else None
torrent = self.component_manager.get_component(LIBTORRENT_COMPONENT) if TorrentSession else None
log.info('Starting the file manager') log.info('Starting the file manager')
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
self.stream_manager = StreamManager( self.file_manager = FileManager(
loop, self.conf, blob_manager, wallet, storage, node, self.component_manager.analytics_manager loop, self.conf, wallet, storage, self.component_manager.analytics_manager
) )
await self.stream_manager.start() self.file_manager.source_managers['stream'] = StreamManager(
loop, self.conf, blob_manager, wallet, storage, node,
)
if TorrentSession:
self.file_manager.source_managers['torrent'] = TorrentManager(
loop, self.conf, torrent, storage, self.component_manager.analytics_manager
)
await self.file_manager.start()
log.info('Done setting up file manager') log.info('Done setting up file manager')
async def stop(self): async def stop(self):
self.stream_manager.stop() self.file_manager.stop()
class TorrentComponent(Component):
component_name = LIBTORRENT_COMPONENT
def __init__(self, component_manager):
super().__init__(component_manager)
self.torrent_session = None
@property
def component(self) -> typing.Optional[TorrentSession]:
return self.torrent_session
async def get_status(self):
if not self.torrent_session:
return
return {
'running': True, # TODO: what to return here?
}
async def start(self):
if TorrentSession:
self.torrent_session = TorrentSession(asyncio.get_event_loop(), None)
await self.torrent_session.bind() # TODO: specify host/port
async def stop(self):
if self.torrent_session:
await self.torrent_session.pause()
class PeerProtocolServerComponent(Component): class PeerProtocolServerComponent(Component):

View file

@ -40,7 +40,7 @@ from lbry.error import (
from lbry.extras import system_info from lbry.extras import system_info
from lbry.extras.daemon import analytics from lbry.extras.daemon import analytics
from lbry.extras.daemon.components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT from lbry.extras.daemon.components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT
from lbry.extras.daemon.components import STREAM_MANAGER_COMPONENT from lbry.extras.daemon.components import FILE_MANAGER_COMPONENT
from lbry.extras.daemon.components import EXCHANGE_RATE_MANAGER_COMPONENT, UPNP_COMPONENT from lbry.extras.daemon.components import EXCHANGE_RATE_MANAGER_COMPONENT, UPNP_COMPONENT
from lbry.extras.daemon.componentmanager import RequiredCondition from lbry.extras.daemon.componentmanager import RequiredCondition
from lbry.extras.daemon.componentmanager import ComponentManager from lbry.extras.daemon.componentmanager import ComponentManager
@ -57,8 +57,8 @@ if typing.TYPE_CHECKING:
from lbry.extras.daemon.components import UPnPComponent from lbry.extras.daemon.components import UPnPComponent
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
from lbry.extras.daemon.storage import SQLiteStorage from lbry.extras.daemon.storage import SQLiteStorage
from lbry.stream.stream_manager import StreamManager
from lbry.wallet import WalletManager, Ledger from lbry.wallet import WalletManager, Ledger
from lbry.file.file_manager import FileManager
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -372,8 +372,8 @@ class Daemon(metaclass=JSONRPCServerType):
return self.component_manager.get_component(DATABASE_COMPONENT) return self.component_manager.get_component(DATABASE_COMPONENT)
@property @property
def stream_manager(self) -> typing.Optional['StreamManager']: def file_manager(self) -> typing.Optional['FileManager']:
return self.component_manager.get_component(STREAM_MANAGER_COMPONENT) return self.component_manager.get_component(FILE_MANAGER_COMPONENT)
@property @property
def exchange_rate_manager(self) -> typing.Optional['ExchangeRateManager']: def exchange_rate_manager(self) -> typing.Optional['ExchangeRateManager']:
@ -609,8 +609,8 @@ class Daemon(metaclass=JSONRPCServerType):
else: else:
name, claim_id = name_and_claim_id.split("/") name, claim_id = name_and_claim_id.split("/")
uri = f"lbry://{name}#{claim_id}" uri = f"lbry://{name}#{claim_id}"
if not self.stream_manager.started.is_set(): if not self.file_manager.started.is_set():
await self.stream_manager.started.wait() await self.file_manager.started.wait()
stream = await self.jsonrpc_get(uri) stream = await self.jsonrpc_get(uri)
if isinstance(stream, dict): if isinstance(stream, dict):
raise web.HTTPServerError(text=stream['error']) raise web.HTTPServerError(text=stream['error'])
@ -634,11 +634,11 @@ class Daemon(metaclass=JSONRPCServerType):
async def _handle_stream_range_request(self, request: web.Request): async def _handle_stream_range_request(self, request: web.Request):
sd_hash = request.path.split("/stream/")[1] sd_hash = request.path.split("/stream/")[1]
if not self.stream_manager.started.is_set(): if not self.file_manager.started.is_set():
await self.stream_manager.started.wait() await self.file_manager.started.wait()
if sd_hash not in self.stream_manager.streams: if sd_hash not in self.file_manager.streams:
return web.HTTPNotFound() return web.HTTPNotFound()
return await self.stream_manager.stream_partial_content(request, sd_hash) return await self.file_manager.stream_partial_content(request, sd_hash)
async def _process_rpc_call(self, data): async def _process_rpc_call(self, data):
args = data.get('params', {}) args = data.get('params', {})
@ -858,7 +858,8 @@ class Daemon(metaclass=JSONRPCServerType):
'exchange_rate_manager': (bool), 'exchange_rate_manager': (bool),
'hash_announcer': (bool), 'hash_announcer': (bool),
'peer_protocol_server': (bool), 'peer_protocol_server': (bool),
'stream_manager': (bool), 'file_manager': (bool),
'libtorrent_component': (bool),
'upnp': (bool), 'upnp': (bool),
'wallet': (bool), 'wallet': (bool),
}, },
@ -885,6 +886,9 @@ class Daemon(metaclass=JSONRPCServerType):
} }
], ],
}, },
'libtorrent_component': {
'running': (bool) libtorrent was detected and started successfully,
},
'dht': { 'dht': {
'node_id': (str) lbry dht node id - hex encoded, 'node_id': (str) lbry dht node id - hex encoded,
'peers_in_routing_table': (int) the number of peers in the routing table, 'peers_in_routing_table': (int) the number of peers in the routing table,
@ -906,7 +910,7 @@ class Daemon(metaclass=JSONRPCServerType):
'hash_announcer': { 'hash_announcer': {
'announce_queue_size': (int) number of blobs currently queued to be announced 'announce_queue_size': (int) number of blobs currently queued to be announced
}, },
'stream_manager': { 'file_manager': {
'managed_files': (int) count of files in the stream manager, 'managed_files': (int) count of files in the stream manager,
}, },
'upnp': { 'upnp': {
@ -1077,7 +1081,7 @@ class Daemon(metaclass=JSONRPCServerType):
return results return results
@requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, @requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT,
STREAM_MANAGER_COMPONENT) FILE_MANAGER_COMPONENT)
async def jsonrpc_get( async def jsonrpc_get(
self, uri, file_name=None, download_directory=None, timeout=None, save_file=None, wallet_id=None): self, uri, file_name=None, download_directory=None, timeout=None, save_file=None, wallet_id=None):
""" """
@ -1103,7 +1107,7 @@ class Daemon(metaclass=JSONRPCServerType):
if download_directory and not os.path.isdir(download_directory): if download_directory and not os.path.isdir(download_directory):
return {"error": f"specified download directory \"{download_directory}\" does not exist"} return {"error": f"specified download directory \"{download_directory}\" does not exist"}
try: try:
stream = await self.stream_manager.download_stream_from_uri( stream = await self.file_manager.download_from_uri(
uri, self.exchange_rate_manager, timeout, file_name, download_directory, uri, self.exchange_rate_manager, timeout, file_name, download_directory,
save_file=save_file, wallet=wallet save_file=save_file, wallet=wallet
) )
@ -1949,7 +1953,7 @@ class Daemon(metaclass=JSONRPCServerType):
File management. File management.
""" """
@requires(STREAM_MANAGER_COMPONENT) @requires(FILE_MANAGER_COMPONENT)
async def jsonrpc_file_list(self, sort=None, reverse=False, comparison=None, wallet_id=None, page=None, async def jsonrpc_file_list(self, sort=None, reverse=False, comparison=None, wallet_id=None, page=None,
page_size=None, **kwargs): page_size=None, **kwargs):
""" """
@ -1994,7 +1998,7 @@ class Daemon(metaclass=JSONRPCServerType):
comparison = comparison or 'eq' comparison = comparison or 'eq'
paginated = paginate_list( paginated = paginate_list(
self.stream_manager.get_filtered_streams(sort, reverse, comparison, **kwargs), page, page_size self.file_manager.get_filtered(sort, reverse, comparison, **kwargs), page, page_size
) )
if paginated['items']: if paginated['items']:
receipts = { receipts = {
@ -2008,7 +2012,7 @@ class Daemon(metaclass=JSONRPCServerType):
stream.purchase_receipt = receipts.get(stream.claim_id) stream.purchase_receipt = receipts.get(stream.claim_id)
return paginated return paginated
@requires(STREAM_MANAGER_COMPONENT) @requires(FILE_MANAGER_COMPONENT)
async def jsonrpc_file_set_status(self, status, **kwargs): async def jsonrpc_file_set_status(self, status, **kwargs):
""" """
Start or stop downloading a file Start or stop downloading a file
@ -2032,12 +2036,12 @@ class Daemon(metaclass=JSONRPCServerType):
if status not in ['start', 'stop']: if status not in ['start', 'stop']:
raise Exception('Status must be "start" or "stop".') raise Exception('Status must be "start" or "stop".')
streams = self.stream_manager.get_filtered_streams(**kwargs) streams = self.file_manager.get_filtered(**kwargs)
if not streams: if not streams:
raise Exception(f'Unable to find a file for {kwargs}') raise Exception(f'Unable to find a file for {kwargs}')
stream = streams[0] stream = streams[0]
if status == 'start' and not stream.running: if status == 'start' and not stream.running:
await stream.save_file(node=self.stream_manager.node) await stream.save_file()
msg = "Resumed download" msg = "Resumed download"
elif status == 'stop' and stream.running: elif status == 'stop' and stream.running:
await stream.stop() await stream.stop()
@ -2049,7 +2053,7 @@ class Daemon(metaclass=JSONRPCServerType):
) )
return msg return msg
@requires(STREAM_MANAGER_COMPONENT) @requires(FILE_MANAGER_COMPONENT)
async def jsonrpc_file_delete(self, delete_from_download_dir=False, delete_all=False, **kwargs): async def jsonrpc_file_delete(self, delete_from_download_dir=False, delete_all=False, **kwargs):
""" """
Delete a LBRY file Delete a LBRY file
@ -2081,7 +2085,7 @@ class Daemon(metaclass=JSONRPCServerType):
(bool) true if deletion was successful (bool) true if deletion was successful
""" """
streams = self.stream_manager.get_filtered_streams(**kwargs) streams = self.file_manager.get_filtered(**kwargs)
if len(streams) > 1: if len(streams) > 1:
if not delete_all: if not delete_all:
@ -2098,12 +2102,12 @@ class Daemon(metaclass=JSONRPCServerType):
else: else:
for stream in streams: for stream in streams:
message = f"Deleted file {stream.file_name}" message = f"Deleted file {stream.file_name}"
await self.stream_manager.delete_stream(stream, delete_file=delete_from_download_dir) await self.file_manager.delete(stream, delete_file=delete_from_download_dir)
log.info(message) log.info(message)
result = True result = True
return result return result
@requires(STREAM_MANAGER_COMPONENT) @requires(FILE_MANAGER_COMPONENT)
async def jsonrpc_file_save(self, file_name=None, download_directory=None, **kwargs): async def jsonrpc_file_save(self, file_name=None, download_directory=None, **kwargs):
""" """
Start saving a file to disk. Start saving a file to disk.
@ -2130,7 +2134,7 @@ class Daemon(metaclass=JSONRPCServerType):
Returns: {File} Returns: {File}
""" """
streams = self.stream_manager.get_filtered_streams(**kwargs) streams = self.file_manager.get_filtered(**kwargs)
if len(streams) > 1: if len(streams) > 1:
log.warning("There are %i matching files, use narrower filters to select one", len(streams)) log.warning("There are %i matching files, use narrower filters to select one", len(streams))
@ -2905,7 +2909,7 @@ class Daemon(metaclass=JSONRPCServerType):
Create, update, abandon, list and inspect your stream claims. Create, update, abandon, list and inspect your stream claims.
""" """
@requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
async def jsonrpc_publish(self, name, **kwargs): async def jsonrpc_publish(self, name, **kwargs):
""" """
Create or replace a stream claim at a given name (use 'stream create/update' for more control). Create or replace a stream claim at a given name (use 'stream create/update' for more control).
@ -3027,7 +3031,7 @@ class Daemon(metaclass=JSONRPCServerType):
f"to update a specific stream claim." f"to update a specific stream claim."
) )
@requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
async def jsonrpc_stream_repost(self, name, bid, claim_id, allow_duplicate_name=False, channel_id=None, async def jsonrpc_stream_repost(self, name, bid, claim_id, allow_duplicate_name=False, channel_id=None,
channel_name=None, channel_account_id=None, account_id=None, wallet_id=None, channel_name=None, channel_account_id=None, account_id=None, wallet_id=None,
claim_address=None, funding_account_ids=None, preview=False, blocking=False): claim_address=None, funding_account_ids=None, preview=False, blocking=False):
@ -3099,7 +3103,7 @@ class Daemon(metaclass=JSONRPCServerType):
return tx return tx
@requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
async def jsonrpc_stream_create( async def jsonrpc_stream_create(
self, name, bid, file_path, allow_duplicate_name=False, self, name, bid, file_path, allow_duplicate_name=False,
channel_id=None, channel_name=None, channel_account_id=None, channel_id=None, channel_name=None, channel_account_id=None,
@ -3237,7 +3241,7 @@ class Daemon(metaclass=JSONRPCServerType):
file_stream = None file_stream = None
if not preview: if not preview:
file_stream = await self.stream_manager.create_stream(file_path) file_stream = await self.file_manager.create_stream(file_path)
claim.stream.source.sd_hash = file_stream.sd_hash claim.stream.source.sd_hash = file_stream.sd_hash
new_txo.script.generate() new_txo.script.generate()
@ -3257,7 +3261,7 @@ class Daemon(metaclass=JSONRPCServerType):
return tx return tx
@requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
async def jsonrpc_stream_update( async def jsonrpc_stream_update(
self, claim_id, bid=None, file_path=None, self, claim_id, bid=None, file_path=None,
channel_id=None, channel_name=None, channel_account_id=None, clear_channel=False, channel_id=None, channel_name=None, channel_account_id=None, clear_channel=False,
@ -3447,11 +3451,12 @@ class Daemon(metaclass=JSONRPCServerType):
stream_hash = None stream_hash = None
if not preview: if not preview:
old_stream = self.stream_manager.streams.get(old_txo.claim.stream.source.sd_hash, None) old_stream = self.file_manager.get_filtered(sd_hash=old_txo.claim.stream.source.sd_hash)
old_stream = old_stream[0] if old_stream else None
if file_path is not None: if file_path is not None:
if old_stream: if old_stream:
await self.stream_manager.delete_stream(old_stream, delete_file=False) await self.file_manager.delete(old_stream, delete_file=False)
file_stream = await self.stream_manager.create_stream(file_path) file_stream = await self.file_manager.create_stream(file_path)
new_txo.claim.stream.source.sd_hash = file_stream.sd_hash new_txo.claim.stream.source.sd_hash = file_stream.sd_hash
new_txo.script.generate() new_txo.script.generate()
stream_hash = file_stream.stream_hash stream_hash = file_stream.stream_hash
@ -4583,9 +4588,9 @@ class Daemon(metaclass=JSONRPCServerType):
""" """
if not blob_hash or not is_valid_blobhash(blob_hash): if not blob_hash or not is_valid_blobhash(blob_hash):
return f"Invalid blob hash to delete '{blob_hash}'" return f"Invalid blob hash to delete '{blob_hash}'"
streams = self.stream_manager.get_filtered_streams(sd_hash=blob_hash) streams = self.file_manager.get_filtered(sd_hash=blob_hash)
if streams: if streams:
await self.stream_manager.delete_stream(streams[0]) await self.file_manager.delete(streams[0])
else: else:
await self.blob_manager.delete_blobs([blob_hash]) await self.blob_manager.delete_blobs([blob_hash])
return "Deleted %s" % blob_hash return "Deleted %s" % blob_hash
@ -4758,7 +4763,7 @@ class Daemon(metaclass=JSONRPCServerType):
raise NotImplementedError() raise NotImplementedError()
@requires(STREAM_MANAGER_COMPONENT) @requires(FILE_MANAGER_COMPONENT)
async def jsonrpc_file_reflect(self, **kwargs): async def jsonrpc_file_reflect(self, **kwargs):
""" """
Reflect all the blobs in a file matching the filter criteria Reflect all the blobs in a file matching the filter criteria
@ -4787,8 +4792,8 @@ class Daemon(metaclass=JSONRPCServerType):
else: else:
server, port = random.choice(self.conf.reflector_servers) server, port = random.choice(self.conf.reflector_servers)
reflected = await asyncio.gather(*[ reflected = await asyncio.gather(*[
self.stream_manager.reflect_stream(stream, server, port) self.file_manager['stream'].reflect_stream(stream, server, port)
for stream in self.stream_manager.get_filtered_streams(**kwargs) for stream in self.file_manager.get_filtered_streams(**kwargs)
]) ])
total = [] total = []
for reflected_for_stream in reflected: for reflected_for_stream in reflected:
@ -5334,10 +5339,10 @@ class Daemon(metaclass=JSONRPCServerType):
results = await self.ledger.resolve(accounts, urls, **kwargs) results = await self.ledger.resolve(accounts, urls, **kwargs)
if self.conf.save_resolved_claims and results: if self.conf.save_resolved_claims and results:
try: try:
claims = self.stream_manager._convert_to_old_resolve_output(self.wallet_manager, results) await self.storage.save_claim_from_output(
await self.storage.save_claims_for_resolve([ self.ledger,
value for value in claims.values() if 'error' not in value *(result for result in results.values() if isinstance(result, Output))
]) )
except DecodeError: except DecodeError:
pass pass
return results return results

View file

@ -7,6 +7,7 @@ from json import JSONEncoder
from google.protobuf.message import DecodeError from google.protobuf.message import DecodeError
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.torrent.torrent_manager import TorrentSource
from lbry.wallet import Wallet, Ledger, Account, Transaction, Output from lbry.wallet import Wallet, Ledger, Account, Transaction, Output
from lbry.wallet.bip32 import PubKey from lbry.wallet.bip32 import PubKey
from lbry.wallet.dewies import dewies_to_lbc from lbry.wallet.dewies import dewies_to_lbc
@ -126,7 +127,7 @@ class JSONResponseEncoder(JSONEncoder):
return self.encode_account(obj) return self.encode_account(obj)
if isinstance(obj, Wallet): if isinstance(obj, Wallet):
return self.encode_wallet(obj) return self.encode_wallet(obj)
if isinstance(obj, ManagedStream): if isinstance(obj, (ManagedStream, TorrentSource)):
return self.encode_file(obj) return self.encode_file(obj)
if isinstance(obj, Transaction): if isinstance(obj, Transaction):
return self.encode_transaction(obj) return self.encode_transaction(obj)
@ -273,26 +274,32 @@ class JSONResponseEncoder(JSONEncoder):
output_exists = managed_stream.output_file_exists output_exists = managed_stream.output_file_exists
tx_height = managed_stream.stream_claim_info.height tx_height = managed_stream.stream_claim_info.height
best_height = self.ledger.headers.height best_height = self.ledger.headers.height
return { is_stream = hasattr(managed_stream, 'stream_hash')
'streaming_url': managed_stream.stream_url, if is_stream:
total_bytes_lower_bound = managed_stream.descriptor.lower_bound_decrypted_length()
total_bytes = managed_stream.descriptor.upper_bound_decrypted_length()
else:
total_bytes_lower_bound = total_bytes = managed_stream.torrent_length
result = {
'streaming_url': None,
'completed': managed_stream.completed, 'completed': managed_stream.completed,
'file_name': managed_stream.file_name if output_exists else None, 'file_name': None,
'download_directory': managed_stream.download_directory if output_exists else None, 'download_directory': None,
'download_path': managed_stream.full_path if output_exists else None, 'download_path': None,
'points_paid': 0.0, 'points_paid': 0.0,
'stopped': not managed_stream.running, 'stopped': not managed_stream.running,
'stream_hash': managed_stream.stream_hash, 'stream_hash': None,
'stream_name': managed_stream.descriptor.stream_name, 'stream_name': None,
'suggested_file_name': managed_stream.descriptor.suggested_file_name, 'suggested_file_name': None,
'sd_hash': managed_stream.descriptor.sd_hash, 'sd_hash': None,
'mime_type': managed_stream.mime_type, 'mime_type': None,
'key': managed_stream.descriptor.key, 'key': None,
'total_bytes_lower_bound': managed_stream.descriptor.lower_bound_decrypted_length(), 'total_bytes_lower_bound': total_bytes_lower_bound,
'total_bytes': managed_stream.descriptor.upper_bound_decrypted_length(), 'total_bytes': total_bytes,
'written_bytes': managed_stream.written_bytes, 'written_bytes': managed_stream.written_bytes,
'blobs_completed': managed_stream.blobs_completed, 'blobs_completed': None,
'blobs_in_stream': managed_stream.blobs_in_stream, 'blobs_in_stream': None,
'blobs_remaining': managed_stream.blobs_remaining, 'blobs_remaining': None,
'status': managed_stream.status, 'status': managed_stream.status,
'claim_id': managed_stream.claim_id, 'claim_id': managed_stream.claim_id,
'txid': managed_stream.txid, 'txid': managed_stream.txid,
@ -309,10 +316,37 @@ class JSONResponseEncoder(JSONEncoder):
'height': tx_height, 'height': tx_height,
'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height, 'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height,
'timestamp': self.ledger.headers.estimated_timestamp(tx_height), 'timestamp': self.ledger.headers.estimated_timestamp(tx_height),
'is_fully_reflected': managed_stream.is_fully_reflected, 'is_fully_reflected': False,
'reflector_progress': managed_stream.reflector_progress, 'reflector_progress': False,
'uploading_to_reflector': managed_stream.uploading_to_reflector 'uploading_to_reflector': False
} }
if is_stream:
result.update({
'streaming_url': managed_stream.stream_url,
'stream_hash': managed_stream.stream_hash,
'stream_name': managed_stream.descriptor.stream_name,
'suggested_file_name': managed_stream.descriptor.suggested_file_name,
'sd_hash': managed_stream.descriptor.sd_hash,
'mime_type': managed_stream.mime_type,
'key': managed_stream.descriptor.key,
'blobs_completed': managed_stream.blobs_completed,
'blobs_in_stream': managed_stream.blobs_in_stream,
'blobs_remaining': managed_stream.blobs_remaining,
'is_fully_reflected': managed_stream.is_fully_reflected,
'reflector_progress': managed_stream.reflector_progress,
'uploading_to_reflector': managed_stream.uploading_to_reflector
})
else:
result.update({
'streaming_url': f'file://{managed_stream.full_path}',
})
if output_exists:
result.update({
'file_name': managed_stream.file_name,
'download_directory': managed_stream.download_directory,
'download_path': managed_stream.full_path,
})
return result
def encode_claim(self, claim): def encode_claim(self, claim):
encoded = getattr(claim, claim.claim_type).to_dict() encoded = getattr(claim, claim.claim_type).to_dict()

View file

@ -9,7 +9,7 @@ from typing import Optional
from lbry.wallet import SQLiteMixin from lbry.wallet import SQLiteMixin
from lbry.conf import Config from lbry.conf import Config
from lbry.wallet.dewies import dewies_to_lbc, lbc_to_dewies from lbry.wallet.dewies import dewies_to_lbc, lbc_to_dewies
from lbry.wallet.transaction import Transaction from lbry.wallet.transaction import Transaction, Output
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.dht.constants import DATA_EXPIRATION from lbry.dht.constants import DATA_EXPIRATION
from lbry.blob.blob_info import BlobInfo from lbry.blob.blob_info import BlobInfo
@ -727,6 +727,19 @@ class SQLiteStorage(SQLiteMixin):
if claim_id_to_supports: if claim_id_to_supports:
await self.save_supports(claim_id_to_supports) await self.save_supports(claim_id_to_supports)
def save_claim_from_output(self, ledger, *outputs: Output):
return self.save_claims([{
"claim_id": output.claim_id,
"name": output.claim_name,
"amount": dewies_to_lbc(output.amount),
"address": output.get_address(ledger),
"txid": output.tx_ref.id,
"nout": output.position,
"value": output.claim,
"height": output.tx_ref.height,
"claim_sequence": -1,
} for output in outputs])
def save_claims_for_resolve(self, claim_infos): def save_claims_for_resolve(self, claim_infos):
to_save = {} to_save = {}
for info in claim_infos: for info in claim_infos:
@ -740,7 +753,8 @@ class SQLiteStorage(SQLiteMixin):
return self.save_claims(to_save.values()) return self.save_claims(to_save.values())
@staticmethod @staticmethod
def _save_content_claim(transaction, claim_outpoint, stream_hash): def _save_content_claim(transaction, claim_outpoint, stream_hash=None, bt_infohash=None):
assert stream_hash or bt_infohash
# get the claim id and serialized metadata # get the claim id and serialized metadata
claim_info = transaction.execute( claim_info = transaction.execute(
"select claim_id, serialized_metadata from claim where claim_outpoint=?", (claim_outpoint,) "select claim_id, serialized_metadata from claim where claim_outpoint=?", (claim_outpoint,)
@ -788,6 +802,19 @@ class SQLiteStorage(SQLiteMixin):
if stream_hash in self.content_claim_callbacks: if stream_hash in self.content_claim_callbacks:
await self.content_claim_callbacks[stream_hash]() await self.content_claim_callbacks[stream_hash]()
async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name):
def _save_torrent(transaction):
transaction.execute(
"insert or replace into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name)
).fetchall()
transaction.execute(
"insert or replace into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint)
).fetchall()
await self.db.run(_save_torrent)
# update corresponding ManagedEncryptedFileDownloader object
if bt_infohash in self.content_claim_callbacks:
await self.content_claim_callbacks[bt_infohash]()
async def get_content_claim(self, stream_hash: str, include_supports: typing.Optional[bool] = True) -> typing.Dict: async def get_content_claim(self, stream_hash: str, include_supports: typing.Optional[bool] = True) -> typing.Dict:
claims = await self.db.run(get_claims_from_stream_hashes, [stream_hash]) claims = await self.db.run(get_claims_from_stream_hashes, [stream_hash])
claim = None claim = None
@ -799,6 +826,10 @@ class SQLiteStorage(SQLiteMixin):
claim['effective_amount'] = calculate_effective_amount(claim['amount'], supports) claim['effective_amount'] = calculate_effective_amount(claim['amount'], supports)
return claim return claim
async def get_content_claim_for_torrent(self, bt_infohash):
claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash])
return claims[bt_infohash].as_dict() if claims else None
# # # # # # # # # reflector functions # # # # # # # # # # # # # # # # # # reflector functions # # # # # # # # #
def update_reflected_stream(self, sd_hash, reflector_address, success=True): def update_reflected_stream(self, sd_hash, reflector_address, success=True):

0
lbry/file/__init__.py Normal file
View file

286
lbry/file/file_manager.py Normal file
View file

@ -0,0 +1,286 @@
import asyncio
import logging
import typing
from typing import Optional
from aiohttp.web import Request
from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
from lbry.stream.managed_stream import ManagedStream
from lbry.torrent.torrent_manager import TorrentSource
from lbry.utils import cache_concurrent
from lbry.schema.url import URL
from lbry.wallet.dewies import dewies_to_lbc
from lbry.file.source_manager import SourceManager
from lbry.file.source import ManagedDownloadSource
if typing.TYPE_CHECKING:
from lbry.conf import Config
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage
from lbry.wallet import WalletManager, Output
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
log = logging.getLogger(__name__)
class FileManager:
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', wallet_manager: 'WalletManager',
storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None):
self.loop = loop
self.config = config
self.wallet_manager = wallet_manager
self.storage = storage
self.analytics_manager = analytics_manager
self.source_managers: typing.Dict[str, SourceManager] = {}
self.started = asyncio.Event()
@property
def streams(self):
return self.source_managers['stream']._sources
async def create_stream(self, file_path: str, key: Optional[bytes] = None, **kwargs) -> ManagedDownloadSource:
if 'stream' in self.source_managers:
return await self.source_managers['stream'].create(file_path, key, **kwargs)
raise NotImplementedError
async def start(self):
await asyncio.gather(*(source_manager.start() for source_manager in self.source_managers.values()))
for manager in self.source_managers.values():
await manager.started.wait()
self.started.set()
def stop(self):
for manager in self.source_managers.values():
# fixme: pop or not?
manager.stop()
self.started.clear()
@cache_concurrent
async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
timeout: Optional[float] = None, file_name: Optional[str] = None,
download_directory: Optional[str] = None,
save_file: Optional[bool] = None, resolve_timeout: float = 3.0,
wallet: Optional['Wallet'] = None) -> ManagedDownloadSource:
wallet = wallet or self.wallet_manager.default_wallet
timeout = timeout or self.config.download_timeout
start_time = self.loop.time()
resolved_time = None
stream = None
claim = None
error = None
outpoint = None
if save_file is None:
save_file = self.config.save_files
if file_name and not save_file:
save_file = True
if save_file:
download_directory = download_directory or self.config.download_dir
else:
download_directory = None
payment = None
try:
# resolve the claim
if not URL.parse(uri).has_stream:
raise ResolveError("cannot download a channel claim, specify a /path")
try:
resolved_result = await asyncio.wait_for(
self.wallet_manager.ledger.resolve(wallet.accounts, [uri], include_purchase_receipt=True),
resolve_timeout
)
except asyncio.TimeoutError:
raise ResolveTimeoutError(uri)
except Exception as err:
if isinstance(err, asyncio.CancelledError):
raise
log.exception("Unexpected error resolving stream:")
raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
if 'error' in resolved_result:
raise ResolveError(f"Unexpected error resolving uri for download: {resolved_result['error']}")
if not resolved_result or uri not in resolved_result:
raise ResolveError(f"Failed to resolve stream at '{uri}'")
txo = resolved_result[uri]
if isinstance(txo, dict):
raise ResolveError(f"Failed to resolve stream at '{uri}': {txo}")
claim = txo.claim
outpoint = f"{txo.tx_ref.id}:{txo.position}"
resolved_time = self.loop.time() - start_time
await self.storage.save_claim_from_output(self.wallet_manager.ledger, txo)
####################
# update or replace
####################
if claim.stream.source.bt_infohash:
source_manager = self.source_managers['torrent']
existing = source_manager.get_filtered(bt_infohash=claim.stream.source.bt_infohash)
else:
source_manager = self.source_managers['stream']
existing = source_manager.get_filtered(sd_hash=claim.stream.source.sd_hash)
# resume or update an existing stream, if the stream changed: download it and delete the old one after
to_replace, updated_stream = None, None
if existing and existing[0].claim_id != txo.claim_id:
raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {txo.claim_id}")
if existing:
log.info("claim contains a metadata only update to a stream we have")
if claim.stream.source.bt_infohash:
await self.storage.save_torrent_content_claim(
existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name
)
claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier)
existing[0].set_claim(claim_info, claim)
else:
await self.storage.save_content_claim(
existing[0].stream_hash, outpoint
)
await source_manager._update_content_claim(existing[0])
updated_stream = existing[0]
else:
existing_for_claim_id = self.get_filtered(claim_id=txo.claim_id)
if existing_for_claim_id:
log.info("claim contains an update to a stream we have, downloading it")
if save_file and existing_for_claim_id[0].output_file_exists:
save_file = False
await existing_for_claim_id[0].start(timeout=timeout, save_now=save_file)
if not existing_for_claim_id[0].output_file_exists and (
save_file or file_name or download_directory):
await existing_for_claim_id[0].save_file(
file_name=file_name, download_directory=download_directory
)
to_replace = existing_for_claim_id[0]
# resume or update an existing stream, if the stream changed: download it and delete the old one after
if updated_stream:
log.info("already have stream for %s", uri)
if save_file and updated_stream.output_file_exists:
save_file = False
await updated_stream.start(timeout=timeout, save_now=save_file)
if not updated_stream.output_file_exists and (save_file or file_name or download_directory):
await updated_stream.save_file(
file_name=file_name, download_directory=download_directory
)
return updated_stream
####################
# pay fee
####################
if not to_replace and txo.has_price and not txo.purchase_receipt:
payment = await self.wallet_manager.create_purchase_transaction(
wallet.accounts, txo, exchange_rate_manager
)
####################
# make downloader and wait for start
####################
if not claim.stream.source.bt_infohash:
# fixme: this shouldnt be here
stream = ManagedStream(
self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash,
download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
analytics_manager=self.analytics_manager
)
stream.downloader.node = source_manager.node
else:
stream = TorrentSource(
self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash,
file_name=file_name, download_directory=download_directory or self.config.download_dir,
status=ManagedStream.STATUS_RUNNING,
analytics_manager=self.analytics_manager,
torrent_session=source_manager.torrent_session
)
log.info("starting download for %s", uri)
before_download = self.loop.time()
await stream.start(timeout, save_file)
####################
# success case: delete to_replace if applicable, broadcast fee payment
####################
if to_replace: # delete old stream now that the replacement has started downloading
await source_manager.delete(to_replace)
if payment is not None:
await self.wallet_manager.broadcast_or_release(payment)
payment = None # to avoid releasing in `finally` later
log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri)
await self.storage.save_content_fee(stream.stream_hash, stream.content_fee)
source_manager.add(stream)
if not claim.stream.source.bt_infohash:
await self.storage.save_content_claim(stream.stream_hash, outpoint)
else:
await self.storage.save_torrent_content_claim(
stream.identifier, outpoint, stream.torrent_length, stream.torrent_name
)
claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
stream.set_claim(claim_info, claim)
if save_file:
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download),
loop=self.loop)
return stream
except asyncio.TimeoutError:
error = DownloadDataTimeoutError(stream.sd_hash)
raise error
except Exception as err: # forgive data timeout, don't delete stream
expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
KeyFeeAboveMaxAllowedError)
if isinstance(err, expected):
log.warning("Failed to download %s: %s", uri, str(err))
elif isinstance(err, asyncio.CancelledError):
pass
else:
log.exception("Unexpected error downloading stream:")
error = err
raise
finally:
if payment is not None:
# payment is set to None after broadcasting, if we're here an exception probably happened
await self.wallet_manager.ledger.release_tx(payment)
if self.analytics_manager and claim and claim.stream.source.bt_infohash:
# TODO: analytics for torrents
pass
elif self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
stream.downloader.time_to_first_bytes))):
server = self.wallet_manager.ledger.network.client.server
self.loop.create_task(
self.analytics_manager.send_time_to_first_bytes(
resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id,
uri, outpoint,
None if not stream else len(stream.downloader.blob_downloader.active_connections),
None if not stream else len(stream.downloader.blob_downloader.scores),
None if not stream else len(stream.downloader.blob_downloader.connection_failures),
False if not stream else stream.downloader.added_fixed_peers,
self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay,
None if not stream else stream.sd_hash,
None if not stream else stream.downloader.time_to_descriptor,
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
None if not stream else stream.downloader.time_to_first_bytes,
None if not error else error.__class__.__name__,
None if not error else str(error),
None if not server else f"{server[0]}:{server[1]}"
)
)
async def stream_partial_content(self, request: Request, sd_hash: str):
return await self.source_managers['stream'].stream_partial_content(request, sd_hash)
def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]:
"""
Get a list of filtered and sorted ManagedStream objects
:param sort_by: field to sort by
:param reverse: reverse sorting
:param comparison: comparison operator used for filtering
:param search_by: fields and values to filter by
"""
return sum((manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), [])
async def delete(self, source: ManagedDownloadSource, delete_file=False):
for manager in self.source_managers.values():
await manager.delete(source, delete_file)

161
lbry/file/source.py Normal file
View file

@ -0,0 +1,161 @@
import os
import asyncio
import typing
import logging
import binascii
from typing import Optional
from lbry.utils import generate_id
from lbry.extras.daemon.storage import StoredContentClaim
if typing.TYPE_CHECKING:
from lbry.conf import Config
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.wallet.transaction import Transaction
from lbry.extras.daemon.storage import SQLiteStorage
log = logging.getLogger(__name__)
class ManagedDownloadSource:
STATUS_RUNNING = "running"
STATUS_STOPPED = "stopped"
STATUS_FINISHED = "finished"
SAVING_ID = 1
STREAMING_ID = 2
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str,
file_name: Optional[str] = None, download_directory: Optional[str] = None,
status: Optional[str] = STATUS_STOPPED, claim: Optional[StoredContentClaim] = None,
download_id: Optional[str] = None, rowid: Optional[int] = None,
content_fee: Optional['Transaction'] = None,
analytics_manager: Optional['AnalyticsManager'] = None,
added_on: Optional[int] = None):
self.loop = loop
self.storage = storage
self.config = config
self.identifier = identifier
self.download_directory = download_directory
self._file_name = file_name
self._status = status
self.stream_claim_info = claim
self.download_id = download_id or binascii.hexlify(generate_id()).decode()
self.rowid = rowid
self.content_fee = content_fee
self.purchase_receipt = None
self._added_on = added_on
self.analytics_manager = analytics_manager
self.saving = asyncio.Event(loop=self.loop)
self.finished_writing = asyncio.Event(loop=self.loop)
self.started_writing = asyncio.Event(loop=self.loop)
self.finished_write_attempt = asyncio.Event(loop=self.loop)
# @classmethod
# async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', file_path: str,
# key: Optional[bytes] = None,
# iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedDownloadSource':
# raise NotImplementedError()
async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
raise NotImplementedError()
async def stop(self, finished: bool = False):
raise NotImplementedError()
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
raise NotImplementedError()
def stop_tasks(self):
raise NotImplementedError()
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
self.stream_claim_info = StoredContentClaim(
f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
claim_info['name'], claim_info['amount'], claim_info['height'],
binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'],
claim_info['claim_sequence'], claim_info.get('channel_name')
)
# async def update_content_claim(self, claim_info: Optional[typing.Dict] = None):
# if not claim_info:
# claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash)
# self.set_claim(claim_info, claim_info['value'])
@property
def file_name(self) -> Optional[str]:
return self._file_name
@property
def added_on(self) -> Optional[int]:
return self._added_on
@property
def status(self) -> str:
return self._status
@property
def completed(self):
raise NotImplementedError()
# @property
# def stream_url(self):
# return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}
@property
def finished(self) -> bool:
return self.status == self.STATUS_FINISHED
@property
def running(self) -> bool:
return self.status == self.STATUS_RUNNING
@property
def claim_id(self) -> Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.claim_id
@property
def txid(self) -> Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.txid
@property
def nout(self) -> Optional[int]:
return None if not self.stream_claim_info else self.stream_claim_info.nout
@property
def outpoint(self) -> Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.outpoint
@property
def claim_height(self) -> Optional[int]:
return None if not self.stream_claim_info else self.stream_claim_info.height
@property
def channel_claim_id(self) -> Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.channel_claim_id
@property
def channel_name(self) -> Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.channel_name
@property
def claim_name(self) -> Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.claim_name
@property
def metadata(self) -> Optional[typing.Dict]:
return None if not self.stream_claim_info else self.stream_claim_info.claim.stream.to_dict()
@property
def metadata_protobuf(self) -> bytes:
if self.stream_claim_info:
return binascii.hexlify(self.stream_claim_info.claim.to_bytes())
@property
def full_path(self) -> Optional[str]:
return os.path.join(self.download_directory, os.path.basename(self.file_name)) \
if self.file_name and self.download_directory else None
@property
def output_file_exists(self):
return os.path.isfile(self.full_path) if self.full_path else False

134
lbry/file/source_manager.py Normal file
View file

@ -0,0 +1,134 @@
import os
import asyncio
import logging
import typing
from typing import Optional
from lbry.file.source import ManagedDownloadSource
if typing.TYPE_CHECKING:
from lbry.conf import Config
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage
log = logging.getLogger(__name__)
COMPARISON_OPERATORS = {
'eq': lambda a, b: a == b,
'ne': lambda a, b: a != b,
'g': lambda a, b: a > b,
'l': lambda a, b: a < b,
'ge': lambda a, b: a >= b,
'le': lambda a, b: a <= b,
}
class SourceManager:
filter_fields = {
'rowid',
'status',
'file_name',
'added_on',
'claim_name',
'claim_height',
'claim_id',
'outpoint',
'txid',
'nout',
'channel_claim_id',
'channel_name'
}
source_class = ManagedDownloadSource
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage',
analytics_manager: Optional['AnalyticsManager'] = None):
self.loop = loop
self.config = config
self.storage = storage
self.analytics_manager = analytics_manager
self._sources: typing.Dict[str, ManagedDownloadSource] = {}
self.started = asyncio.Event(loop=self.loop)
def add(self, source: ManagedDownloadSource):
self._sources[source.identifier] = source
def remove(self, source: ManagedDownloadSource):
if source.identifier not in self._sources:
return
self._sources.pop(source.identifier)
source.stop_tasks()
async def initialize_from_database(self):
raise NotImplementedError()
async def start(self):
await self.initialize_from_database()
self.started.set()
def stop(self):
while self._sources:
_, source = self._sources.popitem()
source.stop_tasks()
self.started.clear()
async def create(self, file_path: str, key: Optional[bytes] = None,
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedDownloadSource:
raise NotImplementedError()
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
self.remove(source)
if delete_file and source.output_file_exists:
os.remove(source.full_path)
def get_filtered(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False,
comparison: Optional[str] = None, **search_by) -> typing.List[ManagedDownloadSource]:
"""
Get a list of filtered and sorted ManagedStream objects
:param sort_by: field to sort by
:param reverse: reverse sorting
:param comparison: comparison operator used for filtering
:param search_by: fields and values to filter by
"""
if sort_by and sort_by not in self.filter_fields:
raise ValueError(f"'{sort_by}' is not a valid field to sort by")
if comparison and comparison not in COMPARISON_OPERATORS:
raise ValueError(f"'{comparison}' is not a valid comparison")
if 'full_status' in search_by:
del search_by['full_status']
for search in search_by:
if search not in self.filter_fields:
raise ValueError(f"'{search}' is not a valid search operation")
compare_sets = {}
if isinstance(search_by.get('claim_id'), list):
compare_sets['claim_ids'] = search_by.pop('claim_id')
if isinstance(search_by.get('outpoint'), list):
compare_sets['outpoints'] = search_by.pop('outpoint')
if isinstance(search_by.get('channel_claim_id'), list):
compare_sets['channel_claim_ids'] = search_by.pop('channel_claim_id')
if search_by:
comparison = comparison or 'eq'
streams = []
for stream in self._sources.values():
matched = False
for set_search, val in compare_sets.items():
if COMPARISON_OPERATORS[comparison](getattr(stream, self.filter_fields[set_search]), val):
streams.append(stream)
matched = True
break
if matched:
continue
for search, val in search_by.items():
this_stream = getattr(stream, search)
if COMPARISON_OPERATORS[comparison](this_stream, val):
streams.append(stream)
break
else:
streams = list(self._sources.values())
if sort_by:
streams.sort(key=lambda s: getattr(s, sort_by))
if reverse:
streams.reverse()
return streams

View file

@ -92,8 +92,8 @@ class StreamDownloader:
async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0): async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0):
# set up peer accumulation # set up peer accumulation
if node: self.node = node or self.node # fixme: this shouldnt be set here!
self.node = node if self.node:
if self.accumulate_task and not self.accumulate_task.done(): if self.accumulate_task and not self.accumulate_task.done():
self.accumulate_task.cancel() self.accumulate_task.cancel()
_, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue) _, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue)

View file

@ -3,9 +3,8 @@ import asyncio
import time import time
import typing import typing
import logging import logging
import binascii from typing import Optional
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
from lbry.utils import generate_id
from lbry.error import DownloadSDTimeoutError from lbry.error import DownloadSDTimeoutError
from lbry.schema.mime_types import guess_media_type from lbry.schema.mime_types import guess_media_type
from lbry.stream.downloader import StreamDownloader from lbry.stream.downloader import StreamDownloader
@ -13,6 +12,7 @@ from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name
from lbry.stream.reflector.client import StreamReflectorClient from lbry.stream.reflector.client import StreamReflectorClient
from lbry.extras.daemon.storage import StoredContentClaim from lbry.extras.daemon.storage import StoredContentClaim
from lbry.blob import MAX_BLOB_SIZE from lbry.blob import MAX_BLOB_SIZE
from lbry.file.source import ManagedDownloadSource
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.conf import Config from lbry.conf import Config
@ -40,65 +40,20 @@ async def get_next_available_file_name(loop: asyncio.AbstractEventLoop, download
return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name) return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name)
class ManagedStream: class ManagedStream(ManagedDownloadSource):
STATUS_RUNNING = "running"
STATUS_STOPPED = "stopped"
STATUS_FINISHED = "finished"
SAVING_ID = 1
STREAMING_ID = 2
__slots__ = [
'loop',
'config',
'blob_manager',
'sd_hash',
'download_directory',
'_file_name',
'_added_on',
'_status',
'stream_claim_info',
'download_id',
'rowid',
'content_fee',
'purchase_receipt',
'downloader',
'analytics_manager',
'fully_reflected',
'reflector_progress',
'file_output_task',
'delayed_stop_task',
'streaming_responses',
'streaming',
'_running',
'saving',
'finished_writing',
'started_writing',
'finished_write_attempt',
'uploading_to_reflector'
]
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
sd_hash: str, download_directory: typing.Optional[str] = None, file_name: typing.Optional[str] = None, sd_hash: str, download_directory: Optional[str] = None, file_name: Optional[str] = None,
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredContentClaim] = None, status: Optional[str] = ManagedDownloadSource.STATUS_STOPPED,
download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None, claim: Optional[StoredContentClaim] = None,
descriptor: typing.Optional[StreamDescriptor] = None, download_id: Optional[str] = None, rowid: Optional[int] = None,
content_fee: typing.Optional['Transaction'] = None, descriptor: Optional[StreamDescriptor] = None,
analytics_manager: typing.Optional['AnalyticsManager'] = None, content_fee: Optional['Transaction'] = None,
added_on: typing.Optional[int] = None): analytics_manager: Optional['AnalyticsManager'] = None,
self.loop = loop added_on: Optional[int] = None):
self.config = config super().__init__(loop, config, blob_manager.storage, sd_hash, file_name, download_directory, status, claim,
download_id, rowid, content_fee, analytics_manager, added_on)
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.sd_hash = sd_hash
self.download_directory = download_directory
self._file_name = file_name
self._status = status
self.stream_claim_info = claim
self.download_id = download_id or binascii.hexlify(generate_id()).decode()
self.rowid = rowid
self.content_fee = content_fee
self.purchase_receipt = None self.purchase_receipt = None
self._added_on = added_on
self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor) self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor)
self.analytics_manager = analytics_manager self.analytics_manager = analytics_manager
@ -108,12 +63,13 @@ class ManagedStream:
self.file_output_task: typing.Optional[asyncio.Task] = None self.file_output_task: typing.Optional[asyncio.Task] = None
self.delayed_stop_task: typing.Optional[asyncio.Task] = None self.delayed_stop_task: typing.Optional[asyncio.Task] = None
self.streaming_responses: typing.List[typing.Tuple[Request, StreamResponse]] = [] self.streaming_responses: typing.List[typing.Tuple[Request, StreamResponse]] = []
self.fully_reflected = asyncio.Event(loop=self.loop)
self.streaming = asyncio.Event(loop=self.loop) self.streaming = asyncio.Event(loop=self.loop)
self._running = asyncio.Event(loop=self.loop) self._running = asyncio.Event(loop=self.loop)
self.saving = asyncio.Event(loop=self.loop)
self.finished_writing = asyncio.Event(loop=self.loop) @property
self.started_writing = asyncio.Event(loop=self.loop) def sd_hash(self) -> str:
self.finished_write_attempt = asyncio.Event(loop=self.loop) return self.identifier
@property @property
def is_fully_reflected(self) -> bool: def is_fully_reflected(self) -> bool:
@ -128,17 +84,9 @@ class ManagedStream:
return self.descriptor.stream_hash return self.descriptor.stream_hash
@property @property
def file_name(self) -> typing.Optional[str]: def file_name(self) -> Optional[str]:
return self._file_name or (self.descriptor.suggested_file_name if self.descriptor else None) return self._file_name or (self.descriptor.suggested_file_name if self.descriptor else None)
@property
def added_on(self) -> typing.Optional[int]:
return self._added_on
@property
def status(self) -> str:
return self._status
@property @property
def written_bytes(self) -> int: def written_bytes(self) -> int:
return 0 if not self.output_file_exists else os.stat(self.full_path).st_size return 0 if not self.output_file_exists else os.stat(self.full_path).st_size
@ -156,55 +104,6 @@ class ManagedStream:
self._status = status self._status = status
await self.blob_manager.storage.change_file_status(self.stream_hash, status) await self.blob_manager.storage.change_file_status(self.stream_hash, status)
@property
def finished(self) -> bool:
return self.status == self.STATUS_FINISHED
@property
def running(self) -> bool:
return self.status == self.STATUS_RUNNING
@property
def claim_id(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.claim_id
@property
def txid(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.txid
@property
def nout(self) -> typing.Optional[int]:
return None if not self.stream_claim_info else self.stream_claim_info.nout
@property
def outpoint(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.outpoint
@property
def claim_height(self) -> typing.Optional[int]:
return None if not self.stream_claim_info else self.stream_claim_info.height
@property
def channel_claim_id(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.channel_claim_id
@property
def channel_name(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.channel_name
@property
def claim_name(self) -> typing.Optional[str]:
return None if not self.stream_claim_info else self.stream_claim_info.claim_name
@property
def metadata(self) -> typing.Optional[typing.Dict]:
return None if not self.stream_claim_info else self.stream_claim_info.claim.stream.to_dict()
@property
def metadata_protobuf(self) -> bytes:
if self.stream_claim_info:
return binascii.hexlify(self.stream_claim_info.claim.to_bytes())
@property @property
def blobs_completed(self) -> int: def blobs_completed(self) -> int:
return sum([1 if b.blob_hash in self.blob_manager.completed_blob_hashes else 0 return sum([1 if b.blob_hash in self.blob_manager.completed_blob_hashes else 0
@ -218,39 +117,30 @@ class ManagedStream:
def blobs_remaining(self) -> int: def blobs_remaining(self) -> int:
return self.blobs_in_stream - self.blobs_completed return self.blobs_in_stream - self.blobs_completed
@property
def full_path(self) -> typing.Optional[str]:
return os.path.join(self.download_directory, os.path.basename(self.file_name)) \
if self.file_name and self.download_directory else None
@property
def output_file_exists(self):
return os.path.isfile(self.full_path) if self.full_path else False
@property @property
def mime_type(self): def mime_type(self):
return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0] return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0]
@classmethod # @classmethod
async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', # async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config',
file_path: str, key: typing.Optional[bytes] = None, # file_path: str, key: Optional[bytes] = None,
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedStream': # iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedDownloadSource':
""" # """
Generate a stream from a file and save it to the db # Generate a stream from a file and save it to the db
""" # """
descriptor = await StreamDescriptor.create_stream( # descriptor = await StreamDescriptor.create_stream(
loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator, # loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator,
blob_completed_callback=blob_manager.blob_completed # blob_completed_callback=blob_manager.blob_completed
) # )
await blob_manager.storage.store_stream( # await blob_manager.storage.store_stream(
blob_manager.get_blob(descriptor.sd_hash), descriptor # blob_manager.get_blob(descriptor.sd_hash), descriptor
) # )
row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path), # row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path),
os.path.dirname(file_path), 0) # os.path.dirname(file_path), 0)
return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path), # return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path),
os.path.basename(file_path), status=cls.STATUS_FINISHED, rowid=row_id, descriptor=descriptor) # os.path.basename(file_path), status=cls.STATUS_FINISHED, rowid=row_id, descriptor=descriptor)
async def start(self, node: typing.Optional['Node'] = None, timeout: typing.Optional[float] = None, async def start(self, timeout: Optional[float] = None,
save_now: bool = False): save_now: bool = False):
timeout = timeout or self.config.download_timeout timeout = timeout or self.config.download_timeout
if self._running.is_set(): if self._running.is_set():
@ -258,7 +148,7 @@ class ManagedStream:
log.info("start downloader for stream (sd hash: %s)", self.sd_hash) log.info("start downloader for stream (sd hash: %s)", self.sd_hash)
self._running.set() self._running.set()
try: try:
await asyncio.wait_for(self.downloader.start(node), timeout, loop=self.loop) await asyncio.wait_for(self.downloader.start(), timeout, loop=self.loop)
except asyncio.TimeoutError: except asyncio.TimeoutError:
self._running.clear() self._running.clear()
raise DownloadSDTimeoutError(self.sd_hash) raise DownloadSDTimeoutError(self.sd_hash)
@ -268,6 +158,11 @@ class ManagedStream:
self.delayed_stop_task = self.loop.create_task(self._delayed_stop()) self.delayed_stop_task = self.loop.create_task(self._delayed_stop())
if not await self.blob_manager.storage.file_exists(self.sd_hash): if not await self.blob_manager.storage.file_exists(self.sd_hash):
if save_now: if save_now:
if not self._file_name:
self._file_name = await get_next_available_file_name(
self.loop, self.download_directory,
self._file_name or sanitize_file_name(self.descriptor.suggested_file_name)
)
file_name, download_dir = self._file_name, self.download_directory file_name, download_dir = self._file_name, self.download_directory
else: else:
file_name, download_dir = None, None file_name, download_dir = None, None
@ -287,7 +182,7 @@ class ManagedStream:
if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING: if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING:
await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED) await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED)
async def _aiter_read_stream(self, start_blob_num: typing.Optional[int] = 0, connection_id: int = 0)\ async def _aiter_read_stream(self, start_blob_num: Optional[int] = 0, connection_id: int = 0)\
-> typing.AsyncIterator[typing.Tuple['BlobInfo', bytes]]: -> typing.AsyncIterator[typing.Tuple['BlobInfo', bytes]]:
if start_blob_num >= len(self.descriptor.blobs[:-1]): if start_blob_num >= len(self.descriptor.blobs[:-1]):
raise IndexError(start_blob_num) raise IndexError(start_blob_num)
@ -299,7 +194,7 @@ class ManagedStream:
decrypted = await self.downloader.read_blob(blob_info, connection_id) decrypted = await self.downloader.read_blob(blob_info, connection_id)
yield (blob_info, decrypted) yield (blob_info, decrypted)
async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse: async def stream_file(self, request: Request, node: Optional['Node'] = None) -> StreamResponse:
log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id, log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id,
self.sd_hash[:6]) self.sd_hash[:6])
headers, size, skip_blobs, first_blob_start_offset = self._prepare_range_response_headers( headers, size, skip_blobs, first_blob_start_offset = self._prepare_range_response_headers(
@ -391,9 +286,8 @@ class ManagedStream:
self.saving.clear() self.saving.clear()
self.finished_write_attempt.set() self.finished_write_attempt.set()
async def save_file(self, file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None, async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
node: typing.Optional['Node'] = None): await self.start()
await self.start(node)
if self.file_output_task and not self.file_output_task.done(): # cancel an already running save task if self.file_output_task and not self.file_output_task.done(): # cancel an already running save task
self.file_output_task.cancel() self.file_output_task.cancel()
self.download_directory = download_directory or self.download_directory or self.config.download_dir self.download_directory = download_directory or self.download_directory or self.config.download_dir
@ -468,15 +362,7 @@ class ManagedStream:
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}") await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
return sent return sent
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'): async def update_content_claim(self, claim_info: Optional[typing.Dict] = None):
self.stream_claim_info = StoredContentClaim(
f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
claim_info['name'], claim_info['amount'], claim_info['height'],
binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'],
claim_info['claim_sequence'], claim_info.get('channel_name')
)
async def update_content_claim(self, claim_info: typing.Optional[typing.Dict] = None):
if not claim_info: if not claim_info:
claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash) claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash)
self.set_claim(claim_info, claim_info['value']) self.set_claim(claim_info, claim_info['value'])

View file

@ -6,93 +6,65 @@ import random
import typing import typing
from typing import Optional from typing import Optional
from aiohttp.web import Request from aiohttp.web import Request
from lbry.error import ResolveError, InvalidStreamDescriptorError, DownloadSDTimeoutError, InsufficientFundsError from lbry.error import InvalidStreamDescriptorError
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError from lbry.file.source_manager import SourceManager
from lbry.utils import cache_concurrent
from lbry.stream.descriptor import StreamDescriptor from lbry.stream.descriptor import StreamDescriptor
from lbry.stream.managed_stream import ManagedStream from lbry.stream.managed_stream import ManagedStream
from lbry.schema.claim import Claim from lbry.file.source import ManagedDownloadSource
from lbry.schema.url import URL
from lbry.wallet.dewies import dewies_to_lbc
from lbry.wallet import Output
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.conf import Config from lbry.conf import Config
from lbry.blob.blob_manager import BlobManager from lbry.blob.blob_manager import BlobManager
from lbry.dht.node import Node from lbry.dht.node import Node
from lbry.wallet.wallet import WalletManager
from lbry.wallet.transaction import Transaction
from lbry.extras.daemon.analytics import AnalyticsManager from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
from lbry.wallet.transaction import Transaction
from lbry.wallet.manager import WalletManager
from lbry.wallet.wallet import Wallet
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
FILTER_FIELDS = [
'rowid',
'status',
'file_name',
'added_on',
'sd_hash',
'stream_hash',
'claim_name',
'claim_height',
'claim_id',
'outpoint',
'txid',
'nout',
'channel_claim_id',
'channel_name',
'full_status', # TODO: remove
'blobs_remaining',
'blobs_in_stream'
]
SET_FILTER_FIELDS = { def path_or_none(encoded_path) -> Optional[str]:
"claim_ids": "claim_id", if not encoded_path:
"channel_claim_ids": "channel_claim_id",
"outpoints": "outpoint"
}
COMPARISON_OPERATORS = {
'eq': lambda a, b: a == b,
'ne': lambda a, b: a != b,
'g': lambda a, b: a > b,
'l': lambda a, b: a < b,
'ge': lambda a, b: a >= b,
'le': lambda a, b: a <= b,
'in': lambda a, b: a in b
}
def path_or_none(path) -> Optional[str]:
if not path:
return return
return binascii.unhexlify(path).decode() return binascii.unhexlify(encoded_path).decode()
class StreamManager: class StreamManager(SourceManager):
_sources: typing.Dict[str, ManagedStream]
filter_fields = SourceManager.filter_fields
filter_fields.update({
'sd_hash',
'stream_hash',
'full_status', # TODO: remove
'blobs_remaining',
'blobs_in_stream'
})
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
wallet_manager: 'WalletManager', storage: 'SQLiteStorage', node: Optional['Node'], wallet_manager: 'WalletManager', storage: 'SQLiteStorage', node: Optional['Node'],
analytics_manager: Optional['AnalyticsManager'] = None): analytics_manager: Optional['AnalyticsManager'] = None):
self.loop = loop super().__init__(loop, config, storage, analytics_manager)
self.config = config
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.wallet_manager = wallet_manager self.wallet_manager = wallet_manager
self.storage = storage
self.node = node self.node = node
self.analytics_manager = analytics_manager
self.streams: typing.Dict[str, ManagedStream] = {}
self.resume_saving_task: Optional[asyncio.Task] = None self.resume_saving_task: Optional[asyncio.Task] = None
self.re_reflect_task: Optional[asyncio.Task] = None self.re_reflect_task: Optional[asyncio.Task] = None
self.update_stream_finished_futs: typing.List[asyncio.Future] = [] self.update_stream_finished_futs: typing.List[asyncio.Future] = []
self.running_reflector_uploads: typing.Dict[str, asyncio.Task] = {} self.running_reflector_uploads: typing.Dict[str, asyncio.Task] = {}
self.started = asyncio.Event(loop=self.loop) self.started = asyncio.Event(loop=self.loop)
@property
def streams(self):
return self._sources
def add(self, source: ManagedStream):
super().add(source)
self.storage.content_claim_callbacks[source.stream_hash] = lambda: self._update_content_claim(source)
async def _update_content_claim(self, stream: ManagedStream): async def _update_content_claim(self, stream: ManagedStream):
claim_info = await self.storage.get_content_claim(stream.stream_hash) claim_info = await self.storage.get_content_claim(stream.stream_hash)
self.streams.setdefault(stream.sd_hash, stream).set_claim(claim_info, claim_info['value']) self._sources.setdefault(stream.sd_hash, stream).set_claim(claim_info, claim_info['value'])
async def recover_streams(self, file_infos: typing.List[typing.Dict]): async def recover_streams(self, file_infos: typing.List[typing.Dict]):
to_restore = [] to_restore = []
@ -123,10 +95,10 @@ class StreamManager:
# if self.blob_manager._save_blobs: # if self.blob_manager._save_blobs:
# log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos)) # log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos))
async def add_stream(self, rowid: int, sd_hash: str, file_name: Optional[str], async def _load_stream(self, rowid: int, sd_hash: str, file_name: Optional[str],
download_directory: Optional[str], status: str, download_directory: Optional[str], status: str,
claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
added_on: Optional[int], fully_reflected: bool): added_on: Optional[int], fully_reflected: Optional[bool]):
try: try:
descriptor = await self.blob_manager.get_stream_descriptor(sd_hash) descriptor = await self.blob_manager.get_stream_descriptor(sd_hash)
except InvalidStreamDescriptorError as err: except InvalidStreamDescriptorError as err:
@ -139,10 +111,9 @@ class StreamManager:
) )
if fully_reflected: if fully_reflected:
stream.fully_reflected.set() stream.fully_reflected.set()
self.streams[sd_hash] = stream self.add(stream)
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
async def load_and_resume_streams_from_database(self): async def initialize_from_database(self):
to_recover = [] to_recover = []
to_start = [] to_start = []
@ -156,7 +127,6 @@ class StreamManager:
to_recover.append(file_info) to_recover.append(file_info)
to_start.append(file_info) to_start.append(file_info)
if to_recover: if to_recover:
log.info("Recover %i files", len(to_recover))
await self.recover_streams(to_recover) await self.recover_streams(to_recover)
log.info("Initializing %i files", len(to_start)) log.info("Initializing %i files", len(to_start))
@ -167,7 +137,7 @@ class StreamManager:
download_directory = path_or_none(file_info['download_directory']) download_directory = path_or_none(file_info['download_directory'])
if file_name and download_directory and not file_info['saved_file'] and file_info['status'] == 'running': if file_name and download_directory and not file_info['saved_file'] and file_info['status'] == 'running':
to_resume_saving.append((file_name, download_directory, file_info['sd_hash'])) to_resume_saving.append((file_name, download_directory, file_info['sd_hash']))
add_stream_tasks.append(self.loop.create_task(self.add_stream( add_stream_tasks.append(self.loop.create_task(self._load_stream(
file_info['rowid'], file_info['sd_hash'], file_name, file_info['rowid'], file_info['sd_hash'], file_name,
download_directory, file_info['status'], download_directory, file_info['status'],
file_info['claim'], file_info['content_fee'], file_info['claim'], file_info['content_fee'],
@ -175,25 +145,22 @@ class StreamManager:
))) )))
if add_stream_tasks: if add_stream_tasks:
await asyncio.gather(*add_stream_tasks, loop=self.loop) await asyncio.gather(*add_stream_tasks, loop=self.loop)
log.info("Started stream manager with %i files", len(self.streams)) log.info("Started stream manager with %i files", len(self._sources))
if not self.node: if not self.node:
log.info("no DHT node given, resuming downloads trusting that we can contact reflector") log.info("no DHT node given, resuming downloads trusting that we can contact reflector")
if to_resume_saving: if to_resume_saving:
self.resume_saving_task = self.loop.create_task(self.resume(to_resume_saving)) log.info("Resuming saving %i files", len(to_resume_saving))
self.resume_saving_task = asyncio.ensure_future(asyncio.gather(
async def resume(self, to_resume_saving): *(self._sources[sd_hash].save_file(file_name, download_directory)
log.info("Resuming saving %i files", len(to_resume_saving)) for (file_name, download_directory, sd_hash) in to_resume_saving),
await asyncio.gather( loop=self.loop
*(self.streams[sd_hash].save_file(file_name, download_directory, node=self.node) ))
for (file_name, download_directory, sd_hash) in to_resume_saving),
loop=self.loop
)
async def reflect_streams(self): async def reflect_streams(self):
while True: while True:
if self.config.reflect_streams and self.config.reflector_servers: if self.config.reflect_streams and self.config.reflector_servers:
sd_hashes = await self.storage.get_streams_to_re_reflect() sd_hashes = await self.storage.get_streams_to_re_reflect()
sd_hashes = [sd for sd in sd_hashes if sd in self.streams] sd_hashes = [sd for sd in sd_hashes if sd in self._sources]
batch = [] batch = []
while sd_hashes: while sd_hashes:
stream = self.streams[sd_hashes.pop()] stream = self.streams[sd_hashes.pop()]
@ -209,18 +176,15 @@ class StreamManager:
await asyncio.sleep(300, loop=self.loop) await asyncio.sleep(300, loop=self.loop)
async def start(self): async def start(self):
await self.load_and_resume_streams_from_database() await super().start()
self.re_reflect_task = self.loop.create_task(self.reflect_streams()) self.re_reflect_task = self.loop.create_task(self.reflect_streams())
self.started.set()
def stop(self): def stop(self):
super().stop()
if self.resume_saving_task and not self.resume_saving_task.done(): if self.resume_saving_task and not self.resume_saving_task.done():
self.resume_saving_task.cancel() self.resume_saving_task.cancel()
if self.re_reflect_task and not self.re_reflect_task.done(): if self.re_reflect_task and not self.re_reflect_task.done():
self.re_reflect_task.cancel() self.re_reflect_task.cancel()
while self.streams:
_, stream = self.streams.popitem()
stream.stop_tasks()
while self.update_stream_finished_futs: while self.update_stream_finished_futs:
self.update_stream_finished_futs.pop().cancel() self.update_stream_finished_futs.pop().cancel()
while self.running_reflector_uploads: while self.running_reflector_uploads:
@ -243,280 +207,42 @@ class StreamManager:
) )
return task return task
async def create_stream(self, file_path: str, key: Optional[bytes] = None, async def create(self, file_path: str, key: Optional[bytes] = None,
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
stream = await ManagedStream.create(self.loop, self.config, self.blob_manager, file_path, key, iv_generator) descriptor = await StreamDescriptor.create_stream(
self.loop, self.blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator,
blob_completed_callback=self.blob_manager.blob_completed
)
await self.storage.store_stream(
self.blob_manager.get_blob(descriptor.sd_hash), descriptor
)
row_id = await self.storage.save_published_file(
descriptor.stream_hash, os.path.basename(file_path), os.path.dirname(file_path), 0
)
stream = ManagedStream(
self.loop, self.config, self.blob_manager, descriptor.sd_hash, os.path.dirname(file_path),
os.path.basename(file_path), status=ManagedDownloadSource.STATUS_FINISHED,
rowid=row_id, descriptor=descriptor
)
self.streams[stream.sd_hash] = stream self.streams[stream.sd_hash] = stream
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
if self.config.reflect_streams and self.config.reflector_servers: if self.config.reflect_streams and self.config.reflector_servers:
self.reflect_stream(stream) self.reflect_stream(stream)
return stream return stream
async def delete_stream(self, stream: ManagedStream, delete_file: Optional[bool] = False): async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
if stream.sd_hash in self.running_reflector_uploads: if not isinstance(source, ManagedStream):
self.running_reflector_uploads[stream.sd_hash].cancel() return
stream.stop_tasks() if source.identifier in self.running_reflector_uploads:
if stream.sd_hash in self.streams: self.running_reflector_uploads[source.identifier].cancel()
del self.streams[stream.sd_hash] source.stop_tasks()
blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]] if source.identifier in self.streams:
del self.streams[source.identifier]
blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
await self.storage.delete_stream(stream.descriptor) await self.storage.delete_stream(source.descriptor)
if delete_file and stream.output_file_exists: if delete_file and source.output_file_exists:
os.remove(stream.full_path) os.remove(source.full_path)
def get_stream_by_stream_hash(self, stream_hash: str) -> Optional[ManagedStream]:
streams = tuple(filter(lambda stream: stream.stream_hash == stream_hash, self.streams.values()))
if streams:
return streams[0]
def get_filtered_streams(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False,
comparison: Optional[str] = None,
**search_by) -> typing.List[ManagedStream]:
"""
Get a list of filtered and sorted ManagedStream objects
:param sort_by: field to sort by
:param reverse: reverse sorting
:param comparison: comparison operator used for filtering
:param search_by: fields and values to filter by
"""
if sort_by and sort_by not in FILTER_FIELDS:
raise ValueError(f"'{sort_by}' is not a valid field to sort by")
if comparison and comparison not in COMPARISON_OPERATORS:
raise ValueError(f"'{comparison}' is not a valid comparison")
if 'full_status' in search_by:
del search_by['full_status']
for search in search_by:
if search not in FILTER_FIELDS:
raise ValueError(f"'{search}' is not a valid search operation")
compare_sets = {}
if isinstance(search_by.get('claim_id'), list):
compare_sets['claim_ids'] = search_by.pop('claim_id')
if isinstance(search_by.get('outpoint'), list):
compare_sets['outpoints'] = search_by.pop('outpoint')
if isinstance(search_by.get('channel_claim_id'), list):
compare_sets['channel_claim_ids'] = search_by.pop('channel_claim_id')
if search_by:
comparison = comparison or 'eq'
streams = []
for stream in self.streams.values():
matched = False
for set_search, val in compare_sets.items():
if COMPARISON_OPERATORS[comparison](getattr(stream, SET_FILTER_FIELDS[set_search]), val):
streams.append(stream)
matched = True
break
if matched:
continue
for search, val in search_by.items():
this_stream = getattr(stream, search)
if COMPARISON_OPERATORS[comparison](this_stream, val):
streams.append(stream)
break
else:
streams = list(self.streams.values())
if sort_by:
streams.sort(key=lambda s: getattr(s, sort_by))
if reverse:
streams.reverse()
return streams
async def _check_update_or_replace(self, outpoint: str, claim_id: str, claim: Claim
) -> typing.Tuple[Optional[ManagedStream], Optional[ManagedStream]]:
existing = self.get_filtered_streams(outpoint=outpoint)
if existing:
return existing[0], None
existing = self.get_filtered_streams(sd_hash=claim.stream.source.sd_hash)
if existing and existing[0].claim_id != claim_id:
raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {claim_id}")
if existing:
log.info("claim contains a metadata only update to a stream we have")
await self.storage.save_content_claim(
existing[0].stream_hash, outpoint
)
await self._update_content_claim(existing[0])
return existing[0], None
else:
existing_for_claim_id = self.get_filtered_streams(claim_id=claim_id)
if existing_for_claim_id:
log.info("claim contains an update to a stream we have, downloading it")
return None, existing_for_claim_id[0]
return None, None
@staticmethod
def _convert_to_old_resolve_output(wallet_manager, resolves):
result = {}
for url, txo in resolves.items():
if isinstance(txo, Output):
tx_height = txo.tx_ref.height
best_height = wallet_manager.ledger.headers.height
result[url] = {
'name': txo.claim_name,
'value': txo.claim,
'protobuf': binascii.hexlify(txo.claim.to_bytes()),
'claim_id': txo.claim_id,
'txid': txo.tx_ref.id,
'nout': txo.position,
'amount': dewies_to_lbc(txo.amount),
'effective_amount': txo.meta.get('effective_amount', 0),
'height': tx_height,
'confirmations': (best_height+1) - tx_height if tx_height > 0 else tx_height,
'claim_sequence': -1,
'address': txo.get_address(wallet_manager.ledger),
'valid_at_height': txo.meta.get('activation_height', None),
'timestamp': wallet_manager.ledger.headers.estimated_timestamp(tx_height),
'supports': []
}
else:
result[url] = txo
return result
@cache_concurrent
async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
timeout: Optional[float] = None,
file_name: Optional[str] = None,
download_directory: Optional[str] = None,
save_file: Optional[bool] = None,
resolve_timeout: float = 3.0,
wallet: Optional['Wallet'] = None) -> ManagedStream:
manager = self.wallet_manager
wallet = wallet or manager.default_wallet
timeout = timeout or self.config.download_timeout
start_time = self.loop.time()
resolved_time = None
stream = None
txo: Optional[Output] = None
error = None
outpoint = None
if save_file is None:
save_file = self.config.save_files
if file_name and not save_file:
save_file = True
if save_file:
download_directory = download_directory or self.config.download_dir
else:
download_directory = None
payment = None
try:
# resolve the claim
if not URL.parse(uri).has_stream:
raise ResolveError("cannot download a channel claim, specify a /path")
try:
response = await asyncio.wait_for(
manager.ledger.resolve(wallet.accounts, [uri], include_purchase_receipt=True),
resolve_timeout
)
resolved_result = self._convert_to_old_resolve_output(manager, response)
except asyncio.TimeoutError:
raise ResolveTimeoutError(uri)
except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
log.exception("Unexpected error resolving stream:")
raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
await self.storage.save_claims_for_resolve([
value for value in resolved_result.values() if 'error' not in value
])
resolved = resolved_result.get(uri, {})
resolved = resolved if 'value' in resolved else resolved.get('claim')
if not resolved:
raise ResolveError(f"Failed to resolve stream at '{uri}'")
if 'error' in resolved:
raise ResolveError(f"error resolving stream: {resolved['error']}")
txo = response[uri]
claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf']))
outpoint = f"{resolved['txid']}:{resolved['nout']}"
resolved_time = self.loop.time() - start_time
# resume or update an existing stream, if the stream changed: download it and delete the old one after
updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim)
if updated_stream:
log.info("already have stream for %s", uri)
if save_file and updated_stream.output_file_exists:
save_file = False
await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file)
if not updated_stream.output_file_exists and (save_file or file_name or download_directory):
await updated_stream.save_file(
file_name=file_name, download_directory=download_directory, node=self.node
)
return updated_stream
if not to_replace and txo.has_price and not txo.purchase_receipt:
payment = await manager.create_purchase_transaction(
wallet.accounts, txo, exchange_rate_manager
)
stream = ManagedStream(
self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory,
file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
analytics_manager=self.analytics_manager
)
log.info("starting download for %s", uri)
before_download = self.loop.time()
await stream.start(self.node, timeout)
stream.set_claim(resolved, claim)
if to_replace: # delete old stream now that the replacement has started downloading
await self.delete_stream(to_replace)
if payment is not None:
await manager.broadcast_or_release(payment)
payment = None # to avoid releasing in `finally` later
log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri)
await self.storage.save_content_fee(stream.stream_hash, stream.content_fee)
self.streams[stream.sd_hash] = stream
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
await self.storage.save_content_claim(stream.stream_hash, outpoint)
if save_file:
await asyncio.wait_for(stream.save_file(node=self.node), timeout - (self.loop.time() - before_download),
loop=self.loop)
return stream
except asyncio.TimeoutError:
error = DownloadDataTimeoutError(stream.sd_hash)
raise error
except Exception as err: # forgive data timeout, don't delete stream
expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
KeyFeeAboveMaxAllowedError)
if isinstance(err, expected):
log.warning("Failed to download %s: %s", uri, str(err))
elif isinstance(err, asyncio.CancelledError):
pass
else:
log.exception("Unexpected error downloading stream:")
error = err
raise
finally:
if payment is not None:
# payment is set to None after broadcasting, if we're here an exception probably happened
await manager.ledger.release_tx(payment)
if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
stream.downloader.time_to_first_bytes))):
server = self.wallet_manager.ledger.network.client.server
self.loop.create_task(
self.analytics_manager.send_time_to_first_bytes(
resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id,
uri, outpoint,
None if not stream else len(stream.downloader.blob_downloader.active_connections),
None if not stream else len(stream.downloader.blob_downloader.scores),
None if not stream else len(stream.downloader.blob_downloader.connection_failures),
False if not stream else stream.downloader.added_fixed_peers,
self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay,
None if not stream else stream.sd_hash,
None if not stream else stream.downloader.time_to_descriptor,
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
None if not stream else stream.downloader.time_to_first_bytes,
None if not error else error.__class__.__name__,
None if not error else str(error),
None if not server else f"{server[0]}:{server[1]}"
)
)
async def stream_partial_content(self, request: Request, sd_hash: str): async def stream_partial_content(self, request: Request, sd_hash: str):
return await self.streams[sd_hash].stream_file(request, self.node) return await self._sources[sd_hash].stream_file(request, self.node)

0
lbry/torrent/__init__.py Normal file
View file

290
lbry/torrent/session.py Normal file
View file

@ -0,0 +1,290 @@
import asyncio
import binascii
import os
import logging
import random
from hashlib import sha1
from tempfile import mkdtemp
from typing import Optional
import libtorrent
NOTIFICATION_MASKS = [
"error",
"peer",
"port_mapping",
"storage",
"tracker",
"debug",
"status",
"progress",
"ip_block",
"dht",
"stats",
"session_log",
"torrent_log",
"peer_log",
"incoming_request",
"dht_log",
"dht_operation",
"port_mapping_log",
"picker_log",
"file_progress",
"piece_progress",
"upload",
"block_progress"
]
log = logging.getLogger(__name__)
DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted?
libtorrent.add_torrent_params_flags_t.flag_auto_managed
| libtorrent.add_torrent_params_flags_t.flag_update_subscribe
)
def get_notification_type(notification) -> str:
for i, notification_type in enumerate(NOTIFICATION_MASKS):
if (1 << i) & notification:
return notification_type
raise ValueError("unrecognized notification type")
class TorrentHandle:
def __init__(self, loop, executor, handle):
self._loop = loop
self._executor = executor
self._handle: libtorrent.torrent_handle = handle
self.started = asyncio.Event(loop=loop)
self.finished = asyncio.Event(loop=loop)
self.metadata_completed = asyncio.Event(loop=loop)
self.size = 0
self.total_wanted_done = 0
self.name = ''
self.tasks = []
self.torrent_file: Optional[libtorrent.file_storage] = None
self._base_path = None
self._handle.set_sequential_download(1)
@property
def largest_file(self) -> Optional[str]:
if not self.torrent_file:
return None
index = self.largest_file_index
return os.path.join(self._base_path, self.torrent_file.at(index).path)
@property
def largest_file_index(self):
largest_size, index = 0, 0
for file_num in range(self.torrent_file.num_files()):
if self.torrent_file.file_size(file_num) > largest_size:
largest_size = self.torrent_file.file_size(file_num)
index = file_num
return index
def stop_tasks(self):
while self.tasks:
self.tasks.pop().cancel()
def _show_status(self):
# fixme: cleanup
if not self._handle.is_valid():
return
status = self._handle.status()
if status.has_metadata:
self.size = status.total_wanted
self.total_wanted_done = status.total_wanted_done
self.name = status.name
if not self.metadata_completed.is_set():
self.metadata_completed.set()
log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name)
self.torrent_file = self._handle.get_torrent_info().files()
self._base_path = status.save_path
first_piece = self.torrent_file.at(self.largest_file_index).offset
if not self.started.is_set():
if self._handle.have_piece(first_piece):
self.started.set()
else:
# prioritize it
self._handle.set_piece_deadline(first_piece, 100)
if not status.is_seeding:
log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s',
status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
status.num_peers, status.num_seeds, status.state, status.save_path)
elif not self.finished.is_set():
self.finished.set()
log.info("Torrent finished: %s", self.name)
async def status_loop(self):
while True:
self._show_status()
if self.finished.is_set():
break
await asyncio.sleep(0.1, loop=self._loop)
async def pause(self):
await self._loop.run_in_executor(
self._executor, self._handle.pause
)
async def resume(self):
await self._loop.run_in_executor(
self._executor, lambda: self._handle.resume() # pylint: disable=unnecessary-lambda
)
class TorrentSession:
def __init__(self, loop, executor):
self._loop = loop
self._executor = executor
self._session: Optional[libtorrent.session] = None
self._handles = {}
self.tasks = []
self.wait_start = True
async def add_fake_torrent(self):
tmpdir = mkdtemp()
info, btih = _create_fake_torrent(tmpdir)
flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode
handle = self._session.add_torrent({
'ti': info, 'save_path': tmpdir, 'flags': flags
})
self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
return btih
async def bind(self, interface: str = '0.0.0.0', port: int = 10889):
settings = {
'listen_interfaces': f"{interface}:{port}",
'enable_outgoing_utp': True,
'enable_incoming_utp': True,
'enable_outgoing_tcp': False,
'enable_incoming_tcp': False
}
self._session = await self._loop.run_in_executor(
self._executor, libtorrent.session, settings # pylint: disable=c-extension-no-member
)
self.tasks.append(self._loop.create_task(self.process_alerts()))
def stop(self):
while self.tasks:
self.tasks.pop().cancel()
self._session.save_state()
self._session.pause()
self._session.stop_dht()
self._session.stop_lsd()
self._session.stop_natpmp()
self._session.stop_upnp()
self._session = None
def _pop_alerts(self):
for alert in self._session.pop_alerts():
log.info("torrent alert: %s", alert)
async def process_alerts(self):
while True:
await self._loop.run_in_executor(
self._executor, self._pop_alerts
)
await asyncio.sleep(1, loop=self._loop)
async def pause(self):
await self._loop.run_in_executor(
self._executor, lambda: self._session.save_state() # pylint: disable=unnecessary-lambda
)
await self._loop.run_in_executor(
self._executor, lambda: self._session.pause() # pylint: disable=unnecessary-lambda
)
async def resume(self):
await self._loop.run_in_executor(
self._executor, self._session.resume
)
def _add_torrent(self, btih: str, download_directory: Optional[str]):
params = {'info_hash': binascii.unhexlify(btih.encode()), 'flags': DEFAULT_FLAGS}
if download_directory:
params['save_path'] = download_directory
handle = self._session.add_torrent(params)
handle.force_dht_announce()
self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
def full_path(self, btih):
return self._handles[btih].largest_file
async def add_torrent(self, btih, download_path):
await self._loop.run_in_executor(
self._executor, self._add_torrent, btih, download_path
)
self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop()))
await self._handles[btih].metadata_completed.wait()
if self.wait_start:
# fixme: temporary until we add streaming support, otherwise playback fails!
await self._handles[btih].started.wait()
def remove_torrent(self, btih, remove_files=False):
if btih in self._handles:
handle = self._handles[btih]
handle.stop_tasks()
self._session.remove_torrent(handle._handle, 1 if remove_files else 0)
self._handles.pop(btih)
async def save_file(self, btih, download_directory):
handle = self._handles[btih]
await handle.resume()
def get_size(self, btih):
return self._handles[btih].size
def get_name(self, btih):
return self._handles[btih].name
def get_downloaded(self, btih):
return self._handles[btih].total_wanted_done
def is_completed(self, btih):
return self._handles[btih].finished.is_set()
def get_magnet_uri(btih):
return f"magnet:?xt=urn:btih:{btih}"
def _create_fake_torrent(tmpdir):
# beware, that's just for testing
path = os.path.join(tmpdir, 'tmp')
with open(path, 'wb') as myfile:
size = myfile.write(bytes([random.randint(0, 255) for _ in range(40)]) * 1024)
file_storage = libtorrent.file_storage()
file_storage.add_file('tmp', size)
t = libtorrent.create_torrent(file_storage, 0, 4 * 1024 * 1024)
libtorrent.set_piece_hashes(t, tmpdir)
info = libtorrent.torrent_info(t.generate())
btih = sha1(info.metadata()).hexdigest()
return info, btih
async def main():
if os.path.exists("~/Downloads/ubuntu-18.04.3-live-server-amd64.torrent"):
os.remove("~/Downloads/ubuntu-18.04.3-live-server-amd64.torrent")
if os.path.exists("~/Downloads/ubuntu-18.04.3-live-server-amd64.iso"):
os.remove("~/Downloads/ubuntu-18.04.3-live-server-amd64.iso")
btih = "dd8255ecdc7ca55fb0bbf81323d87062db1f6d1c"
executor = None
session = TorrentSession(asyncio.get_event_loop(), executor)
session2 = TorrentSession(asyncio.get_event_loop(), executor)
await session.bind('localhost', port=4040)
await session2.bind('localhost', port=4041)
btih = await session.add_fake_torrent()
session2._session.add_dht_node(('localhost', 4040))
await session2.add_torrent(btih, "/tmp/down")
while True:
await asyncio.sleep(100)
await session.pause()
executor.shutdown()
if __name__ == "__main__":
asyncio.run(main())

72
lbry/torrent/torrent.py Normal file
View file

@ -0,0 +1,72 @@
import asyncio
import logging
import typing
log = logging.getLogger(__name__)
class TorrentInfo:
__slots__ = ('dht_seeds', 'http_seeds', 'trackers', 'total_size')
def __init__(self, dht_seeds: typing.Tuple[typing.Tuple[str, int]],
http_seeds: typing.Tuple[typing.Dict[str, typing.Any]],
trackers: typing.Tuple[typing.Tuple[str, int]], total_size: int):
self.dht_seeds = dht_seeds
self.http_seeds = http_seeds
self.trackers = trackers
self.total_size = total_size
@classmethod
def from_libtorrent_info(cls, torrent_info):
return cls(
torrent_info.nodes(), tuple(
{
'url': web_seed['url'],
'type': web_seed['type'],
'auth': web_seed['auth']
} for web_seed in torrent_info.web_seeds()
), tuple(
(tracker.url, tracker.tier) for tracker in torrent_info.trackers()
), torrent_info.total_size()
)
class Torrent:
def __init__(self, loop, handle):
self._loop = loop
self._handle = handle
self.finished = asyncio.Event(loop=loop)
def _threaded_update_status(self):
status = self._handle.status()
if not status.is_seeding:
log.info(
'%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d) %s',
status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
status.num_peers, status.state
)
elif not self.finished.is_set():
self.finished.set()
async def wait_for_finished(self):
while True:
await self._loop.run_in_executor(
None, self._threaded_update_status
)
if self.finished.is_set():
log.info("finished downloading torrent!")
await self.pause()
break
await asyncio.sleep(1, loop=self._loop)
async def pause(self):
log.info("pause torrent")
await self._loop.run_in_executor(
None, self._handle.pause
)
async def resume(self):
await self._loop.run_in_executor(
None, self._handle.resume
)

View file

@ -0,0 +1,140 @@
import asyncio
import binascii
import logging
import os
import typing
from typing import Optional
from aiohttp.web import Request
from lbry.file.source_manager import SourceManager
from lbry.file.source import ManagedDownloadSource
if typing.TYPE_CHECKING:
from lbry.torrent.session import TorrentSession
from lbry.conf import Config
from lbry.wallet.transaction import Transaction
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim
from lbry.extras.daemon.storage import StoredContentClaim
log = logging.getLogger(__name__)
def path_or_none(encoded_path) -> Optional[str]:
if not encoded_path:
return
return binascii.unhexlify(encoded_path).decode()
class TorrentSource(ManagedDownloadSource):
STATUS_STOPPED = "stopped"
filter_fields = SourceManager.filter_fields
filter_fields.update({
'bt_infohash'
})
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str,
file_name: Optional[str] = None, download_directory: Optional[str] = None,
status: Optional[str] = STATUS_STOPPED, claim: Optional['StoredContentClaim'] = None,
download_id: Optional[str] = None, rowid: Optional[int] = None,
content_fee: Optional['Transaction'] = None,
analytics_manager: Optional['AnalyticsManager'] = None,
added_on: Optional[int] = None, torrent_session: Optional['TorrentSession'] = None):
super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id,
rowid, content_fee, analytics_manager, added_on)
self.torrent_session = torrent_session
@property
def full_path(self) -> Optional[str]:
full_path = self.torrent_session.full_path(self.identifier)
self.download_directory = os.path.dirname(full_path)
return full_path
async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
await self.torrent_session.add_torrent(self.identifier, self.download_directory)
async def stop(self, finished: bool = False):
await self.torrent_session.remove_torrent(self.identifier)
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
await self.torrent_session.save_file(self.identifier, download_directory)
@property
def torrent_length(self):
return self.torrent_session.get_size(self.identifier)
@property
def written_bytes(self):
return self.torrent_session.get_downloaded(self.identifier)
@property
def torrent_name(self):
return self.torrent_session.get_name(self.identifier)
@property
def bt_infohash(self):
return self.identifier
def stop_tasks(self):
pass
@property
def completed(self):
return self.torrent_session.is_completed(self.identifier)
class TorrentManager(SourceManager):
_sources: typing.Dict[str, ManagedDownloadSource]
filter_fields = set(SourceManager.filter_fields)
filter_fields.update({
'bt_infohash',
'blobs_remaining', # TODO: here they call them "parts", but its pretty much the same concept
'blobs_in_stream'
})
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', torrent_session: 'TorrentSession',
storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None):
super().__init__(loop, config, storage, analytics_manager)
self.torrent_session: 'TorrentSession' = torrent_session
async def recover_streams(self, file_infos: typing.List[typing.Dict]):
raise NotImplementedError
async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str],
download_directory: Optional[str], status: str,
claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
added_on: Optional[int]):
stream = TorrentSource(
self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name,
download_directory=download_directory, status=status, claim=claim, rowid=rowid,
content_fee=content_fee, analytics_manager=self.analytics_manager, added_on=added_on,
torrent_session=self.torrent_session
)
self.add(stream)
async def initialize_from_database(self):
pass
async def start(self):
await super().start()
def stop(self):
super().stop()
log.info("finished stopping the torrent manager")
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
await super().delete(source, delete_file)
self.torrent_session.remove_torrent(source.identifier, delete_file)
async def create(self, file_path: str, key: Optional[bytes] = None,
iv_generator: Optional[typing.Generator[bytes, None, None]] = None):
raise NotImplementedError
async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
raise NotImplementedError
# blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
# await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
# await self.storage.delete_stream(source.descriptor)
async def stream_partial_content(self, request: Request, sd_hash: str):
raise NotImplementedError

View file

@ -6,7 +6,7 @@ source =
lbry lbry
.tox/*/lib/python*/site-packages/lbry .tox/*/lib/python*/site-packages/lbry
[cryptography.*,coincurve.*,pbkdf2] [cryptography.*,coincurve.*,pbkdf2, libtorrent]
ignore_missing_imports = True ignore_missing_imports = True
[pylint] [pylint]
@ -18,6 +18,7 @@ max-line-length=120
good-names=T,t,n,i,j,k,x,y,s,f,d,h,c,e,op,db,tx,io,cachedproperty,log,id,r,iv,ts,l good-names=T,t,n,i,j,k,x,y,s,f,d,h,c,e,op,db,tx,io,cachedproperty,log,id,r,iv,ts,l
valid-metaclass-classmethod-first-arg=mcs valid-metaclass-classmethod-first-arg=mcs
disable= disable=
c-extension-no-member,
fixme, fixme,
broad-except, broad-except,
no-else-return, no-else-return,

View file

@ -2,10 +2,60 @@ import asyncio
import os import os
from binascii import hexlify from binascii import hexlify
from lbry.schema import Claim
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
from lbry.torrent.session import TorrentSession
from lbry.wallet import Transaction
class FileCommands(CommandTestCase): class FileCommands(CommandTestCase):
async def initialize_torrent(self, tx_to_update=None):
if not hasattr(self, 'seeder_session'):
self.seeder_session = TorrentSession(self.loop, None)
self.addCleanup(self.seeder_session.stop)
await self.seeder_session.bind(port=4040)
btih = await self.seeder_session.add_fake_torrent()
address = await self.account.receiving.get_or_create_usable_address()
if not tx_to_update:
claim = Claim()
claim.stream.update(bt_infohash=btih)
tx = await Transaction.claim_create(
'torrent', claim, 1, address, [self.account], self.account
)
else:
claim = tx_to_update.outputs[0].claim
claim.stream.update(bt_infohash=btih)
tx = await Transaction.claim_update(
tx_to_update.outputs[0], claim, 1, address, [self.account], self.account
)
await tx.sign([self.account])
await self.broadcast(tx)
await self.confirm_tx(tx.id)
self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session
self.client_session._session.add_dht_node(('localhost', 4040))
self.client_session.wait_start = False # fixme: this is super slow on tests
return tx, btih
async def test_download_torrent(self):
tx, btih = await self.initialize_torrent()
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
# second call, see its there and move on
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, btih)
self.assertIn(btih, self.client_session._handles)
tx, new_btih = await self.initialize_torrent(tx)
self.assertNotEqual(btih, new_btih)
# claim now points to another torrent, update to it
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih)
self.assertIn(new_btih, self.client_session._handles)
self.assertNotIn(btih, self.client_session._handles)
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
await self.daemon.jsonrpc_file_delete(delete_all=True)
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0)
self.assertNotIn(new_btih, self.client_session._handles)
async def create_streams_in_range(self, *args, **kwargs): async def create_streams_in_range(self, *args, **kwargs):
self.stream_claim_ids = [] self.stream_claim_ids = []
@ -228,11 +278,11 @@ class FileCommands(CommandTestCase):
await self.daemon.jsonrpc_get('lbry://foo') await self.daemon.jsonrpc_get('lbry://foo')
with open(original_path, 'wb') as handle: with open(original_path, 'wb') as handle:
handle.write(b'some other stuff was there instead') handle.write(b'some other stuff was there instead')
self.daemon.stream_manager.stop() self.daemon.file_manager.stop()
await self.daemon.stream_manager.start() await self.daemon.file_manager.start()
await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed
# check that internal state got through up to the file list API # check that internal state got through up to the file list API
stream = self.daemon.stream_manager.get_stream_by_stream_hash(file_info['stream_hash']) stream = self.daemon.file_manager.get_filtered(stream_hash=file_info['stream_hash'])[0]
file_info = (await self.file_list())[0] file_info = (await self.file_list())[0]
self.assertEqual(stream.file_name, file_info['file_name']) self.assertEqual(stream.file_name, file_info['file_name'])
# checks if what the API shows is what he have at the very internal level. # checks if what the API shows is what he have at the very internal level.
@ -255,7 +305,7 @@ class FileCommands(CommandTestCase):
resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2)) resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2))
self.assertNotIn('error', resp) self.assertNotIn('error', resp)
self.assertTrue(os.path.isfile(path)) self.assertTrue(os.path.isfile(path))
self.daemon.stream_manager.stop() self.daemon.file_manager.stop()
await asyncio.sleep(0.01, loop=self.loop) # FIXME: this sleep should not be needed await asyncio.sleep(0.01, loop=self.loop) # FIXME: this sleep should not be needed
self.assertFalse(os.path.isfile(path)) self.assertFalse(os.path.isfile(path))
@ -348,8 +398,8 @@ class FileCommands(CommandTestCase):
# restart the daemon and make sure the fee is still there # restart the daemon and make sure the fee is still there
self.daemon.stream_manager.stop() self.daemon.file_manager.stop()
await self.daemon.stream_manager.start() await self.daemon.file_manager.start()
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee)
await self.daemon.jsonrpc_file_delete(claim_name='icanpay') await self.daemon.jsonrpc_file_delete(claim_name='icanpay')

View file

@ -21,8 +21,8 @@ def get_random_bytes(n: int) -> bytes:
class RangeRequests(CommandTestCase): class RangeRequests(CommandTestCase):
async def _restart_stream_manager(self): async def _restart_stream_manager(self):
self.daemon.stream_manager.stop() self.daemon.file_manager.stop()
await self.daemon.stream_manager.start() await self.daemon.file_manager.start()
return return
async def _setup_stream(self, data: bytes, save_blobs: bool = True, save_files: bool = False, file_size=0): async def _setup_stream(self, data: bytes, save_blobs: bool = True, save_files: bool = False, file_size=0):

View file

@ -6,7 +6,7 @@ from lbry.conf import Config
from lbry.extras import cli from lbry.extras import cli
from lbry.extras.daemon.components import ( from lbry.extras.daemon.components import (
DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT,
HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT
) )
from lbry.extras.daemon.daemon import Daemon from lbry.extras.daemon.daemon import Daemon
@ -21,7 +21,7 @@ class CLIIntegrationTest(AsyncioTestCase):
conf.api = 'localhost:5299' conf.api = 'localhost:5299'
conf.components_to_skip = ( conf.components_to_skip = (
DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT,
HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT
) )
Daemon.component_attributes = {} Daemon.component_attributes = {}

View file

@ -16,6 +16,7 @@ class TestComponentManager(AsyncioTestCase):
[ [
components.DatabaseComponent, components.DatabaseComponent,
components.ExchangeRateManagerComponent, components.ExchangeRateManagerComponent,
components.TorrentComponent,
components.UPnPComponent components.UPnPComponent
], ],
[ [
@ -24,9 +25,9 @@ class TestComponentManager(AsyncioTestCase):
components.WalletComponent components.WalletComponent
], ],
[ [
components.FileManagerComponent,
components.HashAnnouncerComponent, components.HashAnnouncerComponent,
components.PeerProtocolServerComponent, components.PeerProtocolServerComponent,
components.StreamManagerComponent,
components.WalletServerPaymentsComponent components.WalletServerPaymentsComponent
] ]
] ]
@ -135,8 +136,8 @@ class FakeDelayedBlobManager(FakeComponent):
await asyncio.sleep(1) await asyncio.sleep(1)
class FakeDelayedStreamManager(FakeComponent): class FakeDelayedFileManager(FakeComponent):
component_name = "stream_manager" component_name = "file_manager"
depends_on = [FakeDelayedBlobManager.component_name] depends_on = [FakeDelayedBlobManager.component_name]
async def start(self): async def start(self):
@ -153,7 +154,7 @@ class TestComponentManagerProperStart(AdvanceTimeTestCase):
PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT,
EXCHANGE_RATE_MANAGER_COMPONENT], EXCHANGE_RATE_MANAGER_COMPONENT],
wallet=FakeDelayedWallet, wallet=FakeDelayedWallet,
stream_manager=FakeDelayedStreamManager, file_manager=FakeDelayedFileManager,
blob_manager=FakeDelayedBlobManager blob_manager=FakeDelayedBlobManager
) )
@ -163,17 +164,17 @@ class TestComponentManagerProperStart(AdvanceTimeTestCase):
await self.advance(0) await self.advance(0)
self.assertTrue(self.component_manager.get_component('wallet').running) self.assertTrue(self.component_manager.get_component('wallet').running)
self.assertFalse(self.component_manager.get_component('blob_manager').running) self.assertFalse(self.component_manager.get_component('blob_manager').running)
self.assertFalse(self.component_manager.get_component('stream_manager').running) self.assertFalse(self.component_manager.get_component('file_manager').running)
await self.advance(1) await self.advance(1)
self.assertTrue(self.component_manager.get_component('wallet').running) self.assertTrue(self.component_manager.get_component('wallet').running)
self.assertTrue(self.component_manager.get_component('blob_manager').running) self.assertTrue(self.component_manager.get_component('blob_manager').running)
self.assertFalse(self.component_manager.get_component('stream_manager').running) self.assertFalse(self.component_manager.get_component('file_manager').running)
await self.advance(1) await self.advance(1)
self.assertTrue(self.component_manager.get_component('wallet').running) self.assertTrue(self.component_manager.get_component('wallet').running)
self.assertTrue(self.component_manager.get_component('blob_manager').running) self.assertTrue(self.component_manager.get_component('blob_manager').running)
self.assertTrue(self.component_manager.get_component('stream_manager').running) self.assertTrue(self.component_manager.get_component('file_manager').running)
async def test_proper_stopping_of_components(self): async def test_proper_stopping_of_components(self):
asyncio.create_task(self.component_manager.start()) asyncio.create_task(self.component_manager.start())
@ -182,18 +183,18 @@ class TestComponentManagerProperStart(AdvanceTimeTestCase):
await self.advance(1) await self.advance(1)
self.assertTrue(self.component_manager.get_component('wallet').running) self.assertTrue(self.component_manager.get_component('wallet').running)
self.assertTrue(self.component_manager.get_component('blob_manager').running) self.assertTrue(self.component_manager.get_component('blob_manager').running)
self.assertTrue(self.component_manager.get_component('stream_manager').running) self.assertTrue(self.component_manager.get_component('file_manager').running)
asyncio.create_task(self.component_manager.stop()) asyncio.create_task(self.component_manager.stop())
await self.advance(0) await self.advance(0)
self.assertFalse(self.component_manager.get_component('stream_manager').running) self.assertFalse(self.component_manager.get_component('file_manager').running)
self.assertTrue(self.component_manager.get_component('blob_manager').running) self.assertTrue(self.component_manager.get_component('blob_manager').running)
self.assertTrue(self.component_manager.get_component('wallet').running) self.assertTrue(self.component_manager.get_component('wallet').running)
await self.advance(1) await self.advance(1)
self.assertFalse(self.component_manager.get_component('stream_manager').running) self.assertFalse(self.component_manager.get_component('file_manager').running)
self.assertFalse(self.component_manager.get_component('blob_manager').running) self.assertFalse(self.component_manager.get_component('blob_manager').running)
self.assertTrue(self.component_manager.get_component('wallet').running) self.assertTrue(self.component_manager.get_component('wallet').running)
await self.advance(1) await self.advance(1)
self.assertFalse(self.component_manager.get_component('stream_manager').running) self.assertFalse(self.component_manager.get_component('file_manager').running)
self.assertFalse(self.component_manager.get_component('blob_manager').running) self.assertFalse(self.component_manager.get_component('blob_manager').running)
self.assertFalse(self.component_manager.get_component('wallet').running) self.assertFalse(self.component_manager.get_component('wallet').running)

View file

@ -76,7 +76,8 @@ class TestManagedStream(BlobExchangeTestBase):
return q2, self.loop.create_task(_task()) return q2, self.loop.create_task(_task())
mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers
await self.stream.save_file(node=mock_node) self.stream.downloader.node = mock_node
await self.stream.save_file()
await self.stream.finished_write_attempt.wait() await self.stream.finished_write_attempt.wait()
self.assertTrue(os.path.isfile(self.stream.full_path)) self.assertTrue(os.path.isfile(self.stream.full_path))
if stop_when_done: if stop_when_done:
@ -109,7 +110,6 @@ class TestManagedStream(BlobExchangeTestBase):
await self.setup_stream(2) await self.setup_stream(2)
mock_node = mock.Mock(spec=Node) mock_node = mock.Mock(spec=Node)
q = asyncio.Queue()
bad_peer = make_kademlia_peer(b'2' * 48, "127.0.0.1", tcp_port=3334, allow_localhost=True) bad_peer = make_kademlia_peer(b'2' * 48, "127.0.0.1", tcp_port=3334, allow_localhost=True)
@ -123,7 +123,8 @@ class TestManagedStream(BlobExchangeTestBase):
mock_node.accumulate_peers = _mock_accumulate_peers mock_node.accumulate_peers = _mock_accumulate_peers
await self.stream.save_file(node=mock_node) self.stream.downloader.node = mock_node
await self.stream.save_file()
await self.stream.finished_writing.wait() await self.stream.finished_writing.wait()
self.assertTrue(os.path.isfile(self.stream.full_path)) self.assertTrue(os.path.isfile(self.stream.full_path))
with open(self.stream.full_path, 'rb') as f: with open(self.stream.full_path, 'rb') as f:

View file

@ -39,7 +39,7 @@ class TestStreamAssembler(AsyncioTestCase):
with open(file_path, 'wb') as f: with open(file_path, 'wb') as f:
f.write(self.cleartext) f.write(self.cleartext)
self.stream = await self.stream_manager.create_stream(file_path) self.stream = await self.stream_manager.create(file_path)
async def _test_reflect_stream(self, response_chunk_size): async def _test_reflect_stream(self, response_chunk_size):
reflector = ReflectorServer(self.server_blob_manager, response_chunk_size=response_chunk_size) reflector = ReflectorServer(self.server_blob_manager, response_chunk_size=response_chunk_size)

View file

@ -5,6 +5,8 @@ from unittest import mock
import asyncio import asyncio
import json import json
from decimal import Decimal from decimal import Decimal
from lbry.file.file_manager import FileManager
from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase
from lbry.testcase import get_fake_exchange_rate_manager from lbry.testcase import get_fake_exchange_rate_manager
from lbry.utils import generate_id from lbry.utils import generate_id
@ -110,10 +112,7 @@ async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
async def mock_resolve(*args, **kwargs): async def mock_resolve(*args, **kwargs):
result = {txo.meta['permanent_url']: txo} result = {txo.meta['permanent_url']: txo}
claims = [ await storage.save_claim_from_output(ledger, txo)
StreamManager._convert_to_old_resolve_output(manager, result)[txo.meta['permanent_url']]
]
await storage.save_claims(claims)
return result return result
manager.ledger.resolve = mock_resolve manager.ledger.resolve = mock_resolve
@ -138,11 +137,20 @@ class TestStreamManager(BlobExchangeTestBase):
) )
self.sd_hash = descriptor.sd_hash self.sd_hash = descriptor.sd_hash
self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, balance, fee) self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, balance, fee)
self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet, analytics_manager = AnalyticsManager(
self.client_storage, get_mock_node(self.server_from_client), self.client_config,
AnalyticsManager(self.client_config, binascii.hexlify(generate_id()).decode(),
binascii.hexlify(generate_id()).decode(), binascii.hexlify(generate_id()).decode()
binascii.hexlify(generate_id()).decode())) )
self.stream_manager = StreamManager(
self.loop, self.client_config, self.client_blob_manager, self.mock_wallet,
self.client_storage, get_mock_node(self.server_from_client),
analytics_manager
)
self.file_manager = FileManager(
self.loop, self.client_config, self.mock_wallet, self.client_storage, analytics_manager
)
self.file_manager.source_managers['stream'] = self.stream_manager
self.exchange_rate_manager = get_fake_exchange_rate_manager() self.exchange_rate_manager = get_fake_exchange_rate_manager()
async def _test_time_to_first_bytes(self, check_post, error=None, after_setup=None): async def _test_time_to_first_bytes(self, check_post, error=None, after_setup=None):
@ -159,9 +167,9 @@ class TestStreamManager(BlobExchangeTestBase):
self.stream_manager.analytics_manager._post = _check_post self.stream_manager.analytics_manager._post = _check_post
if error: if error:
with self.assertRaises(error): with self.assertRaises(error):
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
else: else:
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
await asyncio.sleep(0, loop=self.loop) await asyncio.sleep(0, loop=self.loop)
self.assertTrue(checked_analytics_event) self.assertTrue(checked_analytics_event)
@ -281,7 +289,7 @@ class TestStreamManager(BlobExchangeTestBase):
self.stream_manager.analytics_manager._post = check_post self.stream_manager.analytics_manager._post = check_post
self.assertDictEqual(self.stream_manager.streams, {}) self.assertDictEqual(self.stream_manager.streams, {})
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
stream_hash = stream.stream_hash stream_hash = stream.stream_hash
self.assertDictEqual(self.stream_manager.streams, {stream.sd_hash: stream}) self.assertDictEqual(self.stream_manager.streams, {stream.sd_hash: stream})
self.assertTrue(stream.running) self.assertTrue(stream.running)
@ -302,7 +310,8 @@ class TestStreamManager(BlobExchangeTestBase):
) )
self.assertEqual(stored_status, "stopped") self.assertEqual(stored_status, "stopped")
await stream.save_file(node=self.stream_manager.node) stream.downloader.node = self.stream_manager.node
await stream.save_file()
await stream.finished_writing.wait() await stream.finished_writing.wait()
await asyncio.sleep(0, loop=self.loop) await asyncio.sleep(0, loop=self.loop)
self.assertTrue(stream.finished) self.assertTrue(stream.finished)
@ -313,7 +322,7 @@ class TestStreamManager(BlobExchangeTestBase):
) )
self.assertEqual(stored_status, "finished") self.assertEqual(stored_status, "finished")
await self.stream_manager.delete_stream(stream, True) await self.stream_manager.delete(stream, True)
self.assertDictEqual(self.stream_manager.streams, {}) self.assertDictEqual(self.stream_manager.streams, {})
self.assertFalse(os.path.isfile(os.path.join(self.client_dir, "test_file"))) self.assertFalse(os.path.isfile(os.path.join(self.client_dir, "test_file")))
stored_status = await self.client_storage.run_and_return_one_or_none( stored_status = await self.client_storage.run_and_return_one_or_none(
@ -325,7 +334,7 @@ class TestStreamManager(BlobExchangeTestBase):
async def _test_download_error_on_start(self, expected_error, timeout=None): async def _test_download_error_on_start(self, expected_error, timeout=None):
error = None error = None
try: try:
await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager, timeout) await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager, timeout)
except Exception as err: except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise raise
@ -401,7 +410,7 @@ class TestStreamManager(BlobExchangeTestBase):
last_blob_hash = json.loads(sdf.read())['blobs'][-2]['blob_hash'] last_blob_hash = json.loads(sdf.read())['blobs'][-2]['blob_hash']
self.server_blob_manager.delete_blob(last_blob_hash) self.server_blob_manager.delete_blob(last_blob_hash)
self.client_config.blob_download_timeout = 0.1 self.client_config.blob_download_timeout = 0.1
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
await stream.started_writing.wait() await stream.started_writing.wait()
self.assertEqual('running', stream.status) self.assertEqual('running', stream.status)
self.assertIsNotNone(stream.full_path) self.assertIsNotNone(stream.full_path)
@ -433,7 +442,7 @@ class TestStreamManager(BlobExchangeTestBase):
self.stream_manager.analytics_manager._post = check_post self.stream_manager.analytics_manager._post = check_post
self.assertDictEqual(self.stream_manager.streams, {}) self.assertDictEqual(self.stream_manager.streams, {})
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
await stream.finished_writing.wait() await stream.finished_writing.wait()
await asyncio.sleep(0, loop=self.loop) await asyncio.sleep(0, loop=self.loop)
self.stream_manager.stop() self.stream_manager.stop()

View file

@ -11,6 +11,7 @@ commands =
--global-option=fetch \ --global-option=fetch \
--global-option=--version --global-option=3.30.1 --global-option=--all \ --global-option=--version --global-option=3.30.1 --global-option=--all \
--global-option=build --global-option=--enable --global-option=fts5 --global-option=build --global-option=--enable --global-option=fts5
pip install lbry-libtorrent
orchstr8 download orchstr8 download
blockchain: coverage run -p --source={envsitepackagesdir}/lbry -m unittest discover -vv integration.blockchain {posargs} blockchain: coverage run -p --source={envsitepackagesdir}/lbry -m unittest discover -vv integration.blockchain {posargs}
datanetwork: coverage run -p --source={envsitepackagesdir}/lbry -m unittest discover -vv integration.datanetwork {posargs} datanetwork: coverage run -p --source={envsitepackagesdir}/lbry -m unittest discover -vv integration.datanetwork {posargs}