Compare commits

...
Sign in to create a new pull request.

10 commits

Author SHA1 Message Date
Victor Shyba
d14eb24141 always add itself 2021-09-29 23:09:13 -03:00
Victor Shyba
820015872b fix standalone blob server and make it fetch from upstream 2021-09-29 23:04:58 -03:00
Victor Shyba
f57b1b3c3d dht: use bytes hex/fromhex instead of binascii 2021-09-29 21:27:53 -03:00
Victor Shyba
a85ceb3cca add grin to dht known list 2021-09-29 11:49:53 -03:00
Victor Shyba
254ae66ed3 add madiator to known dht nodes 2021-09-29 00:24:51 -03:00
Victor Shyba
3206fe873e add option to set bootstrap_node 2021-09-28 18:52:23 -03:00
Victor Shyba
63046fce58 add dockerfile for dht node 2021-09-28 18:43:35 -03:00
Victor Shyba
b76863d055 configure where to save peers 2021-09-28 18:43:35 -03:00
Victor Shyba
d7ed74ab94 define arg types 2021-09-28 18:43:35 -03:00
Victor Shyba
7388f5f1cd add dht seed node script 2021-09-28 18:43:35 -03:00
11 changed files with 139 additions and 33 deletions

View file

@ -0,0 +1,38 @@
FROM debian:10-slim
ARG user=lbry
ARG projects_dir=/home/$user
ARG DOCKER_TAG
ARG DOCKER_COMMIT=docker
ENV DOCKER_TAG=$DOCKER_TAG DOCKER_COMMIT=$DOCKER_COMMIT
RUN apt-get update && \
apt-get -y --no-install-recommends install \
wget \
automake libtool \
tar unzip \
build-essential \
pkg-config \
libleveldb-dev \
python3.7 \
python3-dev \
python3-pip \
python3-wheel \
python3-setuptools && \
update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 && \
rm -rf /var/lib/apt/lists/*
RUN groupadd -g 999 $user && useradd -m -u 999 -g $user $user
COPY . $projects_dir
RUN chown -R $user:$user $projects_dir
USER $user
WORKDIR $projects_dir
RUN make install
RUN python3 docker/set_build.py
RUN rm ~/.cache -rf
ENTRYPOINT ["python3", "scripts/dht_node.py"]

View file

@ -18,7 +18,7 @@ MAX_REQUEST_SIZE = 1200
class BlobServerProtocol(asyncio.Protocol): class BlobServerProtocol(asyncio.Protocol):
def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str, def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str,
idle_timeout: float = 30.0, transfer_timeout: float = 60.0): idle_timeout: float = 30.0, transfer_timeout: float = 60.0, blob_callback=None):
self.loop = loop self.loop = loop
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.idle_timeout = idle_timeout self.idle_timeout = idle_timeout
@ -32,6 +32,7 @@ class BlobServerProtocol(asyncio.Protocol):
self.started_transfer = asyncio.Event(loop=self.loop) self.started_transfer = asyncio.Event(loop=self.loop)
self.transfer_finished = asyncio.Event(loop=self.loop) self.transfer_finished = asyncio.Event(loop=self.loop)
self.close_on_idle_task: typing.Optional[asyncio.Task] = None self.close_on_idle_task: typing.Optional[asyncio.Task] = None
self.blob_handling_callback: typing.Optional[typing.Callable] = blob_callback
async def close_on_idle(self): async def close_on_idle(self):
while self.transport: while self.transport:
@ -92,6 +93,9 @@ class BlobServerProtocol(asyncio.Protocol):
if download_request: if download_request:
blob = self.blob_manager.get_blob(download_request.requested_blob) blob = self.blob_manager.get_blob(download_request.requested_blob)
if self.blob_handling_callback:
await self.blob_handling_callback(blob)
blob = self.blob_manager.get_blob(download_request.requested_blob)
if blob.get_is_verified(): if blob.get_is_verified():
incoming_blob = {'blob_hash': blob.blob_hash, 'length': blob.length} incoming_blob = {'blob_hash': blob.blob_hash, 'length': blob.length}
responses.append(BlobDownloadResponse(incoming_blob=incoming_blob)) responses.append(BlobDownloadResponse(incoming_blob=incoming_blob))
@ -152,7 +156,7 @@ class BlobServerProtocol(asyncio.Protocol):
class BlobServer: class BlobServer:
def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str, def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str,
idle_timeout: float = 30.0, transfer_timeout: float = 60.0): idle_timeout: float = 30.0, transfer_timeout: float = 60.0, blob_callback=None):
self.loop = loop self.loop = loop
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.server_task: typing.Optional[asyncio.Task] = None self.server_task: typing.Optional[asyncio.Task] = None
@ -161,6 +165,7 @@ class BlobServer:
self.idle_timeout = idle_timeout self.idle_timeout = idle_timeout
self.transfer_timeout = transfer_timeout self.transfer_timeout = transfer_timeout
self.server_protocol_class = BlobServerProtocol self.server_protocol_class = BlobServerProtocol
self.blob_handling_callback: typing.Optional[typing.Callable] = blob_callback
def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'): def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'):
if self.server_task is not None: if self.server_task is not None:
@ -169,7 +174,8 @@ class BlobServer:
async def _start_server(): async def _start_server():
server = await self.loop.create_server( server = await self.loop.create_server(
lambda: self.server_protocol_class(self.loop, self.blob_manager, self.lbrycrd_address, lambda: self.server_protocol_class(self.loop, self.blob_manager, self.lbrycrd_address,
self.idle_timeout, self.transfer_timeout), self.idle_timeout, self.transfer_timeout,
blob_callback=self.blob_handling_callback),
interface, port interface, port
) )
self.started_listening.set() self.started_listening.set()

View file

@ -692,6 +692,8 @@ class Config(CLIConfig):
('spv19.lbry.com', 50001), ('spv19.lbry.com', 50001),
]) ])
known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [ known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [
('dht.lbry.grin.io', 4444), # Grin
('dht.lbry.madiator.com', 4444), # Madiator
('lbrynet1.lbry.com', 4444), # US EAST ('lbrynet1.lbry.com', 4444), # US EAST
('lbrynet2.lbry.com', 4444), # US WEST ('lbrynet2.lbry.com', 4444), # US WEST
('lbrynet3.lbry.com', 4444), # EU ('lbrynet3.lbry.com', 4444), # EU

View file

@ -1,7 +1,6 @@
import logging import logging
import asyncio import asyncio
import typing import typing
import binascii
import socket import socket
from lbry.utils import resolve_host from lbry.utils import resolve_host
from lbry.dht import constants from lbry.dht import constants
@ -80,7 +79,7 @@ class Node:
await fut await fut
async def announce_blob(self, blob_hash: str) -> typing.List[bytes]: async def announce_blob(self, blob_hash: str) -> typing.List[bytes]:
hash_value = binascii.unhexlify(blob_hash.encode()) hash_value = bytes.fromhex(blob_hash)
assert len(hash_value) == constants.HASH_LENGTH assert len(hash_value) == constants.HASH_LENGTH
peers = await self.peer_search(hash_value) peers = await self.peer_search(hash_value)
@ -95,7 +94,7 @@ class Node:
stored_to = [node_id for node_id, contacted in stored_to_tup if contacted] stored_to = [node_id for node_id, contacted in stored_to_tup if contacted]
if stored_to: if stored_to:
log.debug( log.debug(
"Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8], "Stored %s to %i of %i attempted peers", hash_value.hex()[:8],
len(stored_to), len(peers) len(stored_to), len(peers)
) )
else: else:
@ -223,7 +222,7 @@ class Node:
# prioritize peers who reply to a dht ping first # prioritize peers who reply to a dht ping first
# this minimizes attempting to make tcp connections that won't work later to dead or unreachable peers # this minimizes attempting to make tcp connections that won't work later to dead or unreachable peers
async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())): async for results in self.get_iterative_value_finder(bytes.fromhex(blob_hash)):
to_put = [] to_put = []
for peer in results: for peer in results:
if peer.address == self.protocol.external_ip and self.protocol.peer_port == peer.tcp_port: if peer.address == self.protocol.external_ip and self.protocol.peer_port == peer.tcp_port:

View file

@ -1,7 +1,6 @@
import typing import typing
import asyncio import asyncio
import logging import logging
from binascii import hexlify
from dataclasses import dataclass, field from dataclasses import dataclass, field
from functools import lru_cache from functools import lru_cache
from lbry.utils import is_valid_public_ipv4 as _is_valid_public_ipv4, LRUCache from lbry.utils import is_valid_public_ipv4 as _is_valid_public_ipv4, LRUCache
@ -154,7 +153,7 @@ class KademliaPeer:
def __post_init__(self): def __post_init__(self):
if self._node_id is not None: if self._node_id is not None:
if not len(self._node_id) == constants.HASH_LENGTH: if not len(self._node_id) == constants.HASH_LENGTH:
raise ValueError("invalid node_id: {}".format(hexlify(self._node_id).decode())) raise ValueError("invalid node_id: {}".format(self._node_id.hex()))
if self.udp_port is not None and not 1024 <= self.udp_port <= 65535: if self.udp_port is not None and not 1024 <= self.udp_port <= 65535:
raise ValueError(f"invalid udp port: {self.address}:{self.udp_port}") raise ValueError(f"invalid udp port: {self.address}:{self.udp_port}")
if self.tcp_port is not None and not 1024 <= self.tcp_port <= 65535: if self.tcp_port is not None and not 1024 <= self.tcp_port <= 65535:

View file

@ -16,6 +16,9 @@ class DictDataStore:
self._peer_manager = peer_manager self._peer_manager = peer_manager
self.completed_blobs: typing.Set[str] = set() self.completed_blobs: typing.Set[str] = set()
def __len__(self):
return self._data_store.__len__()
def removed_expired_peers(self): def removed_expired_peers(self):
now = self.loop.time() now = self.loop.time()
keys = list(self._data_store.keys()) keys = list(self._data_store.keys())

View file

@ -1,5 +1,4 @@
import asyncio import asyncio
from binascii import hexlify
from itertools import chain from itertools import chain
from collections import defaultdict from collections import defaultdict
import typing import typing
@ -198,7 +197,7 @@ class IterativeFinder:
added += 1 added += 1
log.debug("running %d probes", len(self.running_probes)) log.debug("running %d probes", len(self.running_probes))
if not added and not self.running_probes: if not added and not self.running_probes:
log.debug("search for %s exhausted", hexlify(self.key)[:8]) log.debug("search for %s exhausted", self.key.hex()[:8])
self.search_exhausted() self.search_exhausted()
def _schedule_probe(self, peer: 'KademliaPeer'): def _schedule_probe(self, peer: 'KademliaPeer'):
@ -271,7 +270,7 @@ class IterativeNodeFinder(IterativeFinder):
self.yielded_peers: typing.Set['KademliaPeer'] = set() self.yielded_peers: typing.Set['KademliaPeer'] = set()
async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse: async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse:
log.debug("probing %s:%d %s", peer.address, peer.udp_port, hexlify(peer.node_id)[:8] if peer.node_id else '') log.debug("probing %s:%d %s", peer.address, peer.udp_port, peer.node_id.hex()[:8] if peer.node_id else '')
response = await self.protocol.get_rpc_peer(peer).find_node(self.key) response = await self.protocol.get_rpc_peer(peer).find_node(self.key)
return FindNodeResponse(self.key, response) return FindNodeResponse(self.key, response)

View file

@ -4,7 +4,6 @@ import functools
import hashlib import hashlib
import asyncio import asyncio
import typing import typing
import binascii
import random import random
from asyncio.protocols import DatagramProtocol from asyncio.protocols import DatagramProtocol
from asyncio.transports import DatagramTransport from asyncio.transports import DatagramTransport
@ -97,8 +96,7 @@ class KademliaRPC:
if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp() if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp()
] ]
# if we don't have k storing peers to return and we have this hash locally, include our contact information # if we don't have k storing peers to return and we have this hash locally, include our contact information
if len(peers) < constants.K and binascii.hexlify(key).decode() in self.protocol.data_store.completed_blobs: peers.append(self.compact_address())
peers.append(self.compact_address())
if not peers: if not peers:
response[PAGE_KEY] = 0 response[PAGE_KEY] = 0
else: else:
@ -415,8 +413,8 @@ class KademliaProtocol(DatagramProtocol):
self._wakeup_routing_task.clear() self._wakeup_routing_task.clear()
def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram): def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram):
assert sender_contact.node_id != self.node_id, (binascii.hexlify(sender_contact.node_id)[:8].decode(), assert sender_contact.node_id != self.node_id, (sender_contact.node_id.hex()[:8],
binascii.hexlify(self.node_id)[:8].decode()) self.node_id.hex()[:8])
method = message.method method = message.method
if method not in [b'ping', b'store', b'findNode', b'findValue']: if method not in [b'ping', b'store', b'findNode', b'findValue']:
raise AttributeError('Invalid method: %s' % message.method.decode()) raise AttributeError('Invalid method: %s' % message.method.decode())
@ -561,7 +559,7 @@ class KademliaProtocol(DatagramProtocol):
message = decode_datagram(datagram) message = decode_datagram(datagram)
except (ValueError, TypeError, DecodeError): except (ValueError, TypeError, DecodeError):
self.peer_manager.report_failure(address[0], address[1]) self.peer_manager.report_failure(address[0], address[1])
log.warning("Couldn't decode dht datagram from %s: %s", address, binascii.hexlify(datagram).decode()) log.warning("Couldn't decode dht datagram from %s: %s", address, datagram.hex())
return return
if isinstance(message, RequestDatagram): if isinstance(message, RequestDatagram):
@ -603,7 +601,7 @@ class KademliaProtocol(DatagramProtocol):
if len(data) > constants.MSG_SIZE_LIMIT: if len(data) > constants.MSG_SIZE_LIMIT:
log.warning("cannot send datagram larger than %i bytes (packet is %i bytes)", log.warning("cannot send datagram larger than %i bytes (packet is %i bytes)",
constants.MSG_SIZE_LIMIT, len(data)) constants.MSG_SIZE_LIMIT, len(data))
log.debug("Packet is too large to send: %s", binascii.hexlify(data[:3500]).decode()) log.debug("Packet is too large to send: %s", data[:3500].hex())
raise ValueError( raise ValueError(
f"cannot send datagram larger than {constants.MSG_SIZE_LIMIT} bytes (packet is {len(data)} bytes)" f"cannot send datagram larger than {constants.MSG_SIZE_LIMIT} bytes (packet is {len(data)} bytes)"
) )
@ -663,13 +661,13 @@ class KademliaProtocol(DatagramProtocol):
res = await self.get_rpc_peer(peer).store(hash_value) res = await self.get_rpc_peer(peer).store(hash_value)
if res != b"OK": if res != b"OK":
raise ValueError(res) raise ValueError(res)
log.debug("Stored %s to %s", binascii.hexlify(hash_value).decode()[:8], peer) log.debug("Stored %s to %s", hash_value.hex()[:8], peer)
return peer.node_id, True return peer.node_id, True
try: try:
return await __store() return await __store()
except asyncio.TimeoutError: except asyncio.TimeoutError:
log.debug("Timeout while storing blob_hash %s at %s", binascii.hexlify(hash_value).decode()[:8], peer) log.debug("Timeout while storing blob_hash %s at %s", hash_value.hex()[:8], peer)
return peer.node_id, False return peer.node_id, False
except ValueError as err: except ValueError as err:
log.error("Unexpected response: %s", err) log.error("Unexpected response: %s", err)

48
scripts/dht_node.py Normal file
View file

@ -0,0 +1,48 @@
import asyncio
import argparse
import logging
from typing import Optional
from lbry.dht.constants import generate_id
from lbry.dht.node import Node
from lbry.dht.peer import PeerManager
from lbry.extras.daemon.storage import SQLiteStorage
from lbry.conf import Config
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
log = logging.getLogger(__name__)
async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional[str]):
loop = asyncio.get_event_loop()
conf = Config()
storage = SQLiteStorage(conf, db_file_path, loop, loop.time)
if bootstrap_node:
nodes = bootstrap_node.split(':')
nodes = [(nodes[0], int(nodes[1]))]
else:
nodes = conf.known_dht_nodes
await storage.open()
node = Node(
loop, PeerManager(loop), generate_id(), port, port, 3333, None,
storage=storage
)
node.start(host, nodes)
while True:
await asyncio.sleep(10)
log.info("Known peers: %d. Storing contact information for %d blobs from %d peers.",
len(node.protocol.routing_table.get_peers()), len(node.protocol.data_store),
len(node.protocol.data_store.get_storing_contacts()))
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Starts a single DHT node, which then can be used as a seed node or just a contributing node.")
parser.add_argument("--host", default='0.0.0.0', type=str, help="Host to listen for requests. Default: 0.0.0.0")
parser.add_argument("--port", default=4444, type=int, help="Port to listen for requests. Default: 4444")
parser.add_argument("--db_file", default='/tmp/dht.db', type=str, help="DB file to save peers. Default: /tmp/dht.db")
parser.add_argument("--bootstrap_node", default=None, type=str,
help="Node to connect for bootstraping this node. Leave unset to use the default ones. "
"Format: host:port Example: lbrynet1.lbry.com:4444")
args = parser.parse_args()
asyncio.run(main(args.host, args.port, args.db_file, args.bootstrap_node))

View file

@ -48,7 +48,7 @@ async def main(blob_hash: str, url: str):
) )
host = host_info[0][4][0] host = host_info[0][4][0]
storage = SQLiteStorage(conf, os.path.join(conf.data_dir, "lbrynet.sqlite")) storage = SQLiteStorage(conf, ":memory:")
blob_manager = BlobManager(loop, os.path.join(conf.data_dir, "blobfiles"), storage, conf) blob_manager = BlobManager(loop, os.path.join(conf.data_dir, "blobfiles"), storage, conf)
await storage.open() await storage.open()
await blob_manager.setup() await blob_manager.setup()

View file

@ -1,31 +1,45 @@
import sys import sys
import os import os
import asyncio import asyncio
import logging
from lbry.blob_exchange.client import request_blob
from lbry.utils import resolve_host
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
from lbry.blob.blob_manager import BlobManager from lbry.blob.blob_manager import BlobManager
from lbry.blob_exchange.server import BlobServer from lbry.blob_exchange.server import BlobServer
from lbry.schema.address import decode_address
from lbry.extras.daemon.storage import SQLiteStorage from lbry.extras.daemon.storage import SQLiteStorage
from lbry.wallet import Ledger
from lbry.conf import Config
async def main(address: str): async def main(address: str):
try: if not Ledger.is_pubkey_address(address):
decode_address(address)
except:
print(f"'{address}' is not a valid lbrycrd address") print(f"'{address}' is not a valid lbrycrd address")
return 1 return 1
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
conf = Config()
storage = SQLiteStorage(os.path.expanduser("~/.lbrynet/lbrynet.sqlite")) async def ensure_blob(blob):
upstream_host, upstream_port = conf.fixed_peers[0]
upstream_host = await resolve_host(upstream_host, upstream_port, 'tcp')
success, proto = await request_blob(loop, blob, upstream_host, int(upstream_port), conf.peer_connect_timeout,
conf.blob_download_timeout)
print(success, proto)
if proto:
proto.close()
storage = SQLiteStorage(conf, os.path.expanduser("/tmp/lbrynet.sqlite"))
await storage.open() await storage.open()
blob_manager = BlobManager(loop, os.path.expanduser("~/.lbrynet/blobfiles"), storage) blob_manager = BlobManager(loop, os.path.expanduser("/tmp/blobfiles"), storage, conf)
await blob_manager.setup() await blob_manager.setup()
server = await loop.create_server( server = BlobServer(loop, blob_manager, address, blob_callback=ensure_blob)
lambda: BlobServer(loop, blob_manager, address),
'0.0.0.0', 4444)
try: try:
async with server: server.start_server(6666, '0.0.0.0')
await server.serve_forever() while True:
await asyncio.sleep(1)
finally: finally:
await storage.close() await storage.close()