refactor script, remove dep
This commit is contained in:
parent
47a5d37d7c
commit
13af7800c2
2 changed files with 154 additions and 103 deletions
|
@ -31,7 +31,7 @@ RUN chown -R $user:$user $projects_dir
|
|||
USER $user
|
||||
WORKDIR $projects_dir
|
||||
|
||||
RUN python3 -m pip install -U setuptools pip sqlalchemy
|
||||
RUN python3 -m pip install -U setuptools pip
|
||||
RUN make install
|
||||
RUN python3 docker/set_build.py
|
||||
RUN rm ~/.cache -rf
|
||||
|
|
|
@ -3,9 +3,10 @@ import logging
|
|||
import asyncio
|
||||
import time
|
||||
import typing
|
||||
from dataclasses import dataclass, astuple, replace
|
||||
|
||||
from aiohttp import web
|
||||
from prometheus_client import Gauge, Counter, generate_latest as prom_generate_latest
|
||||
from prometheus_client import Gauge, generate_latest as prom_generate_latest
|
||||
|
||||
import lbry.dht.error
|
||||
from lbry.dht.constants import generate_id
|
||||
|
@ -17,56 +18,92 @@ from lbry.conf import Config
|
|||
from lbry.utils import resolve_host
|
||||
|
||||
|
||||
from sqlalchemy.orm import declarative_base, relationship
|
||||
import sqlalchemy as sqla
|
||||
|
||||
|
||||
@sqla.event.listens_for(sqla.engine.Engine, "connect")
|
||||
def set_sqlite_pragma(dbapi_connection, _):
|
||||
cursor = dbapi_connection.cursor()
|
||||
cursor.execute("PRAGMA journal_mode=WAL")
|
||||
cursor.close()
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
|
||||
log = logging.getLogger(__name__)
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class DHTPeer(Base):
|
||||
__tablename__ = "peer"
|
||||
peer_id = sqla.Column(sqla.Integer(), sqla.Identity(), primary_key=True)
|
||||
node_id = sqla.Column(sqla.String(96))
|
||||
address = sqla.Column(sqla.String())
|
||||
udp_port = sqla.Column(sqla.Integer())
|
||||
tcp_port = sqla.Column(sqla.Integer())
|
||||
first_online = sqla.Column(sqla.DateTime())
|
||||
errors = sqla.Column(sqla.Integer(), default=0)
|
||||
last_churn = sqla.Column(sqla.Integer())
|
||||
added_on = sqla.Column(sqla.DateTime(), nullable=False, default=datetime.datetime.utcnow)
|
||||
last_check = sqla.Column(sqla.DateTime())
|
||||
last_seen = sqla.Column(sqla.DateTime())
|
||||
latency = sqla.Column(sqla.Integer())
|
||||
endpoint_unique = sqla.UniqueConstraint("node_id", "udp_port")
|
||||
class PeerStorage(SQLiteMixin):
|
||||
CREATE_TABLES_QUERY = """
|
||||
PRAGMA JOURNAL_MODE=WAL;
|
||||
CREATE TABLE IF NOT EXISTS peer (
|
||||
peer_id INTEGER NOT NULL,
|
||||
node_id VARCHAR(96),
|
||||
address VARCHAR,
|
||||
udp_port INTEGER,
|
||||
tcp_port INTEGER,
|
||||
first_online DATETIME,
|
||||
errors INTEGER,
|
||||
last_churn INTEGER,
|
||||
added_on DATETIME NOT NULL,
|
||||
last_check DATETIME,
|
||||
last_seen DATETIME,
|
||||
latency INTEGER,
|
||||
PRIMARY KEY (peer_id)
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS connection (
|
||||
from_peer_id INTEGER NOT NULL,
|
||||
to_peer_id INTEGER NOT NULL,
|
||||
PRIMARY KEY (from_peer_id, to_peer_id),
|
||||
FOREIGN KEY(from_peer_id) REFERENCES peer (peer_id),
|
||||
FOREIGN KEY(to_peer_id) REFERENCES peer (peer_id)
|
||||
);
|
||||
"""
|
||||
|
||||
async def open(self):
|
||||
await super().open()
|
||||
self.db.writer_connection.row_factory = dict_row_factory
|
||||
|
||||
async def all_peers(self):
|
||||
return [DHTPeer(**peer) for peer in await self.db.execute_fetchall("select * from peer")]
|
||||
|
||||
async def save_peers(self, *peers):
|
||||
log.info("Saving graph nodes (peers) to DB")
|
||||
await self.db.executemany(
|
||||
"INSERT OR REPLACE INTO peer("
|
||||
"node_id, address, udp_port, tcp_port, first_online, errors, last_churn,"
|
||||
"added_on, last_check, last_seen, latency, peer_id) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
|
||||
[astuple(peer) for peer in peers]
|
||||
)
|
||||
log.info("Finished saving graph nodes (peers) to DB")
|
||||
|
||||
async def save_connections(self, connections_map):
|
||||
log.info("Saving graph edges (connections) to DB")
|
||||
await self.db.executemany(
|
||||
"DELETE FROM connection WHERE from_peer_id = ?", [(key,) for key in connections_map])
|
||||
for from_peer_id in connections_map:
|
||||
await self.db.executemany(
|
||||
"INSERT INTO connection(from_peer_id, to_peer_id) VALUES(?,?)",
|
||||
[(from_peer_id, to_peer_id) for to_peer_id in connections_map[from_peer_id]])
|
||||
log.info("Finished saving graph edges (connections) to DB")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DHTPeer:
|
||||
node_id: str
|
||||
address: str
|
||||
udp_port: int
|
||||
tcp_port: int = None
|
||||
first_online: datetime.datetime = None
|
||||
errors: int = None
|
||||
last_churn: int = None
|
||||
added_on: datetime.datetime = None
|
||||
last_check: datetime.datetime = None
|
||||
last_seen: datetime.datetime = None
|
||||
latency: int = None
|
||||
peer_id: int = None
|
||||
|
||||
@classmethod
|
||||
def from_kad_peer(cls, peer):
|
||||
def from_kad_peer(cls, peer, peer_id):
|
||||
node_id = peer.node_id.hex() if peer.node_id else None
|
||||
return DHTPeer(node_id=node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port)
|
||||
return DHTPeer(
|
||||
node_id=node_id, address=peer.address, udp_port=peer.udp_port, tcp_port=peer.tcp_port,
|
||||
peer_id=peer_id, added_on=datetime.datetime.utcnow())
|
||||
|
||||
def to_kad_peer(self):
|
||||
node_id = bytes.fromhex(self.node_id) if self.node_id else None
|
||||
return make_kademlia_peer(node_id, self.address, self.udp_port, self.tcp_port)
|
||||
|
||||
|
||||
class DHTConnection(Base):
|
||||
__tablename__ = "connection"
|
||||
from_peer_id = sqla.Column(sqla.Integer(), sqla.ForeignKey("peer.peer_id"), primary_key=True)
|
||||
connected_by = relationship("DHTPeer", backref="known_by", primaryjoin=(DHTPeer.peer_id == from_peer_id))
|
||||
to_peer_id = sqla.Column(sqla.Integer(), sqla.ForeignKey("peer.peer_id"), primary_key=True)
|
||||
connected_to = relationship("DHTPeer", backref="connections", primaryjoin=(DHTPeer.peer_id == to_peer_id))
|
||||
|
||||
|
||||
def new_node(address="0.0.0.0", udp_port=0, node_id=None):
|
||||
node_id = node_id or generate_id()
|
||||
loop = asyncio.get_event_loop()
|
||||
|
@ -102,15 +139,17 @@ class Crawler:
|
|||
"host_latency", "Time spent on the last request, in nanoseconds.", namespace="dht_crawler_node",
|
||||
labelnames=("host", "port")
|
||||
)
|
||||
|
||||
def __init__(self, db_path: str):
|
||||
self.node = new_node()
|
||||
self.semaphore = asyncio.Semaphore(200)
|
||||
engine = sqla.create_engine(f"sqlite:///{db_path}")
|
||||
Base.metadata.create_all(engine)
|
||||
session = sqla.orm.sessionmaker(engine, autocommit=False, autoflush=False, expire_on_commit=False)
|
||||
self.db = session()
|
||||
self.db = PeerStorage(db_path)
|
||||
self._memory_peers = {}
|
||||
self._connections = {}
|
||||
|
||||
async def open(self):
|
||||
await self.db.open()
|
||||
self._memory_peers = {
|
||||
(peer.address, peer.udp_port): peer for peer in self.db.query(DHTPeer).all()
|
||||
(peer.address, peer.udp_port): peer for peer in await self.db.all_peers()
|
||||
}
|
||||
|
||||
@property
|
||||
|
@ -146,20 +185,20 @@ class Crawler:
|
|||
return to_check
|
||||
|
||||
def add_peers(self, *peers):
|
||||
db_peers = []
|
||||
for peer in peers:
|
||||
db_peer = self.get_from_peer(peer)
|
||||
if db_peer and db_peer.node_id is None and peer.node_id:
|
||||
db_peer.node_id = peer.node_id.hex()
|
||||
if db_peer and db_peer.node_id is None and peer.node_id is not None:
|
||||
db_peer = replace(db_peer, node_id=peer.node_id.hex())
|
||||
elif not db_peer:
|
||||
db_peer = DHTPeer.from_kad_peer(peer)
|
||||
self._memory_peers[(peer.address, peer.udp_port)] = db_peer
|
||||
db_peer.last_seen = datetime.datetime.utcnow()
|
||||
db_peers.append(db_peer)
|
||||
db_peer = DHTPeer.from_kad_peer(peer, len(self._memory_peers) + 1)
|
||||
db_peer = replace(db_peer, last_seen=datetime.datetime.utcnow())
|
||||
self._memory_peers[(peer.address, peer.udp_port)] = db_peer
|
||||
|
||||
def flush_to_db(self):
|
||||
self.db.add_all(self._memory_peers.values())
|
||||
self.db.commit()
|
||||
async def flush_to_db(self):
|
||||
await self.db.save_peers(*self._memory_peers.values())
|
||||
connections_to_save = self._connections
|
||||
self._connections = {}
|
||||
await self.db.save_connections(connections_to_save)
|
||||
|
||||
def get_from_peer(self, peer):
|
||||
return self._memory_peers.get((peer.address, peer.udp_port), None)
|
||||
|
@ -167,71 +206,73 @@ class Crawler:
|
|||
def set_latency(self, peer, latency=None):
|
||||
if latency:
|
||||
self.host_latency_metric.labels(host=peer.address, port=peer.udp_port).set(latency)
|
||||
db_peer = self.get_from_peer(peer)
|
||||
db_peer.latency = latency
|
||||
db_peer = replace(self.get_from_peer(peer), latency=latency)
|
||||
if not db_peer.node_id and peer.node_id:
|
||||
db_peer.node_id = peer.node_id.hex()
|
||||
db_peer = replace(db_peer, node_id=peer.node_id.hex())
|
||||
if db_peer.first_online and latency is None:
|
||||
db_peer.last_churn = (datetime.datetime.utcnow() - db_peer.first_online).seconds
|
||||
db_peer = replace(db_peer, last_churn=(datetime.datetime.utcnow() - db_peer.first_online).seconds)
|
||||
elif latency is not None and db_peer.first_online is None:
|
||||
db_peer.first_online = datetime.datetime.utcnow()
|
||||
db_peer.last_check = datetime.datetime.utcnow()
|
||||
db_peer = replace(db_peer, first_online=datetime.datetime.utcnow())
|
||||
db_peer = replace(db_peer, last_check=datetime.datetime.utcnow())
|
||||
self._memory_peers[(db_peer.address, db_peer.udp_port)] = db_peer
|
||||
|
||||
def inc_errors(self, peer):
|
||||
db_peer = self.get_from_peer(peer)
|
||||
db_peer.errors = (db_peer.errors or 0) + 1
|
||||
self._memory_peers[(peer.address, peer.node_id)] = replace(db_peer, errors=(db_peer.errors or 0) + 1)
|
||||
|
||||
def associate_peers(self, target_peer_id, db_peer_ids):
|
||||
return # todo
|
||||
def associate_peers(self, peer, other_peers):
|
||||
self._connections[self.get_from_peer(peer).peer_id] = [
|
||||
self.get_from_peer(other_peer).peer_id for other_peer in other_peers]
|
||||
|
||||
async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']:
|
||||
async with self.semaphore:
|
||||
peer = make_kademlia_peer(None, await resolve_host(host, port, 'udp'), port)
|
||||
for attempt in range(3):
|
||||
try:
|
||||
req_start = time.perf_counter_ns()
|
||||
response = await self.node.protocol.get_rpc_peer(peer).find_node(key)
|
||||
latency = time.perf_counter_ns() - req_start
|
||||
self.set_latency(make_kademlia_peer(key, host, port), latency)
|
||||
return [make_kademlia_peer(*peer_tuple) for peer_tuple in response]
|
||||
except asyncio.TimeoutError:
|
||||
self.set_latency(make_kademlia_peer(key, host, port), None)
|
||||
continue
|
||||
except lbry.dht.error.RemoteException as e:
|
||||
log.info('Peer errored: %s:%d attempt #%d - %s',
|
||||
host, port, (attempt + 1), str(e))
|
||||
self.inc_errors(peer)
|
||||
self.set_latency(make_kademlia_peer(key, host, port), None)
|
||||
continue
|
||||
peer = make_kademlia_peer(key, await resolve_host(host, port, 'udp'), port)
|
||||
for attempt in range(3):
|
||||
try:
|
||||
req_start = time.perf_counter_ns()
|
||||
response = await self.node.protocol.get_rpc_peer(peer).find_node(key)
|
||||
await asyncio.sleep(0.05)
|
||||
latency = time.perf_counter_ns() - req_start
|
||||
self.set_latency(peer, latency)
|
||||
return [make_kademlia_peer(*peer_tuple) for peer_tuple in response]
|
||||
except asyncio.TimeoutError:
|
||||
self.set_latency(peer, None)
|
||||
continue
|
||||
except lbry.dht.error.RemoteException as e:
|
||||
log.info('Peer errored: %s:%d attempt #%d - %s',
|
||||
host, port, (attempt + 1), str(e))
|
||||
self.inc_errors(peer)
|
||||
self.set_latency(peer, None)
|
||||
continue
|
||||
return []
|
||||
|
||||
async def crawl_routing_table(self, host, port, node_id=None):
|
||||
start = time.time()
|
||||
log.info("querying %s:%d", host, port)
|
||||
address = await resolve_host(host, port, 'udp')
|
||||
self.add_peers(make_kademlia_peer(None, address, port))
|
||||
key = node_id or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
||||
peer = make_kademlia_peer(key, address, port)
|
||||
self.add_peers(peer)
|
||||
if not key:
|
||||
latency = None
|
||||
for _ in range(3):
|
||||
try:
|
||||
ping_start = time.perf_counter_ns()
|
||||
async with self.semaphore:
|
||||
await self.node.protocol.get_rpc_peer(make_kademlia_peer(None, address, port)).ping()
|
||||
key = key or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
||||
await self.node.protocol.get_rpc_peer(peer).ping()
|
||||
await asyncio.sleep(0.05)
|
||||
key = key or self.node.protocol.peer_manager.get_node_id_for_endpoint(address, port)
|
||||
peer = make_kademlia_peer(key, address, port)
|
||||
latency = time.perf_counter_ns() - ping_start
|
||||
break
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
except lbry.dht.error.RemoteException:
|
||||
self.inc_errors(make_kademlia_peer(None, address, port))
|
||||
self.inc_errors(peer)
|
||||
pass
|
||||
self.set_latency(make_kademlia_peer(key, address, port), latency if key else None)
|
||||
if not latency or not key:
|
||||
if latency and not key:
|
||||
self.set_latency(peer, latency if peer.node_id else None)
|
||||
if not latency or not peer.node_id:
|
||||
if latency and not peer.node_id:
|
||||
log.warning("No node id from %s:%d", host, port)
|
||||
return set()
|
||||
node_id = key
|
||||
distance = Distance(key)
|
||||
max_distance = int.from_bytes(bytes([0xff] * 48), 'big')
|
||||
peers = set()
|
||||
|
@ -248,7 +289,7 @@ class Crawler:
|
|||
next_jump = current_distance + int(max_distance // factor) # jump closer
|
||||
factor /= 2
|
||||
if factor > 8 and next_jump < max_distance:
|
||||
key = int.from_bytes(node_id, 'big') ^ next_jump
|
||||
key = int.from_bytes(peer.node_id, 'big') ^ next_jump
|
||||
if key.bit_length() > 384:
|
||||
break
|
||||
key = key.to_bytes(48, 'big')
|
||||
|
@ -262,8 +303,7 @@ class Crawler:
|
|||
self.add_peers(*peers)
|
||||
if peers:
|
||||
self.connections_found_metric.labels(host=host, port=port).set(len(peers))
|
||||
#self.associate_peers(this_peer_id, db_peer_ids)
|
||||
self.db.commit()
|
||||
self.associate_peers(peer, peers)
|
||||
return peers
|
||||
|
||||
async def process(self):
|
||||
|
@ -271,19 +311,17 @@ class Crawler:
|
|||
|
||||
def submit(_peer):
|
||||
f = asyncio.ensure_future(
|
||||
self.crawl_routing_table(_peer.address, peer.udp_port, bytes.fromhex(peer.node_id)))
|
||||
to_process[_peer] = f
|
||||
f.add_done_callback(lambda _: to_process.pop(_peer))
|
||||
self.crawl_routing_table(_peer.address, _peer.udp_port, bytes.fromhex(_peer.node_id)))
|
||||
to_process[_peer.peer_id] = f
|
||||
f.add_done_callback(lambda _: to_process.pop(_peer.peer_id))
|
||||
|
||||
to_check = self.get_peers_needing_check()
|
||||
last_flush = datetime.datetime.utcnow()
|
||||
while True:
|
||||
for peer in to_check:
|
||||
if peer not in to_process:
|
||||
for peer in to_check[:200]:
|
||||
if peer.peer_id not in to_process:
|
||||
submit(peer)
|
||||
await asyncio.sleep(.1)
|
||||
if len(to_process) > 100:
|
||||
break
|
||||
await asyncio.sleep(.05)
|
||||
await asyncio.sleep(0)
|
||||
self.unique_total_hosts_metric.labels("global").set(self.checked_peers_count)
|
||||
self.reachable_hosts_metric.labels("global").set(self.checked_peers_count - self.unreachable_peers_count)
|
||||
|
@ -298,7 +336,7 @@ class Crawler:
|
|||
to_check = self.get_peers_needing_check()
|
||||
if (datetime.datetime.utcnow() - last_flush).seconds > 60:
|
||||
log.info("flushing to db")
|
||||
self.flush_to_db()
|
||||
await self.flush_to_db()
|
||||
last_flush = datetime.datetime.utcnow()
|
||||
while not to_check and not to_process:
|
||||
port = self.node.listening_port.get_extra_info('socket').getsockname()[1]
|
||||
|
@ -332,10 +370,23 @@ class SimpleMetrics:
|
|||
await prom_site.start()
|
||||
|
||||
|
||||
def dict_row_factory(cursor, row):
|
||||
d = {}
|
||||
for idx, col in enumerate(cursor.description):
|
||||
if col[0] in ('added_on', 'first_online', 'last_seen', 'last_check'):
|
||||
d[col[0]] = datetime.datetime.fromisoformat(row[idx]) if row[idx] else None
|
||||
else:
|
||||
d[col[0]] = row[idx]
|
||||
return d
|
||||
|
||||
|
||||
async def test():
|
||||
asyncio.get_event_loop().set_debug(True)
|
||||
metrics = SimpleMetrics('8080')
|
||||
await metrics.start()
|
||||
crawler = Crawler("/tmp/a.db")
|
||||
await crawler.open()
|
||||
await crawler.flush_to_db()
|
||||
await crawler.node.start_listening()
|
||||
conf = Config()
|
||||
if crawler.active_peers_count < 100:
|
||||
|
|
Loading…
Reference in a new issue