This commit is contained in:
Victor Shyba 2020-01-28 22:37:52 -03:00
parent 27739e0364
commit 2089059792
9 changed files with 67 additions and 185 deletions

View file

@ -24,10 +24,8 @@ from lbry.extras.daemon.storage import SQLiteStorage
from lbry.wallet import WalletManager
from lbry.wallet.usage_payment import WalletServerPayer
try:
import libtorrent
from lbry.torrent.session import TorrentSession
except ImportError:
libtorrent = None
TorrentSession = None
log = logging.getLogger(__name__)
@ -343,7 +341,7 @@ class FileManagerComponent(Component):
if not self.file_manager:
return
return {
'managed_files': len(self.file_manager._sources),
'managed_files': len(self.file_manager.get_filtered()),
}
async def start(self):
@ -386,7 +384,7 @@ class TorrentComponent(Component):
}
async def start(self):
if libtorrent:
if TorrentSession:
self.torrent_session = TorrentSession(asyncio.get_event_loop(), None)
await self.torrent_session.bind() # TODO: specify host/port

View file

@ -1,38 +1,28 @@
import time
import asyncio
import binascii
import logging
import typing
from typing import Optional
from aiohttp.web import Request
from lbry.error import ResolveError, InvalidStreamDescriptorError, DownloadSDTimeoutError, InsufficientFundsError
from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
from lbry.stream.managed_stream import ManagedStream
from lbry.utils import cache_concurrent
from lbry.schema.claim import Claim
from lbry.schema.url import URL
from lbry.wallet.dewies import dewies_to_lbc
from lbry.wallet.transaction import Output
from lbry.file.source_manager import SourceManager
from lbry.file.source import ManagedDownloadSource
if typing.TYPE_CHECKING:
from lbry.conf import Config
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage
from lbry.wallet import LbryWalletManager
from lbry.wallet import WalletManager
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
log = logging.getLogger(__name__)
def path_or_none(p) -> Optional[str]:
if not p:
return
return binascii.unhexlify(p).decode()
class FileManager:
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', wallet_manager: 'LbryWalletManager',
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', wallet_manager: 'WalletManager',
storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None):
self.loop = loop
self.config = config
@ -141,8 +131,9 @@ class FileManager:
log.info("claim contains an update to a stream we have, downloading it")
if save_file and existing_for_claim_id[0].output_file_exists:
save_file = False
await existing_for_claim_id[0].start(node=self.node, timeout=timeout, save_now=save_file)
if not existing_for_claim_id[0].output_file_exists and (save_file or file_name or download_directory):
await existing_for_claim_id[0].start(timeout=timeout, save_now=save_file)
if not existing_for_claim_id[0].output_file_exists and (
save_file or file_name or download_directory):
await existing_for_claim_id[0].save_file(
file_name=file_name, download_directory=download_directory
)
@ -176,8 +167,8 @@ class FileManager:
if not claim.stream.source.bt_infohash:
stream = ManagedStream(
self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash, download_directory,
file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash,
download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
analytics_manager=self.analytics_manager
)
else:
@ -262,29 +253,6 @@ class FileManager:
"""
return sum((manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), [])
async def _check_update_or_replace(
self, outpoint: str, claim_id: str, claim: Claim
) -> typing.Tuple[Optional[ManagedDownloadSource], Optional[ManagedDownloadSource]]:
existing = self.get_filtered(outpoint=outpoint)
if existing:
return existing[0], None
existing = self.get_filtered(sd_hash=claim.stream.source.sd_hash)
if existing and existing[0].claim_id != claim_id:
raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {claim_id}")
if existing:
log.info("claim contains a metadata only update to a stream we have")
await self.storage.save_content_claim(
existing[0].stream_hash, outpoint
)
await self._update_content_claim(existing[0])
return existing[0], None
else:
existing_for_claim_id = self.get_filtered(claim_id=claim_id)
if existing_for_claim_id:
log.info("claim contains an update to a stream we have, downloading it")
return None, existing_for_claim_id[0]
return None, None
async def delete(self, source: ManagedDownloadSource, delete_file=False):
for manager in self.source_managers.values():
return await manager.delete(source, delete_file)

View file

@ -16,20 +16,6 @@ if typing.TYPE_CHECKING:
log = logging.getLogger(__name__)
# def _get_next_available_file_name(download_directory: str, file_name: str) -> str:
# base_name, ext = os.path.splitext(os.path.basename(file_name))
# i = 0
# while os.path.isfile(os.path.join(download_directory, file_name)):
# i += 1
# file_name = "%s_%i%s" % (base_name, i, ext)
#
# return file_name
#
#
# async def get_next_available_file_name(loop: asyncio.AbstractEventLoop, download_directory: str, file_name: str) -> str:
# return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name)
class ManagedDownloadSource:
STATUS_RUNNING = "running"
STATUS_STOPPED = "stopped"
@ -71,7 +57,7 @@ class ManagedDownloadSource:
# iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedDownloadSource':
# raise NotImplementedError()
async def start(self, timeout: Optional[float] = None):
async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
raise NotImplementedError()
async def stop(self, finished: bool = False):

View file

@ -1,6 +1,5 @@
import os
import asyncio
import binascii
import logging
import typing
from typing import Optional
@ -12,7 +11,7 @@ if typing.TYPE_CHECKING:
log = logging.getLogger(__name__)
comparison_operators = {
COMPARISON_OPERATORS = {
'eq': lambda a, b: a == b,
'ne': lambda a, b: a != b,
'g': lambda a, b: a > b,
@ -22,12 +21,6 @@ comparison_operators = {
}
def path_or_none(p) -> Optional[str]:
if not p:
return
return binascii.unhexlify(p).decode()
class SourceManager:
filter_fields = {
'rowid',
@ -77,10 +70,11 @@ class SourceManager:
source.stop_tasks()
self.started.clear()
async def create(self, file_path: str, key: Optional[bytes] = None, **kw) -> ManagedDownloadSource:
async def create(self, file_path: str, key: Optional[bytes] = None,
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedDownloadSource:
raise NotImplementedError()
async def _delete(self, source: ManagedDownloadSource):
async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
raise NotImplementedError()
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
@ -101,11 +95,11 @@ class SourceManager:
"""
if sort_by and sort_by not in self.filter_fields:
raise ValueError(f"'{sort_by}' is not a valid field to sort by")
if comparison and comparison not in comparison_operators:
if comparison and comparison not in COMPARISON_OPERATORS:
raise ValueError(f"'{comparison}' is not a valid comparison")
if 'full_status' in search_by:
del search_by['full_status']
for search in search_by.keys():
for search in search_by:
if search not in self.filter_fields:
raise ValueError(f"'{search}' is not a valid search operation")
if search_by:
@ -113,7 +107,7 @@ class SourceManager:
sources = []
for stream in self._sources.values():
for search, val in search_by.items():
if comparison_operators[comparison](getattr(stream, search), val):
if COMPARISON_OPERATORS[comparison](getattr(stream, search), val):
sources.append(stream)
break
else:

View file

@ -6,7 +6,6 @@ import logging
import binascii
from typing import Optional
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
from lbry.utils import generate_id
from lbry.error import DownloadSDTimeoutError
from lbry.schema.mime_types import guess_media_type
from lbry.stream.downloader import StreamDownloader
@ -21,7 +20,6 @@ if typing.TYPE_CHECKING:
from lbry.schema.claim import Claim
from lbry.blob.blob_manager import BlobManager
from lbry.blob.blob_info import BlobInfo
from lbry.extras.daemon.storage import SQLiteStorage
from lbry.dht.node import Node
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.wallet.transaction import Transaction
@ -289,8 +287,7 @@ class ManagedStream(ManagedDownloadSource):
self.saving.clear()
self.finished_write_attempt.set()
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None,
node: Optional['Node'] = None):
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
await self.start()
if self.file_output_task and not self.file_output_task.done(): # cancel an already running save task
self.file_output_task.cancel()

View file

@ -23,24 +23,10 @@ if typing.TYPE_CHECKING:
log = logging.getLogger(__name__)
SET_FILTER_FIELDS = {
"claim_ids": "claim_id",
"channel_claim_ids": "channel_claim_id",
"outpoints": "outpoint"
}
COMPARISON_OPERATORS = {
'eq': lambda a, b: a == b,
'ne': lambda a, b: a != b,
'g': lambda a, b: a > b,
'l': lambda a, b: a < b,
'ge': lambda a, b: a >= b,
'le': lambda a, b: a <= b,
'in': lambda a, b: a in b
}
def path_or_none(p) -> Optional[str]:
if not p:
def path_or_none(encoded_path) -> Optional[str]:
if not encoded_path:
return
return binascii.unhexlify(p).decode()
return binascii.unhexlify(encoded_path).decode()
class StreamManager(SourceManager):
@ -235,60 +221,10 @@ class StreamManager(SourceManager):
await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
await self.storage.delete(stream.descriptor)
def get_filtered_streams(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False,
comparison: Optional[str] = None,
**search_by) -> typing.List[ManagedStream]:
"""
Get a list of filtered and sorted ManagedStream objects
:param sort_by: field to sort by
:param reverse: reverse sorting
:param comparison: comparison operator used for filtering
:param search_by: fields and values to filter by
"""
if sort_by and sort_by not in FILTER_FIELDS:
raise ValueError(f"'{sort_by}' is not a valid field to sort by")
if comparison and comparison not in COMPARISON_OPERATORS:
raise ValueError(f"'{comparison}' is not a valid comparison")
if 'full_status' in search_by:
del search_by['full_status']
for search in search_by:
if search not in FILTER_FIELDS:
raise ValueError(f"'{search}' is not a valid search operation")
compare_sets = {}
if isinstance(search_by.get('claim_id'), list):
compare_sets['claim_ids'] = search_by.pop('claim_id')
if isinstance(search_by.get('outpoint'), list):
compare_sets['outpoints'] = search_by.pop('outpoint')
if isinstance(search_by.get('channel_claim_id'), list):
compare_sets['channel_claim_ids'] = search_by.pop('channel_claim_id')
if search_by:
comparison = comparison or 'eq'
streams = []
for stream in self.streams.values():
matched = False
for set_search, val in compare_sets.items():
if COMPARISON_OPERATORS[comparison](getattr(stream, SET_FILTER_FIELDS[set_search]), val):
streams.append(stream)
matched = True
break
if matched:
continue
for search, val in search_by.items():
this_stream = getattr(stream, search)
if COMPARISON_OPERATORS[comparison](this_stream, val):
streams.append(stream)
break
else:
streams = list(self.streams.values())
if sort_by:
streams.sort(key=lambda s: getattr(s, sort_by))
if reverse:
streams.reverse()
return streams
async def _delete(self, source: ManagedStream, delete_file: Optional[bool] = False):
blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
await self.storage.delete_stream(source.descriptor)
async def stream_partial_content(self, request: Request, sd_hash: str):
return await self._sources[sd_hash].stream_file(request, self.node)

View file

@ -4,29 +4,29 @@ import libtorrent
NOTIFICATION_MASKS = [
"error",
"peer",
"port_mapping",
"storage",
"tracker",
"debug",
"status",
"progress",
"ip_block",
"dht",
"stats",
"session_log",
"torrent_log",
"peer_log",
"incoming_request",
"dht_log",
"dht_operation",
"port_mapping_log",
"picker_log",
"file_progress",
"piece_progress",
"upload",
"block_progress"
"error",
"peer",
"port_mapping",
"storage",
"tracker",
"debug",
"status",
"progress",
"ip_block",
"dht",
"stats",
"session_log",
"torrent_log",
"peer_log",
"incoming_request",
"dht_log",
"dht_operation",
"port_mapping_log",
"picker_log",
"file_progress",
"piece_progress",
"upload",
"block_progress"
]
@ -90,11 +90,12 @@ class TorrentSession:
'enable_incoming_tcp': True
}
self._session = await self._loop.run_in_executor(
self._executor, libtorrent.session, settings
self._executor, libtorrent.session, settings # pylint: disable=c-extension-no-member
)
await self._loop.run_in_executor(
self._executor,
lambda: self._session.add_dht_router("router.utorrent.com", 6881)
# lambda necessary due boost functions raising errors when asyncio inspects them. try removing later
lambda: self._session.add_dht_router("router.utorrent.com", 6881) # pylint: disable=unnecessary-lambda
)
self._loop.create_task(self.process_alerts())
@ -110,12 +111,11 @@ class TorrentSession:
await asyncio.sleep(1, loop=self._loop)
async def pause(self):
state = await self._loop.run_in_executor(
self._executor, lambda: self._session.save_state()
)
# print(f"state:\n{state}")
await self._loop.run_in_executor(
self._executor, lambda: self._session.pause()
self._executor, lambda: self._session.save_state() # pylint: disable=unnecessary-lambda
)
await self._loop.run_in_executor(
self._executor, lambda: self._session.pause() # pylint: disable=unnecessary-lambda
)
async def resume(self):

View file

@ -18,17 +18,17 @@ class TorrentInfo:
self.total_size = total_size
@classmethod
def from_libtorrent_info(cls, ti):
def from_libtorrent_info(cls, torrent_info):
return cls(
ti.nodes(), tuple(
torrent_info.nodes(), tuple(
{
'url': web_seed['url'],
'type': web_seed['type'],
'auth': web_seed['auth']
} for web_seed in ti.web_seeds()
} for web_seed in torrent_info.web_seeds()
), tuple(
(tracker.url, tracker.tier) for tracker in ti.trackers()
), ti.total_size()
(tracker.url, tracker.tier) for tracker in torrent_info.trackers()
), torrent_info.total_size()
)
@ -41,9 +41,11 @@ class Torrent:
def _threaded_update_status(self):
status = self._handle.status()
if not status.is_seeding:
log.info('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d) %s' % (
log.info(
'%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d) %s',
status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
status.num_peers, status.state))
status.num_peers, status.state
)
elif not self.finished.is_set():
self.finished.set()

View file

@ -302,7 +302,8 @@ class TestStreamManager(BlobExchangeTestBase):
)
self.assertEqual(stored_status, "stopped")
await stream.save_file(node=self.stream_manager.node)
stream.node = self.stream_manager.node
await stream.save_file()
await stream.finished_writing.wait()
await asyncio.sleep(0, loop=self.loop)
self.assertTrue(stream.finished)