forked from LBRYCommunity/lbry-sdk
disable infinite peer search, cleanup logging, tune scores to slow connections
This commit is contained in:
parent
d024433d1b
commit
b91d2190f4
3 changed files with 4 additions and 11 deletions
|
@ -37,7 +37,6 @@ class BlobDownloader:
|
||||||
async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer', connection_id: int = 0):
|
async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer', connection_id: int = 0):
|
||||||
if blob.get_is_verified():
|
if blob.get_is_verified():
|
||||||
return
|
return
|
||||||
self.scores[peer] = self.scores.get(peer, 0) - 1 # starts losing score, to account for cancelled ones
|
|
||||||
transport = self.connections.get(peer)
|
transport = self.connections.get(peer)
|
||||||
start = self.loop.time()
|
start = self.loop.time()
|
||||||
bytes_received, transport = await request_blob(
|
bytes_received, transport = await request_blob(
|
||||||
|
@ -55,14 +54,14 @@ class BlobDownloader:
|
||||||
self.failures[peer] = 0
|
self.failures[peer] = 0
|
||||||
self.connections[peer] = transport
|
self.connections[peer] = transport
|
||||||
elapsed = self.loop.time() - start
|
elapsed = self.loop.time() - start
|
||||||
self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 0
|
self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 1
|
||||||
|
|
||||||
async def new_peer_or_finished(self):
|
async def new_peer_or_finished(self):
|
||||||
active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)]
|
active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)]
|
||||||
await asyncio.wait(active_tasks, loop=self.loop, return_when='FIRST_COMPLETED')
|
await asyncio.wait(active_tasks, loop=self.loop, return_when='FIRST_COMPLETED')
|
||||||
|
|
||||||
def cleanup_active(self):
|
def cleanup_active(self):
|
||||||
if not self.active_connections:
|
if not self.active_connections and not self.connections:
|
||||||
self.clearbanned()
|
self.clearbanned()
|
||||||
to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()]
|
to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()]
|
||||||
for peer in to_remove:
|
for peer in to_remove:
|
||||||
|
|
|
@ -208,12 +208,8 @@ class Node:
|
||||||
task.cancel()
|
task.cancel()
|
||||||
|
|
||||||
async def _value_producer(self, blob_hash: str, result_queue: asyncio.Queue):
|
async def _value_producer(self, blob_hash: str, result_queue: asyncio.Queue):
|
||||||
for interval in range(1000):
|
|
||||||
log.info("Searching %s", blob_hash[:8])
|
|
||||||
async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())):
|
async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())):
|
||||||
result_queue.put_nowait(results)
|
result_queue.put_nowait(results)
|
||||||
log.info("Search expired %s", blob_hash[:8])
|
|
||||||
await asyncio.sleep(interval ** 2)
|
|
||||||
|
|
||||||
def accumulate_peers(self, search_queue: asyncio.Queue,
|
def accumulate_peers(self, search_queue: asyncio.Queue,
|
||||||
peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[
|
peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[
|
||||||
|
|
|
@ -311,7 +311,6 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
return args, {}
|
return args, {}
|
||||||
|
|
||||||
async def _add_peer(self, peer: 'KademliaPeer'):
|
async def _add_peer(self, peer: 'KademliaPeer'):
|
||||||
log.debug("Trying to add %s:%d", peer.address, peer.udp_port)
|
|
||||||
for p in self.routing_table.get_peers():
|
for p in self.routing_table.get_peers():
|
||||||
if (p.address, p.udp_port) == (peer.address, peer.udp_port) and p.node_id != peer.node_id:
|
if (p.address, p.udp_port) == (peer.address, peer.udp_port) and p.node_id != peer.node_id:
|
||||||
self.routing_table.remove_peer(p)
|
self.routing_table.remove_peer(p)
|
||||||
|
@ -391,7 +390,6 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
while self._to_remove:
|
while self._to_remove:
|
||||||
async with self._split_lock:
|
async with self._split_lock:
|
||||||
peer = self._to_remove.pop()
|
peer = self._to_remove.pop()
|
||||||
log.debug("Trying to remove %s:%d", peer.address, peer.udp_port)
|
|
||||||
self.routing_table.remove_peer(peer)
|
self.routing_table.remove_peer(peer)
|
||||||
self.routing_table.join_buckets()
|
self.routing_table.join_buckets()
|
||||||
while self._to_add:
|
while self._to_add:
|
||||||
|
|
Loading…
Add table
Reference in a new issue