diff --git a/lbry/blob_exchange/server.py b/lbry/blob_exchange/server.py index a2ff8b698..4c2017c0f 100644 --- a/lbry/blob_exchange/server.py +++ b/lbry/blob_exchange/server.py @@ -18,7 +18,7 @@ MAX_REQUEST_SIZE = 1200 class BlobServerProtocol(asyncio.Protocol): def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str, - idle_timeout: float = 30.0, transfer_timeout: float = 60.0): + idle_timeout: float = 30.0, transfer_timeout: float = 60.0, blob_callback=None): self.loop = loop self.blob_manager = blob_manager self.idle_timeout = idle_timeout @@ -32,6 +32,7 @@ class BlobServerProtocol(asyncio.Protocol): self.started_transfer = asyncio.Event(loop=self.loop) self.transfer_finished = asyncio.Event(loop=self.loop) self.close_on_idle_task: typing.Optional[asyncio.Task] = None + self.blob_handling_callback: typing.Optional[typing.Callable] = blob_callback async def close_on_idle(self): while self.transport: @@ -92,6 +93,9 @@ class BlobServerProtocol(asyncio.Protocol): if download_request: blob = self.blob_manager.get_blob(download_request.requested_blob) + if self.blob_handling_callback: + await self.blob_handling_callback(blob) + blob = self.blob_manager.get_blob(download_request.requested_blob) if blob.get_is_verified(): incoming_blob = {'blob_hash': blob.blob_hash, 'length': blob.length} responses.append(BlobDownloadResponse(incoming_blob=incoming_blob)) @@ -152,7 +156,7 @@ class BlobServerProtocol(asyncio.Protocol): class BlobServer: def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str, - idle_timeout: float = 30.0, transfer_timeout: float = 60.0): + idle_timeout: float = 30.0, transfer_timeout: float = 60.0, blob_callback=None): self.loop = loop self.blob_manager = blob_manager self.server_task: typing.Optional[asyncio.Task] = None @@ -161,6 +165,7 @@ class BlobServer: self.idle_timeout = idle_timeout self.transfer_timeout = transfer_timeout self.server_protocol_class = BlobServerProtocol + self.blob_handling_callback: typing.Optional[typing.Callable] = blob_callback def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'): if self.server_task is not None: @@ -169,7 +174,8 @@ class BlobServer: async def _start_server(): server = await self.loop.create_server( lambda: self.server_protocol_class(self.loop, self.blob_manager, self.lbrycrd_address, - self.idle_timeout, self.transfer_timeout), + self.idle_timeout, self.transfer_timeout, + blob_callback=self.blob_handling_callback), interface, port ) self.started_listening.set() diff --git a/scripts/download_blob_from_peer.py b/scripts/download_blob_from_peer.py index ae6721dbf..77eeb4793 100644 --- a/scripts/download_blob_from_peer.py +++ b/scripts/download_blob_from_peer.py @@ -48,7 +48,7 @@ async def main(blob_hash: str, url: str): ) host = host_info[0][4][0] - storage = SQLiteStorage(conf, os.path.join(conf.data_dir, "lbrynet.sqlite")) + storage = SQLiteStorage(conf, ":memory:") blob_manager = BlobManager(loop, os.path.join(conf.data_dir, "blobfiles"), storage, conf) await storage.open() await blob_manager.setup() diff --git a/scripts/standalone_blob_server.py b/scripts/standalone_blob_server.py index 77436ba33..da7140bad 100644 --- a/scripts/standalone_blob_server.py +++ b/scripts/standalone_blob_server.py @@ -1,31 +1,45 @@ import sys import os import asyncio +import logging + +from lbry.blob_exchange.client import request_blob +from lbry.utils import resolve_host + +logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") from lbry.blob.blob_manager import BlobManager from lbry.blob_exchange.server import BlobServer -from lbry.schema.address import decode_address from lbry.extras.daemon.storage import SQLiteStorage +from lbry.wallet import Ledger +from lbry.conf import Config async def main(address: str): - try: - decode_address(address) - except: + if not Ledger.is_pubkey_address(address): print(f"'{address}' is not a valid lbrycrd address") return 1 loop = asyncio.get_running_loop() + conf = Config() - storage = SQLiteStorage(os.path.expanduser("~/.lbrynet/lbrynet.sqlite")) + async def ensure_blob(blob): + upstream_host, upstream_port = conf.fixed_peers[0] + upstream_host = await resolve_host(upstream_host, upstream_port, 'tcp') + success, proto = await request_blob(loop, blob, upstream_host, int(upstream_port), conf.peer_connect_timeout, + conf.blob_download_timeout) + print(success, proto) + if proto: + proto.close() + + storage = SQLiteStorage(conf, os.path.expanduser("/tmp/lbrynet.sqlite")) await storage.open() - blob_manager = BlobManager(loop, os.path.expanduser("~/.lbrynet/blobfiles"), storage) + blob_manager = BlobManager(loop, os.path.expanduser("/tmp/blobfiles"), storage, conf) await blob_manager.setup() - server = await loop.create_server( - lambda: BlobServer(loop, blob_manager, address), - '0.0.0.0', 4444) + server = BlobServer(loop, blob_manager, address, blob_callback=ensure_blob) try: - async with server: - await server.serve_forever() + server.start_server(6666, '0.0.0.0') + while True: + await asyncio.sleep(1) finally: await storage.close()