forked from LBRYCommunity/lbry-sdk
Compare commits
10 commits
master
...
supernode_
Author | SHA1 | Date | |
---|---|---|---|
|
d14eb24141 | ||
|
820015872b | ||
|
f57b1b3c3d | ||
|
a85ceb3cca | ||
|
254ae66ed3 | ||
|
3206fe873e | ||
|
63046fce58 | ||
|
b76863d055 | ||
|
d7ed74ab94 | ||
|
7388f5f1cd |
11 changed files with 139 additions and 33 deletions
38
docker/Dockerfile.dht_node
Normal file
38
docker/Dockerfile.dht_node
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
FROM debian:10-slim
|
||||||
|
|
||||||
|
ARG user=lbry
|
||||||
|
ARG projects_dir=/home/$user
|
||||||
|
|
||||||
|
ARG DOCKER_TAG
|
||||||
|
ARG DOCKER_COMMIT=docker
|
||||||
|
ENV DOCKER_TAG=$DOCKER_TAG DOCKER_COMMIT=$DOCKER_COMMIT
|
||||||
|
|
||||||
|
RUN apt-get update && \
|
||||||
|
apt-get -y --no-install-recommends install \
|
||||||
|
wget \
|
||||||
|
automake libtool \
|
||||||
|
tar unzip \
|
||||||
|
build-essential \
|
||||||
|
pkg-config \
|
||||||
|
libleveldb-dev \
|
||||||
|
python3.7 \
|
||||||
|
python3-dev \
|
||||||
|
python3-pip \
|
||||||
|
python3-wheel \
|
||||||
|
python3-setuptools && \
|
||||||
|
update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 && \
|
||||||
|
rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
RUN groupadd -g 999 $user && useradd -m -u 999 -g $user $user
|
||||||
|
|
||||||
|
COPY . $projects_dir
|
||||||
|
RUN chown -R $user:$user $projects_dir
|
||||||
|
|
||||||
|
USER $user
|
||||||
|
WORKDIR $projects_dir
|
||||||
|
|
||||||
|
RUN make install
|
||||||
|
RUN python3 docker/set_build.py
|
||||||
|
RUN rm ~/.cache -rf
|
||||||
|
ENTRYPOINT ["python3", "scripts/dht_node.py"]
|
||||||
|
|
|
@ -18,7 +18,7 @@ MAX_REQUEST_SIZE = 1200
|
||||||
|
|
||||||
class BlobServerProtocol(asyncio.Protocol):
|
class BlobServerProtocol(asyncio.Protocol):
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str,
|
def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str,
|
||||||
idle_timeout: float = 30.0, transfer_timeout: float = 60.0):
|
idle_timeout: float = 30.0, transfer_timeout: float = 60.0, blob_callback=None):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.idle_timeout = idle_timeout
|
self.idle_timeout = idle_timeout
|
||||||
|
@ -32,6 +32,7 @@ class BlobServerProtocol(asyncio.Protocol):
|
||||||
self.started_transfer = asyncio.Event(loop=self.loop)
|
self.started_transfer = asyncio.Event(loop=self.loop)
|
||||||
self.transfer_finished = asyncio.Event(loop=self.loop)
|
self.transfer_finished = asyncio.Event(loop=self.loop)
|
||||||
self.close_on_idle_task: typing.Optional[asyncio.Task] = None
|
self.close_on_idle_task: typing.Optional[asyncio.Task] = None
|
||||||
|
self.blob_handling_callback: typing.Optional[typing.Callable] = blob_callback
|
||||||
|
|
||||||
async def close_on_idle(self):
|
async def close_on_idle(self):
|
||||||
while self.transport:
|
while self.transport:
|
||||||
|
@ -92,6 +93,9 @@ class BlobServerProtocol(asyncio.Protocol):
|
||||||
|
|
||||||
if download_request:
|
if download_request:
|
||||||
blob = self.blob_manager.get_blob(download_request.requested_blob)
|
blob = self.blob_manager.get_blob(download_request.requested_blob)
|
||||||
|
if self.blob_handling_callback:
|
||||||
|
await self.blob_handling_callback(blob)
|
||||||
|
blob = self.blob_manager.get_blob(download_request.requested_blob)
|
||||||
if blob.get_is_verified():
|
if blob.get_is_verified():
|
||||||
incoming_blob = {'blob_hash': blob.blob_hash, 'length': blob.length}
|
incoming_blob = {'blob_hash': blob.blob_hash, 'length': blob.length}
|
||||||
responses.append(BlobDownloadResponse(incoming_blob=incoming_blob))
|
responses.append(BlobDownloadResponse(incoming_blob=incoming_blob))
|
||||||
|
@ -152,7 +156,7 @@ class BlobServerProtocol(asyncio.Protocol):
|
||||||
|
|
||||||
class BlobServer:
|
class BlobServer:
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str,
|
def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str,
|
||||||
idle_timeout: float = 30.0, transfer_timeout: float = 60.0):
|
idle_timeout: float = 30.0, transfer_timeout: float = 60.0, blob_callback=None):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.server_task: typing.Optional[asyncio.Task] = None
|
self.server_task: typing.Optional[asyncio.Task] = None
|
||||||
|
@ -161,6 +165,7 @@ class BlobServer:
|
||||||
self.idle_timeout = idle_timeout
|
self.idle_timeout = idle_timeout
|
||||||
self.transfer_timeout = transfer_timeout
|
self.transfer_timeout = transfer_timeout
|
||||||
self.server_protocol_class = BlobServerProtocol
|
self.server_protocol_class = BlobServerProtocol
|
||||||
|
self.blob_handling_callback: typing.Optional[typing.Callable] = blob_callback
|
||||||
|
|
||||||
def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'):
|
def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'):
|
||||||
if self.server_task is not None:
|
if self.server_task is not None:
|
||||||
|
@ -169,7 +174,8 @@ class BlobServer:
|
||||||
async def _start_server():
|
async def _start_server():
|
||||||
server = await self.loop.create_server(
|
server = await self.loop.create_server(
|
||||||
lambda: self.server_protocol_class(self.loop, self.blob_manager, self.lbrycrd_address,
|
lambda: self.server_protocol_class(self.loop, self.blob_manager, self.lbrycrd_address,
|
||||||
self.idle_timeout, self.transfer_timeout),
|
self.idle_timeout, self.transfer_timeout,
|
||||||
|
blob_callback=self.blob_handling_callback),
|
||||||
interface, port
|
interface, port
|
||||||
)
|
)
|
||||||
self.started_listening.set()
|
self.started_listening.set()
|
||||||
|
|
|
@ -692,6 +692,8 @@ class Config(CLIConfig):
|
||||||
('spv19.lbry.com', 50001),
|
('spv19.lbry.com', 50001),
|
||||||
])
|
])
|
||||||
known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [
|
known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [
|
||||||
|
('dht.lbry.grin.io', 4444), # Grin
|
||||||
|
('dht.lbry.madiator.com', 4444), # Madiator
|
||||||
('lbrynet1.lbry.com', 4444), # US EAST
|
('lbrynet1.lbry.com', 4444), # US EAST
|
||||||
('lbrynet2.lbry.com', 4444), # US WEST
|
('lbrynet2.lbry.com', 4444), # US WEST
|
||||||
('lbrynet3.lbry.com', 4444), # EU
|
('lbrynet3.lbry.com', 4444), # EU
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
import typing
|
import typing
|
||||||
import binascii
|
|
||||||
import socket
|
import socket
|
||||||
from lbry.utils import resolve_host
|
from lbry.utils import resolve_host
|
||||||
from lbry.dht import constants
|
from lbry.dht import constants
|
||||||
|
@ -80,7 +79,7 @@ class Node:
|
||||||
await fut
|
await fut
|
||||||
|
|
||||||
async def announce_blob(self, blob_hash: str) -> typing.List[bytes]:
|
async def announce_blob(self, blob_hash: str) -> typing.List[bytes]:
|
||||||
hash_value = binascii.unhexlify(blob_hash.encode())
|
hash_value = bytes.fromhex(blob_hash)
|
||||||
assert len(hash_value) == constants.HASH_LENGTH
|
assert len(hash_value) == constants.HASH_LENGTH
|
||||||
peers = await self.peer_search(hash_value)
|
peers = await self.peer_search(hash_value)
|
||||||
|
|
||||||
|
@ -95,7 +94,7 @@ class Node:
|
||||||
stored_to = [node_id for node_id, contacted in stored_to_tup if contacted]
|
stored_to = [node_id for node_id, contacted in stored_to_tup if contacted]
|
||||||
if stored_to:
|
if stored_to:
|
||||||
log.debug(
|
log.debug(
|
||||||
"Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8],
|
"Stored %s to %i of %i attempted peers", hash_value.hex()[:8],
|
||||||
len(stored_to), len(peers)
|
len(stored_to), len(peers)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
@ -223,7 +222,7 @@ class Node:
|
||||||
# prioritize peers who reply to a dht ping first
|
# prioritize peers who reply to a dht ping first
|
||||||
# this minimizes attempting to make tcp connections that won't work later to dead or unreachable peers
|
# this minimizes attempting to make tcp connections that won't work later to dead or unreachable peers
|
||||||
|
|
||||||
async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())):
|
async for results in self.get_iterative_value_finder(bytes.fromhex(blob_hash)):
|
||||||
to_put = []
|
to_put = []
|
||||||
for peer in results:
|
for peer in results:
|
||||||
if peer.address == self.protocol.external_ip and self.protocol.peer_port == peer.tcp_port:
|
if peer.address == self.protocol.external_ip and self.protocol.peer_port == peer.tcp_port:
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
import typing
|
import typing
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from binascii import hexlify
|
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
from lbry.utils import is_valid_public_ipv4 as _is_valid_public_ipv4, LRUCache
|
from lbry.utils import is_valid_public_ipv4 as _is_valid_public_ipv4, LRUCache
|
||||||
|
@ -154,7 +153,7 @@ class KademliaPeer:
|
||||||
def __post_init__(self):
|
def __post_init__(self):
|
||||||
if self._node_id is not None:
|
if self._node_id is not None:
|
||||||
if not len(self._node_id) == constants.HASH_LENGTH:
|
if not len(self._node_id) == constants.HASH_LENGTH:
|
||||||
raise ValueError("invalid node_id: {}".format(hexlify(self._node_id).decode()))
|
raise ValueError("invalid node_id: {}".format(self._node_id.hex()))
|
||||||
if self.udp_port is not None and not 1024 <= self.udp_port <= 65535:
|
if self.udp_port is not None and not 1024 <= self.udp_port <= 65535:
|
||||||
raise ValueError(f"invalid udp port: {self.address}:{self.udp_port}")
|
raise ValueError(f"invalid udp port: {self.address}:{self.udp_port}")
|
||||||
if self.tcp_port is not None and not 1024 <= self.tcp_port <= 65535:
|
if self.tcp_port is not None and not 1024 <= self.tcp_port <= 65535:
|
||||||
|
|
|
@ -16,6 +16,9 @@ class DictDataStore:
|
||||||
self._peer_manager = peer_manager
|
self._peer_manager = peer_manager
|
||||||
self.completed_blobs: typing.Set[str] = set()
|
self.completed_blobs: typing.Set[str] = set()
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return self._data_store.__len__()
|
||||||
|
|
||||||
def removed_expired_peers(self):
|
def removed_expired_peers(self):
|
||||||
now = self.loop.time()
|
now = self.loop.time()
|
||||||
keys = list(self._data_store.keys())
|
keys = list(self._data_store.keys())
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
from binascii import hexlify
|
|
||||||
from itertools import chain
|
from itertools import chain
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
import typing
|
import typing
|
||||||
|
@ -198,7 +197,7 @@ class IterativeFinder:
|
||||||
added += 1
|
added += 1
|
||||||
log.debug("running %d probes", len(self.running_probes))
|
log.debug("running %d probes", len(self.running_probes))
|
||||||
if not added and not self.running_probes:
|
if not added and not self.running_probes:
|
||||||
log.debug("search for %s exhausted", hexlify(self.key)[:8])
|
log.debug("search for %s exhausted", self.key.hex()[:8])
|
||||||
self.search_exhausted()
|
self.search_exhausted()
|
||||||
|
|
||||||
def _schedule_probe(self, peer: 'KademliaPeer'):
|
def _schedule_probe(self, peer: 'KademliaPeer'):
|
||||||
|
@ -271,7 +270,7 @@ class IterativeNodeFinder(IterativeFinder):
|
||||||
self.yielded_peers: typing.Set['KademliaPeer'] = set()
|
self.yielded_peers: typing.Set['KademliaPeer'] = set()
|
||||||
|
|
||||||
async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse:
|
async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse:
|
||||||
log.debug("probing %s:%d %s", peer.address, peer.udp_port, hexlify(peer.node_id)[:8] if peer.node_id else '')
|
log.debug("probing %s:%d %s", peer.address, peer.udp_port, peer.node_id.hex()[:8] if peer.node_id else '')
|
||||||
response = await self.protocol.get_rpc_peer(peer).find_node(self.key)
|
response = await self.protocol.get_rpc_peer(peer).find_node(self.key)
|
||||||
return FindNodeResponse(self.key, response)
|
return FindNodeResponse(self.key, response)
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,6 @@ import functools
|
||||||
import hashlib
|
import hashlib
|
||||||
import asyncio
|
import asyncio
|
||||||
import typing
|
import typing
|
||||||
import binascii
|
|
||||||
import random
|
import random
|
||||||
from asyncio.protocols import DatagramProtocol
|
from asyncio.protocols import DatagramProtocol
|
||||||
from asyncio.transports import DatagramTransport
|
from asyncio.transports import DatagramTransport
|
||||||
|
@ -97,8 +96,7 @@ class KademliaRPC:
|
||||||
if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp()
|
if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp()
|
||||||
]
|
]
|
||||||
# if we don't have k storing peers to return and we have this hash locally, include our contact information
|
# if we don't have k storing peers to return and we have this hash locally, include our contact information
|
||||||
if len(peers) < constants.K and binascii.hexlify(key).decode() in self.protocol.data_store.completed_blobs:
|
peers.append(self.compact_address())
|
||||||
peers.append(self.compact_address())
|
|
||||||
if not peers:
|
if not peers:
|
||||||
response[PAGE_KEY] = 0
|
response[PAGE_KEY] = 0
|
||||||
else:
|
else:
|
||||||
|
@ -415,8 +413,8 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
self._wakeup_routing_task.clear()
|
self._wakeup_routing_task.clear()
|
||||||
|
|
||||||
def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram):
|
def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram):
|
||||||
assert sender_contact.node_id != self.node_id, (binascii.hexlify(sender_contact.node_id)[:8].decode(),
|
assert sender_contact.node_id != self.node_id, (sender_contact.node_id.hex()[:8],
|
||||||
binascii.hexlify(self.node_id)[:8].decode())
|
self.node_id.hex()[:8])
|
||||||
method = message.method
|
method = message.method
|
||||||
if method not in [b'ping', b'store', b'findNode', b'findValue']:
|
if method not in [b'ping', b'store', b'findNode', b'findValue']:
|
||||||
raise AttributeError('Invalid method: %s' % message.method.decode())
|
raise AttributeError('Invalid method: %s' % message.method.decode())
|
||||||
|
@ -561,7 +559,7 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
message = decode_datagram(datagram)
|
message = decode_datagram(datagram)
|
||||||
except (ValueError, TypeError, DecodeError):
|
except (ValueError, TypeError, DecodeError):
|
||||||
self.peer_manager.report_failure(address[0], address[1])
|
self.peer_manager.report_failure(address[0], address[1])
|
||||||
log.warning("Couldn't decode dht datagram from %s: %s", address, binascii.hexlify(datagram).decode())
|
log.warning("Couldn't decode dht datagram from %s: %s", address, datagram.hex())
|
||||||
return
|
return
|
||||||
|
|
||||||
if isinstance(message, RequestDatagram):
|
if isinstance(message, RequestDatagram):
|
||||||
|
@ -603,7 +601,7 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
if len(data) > constants.MSG_SIZE_LIMIT:
|
if len(data) > constants.MSG_SIZE_LIMIT:
|
||||||
log.warning("cannot send datagram larger than %i bytes (packet is %i bytes)",
|
log.warning("cannot send datagram larger than %i bytes (packet is %i bytes)",
|
||||||
constants.MSG_SIZE_LIMIT, len(data))
|
constants.MSG_SIZE_LIMIT, len(data))
|
||||||
log.debug("Packet is too large to send: %s", binascii.hexlify(data[:3500]).decode())
|
log.debug("Packet is too large to send: %s", data[:3500].hex())
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"cannot send datagram larger than {constants.MSG_SIZE_LIMIT} bytes (packet is {len(data)} bytes)"
|
f"cannot send datagram larger than {constants.MSG_SIZE_LIMIT} bytes (packet is {len(data)} bytes)"
|
||||||
)
|
)
|
||||||
|
@ -663,13 +661,13 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
res = await self.get_rpc_peer(peer).store(hash_value)
|
res = await self.get_rpc_peer(peer).store(hash_value)
|
||||||
if res != b"OK":
|
if res != b"OK":
|
||||||
raise ValueError(res)
|
raise ValueError(res)
|
||||||
log.debug("Stored %s to %s", binascii.hexlify(hash_value).decode()[:8], peer)
|
log.debug("Stored %s to %s", hash_value.hex()[:8], peer)
|
||||||
return peer.node_id, True
|
return peer.node_id, True
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return await __store()
|
return await __store()
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
log.debug("Timeout while storing blob_hash %s at %s", binascii.hexlify(hash_value).decode()[:8], peer)
|
log.debug("Timeout while storing blob_hash %s at %s", hash_value.hex()[:8], peer)
|
||||||
return peer.node_id, False
|
return peer.node_id, False
|
||||||
except ValueError as err:
|
except ValueError as err:
|
||||||
log.error("Unexpected response: %s", err)
|
log.error("Unexpected response: %s", err)
|
||||||
|
|
48
scripts/dht_node.py
Normal file
48
scripts/dht_node.py
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
import asyncio
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from lbry.dht.constants import generate_id
|
||||||
|
from lbry.dht.node import Node
|
||||||
|
from lbry.dht.peer import PeerManager
|
||||||
|
from lbry.extras.daemon.storage import SQLiteStorage
|
||||||
|
from lbry.conf import Config
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional[str]):
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
conf = Config()
|
||||||
|
storage = SQLiteStorage(conf, db_file_path, loop, loop.time)
|
||||||
|
if bootstrap_node:
|
||||||
|
nodes = bootstrap_node.split(':')
|
||||||
|
nodes = [(nodes[0], int(nodes[1]))]
|
||||||
|
else:
|
||||||
|
nodes = conf.known_dht_nodes
|
||||||
|
await storage.open()
|
||||||
|
node = Node(
|
||||||
|
loop, PeerManager(loop), generate_id(), port, port, 3333, None,
|
||||||
|
storage=storage
|
||||||
|
)
|
||||||
|
node.start(host, nodes)
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(10)
|
||||||
|
log.info("Known peers: %d. Storing contact information for %d blobs from %d peers.",
|
||||||
|
len(node.protocol.routing_table.get_peers()), len(node.protocol.data_store),
|
||||||
|
len(node.protocol.data_store.get_storing_contacts()))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Starts a single DHT node, which then can be used as a seed node or just a contributing node.")
|
||||||
|
parser.add_argument("--host", default='0.0.0.0', type=str, help="Host to listen for requests. Default: 0.0.0.0")
|
||||||
|
parser.add_argument("--port", default=4444, type=int, help="Port to listen for requests. Default: 4444")
|
||||||
|
parser.add_argument("--db_file", default='/tmp/dht.db', type=str, help="DB file to save peers. Default: /tmp/dht.db")
|
||||||
|
parser.add_argument("--bootstrap_node", default=None, type=str,
|
||||||
|
help="Node to connect for bootstraping this node. Leave unset to use the default ones. "
|
||||||
|
"Format: host:port Example: lbrynet1.lbry.com:4444")
|
||||||
|
args = parser.parse_args()
|
||||||
|
asyncio.run(main(args.host, args.port, args.db_file, args.bootstrap_node))
|
|
@ -48,7 +48,7 @@ async def main(blob_hash: str, url: str):
|
||||||
)
|
)
|
||||||
host = host_info[0][4][0]
|
host = host_info[0][4][0]
|
||||||
|
|
||||||
storage = SQLiteStorage(conf, os.path.join(conf.data_dir, "lbrynet.sqlite"))
|
storage = SQLiteStorage(conf, ":memory:")
|
||||||
blob_manager = BlobManager(loop, os.path.join(conf.data_dir, "blobfiles"), storage, conf)
|
blob_manager = BlobManager(loop, os.path.join(conf.data_dir, "blobfiles"), storage, conf)
|
||||||
await storage.open()
|
await storage.open()
|
||||||
await blob_manager.setup()
|
await blob_manager.setup()
|
||||||
|
|
|
@ -1,31 +1,45 @@
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from lbry.blob_exchange.client import request_blob
|
||||||
|
from lbry.utils import resolve_host
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
||||||
from lbry.blob.blob_manager import BlobManager
|
from lbry.blob.blob_manager import BlobManager
|
||||||
from lbry.blob_exchange.server import BlobServer
|
from lbry.blob_exchange.server import BlobServer
|
||||||
from lbry.schema.address import decode_address
|
|
||||||
from lbry.extras.daemon.storage import SQLiteStorage
|
from lbry.extras.daemon.storage import SQLiteStorage
|
||||||
|
from lbry.wallet import Ledger
|
||||||
|
from lbry.conf import Config
|
||||||
|
|
||||||
|
|
||||||
async def main(address: str):
|
async def main(address: str):
|
||||||
try:
|
if not Ledger.is_pubkey_address(address):
|
||||||
decode_address(address)
|
|
||||||
except:
|
|
||||||
print(f"'{address}' is not a valid lbrycrd address")
|
print(f"'{address}' is not a valid lbrycrd address")
|
||||||
return 1
|
return 1
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
|
conf = Config()
|
||||||
|
|
||||||
storage = SQLiteStorage(os.path.expanduser("~/.lbrynet/lbrynet.sqlite"))
|
async def ensure_blob(blob):
|
||||||
|
upstream_host, upstream_port = conf.fixed_peers[0]
|
||||||
|
upstream_host = await resolve_host(upstream_host, upstream_port, 'tcp')
|
||||||
|
success, proto = await request_blob(loop, blob, upstream_host, int(upstream_port), conf.peer_connect_timeout,
|
||||||
|
conf.blob_download_timeout)
|
||||||
|
print(success, proto)
|
||||||
|
if proto:
|
||||||
|
proto.close()
|
||||||
|
|
||||||
|
storage = SQLiteStorage(conf, os.path.expanduser("/tmp/lbrynet.sqlite"))
|
||||||
await storage.open()
|
await storage.open()
|
||||||
blob_manager = BlobManager(loop, os.path.expanduser("~/.lbrynet/blobfiles"), storage)
|
blob_manager = BlobManager(loop, os.path.expanduser("/tmp/blobfiles"), storage, conf)
|
||||||
await blob_manager.setup()
|
await blob_manager.setup()
|
||||||
|
|
||||||
server = await loop.create_server(
|
server = BlobServer(loop, blob_manager, address, blob_callback=ensure_blob)
|
||||||
lambda: BlobServer(loop, blob_manager, address),
|
|
||||||
'0.0.0.0', 4444)
|
|
||||||
try:
|
try:
|
||||||
async with server:
|
server.start_server(6666, '0.0.0.0')
|
||||||
await server.serve_forever()
|
while True:
|
||||||
|
await asyncio.sleep(1)
|
||||||
finally:
|
finally:
|
||||||
await storage.close()
|
await storage.close()
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue