fix standalone blob server and make it fetch from upstream
This commit is contained in:
parent
f57b1b3c3d
commit
820015872b
3 changed files with 35 additions and 15 deletions
|
@ -18,7 +18,7 @@ MAX_REQUEST_SIZE = 1200
|
||||||
|
|
||||||
class BlobServerProtocol(asyncio.Protocol):
|
class BlobServerProtocol(asyncio.Protocol):
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str,
|
def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str,
|
||||||
idle_timeout: float = 30.0, transfer_timeout: float = 60.0):
|
idle_timeout: float = 30.0, transfer_timeout: float = 60.0, blob_callback=None):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.idle_timeout = idle_timeout
|
self.idle_timeout = idle_timeout
|
||||||
|
@ -32,6 +32,7 @@ class BlobServerProtocol(asyncio.Protocol):
|
||||||
self.started_transfer = asyncio.Event(loop=self.loop)
|
self.started_transfer = asyncio.Event(loop=self.loop)
|
||||||
self.transfer_finished = asyncio.Event(loop=self.loop)
|
self.transfer_finished = asyncio.Event(loop=self.loop)
|
||||||
self.close_on_idle_task: typing.Optional[asyncio.Task] = None
|
self.close_on_idle_task: typing.Optional[asyncio.Task] = None
|
||||||
|
self.blob_handling_callback: typing.Optional[typing.Callable] = blob_callback
|
||||||
|
|
||||||
async def close_on_idle(self):
|
async def close_on_idle(self):
|
||||||
while self.transport:
|
while self.transport:
|
||||||
|
@ -92,6 +93,9 @@ class BlobServerProtocol(asyncio.Protocol):
|
||||||
|
|
||||||
if download_request:
|
if download_request:
|
||||||
blob = self.blob_manager.get_blob(download_request.requested_blob)
|
blob = self.blob_manager.get_blob(download_request.requested_blob)
|
||||||
|
if self.blob_handling_callback:
|
||||||
|
await self.blob_handling_callback(blob)
|
||||||
|
blob = self.blob_manager.get_blob(download_request.requested_blob)
|
||||||
if blob.get_is_verified():
|
if blob.get_is_verified():
|
||||||
incoming_blob = {'blob_hash': blob.blob_hash, 'length': blob.length}
|
incoming_blob = {'blob_hash': blob.blob_hash, 'length': blob.length}
|
||||||
responses.append(BlobDownloadResponse(incoming_blob=incoming_blob))
|
responses.append(BlobDownloadResponse(incoming_blob=incoming_blob))
|
||||||
|
@ -152,7 +156,7 @@ class BlobServerProtocol(asyncio.Protocol):
|
||||||
|
|
||||||
class BlobServer:
|
class BlobServer:
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str,
|
def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str,
|
||||||
idle_timeout: float = 30.0, transfer_timeout: float = 60.0):
|
idle_timeout: float = 30.0, transfer_timeout: float = 60.0, blob_callback=None):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.server_task: typing.Optional[asyncio.Task] = None
|
self.server_task: typing.Optional[asyncio.Task] = None
|
||||||
|
@ -161,6 +165,7 @@ class BlobServer:
|
||||||
self.idle_timeout = idle_timeout
|
self.idle_timeout = idle_timeout
|
||||||
self.transfer_timeout = transfer_timeout
|
self.transfer_timeout = transfer_timeout
|
||||||
self.server_protocol_class = BlobServerProtocol
|
self.server_protocol_class = BlobServerProtocol
|
||||||
|
self.blob_handling_callback: typing.Optional[typing.Callable] = blob_callback
|
||||||
|
|
||||||
def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'):
|
def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'):
|
||||||
if self.server_task is not None:
|
if self.server_task is not None:
|
||||||
|
@ -169,7 +174,8 @@ class BlobServer:
|
||||||
async def _start_server():
|
async def _start_server():
|
||||||
server = await self.loop.create_server(
|
server = await self.loop.create_server(
|
||||||
lambda: self.server_protocol_class(self.loop, self.blob_manager, self.lbrycrd_address,
|
lambda: self.server_protocol_class(self.loop, self.blob_manager, self.lbrycrd_address,
|
||||||
self.idle_timeout, self.transfer_timeout),
|
self.idle_timeout, self.transfer_timeout,
|
||||||
|
blob_callback=self.blob_handling_callback),
|
||||||
interface, port
|
interface, port
|
||||||
)
|
)
|
||||||
self.started_listening.set()
|
self.started_listening.set()
|
||||||
|
|
|
@ -48,7 +48,7 @@ async def main(blob_hash: str, url: str):
|
||||||
)
|
)
|
||||||
host = host_info[0][4][0]
|
host = host_info[0][4][0]
|
||||||
|
|
||||||
storage = SQLiteStorage(conf, os.path.join(conf.data_dir, "lbrynet.sqlite"))
|
storage = SQLiteStorage(conf, ":memory:")
|
||||||
blob_manager = BlobManager(loop, os.path.join(conf.data_dir, "blobfiles"), storage, conf)
|
blob_manager = BlobManager(loop, os.path.join(conf.data_dir, "blobfiles"), storage, conf)
|
||||||
await storage.open()
|
await storage.open()
|
||||||
await blob_manager.setup()
|
await blob_manager.setup()
|
||||||
|
|
|
@ -1,31 +1,45 @@
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from lbry.blob_exchange.client import request_blob
|
||||||
|
from lbry.utils import resolve_host
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
||||||
from lbry.blob.blob_manager import BlobManager
|
from lbry.blob.blob_manager import BlobManager
|
||||||
from lbry.blob_exchange.server import BlobServer
|
from lbry.blob_exchange.server import BlobServer
|
||||||
from lbry.schema.address import decode_address
|
|
||||||
from lbry.extras.daemon.storage import SQLiteStorage
|
from lbry.extras.daemon.storage import SQLiteStorage
|
||||||
|
from lbry.wallet import Ledger
|
||||||
|
from lbry.conf import Config
|
||||||
|
|
||||||
|
|
||||||
async def main(address: str):
|
async def main(address: str):
|
||||||
try:
|
if not Ledger.is_pubkey_address(address):
|
||||||
decode_address(address)
|
|
||||||
except:
|
|
||||||
print(f"'{address}' is not a valid lbrycrd address")
|
print(f"'{address}' is not a valid lbrycrd address")
|
||||||
return 1
|
return 1
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
|
conf = Config()
|
||||||
|
|
||||||
storage = SQLiteStorage(os.path.expanduser("~/.lbrynet/lbrynet.sqlite"))
|
async def ensure_blob(blob):
|
||||||
|
upstream_host, upstream_port = conf.fixed_peers[0]
|
||||||
|
upstream_host = await resolve_host(upstream_host, upstream_port, 'tcp')
|
||||||
|
success, proto = await request_blob(loop, blob, upstream_host, int(upstream_port), conf.peer_connect_timeout,
|
||||||
|
conf.blob_download_timeout)
|
||||||
|
print(success, proto)
|
||||||
|
if proto:
|
||||||
|
proto.close()
|
||||||
|
|
||||||
|
storage = SQLiteStorage(conf, os.path.expanduser("/tmp/lbrynet.sqlite"))
|
||||||
await storage.open()
|
await storage.open()
|
||||||
blob_manager = BlobManager(loop, os.path.expanduser("~/.lbrynet/blobfiles"), storage)
|
blob_manager = BlobManager(loop, os.path.expanduser("/tmp/blobfiles"), storage, conf)
|
||||||
await blob_manager.setup()
|
await blob_manager.setup()
|
||||||
|
|
||||||
server = await loop.create_server(
|
server = BlobServer(loop, blob_manager, address, blob_callback=ensure_blob)
|
||||||
lambda: BlobServer(loop, blob_manager, address),
|
|
||||||
'0.0.0.0', 4444)
|
|
||||||
try:
|
try:
|
||||||
async with server:
|
server.start_server(6666, '0.0.0.0')
|
||||||
await server.serve_forever()
|
while True:
|
||||||
|
await asyncio.sleep(1)
|
||||||
finally:
|
finally:
|
||||||
await storage.close()
|
await storage.close()
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue