forked from LBRYCommunity/lbry-sdk
adds more torrent parts
This commit is contained in:
parent
6865ddfc12
commit
dd26a96828
6 changed files with 118 additions and 41 deletions
|
@ -57,8 +57,8 @@ if typing.TYPE_CHECKING:
|
||||||
from lbry.extras.daemon.components import UPnPComponent
|
from lbry.extras.daemon.components import UPnPComponent
|
||||||
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
||||||
from lbry.extras.daemon.storage import SQLiteStorage
|
from lbry.extras.daemon.storage import SQLiteStorage
|
||||||
from lbry.stream.stream_manager import StreamManager
|
|
||||||
from lbry.wallet import WalletManager, Ledger
|
from lbry.wallet import WalletManager, Ledger
|
||||||
|
from lbry.file.file_manager import FileManager
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -372,7 +372,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
return self.component_manager.get_component(DATABASE_COMPONENT)
|
return self.component_manager.get_component(DATABASE_COMPONENT)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def file_manager(self) -> typing.Optional['StreamManager']:
|
def file_manager(self) -> typing.Optional['FileManager']:
|
||||||
return self.component_manager.get_component(FILE_MANAGER_COMPONENT)
|
return self.component_manager.get_component(FILE_MANAGER_COMPONENT)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -3447,11 +3447,11 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
|
|
||||||
stream_hash = None
|
stream_hash = None
|
||||||
if not preview:
|
if not preview:
|
||||||
old_stream = self.stream_manager.streams.get(old_txo.claim.stream.source.sd_hash, None)
|
old_stream = self.file_manager.get_filtered(sd_hash=old_txo.claim.stream.source.sd_hash)[0]
|
||||||
if file_path is not None:
|
if file_path is not None:
|
||||||
if old_stream:
|
if old_stream:
|
||||||
await self.stream_manager.delete_stream(old_stream, delete_file=False)
|
await self.file_manager.delete(old_stream, delete_file=False)
|
||||||
file_stream = await self.stream_manager.create_stream(file_path)
|
file_stream = await self.file_manager.create_stream(file_path)
|
||||||
new_txo.claim.stream.source.sd_hash = file_stream.sd_hash
|
new_txo.claim.stream.source.sd_hash = file_stream.sd_hash
|
||||||
new_txo.script.generate()
|
new_txo.script.generate()
|
||||||
stream_hash = file_stream.stream_hash
|
stream_hash = file_stream.stream_hash
|
||||||
|
|
|
@ -98,8 +98,9 @@ class FileManager:
|
||||||
raise ResolveError(f"Unexpected error resolving uri for download: {resolved_result['error']}")
|
raise ResolveError(f"Unexpected error resolving uri for download: {resolved_result['error']}")
|
||||||
if not resolved_result or uri not in resolved_result:
|
if not resolved_result or uri not in resolved_result:
|
||||||
raise ResolveError(f"Failed to resolve stream at '{uri}'")
|
raise ResolveError(f"Failed to resolve stream at '{uri}'")
|
||||||
|
|
||||||
txo = resolved_result[uri]
|
txo = resolved_result[uri]
|
||||||
|
if isinstance(txo, dict):
|
||||||
|
raise ResolveError(f"Failed to resolve stream at '{uri}': {txo}")
|
||||||
claim = txo.claim
|
claim = txo.claim
|
||||||
outpoint = f"{txo.tx_ref.id}:{txo.position}"
|
outpoint = f"{txo.tx_ref.id}:{txo.position}"
|
||||||
resolved_time = self.loop.time() - start_time
|
resolved_time = self.loop.time() - start_time
|
||||||
|
@ -179,7 +180,7 @@ class FileManager:
|
||||||
self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash,
|
self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash,
|
||||||
file_name=file_name, download_directory=download_directory or self.config.download_dir,
|
file_name=file_name, download_directory=download_directory or self.config.download_dir,
|
||||||
status=ManagedStream.STATUS_RUNNING,
|
status=ManagedStream.STATUS_RUNNING,
|
||||||
claim=claim, analytics_manager=self.analytics_manager,
|
analytics_manager=self.analytics_manager,
|
||||||
torrent_session=source_manager.torrent_session
|
torrent_session=source_manager.torrent_session
|
||||||
)
|
)
|
||||||
log.info("starting download for %s", uri)
|
log.info("starting download for %s", uri)
|
||||||
|
|
|
@ -74,11 +74,7 @@ class SourceManager:
|
||||||
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedDownloadSource:
|
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedDownloadSource:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
|
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
|
||||||
await self._delete(source)
|
|
||||||
self.remove(source)
|
self.remove(source)
|
||||||
if delete_file and source.output_file_exists:
|
if delete_file and source.output_file_exists:
|
||||||
os.remove(source.full_path)
|
os.remove(source.full_path)
|
||||||
|
|
|
@ -32,7 +32,7 @@ def path_or_none(encoded_path) -> Optional[str]:
|
||||||
class StreamManager(SourceManager):
|
class StreamManager(SourceManager):
|
||||||
_sources: typing.Dict[str, ManagedStream]
|
_sources: typing.Dict[str, ManagedStream]
|
||||||
|
|
||||||
filter_fields = set(SourceManager.filter_fields)
|
filter_fields = SourceManager.filter_fields
|
||||||
filter_fields.update({
|
filter_fields.update({
|
||||||
'sd_hash',
|
'sd_hash',
|
||||||
'stream_hash',
|
'stream_hash',
|
||||||
|
@ -180,6 +180,7 @@ class StreamManager(SourceManager):
|
||||||
self.re_reflect_task = self.loop.create_task(self.reflect_streams())
|
self.re_reflect_task = self.loop.create_task(self.reflect_streams())
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
super().stop()
|
||||||
if self.resume_saving_task and not self.resume_saving_task.done():
|
if self.resume_saving_task and not self.resume_saving_task.done():
|
||||||
self.resume_saving_task.cancel()
|
self.resume_saving_task.cancel()
|
||||||
if self.re_reflect_task and not self.re_reflect_task.done():
|
if self.re_reflect_task and not self.re_reflect_task.done():
|
||||||
|
@ -206,16 +207,30 @@ class StreamManager(SourceManager):
|
||||||
)
|
)
|
||||||
return task
|
return task
|
||||||
|
|
||||||
async def create_stream(self, file_path: str, key: Optional[bytes] = None,
|
async def create(self, file_path: str, key: Optional[bytes] = None,
|
||||||
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
|
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
|
||||||
stream = await ManagedStream.create(self.loop, self.config, self.blob_manager, file_path, key, iv_generator)
|
descriptor = await StreamDescriptor.create_stream(
|
||||||
|
self.loop, self.blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator,
|
||||||
|
blob_completed_callback=self.blob_manager.blob_completed
|
||||||
|
)
|
||||||
|
await self.storage.store_stream(
|
||||||
|
self.blob_manager.get_blob(descriptor.sd_hash), descriptor
|
||||||
|
)
|
||||||
|
row_id = await self.storage.save_published_file(
|
||||||
|
descriptor.stream_hash, os.path.basename(file_path), os.path.dirname(file_path), 0
|
||||||
|
)
|
||||||
|
stream = ManagedStream(
|
||||||
|
self.loop, self.config, self.blob_manager, descriptor.sd_hash, os.path.dirname(file_path),
|
||||||
|
os.path.basename(file_path), status=ManagedDownloadSource.STATUS_FINISHED,
|
||||||
|
rowid=row_id, descriptor=descriptor
|
||||||
|
)
|
||||||
self.streams[stream.sd_hash] = stream
|
self.streams[stream.sd_hash] = stream
|
||||||
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
|
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
|
||||||
if self.config.reflect_streams and self.config.reflector_servers:
|
if self.config.reflect_streams and self.config.reflector_servers:
|
||||||
self.reflect_stream(stream)
|
self.reflect_stream(stream)
|
||||||
return stream
|
return stream
|
||||||
|
|
||||||
async def delete_stream(self, stream: ManagedStream, delete_file: Optional[bool] = False):
|
async def delete(self, stream: ManagedStream, delete_file: Optional[bool] = False):
|
||||||
if stream.sd_hash in self.running_reflector_uploads:
|
if stream.sd_hash in self.running_reflector_uploads:
|
||||||
self.running_reflector_uploads[stream.sd_hash].cancel()
|
self.running_reflector_uploads[stream.sd_hash].cancel()
|
||||||
stream.stop_tasks()
|
stream.stop_tasks()
|
||||||
|
@ -223,12 +238,9 @@ class StreamManager(SourceManager):
|
||||||
del self.streams[stream.sd_hash]
|
del self.streams[stream.sd_hash]
|
||||||
blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]
|
blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]
|
||||||
await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
|
await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
|
||||||
await self.storage.delete(stream.descriptor)
|
await self.storage.delete_stream(stream.descriptor)
|
||||||
|
if delete_file and stream.output_file_exists:
|
||||||
async def _delete(self, source: ManagedStream, delete_file: Optional[bool] = False):
|
os.remove(stream.full_path)
|
||||||
blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
|
|
||||||
await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
|
|
||||||
await self.storage.delete_stream(source.descriptor)
|
|
||||||
|
|
||||||
async def stream_partial_content(self, request: Request, sd_hash: str):
|
async def stream_partial_content(self, request: Request, sd_hash: str):
|
||||||
return await self._sources[sd_hash].stream_file(request, self.node)
|
return await self._sources[sd_hash].stream_file(request, self.node)
|
||||||
|
|
|
@ -1,5 +1,8 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import binascii
|
import binascii
|
||||||
|
import os
|
||||||
|
from hashlib import sha1
|
||||||
|
from tempfile import mkdtemp
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
import libtorrent
|
import libtorrent
|
||||||
|
@ -33,10 +36,8 @@ NOTIFICATION_MASKS = [
|
||||||
|
|
||||||
|
|
||||||
DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted?
|
DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted?
|
||||||
libtorrent.add_torrent_params_flags_t.flag_paused
|
libtorrent.add_torrent_params_flags_t.flag_auto_managed
|
||||||
| libtorrent.add_torrent_params_flags_t.flag_auto_managed
|
|
||||||
| libtorrent.add_torrent_params_flags_t.flag_duplicate_is_error
|
| libtorrent.add_torrent_params_flags_t.flag_duplicate_is_error
|
||||||
| libtorrent.add_torrent_params_flags_t.flag_upload_mode
|
|
||||||
| libtorrent.add_torrent_params_flags_t.flag_update_subscribe
|
| libtorrent.add_torrent_params_flags_t.flag_update_subscribe
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -52,15 +53,23 @@ class TorrentHandle:
|
||||||
def __init__(self, loop, executor, handle):
|
def __init__(self, loop, executor, handle):
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
self._executor = executor
|
self._executor = executor
|
||||||
self._handle = handle
|
self._handle: libtorrent.torrent_handle = handle
|
||||||
self.finished = asyncio.Event(loop=loop)
|
self.finished = asyncio.Event(loop=loop)
|
||||||
|
self.metadata_completed = asyncio.Event(loop=loop)
|
||||||
|
|
||||||
def _show_status(self):
|
def _show_status(self):
|
||||||
|
# fixme: cleanup
|
||||||
status = self._handle.status()
|
status = self._handle.status()
|
||||||
|
if status.has_metadata:
|
||||||
|
self.metadata_completed.set()
|
||||||
|
# metadata: libtorrent.torrent_info = self._handle.get_torrent_info()
|
||||||
|
# print(metadata)
|
||||||
|
# print(metadata.files())
|
||||||
|
# print(type(self._handle))
|
||||||
if not status.is_seeding:
|
if not status.is_seeding:
|
||||||
print('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d) %s' % (
|
print('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s' % (
|
||||||
status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
|
status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
|
||||||
status.num_peers, status.state))
|
status.num_peers, status.num_seeds, status.state, status.save_path))
|
||||||
elif not self.finished.is_set():
|
elif not self.finished.is_set():
|
||||||
self.finished.set()
|
self.finished.set()
|
||||||
print("finished!")
|
print("finished!")
|
||||||
|
@ -72,7 +81,7 @@ class TorrentHandle:
|
||||||
)
|
)
|
||||||
if self.finished.is_set():
|
if self.finished.is_set():
|
||||||
break
|
break
|
||||||
await asyncio.sleep(1, loop=self._loop)
|
await asyncio.sleep(0.1, loop=self._loop)
|
||||||
|
|
||||||
async def pause(self):
|
async def pause(self):
|
||||||
await self._loop.run_in_executor(
|
await self._loop.run_in_executor(
|
||||||
|
@ -89,25 +98,44 @@ class TorrentSession:
|
||||||
def __init__(self, loop, executor):
|
def __init__(self, loop, executor):
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
self._executor = executor
|
self._executor = executor
|
||||||
self._session = None
|
self._session: Optional[libtorrent.session] = None
|
||||||
self._handles = {}
|
self._handles = {}
|
||||||
|
|
||||||
async def bind(self, interface: str = '0.0.0.0', port: int = 6881):
|
async def add_fake_torrent(self):
|
||||||
|
dir = mkdtemp()
|
||||||
|
info, btih = self._create_fake(dir)
|
||||||
|
flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode
|
||||||
|
handle = self._session.add_torrent({
|
||||||
|
'ti': info, 'save_path': dir, 'flags': flags
|
||||||
|
})
|
||||||
|
self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
|
||||||
|
return btih
|
||||||
|
|
||||||
|
def _create_fake(self, dir):
|
||||||
|
# beware, that's just for testing
|
||||||
|
path = os.path.join(dir, 'tmp')
|
||||||
|
with open(path, 'wb') as myfile:
|
||||||
|
size = myfile.write(b'0' * 40 * 1024 * 1024)
|
||||||
|
fs = libtorrent.file_storage()
|
||||||
|
fs.add_file('tmp', size)
|
||||||
|
print(fs.file_path(0))
|
||||||
|
t = libtorrent.create_torrent(fs, 0, 4 * 1024 * 1024)
|
||||||
|
libtorrent.set_piece_hashes(t, dir)
|
||||||
|
info = libtorrent.torrent_info(t.generate())
|
||||||
|
btih = sha1(info.metadata()).hexdigest()
|
||||||
|
return info, btih
|
||||||
|
|
||||||
|
async def bind(self, interface: str = '0.0.0.0', port: int = 10889):
|
||||||
settings = {
|
settings = {
|
||||||
'listen_interfaces': f"{interface}:{port}",
|
'listen_interfaces': f"{interface}:{port}",
|
||||||
'enable_outgoing_utp': True,
|
'enable_outgoing_utp': True,
|
||||||
'enable_incoming_utp': True,
|
'enable_incoming_utp': True,
|
||||||
'enable_outgoing_tcp': True,
|
'enable_outgoing_tcp': False,
|
||||||
'enable_incoming_tcp': True
|
'enable_incoming_tcp': False
|
||||||
}
|
}
|
||||||
self._session = await self._loop.run_in_executor(
|
self._session = await self._loop.run_in_executor(
|
||||||
self._executor, libtorrent.session, settings # pylint: disable=c-extension-no-member
|
self._executor, libtorrent.session, settings # pylint: disable=c-extension-no-member
|
||||||
)
|
)
|
||||||
await self._loop.run_in_executor(
|
|
||||||
self._executor,
|
|
||||||
# lambda necessary due boost functions raising errors when asyncio inspects them. try removing later
|
|
||||||
lambda: self._session.add_dht_router("router.utorrent.com", 6881) # pylint: disable=unnecessary-lambda
|
|
||||||
)
|
|
||||||
self._loop.create_task(self.process_alerts())
|
self._loop.create_task(self.process_alerts())
|
||||||
|
|
||||||
def _pop_alerts(self):
|
def _pop_alerts(self):
|
||||||
|
@ -135,17 +163,25 @@ class TorrentSession:
|
||||||
)
|
)
|
||||||
|
|
||||||
def _add_torrent(self, btih: str, download_directory: Optional[str]):
|
def _add_torrent(self, btih: str, download_directory: Optional[str]):
|
||||||
params = {'info_hash': binascii.unhexlify(btih.encode()), 'flags': DEFAULT_FLAGS}
|
params = {'info_hash': binascii.unhexlify(btih.encode())}
|
||||||
|
flags = DEFAULT_FLAGS
|
||||||
|
print(bin(flags))
|
||||||
|
flags ^= libtorrent.add_torrent_params_flags_t.flag_paused
|
||||||
|
# flags ^= libtorrent.add_torrent_params_flags_t.flag_auto_managed
|
||||||
|
# flags ^= libtorrent.add_torrent_params_flags_t.flag_stop_when_ready
|
||||||
|
print(bin(flags))
|
||||||
|
# params['flags'] = flags
|
||||||
if download_directory:
|
if download_directory:
|
||||||
params['save_path'] = download_directory
|
params['save_path'] = download_directory
|
||||||
self._handles[btih] = TorrentHandle(self._loop, self._executor, self._session.add_torrent(params))
|
handle = self._handles[btih] = TorrentHandle(self._loop, self._executor, self._session.add_torrent(params))
|
||||||
|
handle._handle.force_dht_announce()
|
||||||
|
|
||||||
async def add_torrent(self, btih, download_path):
|
async def add_torrent(self, btih, download_path):
|
||||||
await self._loop.run_in_executor(
|
await self._loop.run_in_executor(
|
||||||
self._executor, self._add_torrent, btih, download_path
|
self._executor, self._add_torrent, btih, download_path
|
||||||
)
|
)
|
||||||
self._loop.create_task(self._handles[btih].status_loop())
|
self._loop.create_task(self._handles[btih].status_loop())
|
||||||
await self._handles[btih].finished.wait()
|
await self._handles[btih].metadata_completed.wait()
|
||||||
|
|
||||||
async def remove_torrent(self, btih, remove_files=False):
|
async def remove_torrent(self, btih, remove_files=False):
|
||||||
if btih in self._handles:
|
if btih in self._handles:
|
||||||
|
@ -156,3 +192,31 @@ class TorrentSession:
|
||||||
|
|
||||||
def get_magnet_uri(btih):
|
def get_magnet_uri(btih):
|
||||||
return f"magnet:?xt=urn:btih:{btih}"
|
return f"magnet:?xt=urn:btih:{btih}"
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
if os.path.exists("~/Downloads/ubuntu-18.04.3-live-server-amd64.torrent"):
|
||||||
|
os.remove("~/Downloads/ubuntu-18.04.3-live-server-amd64.torrent")
|
||||||
|
if os.path.exists("~/Downloads/ubuntu-18.04.3-live-server-amd64.iso"):
|
||||||
|
os.remove("~/Downloads/ubuntu-18.04.3-live-server-amd64.iso")
|
||||||
|
|
||||||
|
btih = "dd8255ecdc7ca55fb0bbf81323d87062db1f6d1c"
|
||||||
|
|
||||||
|
executor = None
|
||||||
|
session = TorrentSession(asyncio.get_event_loop(), executor)
|
||||||
|
session2 = TorrentSession(asyncio.get_event_loop(), executor)
|
||||||
|
await session.bind('localhost', port=4040)
|
||||||
|
await session2.bind('localhost', port=4041)
|
||||||
|
btih = await session.add_fake_torrent()
|
||||||
|
session2._session.add_dht_node(('localhost', 4040))
|
||||||
|
await session2.add_torrent(btih, "/tmp/down")
|
||||||
|
print('added')
|
||||||
|
while True:
|
||||||
|
print("idling")
|
||||||
|
await asyncio.sleep(100)
|
||||||
|
await session.pause()
|
||||||
|
executor.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
|
|
|
@ -26,6 +26,10 @@ def path_or_none(encoded_path) -> Optional[str]:
|
||||||
|
|
||||||
class TorrentSource(ManagedDownloadSource):
|
class TorrentSource(ManagedDownloadSource):
|
||||||
STATUS_STOPPED = "stopped"
|
STATUS_STOPPED = "stopped"
|
||||||
|
filter_fields = SourceManager.filter_fields
|
||||||
|
filter_fields.update({
|
||||||
|
'bt_infohash'
|
||||||
|
})
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str,
|
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str,
|
||||||
file_name: Optional[str] = None, download_directory: Optional[str] = None,
|
file_name: Optional[str] = None, download_directory: Optional[str] = None,
|
||||||
|
|
Loading…
Add table
Reference in a new issue