forked from LBRYCommunity/lbry-sdk
dht_crawler: flush/commit only when finished
This commit is contained in:
parent
be4c62cf32
commit
c6c27925b7
1 changed files with 5 additions and 7 deletions
|
@ -74,7 +74,7 @@ class Crawler:
|
|||
self.semaphore = asyncio.Semaphore(20)
|
||||
engine = sqla.create_engine(f"sqlite:///{db_path}")
|
||||
Base.metadata.create_all(engine)
|
||||
session = sqla.orm.sessionmaker(engine)
|
||||
session = sqla.orm.sessionmaker(engine, autocommit=False, autoflush=False, expire_on_commit=False)
|
||||
self.db = session()
|
||||
|
||||
@property
|
||||
|
@ -113,7 +113,6 @@ class Crawler:
|
|||
self.db.add(db_peer)
|
||||
elif not db_peer:
|
||||
self.db.add(DHTPeer.from_kad_peer(peer))
|
||||
self.db.commit()
|
||||
|
||||
def get_from_peer(self, peer):
|
||||
return self.db.query(DHTPeer).filter(DHTPeer.address==peer.address, DHTPeer.udp_port==peer.udp_port).first()
|
||||
|
@ -129,13 +128,11 @@ class Crawler:
|
|||
db_peer.first_online = datetime.datetime.utcnow()
|
||||
db_peer.last_check = datetime.datetime.utcnow()
|
||||
self.db.add(db_peer)
|
||||
self.db.commit()
|
||||
|
||||
def inc_errors(self, peer):
|
||||
db_peer = self.get_from_peer(peer)
|
||||
db_peer.errors += 1
|
||||
self.db.add(db_peer)
|
||||
self.db.commit()
|
||||
|
||||
def count_peers(self):
|
||||
return self.db.query(DHTPeer).count()
|
||||
|
@ -149,11 +146,10 @@ class Crawler:
|
|||
for peer in peers
|
||||
}
|
||||
all_peer_ids = {peer.node_id for peer in peers if peer.node_id}
|
||||
print(self.db.query(DHTPeer).filter(DHTPeer.node_id.in_(all_peer_ids)).update(
|
||||
{DHTPeer.last_seen: datetime.datetime.utcnow()}))
|
||||
self.db.query(DHTPeer).filter(DHTPeer.node_id.in_(all_peer_ids)).update(
|
||||
{DHTPeer.last_seen: datetime.datetime.utcnow()})
|
||||
self.db.query(DHTConnection).filter(DHTConnection.from_peer_id == db_peer.peer_id).delete()
|
||||
self.db.add_all(connections)
|
||||
self.db.commit()
|
||||
|
||||
async def request_peers(self, host, port, key) -> typing.List['KademliaPeer']:
|
||||
async with self.semaphore:
|
||||
|
@ -227,6 +223,8 @@ class Crawler:
|
|||
host, port, (time.time() - start), len(peers), i)
|
||||
self.add_peers(*peers)
|
||||
self.associate_peers(make_kademlia_peer(key, address, port), peers)
|
||||
self.db.flush()
|
||||
self.db.commit()
|
||||
return peers
|
||||
|
||||
async def process(self):
|
||||
|
|
Loading…
Reference in a new issue