adds more torrent parts

This commit is contained in:
Victor Shyba 2020-02-07 12:32:39 -03:00
parent 507db5f79a
commit 34c6e09e6f
6 changed files with 118 additions and 198 deletions

View file

@ -55,8 +55,8 @@ if typing.TYPE_CHECKING:
from lbry.extras.daemon.components import UPnPComponent
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
from lbry.extras.daemon.storage import SQLiteStorage
from lbry.stream.stream_manager import StreamManager
from lbry.wallet import WalletManager, Ledger
from lbry.file.file_manager import FileManager
log = logging.getLogger(__name__)
@ -341,7 +341,7 @@ class Daemon(metaclass=JSONRPCServerType):
return self.component_manager.get_component(DATABASE_COMPONENT)
@property
def file_manager(self) -> typing.Optional['StreamManager']:
def file_manager(self) -> typing.Optional['FileManager']:
return self.component_manager.get_component(FILE_MANAGER_COMPONENT)
@property
@ -3346,11 +3346,11 @@ class Daemon(metaclass=JSONRPCServerType):
stream_hash = None
if not preview:
old_stream = self.stream_manager.streams.get(old_txo.claim.stream.source.sd_hash, None)
old_stream = self.file_manager.get_filtered(sd_hash=old_txo.claim.stream.source.sd_hash)[0]
if file_path is not None:
if old_stream:
await self.stream_manager.delete_stream(old_stream, delete_file=False)
file_stream = await self.stream_manager.create_stream(file_path)
await self.file_manager.delete(old_stream, delete_file=False)
file_stream = await self.file_manager.create_stream(file_path)
new_txo.claim.stream.source.sd_hash = file_stream.sd_hash
new_txo.script.generate()
stream_hash = file_stream.stream_hash

View file

@ -98,8 +98,9 @@ class FileManager:
raise ResolveError(f"Unexpected error resolving uri for download: {resolved_result['error']}")
if not resolved_result or uri not in resolved_result:
raise ResolveError(f"Failed to resolve stream at '{uri}'")
txo = resolved_result[uri]
if isinstance(txo, dict):
raise ResolveError(f"Failed to resolve stream at '{uri}': {txo}")
claim = txo.claim
outpoint = f"{txo.tx_ref.id}:{txo.position}"
resolved_time = self.loop.time() - start_time
@ -179,7 +180,7 @@ class FileManager:
self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash,
file_name=file_name, download_directory=download_directory or self.config.download_dir,
status=ManagedStream.STATUS_RUNNING,
claim=claim, analytics_manager=self.analytics_manager,
analytics_manager=self.analytics_manager,
torrent_session=source_manager.torrent_session
)
log.info("starting download for %s", uri)

View file

@ -74,11 +74,7 @@ class SourceManager:
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedDownloadSource:
raise NotImplementedError()
async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
raise NotImplementedError()
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
await self._delete(source)
self.remove(source)
if delete_file and source.output_file_exists:
os.remove(source.full_path)

View file

@ -32,7 +32,7 @@ def path_or_none(encoded_path) -> Optional[str]:
class StreamManager(SourceManager):
_sources: typing.Dict[str, ManagedStream]
filter_fields = set(SourceManager.filter_fields)
filter_fields = SourceManager.filter_fields
filter_fields.update({
'sd_hash',
'stream_hash',
@ -180,6 +180,7 @@ class StreamManager(SourceManager):
self.re_reflect_task = self.loop.create_task(self.reflect_streams())
def stop(self):
super().stop()
if self.resume_saving_task and not self.resume_saving_task.done():
self.resume_saving_task.cancel()
if self.re_reflect_task and not self.re_reflect_task.done():
@ -206,16 +207,30 @@ class StreamManager(SourceManager):
)
return task
async def create_stream(self, file_path: str, key: Optional[bytes] = None,
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
stream = await ManagedStream.create(self.loop, self.config, self.blob_manager, file_path, key, iv_generator)
async def create(self, file_path: str, key: Optional[bytes] = None,
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
descriptor = await StreamDescriptor.create_stream(
self.loop, self.blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator,
blob_completed_callback=self.blob_manager.blob_completed
)
await self.storage.store_stream(
self.blob_manager.get_blob(descriptor.sd_hash), descriptor
)
row_id = await self.storage.save_published_file(
descriptor.stream_hash, os.path.basename(file_path), os.path.dirname(file_path), 0
)
stream = ManagedStream(
self.loop, self.config, self.blob_manager, descriptor.sd_hash, os.path.dirname(file_path),
os.path.basename(file_path), status=ManagedDownloadSource.STATUS_FINISHED,
rowid=row_id, descriptor=descriptor
)
self.streams[stream.sd_hash] = stream
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
if self.config.reflect_streams and self.config.reflector_servers:
self.reflect_stream(stream)
return stream
async def delete_stream(self, stream: ManagedStream, delete_file: Optional[bool] = False):
async def delete(self, stream: ManagedStream, delete_file: Optional[bool] = False):
if stream.sd_hash in self.running_reflector_uploads:
self.running_reflector_uploads[stream.sd_hash].cancel()
stream.stop_tasks()
@ -223,151 +238,10 @@ class StreamManager(SourceManager):
del self.streams[stream.sd_hash]
blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]
await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
await self.storage.delete(stream.descriptor)
await self.storage.delete_stream(stream.descriptor)
if delete_file and stream.output_file_exists:
os.remove(stream.full_path)
# @cache_concurrent
# async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
# timeout: Optional[float] = None,
# file_name: Optional[str] = None,
# download_directory: Optional[str] = None,
# save_file: Optional[bool] = None,
# resolve_timeout: float = 3.0,
# wallet: Optional['Wallet'] = None) -> ManagedStream:
# manager = self.wallet_manager
# wallet = wallet or manager.default_wallet
# timeout = timeout or self.config.download_timeout
# start_time = self.loop.time()
# resolved_time = None
# stream = None
# txo: Optional[Output] = None
# error = None
# outpoint = None
# if save_file is None:
# save_file = self.config.save_files
# if file_name and not save_file:
# save_file = True
# if save_file:
# download_directory = download_directory or self.config.download_dir
# else:
# download_directory = None
#
# payment = None
# try:
# # resolve the claim
# if not URL.parse(uri).has_stream:
# raise ResolveError("cannot download a channel claim, specify a /path")
# try:
# response = await asyncio.wait_for(
# manager.ledger.resolve(wallet.accounts, [uri]),
# resolve_timeout
# )
# resolved_result = self._convert_to_old_resolve_output(manager, response)
# except asyncio.TimeoutError:
# raise ResolveTimeoutError(uri)
# except Exception as err:
# if isinstance(err, asyncio.CancelledError):
# raise
# log.exception("Unexpected error resolving stream:")
# raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
# await self.storage.save_claims_for_resolve([
# value for value in resolved_result.values() if 'error' not in value
# ])
# resolved = resolved_result.get(uri, {})
# resolved = resolved if 'value' in resolved else resolved.get('claim')
# if not resolved:
# raise ResolveError(f"Failed to resolve stream at '{uri}'")
# if 'error' in resolved:
# raise ResolveError(f"error resolving stream: {resolved['error']}")
# txo = response[uri]
#
# claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf']))
# outpoint = f"{resolved['txid']}:{resolved['nout']}"
# resolved_time = self.loop.time() - start_time
#
# # resume or update an existing stream, if the stream changed: download it and delete the old one after
# updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim)
# if updated_stream:
# log.info("already have stream for %s", uri)
# if save_file and updated_stream.output_file_exists:
# save_file = False
# await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file)
# if not updated_stream.output_file_exists and (save_file or file_name or download_directory):
# await updated_stream.save_file(
# file_name=file_name, download_directory=download_directory, node=self.node
# )
# return updated_stream
#
# if not to_replace and txo.has_price and not txo.purchase_receipt:
# payment = await manager.create_purchase_transaction(
# wallet.accounts, txo, exchange_rate_manager
# )
#
# stream = ManagedStream(
# self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory,
# file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
# analytics_manager=self.analytics_manager
# )
# log.info("starting download for %s", uri)
#
# before_download = self.loop.time()
# await stream.start(self.node, timeout)
# stream.set_claim(resolved, claim)
# if to_replace: # delete old stream now that the replacement has started downloading
# await self.delete(to_replace)
#
# if payment is not None:
# await manager.broadcast_or_release(payment)
# payment = None # to avoid releasing in `finally` later
# log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri)
# await self.storage.save_content_fee(stream.stream_hash, stream.content_fee)
#
# self._sources[stream.sd_hash] = stream
# self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
# await self.storage.save_content_claim(stream.stream_hash, outpoint)
# if save_file:
# await asyncio.wait_for(stream.save_file(node=self.node), timeout - (self.loop.time() - before_download),
# loop=self.loop)
# return stream
# except asyncio.TimeoutError:
# error = DownloadDataTimeoutError(stream.sd_hash)
# raise error
# except Exception as err: # forgive data timeout, don't delete stream
# expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
# KeyFeeAboveMaxAllowedError)
# if isinstance(err, expected):
# log.warning("Failed to download %s: %s", uri, str(err))
# elif isinstance(err, asyncio.CancelledError):
# pass
# else:
# log.exception("Unexpected error downloading stream:")
# error = err
# raise
# finally:
# if payment is not None:
# # payment is set to None after broadcasting, if we're here an exception probably happened
# await manager.ledger.release_tx(payment)
# if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
# stream.downloader.time_to_first_bytes))):
# server = self.wallet_manager.ledger.network.client.server
# self.loop.create_task(
# self.analytics_manager.send_time_to_first_bytes(
# resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id,
# uri, outpoint,
# None if not stream else len(stream.downloader.blob_downloader.active_connections),
# None if not stream else len(stream.downloader.blob_downloader.scores),
# None if not stream else len(stream.downloader.blob_downloader.connection_failures),
# False if not stream else stream.downloader.added_fixed_peers,
# self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay,
# None if not stream else stream.sd_hash,
# None if not stream else stream.downloader.time_to_descriptor,
# None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
# None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
# None if not stream else stream.downloader.time_to_first_bytes,
# None if not error else error.__class__.__name__,
# None if not error else str(error),
# None if not server else f"{server[0]}:{server[1]}"
# )
# )
# =======
# self.running_reflector_uploads.pop().cancel()
# super().stop()
@ -385,21 +259,6 @@ class StreamManager(SourceManager):
#
# async def create(self, file_path: str, key: Optional[bytes] = None,
# iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
# descriptor = await StreamDescriptor.create_stream(
# self.loop, self.blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator,
# blob_completed_callback=self.blob_manager.blob_completed
# )
# await self.storage.store_stream(
# self.blob_manager.get_blob(descriptor.sd_hash), descriptor
# )
# row_id = await self.storage.save_published_file(
# descriptor.stream_hash, os.path.basename(file_path), os.path.dirname(file_path), 0
# )
# source = ManagedStream(
# self.loop, self.config, self.blob_manager, descriptor.sd_hash, os.path.dirname(file_path),
# os.path.basename(file_path), status=ManagedDownloadSource.STATUS_FINISHED,
# rowid=row_id, descriptor=descriptor
# )
# self.add(source)
# if self.config.reflect_streams:
# self._upload_stream_to_reflector(source)
@ -407,10 +266,6 @@ class StreamManager(SourceManager):
#
# async def _delete(self, stream: ManagedStream, delete_file: Optional[bool] = False):
# >>>>>>> ManagedDownloadSource and SourceManager refactor
async def _delete(self, source: ManagedStream, delete_file: Optional[bool] = False):
blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
await self.storage.delete_stream(source.descriptor)
async def stream_partial_content(self, request: Request, sd_hash: str):
return await self._sources[sd_hash].stream_file(request, self.node)

View file

@ -1,5 +1,8 @@
import asyncio
import binascii
import os
from hashlib import sha1
from tempfile import mkdtemp
from typing import Optional
import libtorrent
@ -33,10 +36,8 @@ NOTIFICATION_MASKS = [
DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted?
libtorrent.add_torrent_params_flags_t.flag_paused
| libtorrent.add_torrent_params_flags_t.flag_auto_managed
libtorrent.add_torrent_params_flags_t.flag_auto_managed
| libtorrent.add_torrent_params_flags_t.flag_duplicate_is_error
| libtorrent.add_torrent_params_flags_t.flag_upload_mode
| libtorrent.add_torrent_params_flags_t.flag_update_subscribe
)
@ -52,15 +53,23 @@ class TorrentHandle:
def __init__(self, loop, executor, handle):
self._loop = loop
self._executor = executor
self._handle = handle
self._handle: libtorrent.torrent_handle = handle
self.finished = asyncio.Event(loop=loop)
self.metadata_completed = asyncio.Event(loop=loop)
def _show_status(self):
# fixme: cleanup
status = self._handle.status()
if status.has_metadata:
self.metadata_completed.set()
# metadata: libtorrent.torrent_info = self._handle.get_torrent_info()
# print(metadata)
# print(metadata.files())
# print(type(self._handle))
if not status.is_seeding:
print('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d) %s' % (
print('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s' % (
status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
status.num_peers, status.state))
status.num_peers, status.num_seeds, status.state, status.save_path))
elif not self.finished.is_set():
self.finished.set()
print("finished!")
@ -72,7 +81,7 @@ class TorrentHandle:
)
if self.finished.is_set():
break
await asyncio.sleep(1, loop=self._loop)
await asyncio.sleep(0.1, loop=self._loop)
async def pause(self):
await self._loop.run_in_executor(
@ -89,25 +98,44 @@ class TorrentSession:
def __init__(self, loop, executor):
self._loop = loop
self._executor = executor
self._session = None
self._session: Optional[libtorrent.session] = None
self._handles = {}
async def bind(self, interface: str = '0.0.0.0', port: int = 6881):
async def add_fake_torrent(self):
dir = mkdtemp()
info, btih = self._create_fake(dir)
flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode
handle = self._session.add_torrent({
'ti': info, 'save_path': dir, 'flags': flags
})
self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
return btih
def _create_fake(self, dir):
# beware, that's just for testing
path = os.path.join(dir, 'tmp')
with open(path, 'wb') as myfile:
size = myfile.write(b'0' * 40 * 1024 * 1024)
fs = libtorrent.file_storage()
fs.add_file('tmp', size)
print(fs.file_path(0))
t = libtorrent.create_torrent(fs, 0, 4 * 1024 * 1024)
libtorrent.set_piece_hashes(t, dir)
info = libtorrent.torrent_info(t.generate())
btih = sha1(info.metadata()).hexdigest()
return info, btih
async def bind(self, interface: str = '0.0.0.0', port: int = 10889):
settings = {
'listen_interfaces': f"{interface}:{port}",
'enable_outgoing_utp': True,
'enable_incoming_utp': True,
'enable_outgoing_tcp': True,
'enable_incoming_tcp': True
'enable_outgoing_tcp': False,
'enable_incoming_tcp': False
}
self._session = await self._loop.run_in_executor(
self._executor, libtorrent.session, settings # pylint: disable=c-extension-no-member
)
await self._loop.run_in_executor(
self._executor,
# lambda necessary due boost functions raising errors when asyncio inspects them. try removing later
lambda: self._session.add_dht_router("router.utorrent.com", 6881) # pylint: disable=unnecessary-lambda
)
self._loop.create_task(self.process_alerts())
def _pop_alerts(self):
@ -135,17 +163,25 @@ class TorrentSession:
)
def _add_torrent(self, btih: str, download_directory: Optional[str]):
params = {'info_hash': binascii.unhexlify(btih.encode()), 'flags': DEFAULT_FLAGS}
params = {'info_hash': binascii.unhexlify(btih.encode())}
flags = DEFAULT_FLAGS
print(bin(flags))
flags ^= libtorrent.add_torrent_params_flags_t.flag_paused
# flags ^= libtorrent.add_torrent_params_flags_t.flag_auto_managed
# flags ^= libtorrent.add_torrent_params_flags_t.flag_stop_when_ready
print(bin(flags))
# params['flags'] = flags
if download_directory:
params['save_path'] = download_directory
self._handles[btih] = TorrentHandle(self._loop, self._executor, self._session.add_torrent(params))
handle = self._handles[btih] = TorrentHandle(self._loop, self._executor, self._session.add_torrent(params))
handle._handle.force_dht_announce()
async def add_torrent(self, btih, download_path):
await self._loop.run_in_executor(
self._executor, self._add_torrent, btih, download_path
)
self._loop.create_task(self._handles[btih].status_loop())
await self._handles[btih].finished.wait()
await self._handles[btih].metadata_completed.wait()
async def remove_torrent(self, btih, remove_files=False):
if btih in self._handles:
@ -156,3 +192,31 @@ class TorrentSession:
def get_magnet_uri(btih):
return f"magnet:?xt=urn:btih:{btih}"
async def main():
if os.path.exists("~/Downloads/ubuntu-18.04.3-live-server-amd64.torrent"):
os.remove("~/Downloads/ubuntu-18.04.3-live-server-amd64.torrent")
if os.path.exists("~/Downloads/ubuntu-18.04.3-live-server-amd64.iso"):
os.remove("~/Downloads/ubuntu-18.04.3-live-server-amd64.iso")
btih = "dd8255ecdc7ca55fb0bbf81323d87062db1f6d1c"
executor = None
session = TorrentSession(asyncio.get_event_loop(), executor)
session2 = TorrentSession(asyncio.get_event_loop(), executor)
await session.bind('localhost', port=4040)
await session2.bind('localhost', port=4041)
btih = await session.add_fake_torrent()
session2._session.add_dht_node(('localhost', 4040))
await session2.add_torrent(btih, "/tmp/down")
print('added')
while True:
print("idling")
await asyncio.sleep(100)
await session.pause()
executor.shutdown()
if __name__ == "__main__":
asyncio.run(main())

View file

@ -26,6 +26,10 @@ def path_or_none(encoded_path) -> Optional[str]:
class TorrentSource(ManagedDownloadSource):
STATUS_STOPPED = "stopped"
filter_fields = SourceManager.filter_fields
filter_fields.update({
'bt_infohash'
})
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str,
file_name: Optional[str] = None, download_directory: Optional[str] = None,