forked from LBRYCommunity/lbry-sdk
dht_crawler: store data
This commit is contained in:
parent
2361e34541
commit
7ea88e7b31
1 changed files with 156 additions and 33 deletions
|
@ -1,3 +1,4 @@
|
||||||
|
import datetime
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
import time
|
import time
|
||||||
|
@ -8,13 +9,50 @@ from lbry.dht.constants import generate_id
|
||||||
from lbry.dht.node import Node
|
from lbry.dht.node import Node
|
||||||
from lbry.dht.peer import make_kademlia_peer, PeerManager
|
from lbry.dht.peer import make_kademlia_peer, PeerManager
|
||||||
from lbry.dht.protocol.distance import Distance
|
from lbry.dht.protocol.distance import Distance
|
||||||
from lbry.extras.daemon.storage import SQLiteStorage
|
from lbry.extras.daemon.storage import SQLiteMixin
|
||||||
from lbry.conf import Config
|
from lbry.conf import Config
|
||||||
from lbry.utils import resolve_host
|
from lbry.utils import resolve_host
|
||||||
|
|
||||||
|
|
||||||
|
from sqlalchemy.orm import declarative_base, relationship
|
||||||
|
import sqlalchemy as sqla
|
||||||
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
Base = declarative_base()
|
||||||
|
|
||||||
|
|
||||||
|
class DHTPeer(Base):
|
||||||
|
__tablename__ = "peer"
|
||||||
|
peer_id = sqla.Column(sqla.Integer(), sqla.Identity(), primary_key=True)
|
||||||
|
node_id = sqla.Column(sqla.String(96))
|
||||||
|
address = sqla.Column(sqla.String())
|
||||||
|
udp_port = sqla.Column(sqla.Integer())
|
||||||
|
tcp_port = sqla.Column(sqla.Integer())
|
||||||
|
first_online = sqla.Column(sqla.DateTime())
|
||||||
|
errors = sqla.Column(sqla.Integer(), default=0)
|
||||||
|
last_churn = sqla.Column(sqla.Integer())
|
||||||
|
added_on = sqla.Column(sqla.DateTime(), nullable=False, default=datetime.datetime.utcnow)
|
||||||
|
last_check = sqla.Column(sqla.DateTime())
|
||||||
|
last_seen = sqla.Column(sqla.DateTime())
|
||||||
|
latency = sqla.Column(sqla.Integer())
|
||||||
|
endpoint_unique = sqla.UniqueConstraint("node_id", "udp_port")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_kad_peer(cls, peer):
|
||||||
|
return DHTPeer(node_id=peer.node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port)
|
||||||
|
|
||||||
|
def to_kad_peer(self):
|
||||||
|
return make_kademlia_peer(self.node_id, self.address, self.udp_port, self.tcp_port)
|
||||||
|
|
||||||
|
|
||||||
|
class DHTConnection(Base):
|
||||||
|
__tablename__ = "connection"
|
||||||
|
from_peer_id = sqla.Column(sqla.Integer(), sqla.ForeignKey("peer.peer_id"), primary_key=True)
|
||||||
|
connected_by = relationship("DHTPeer", backref="known_by", primaryjoin=(DHTPeer.peer_id == from_peer_id))
|
||||||
|
to_peer_id = sqla.Column(sqla.Integer(), sqla.ForeignKey("peer.peer_id"), primary_key=True)
|
||||||
|
connected_to = relationship("DHTPeer", backref="connections", primaryjoin=(DHTPeer.peer_id == to_peer_id))
|
||||||
|
|
||||||
|
|
||||||
def new_node(address="0.0.0.0", udp_port=4444, node_id=None):
|
def new_node(address="0.0.0.0", udp_port=4444, node_id=None):
|
||||||
|
@ -24,13 +62,85 @@ def new_node(address="0.0.0.0", udp_port=4444, node_id=None):
|
||||||
|
|
||||||
|
|
||||||
class Crawler:
|
class Crawler:
|
||||||
def __init__(self):
|
def __init__(self, db_path: str):
|
||||||
self.node = new_node()
|
self.node = new_node()
|
||||||
self.crawled = set()
|
self.semaphore = asyncio.Semaphore(20)
|
||||||
self.known_peers = set()
|
engine = sqla.create_engine(f"sqlite:///{db_path}")
|
||||||
self.unreachable = set()
|
Base.metadata.create_all(engine)
|
||||||
self.error = set()
|
session = sqla.orm.sessionmaker(engine)
|
||||||
self.semaphore = asyncio.Semaphore(10)
|
self.db = session()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def recent_peers_query(self):
|
||||||
|
half_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=30)
|
||||||
|
return self.db.query(DHTPeer).filter(DHTPeer.last_seen > half_hour_ago)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def all_peers(self):
|
||||||
|
return set([peer.to_kad_peer() for peer in self.recent_peers_query.all()])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def unreachable_peers_count(self):
|
||||||
|
return self.recent_peers_query.filter(DHTPeer.latency == None).count()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def peers_with_errors_count(self):
|
||||||
|
return self.recent_peers_query.filter(DHTPeer.errors > 0).count()
|
||||||
|
|
||||||
|
def get_peers_needing_check(self):
|
||||||
|
half_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(minutes=30)
|
||||||
|
return set([peer.to_kad_peer() for peer in self.recent_peers_query.filter(
|
||||||
|
sqla.or_(DHTPeer.last_check == None, DHTPeer.last_check < half_hour_ago)).all()])
|
||||||
|
|
||||||
|
def add_peers(self, *peers):
|
||||||
|
for peer in peers:
|
||||||
|
db_peer = self.get_from_peer(peer)
|
||||||
|
if db_peer and db_peer.node_id is None and peer.node_id:
|
||||||
|
db_peer.node_id = peer.node_id
|
||||||
|
self.db.add(db_peer)
|
||||||
|
elif not db_peer:
|
||||||
|
self.db.add(DHTPeer.from_kad_peer(peer))
|
||||||
|
self.db.commit()
|
||||||
|
|
||||||
|
def get_from_peer(self, peer):
|
||||||
|
return self.db.query(DHTPeer).filter(DHTPeer.address==peer.address, DHTPeer.udp_port==peer.udp_port).first()
|
||||||
|
|
||||||
|
def set_latency(self, peer, latency=None):
|
||||||
|
db_peer = self.get_from_peer(peer)
|
||||||
|
db_peer.latency = latency
|
||||||
|
if not db_peer.node_id:
|
||||||
|
db_peer.node_id = peer.node_id
|
||||||
|
if db_peer.first_online and latency is None:
|
||||||
|
db_peer.last_churn = (datetime.datetime.utcnow() - db_peer.first_online).seconds
|
||||||
|
elif latency is not None and db_peer.first_online is None:
|
||||||
|
db_peer.first_online = datetime.datetime.utcnow()
|
||||||
|
db_peer.last_check = datetime.datetime.utcnow()
|
||||||
|
self.db.add(db_peer)
|
||||||
|
self.db.commit()
|
||||||
|
|
||||||
|
def inc_errors(self, peer):
|
||||||
|
db_peer = self.get_from_peer(peer)
|
||||||
|
db_peer.errors += 1
|
||||||
|
self.db.add(db_peer)
|
||||||
|
self.db.commit()
|
||||||
|
|
||||||
|
def count_peers(self):
|
||||||
|
return self.db.query(DHTPeer).count()
|
||||||
|
|
||||||
|
def associate_peers(self, target_peer, peers):
|
||||||
|
db_peer = self.get_from_peer(target_peer)
|
||||||
|
connections = [
|
||||||
|
DHTConnection(
|
||||||
|
from_peer_id=db_peer.peer_id,
|
||||||
|
to_peer_id=self.get_from_peer(peer).peer_id)
|
||||||
|
for peer in peers
|
||||||
|
]
|
||||||
|
for peer in peers:
|
||||||
|
self.db.query(DHTPeer).filter(DHTPeer.address == peer.address, DHTPeer.udp_port == peer.udp_port).update(
|
||||||
|
{DHTPeer.last_seen: datetime.datetime.utcnow()})
|
||||||
|
self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == db_peer.peer_id).delete()
|
||||||
|
self.db.add_all(connections)
|
||||||
|
self.db.commit()
|
||||||
|
|
||||||
async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']:
|
async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']:
|
||||||
async with self.semaphore:
|
async with self.semaphore:
|
||||||
|
@ -45,30 +155,32 @@ class Crawler:
|
||||||
except lbry.dht.error.RemoteException as e:
|
except lbry.dht.error.RemoteException as e:
|
||||||
log.info('Previously responding peer errored: %s:%d attempt #%d - %s',
|
log.info('Previously responding peer errored: %s:%d attempt #%d - %s',
|
||||||
host, port, (attempt + 1), str(e))
|
host, port, (attempt + 1), str(e))
|
||||||
self.error.add((host, port))
|
self.inc_errors(peer)
|
||||||
continue
|
continue
|
||||||
return []
|
return []
|
||||||
|
|
||||||
async def crawl_routing_table(self, host, port):
|
async def crawl_routing_table(self, host, port):
|
||||||
start = time.time()
|
start = time.time()
|
||||||
log.info("querying %s:%d", host, port)
|
log.info("querying %s:%d", host, port)
|
||||||
self.known_peers.add((host, port))
|
|
||||||
self.crawled.add((host, port))
|
|
||||||
address = await resolve_host(host, port, 'udp')
|
address = await resolve_host(host, port, 'udp')
|
||||||
|
self.add_peers(make_kademlia_peer(None, address, port))
|
||||||
key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
||||||
if not key:
|
latency = None
|
||||||
for _ in range(3):
|
for _ in range(3):
|
||||||
try:
|
try:
|
||||||
async with self.semaphore:
|
async with self.semaphore:
|
||||||
await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping()
|
ping_start = time.perf_counter_ns()
|
||||||
key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping()
|
||||||
except asyncio.TimeoutError:
|
key = self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
||||||
pass
|
latency = time.perf_counter_ns() - ping_start
|
||||||
except lbry.dht.error.RemoteException:
|
except asyncio.TimeoutError:
|
||||||
self.error.add((host, port))
|
pass
|
||||||
if not key:
|
except lbry.dht.error.RemoteException:
|
||||||
self.unreachable.add((host, port))
|
self.inc_errors(make_kademlia_peer(None, address, port))
|
||||||
return set()
|
pass
|
||||||
|
self.set_latency(make_kademlia_peer(key, address, port), latency)
|
||||||
|
if not latency:
|
||||||
|
return set()
|
||||||
node_id = key
|
node_id = key
|
||||||
distance = Distance(key)
|
distance = Distance(key)
|
||||||
max_distance = int.from_bytes(bytes([0xff] * 48), 'big')
|
max_distance = int.from_bytes(bytes([0xff] * 48), 'big')
|
||||||
|
@ -98,7 +210,8 @@ class Crawler:
|
||||||
factor = 2048
|
factor = 2048
|
||||||
log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.",
|
log.info("Done querying %s:%d in %.2f seconds: %d peers found over %d requests.",
|
||||||
host, port, (time.time() - start), len(peers), i)
|
host, port, (time.time() - start), len(peers), i)
|
||||||
self.crawled.update(peers)
|
self.add_peers(*peers)
|
||||||
|
self.associate_peers(make_kademlia_peer(key, address, port), peers)
|
||||||
return peers
|
return peers
|
||||||
|
|
||||||
async def process(self):
|
async def process(self):
|
||||||
|
@ -109,21 +222,31 @@ class Crawler:
|
||||||
to_process[_peer] = f
|
to_process[_peer] = f
|
||||||
f.add_done_callback(lambda _: to_process.pop(_peer))
|
f.add_done_callback(lambda _: to_process.pop(_peer))
|
||||||
|
|
||||||
while to_process or len(self.known_peers) < len(self.crawled):
|
to_check = self.get_peers_needing_check()
|
||||||
log.info("%d known, %d unreachable, %d error.. %d processing",
|
while True:
|
||||||
len(self.known_peers), len(self.unreachable), len(self.error), len(to_process))
|
for peer in to_check:
|
||||||
for peer in self.crawled.difference(self.known_peers):
|
if peer not in to_process:
|
||||||
self.known_peers.add(peer)
|
submit(peer)
|
||||||
submit(peer)
|
if len(to_process) > 20:
|
||||||
|
break
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
log.info("%d known, %d unreachable, %d error, %d processing, %d on queue",
|
||||||
|
self.recent_peers_query.count(), self.unreachable_peers_count, self.peers_with_errors_count,
|
||||||
|
len(to_process), len(to_check))
|
||||||
await asyncio.wait(to_process.values(), return_when=asyncio.FIRST_COMPLETED)
|
await asyncio.wait(to_process.values(), return_when=asyncio.FIRST_COMPLETED)
|
||||||
|
to_check = self.get_peers_needing_check()
|
||||||
|
while not to_check and not to_process:
|
||||||
|
log.info("Idle, sleeping a minute.")
|
||||||
|
await asyncio.sleep(60.0)
|
||||||
|
to_check = self.get_peers_needing_check()
|
||||||
|
|
||||||
|
|
||||||
async def test():
|
async def test():
|
||||||
crawler = Crawler()
|
crawler = Crawler("/tmp/a.db")
|
||||||
await crawler.node.start_listening()
|
await crawler.node.start_listening()
|
||||||
conf = Config()
|
conf = Config()
|
||||||
for (host, port) in conf.known_dht_nodes:
|
#for (host, port) in conf.known_dht_nodes:
|
||||||
await crawler.crawl_routing_table(host, port)
|
# await crawler.crawl_routing_table(host, port)
|
||||||
await crawler.process()
|
await crawler.process()
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|
Loading…
Reference in a new issue