Merge pull request #1926 from lbryio/fix_unfair_ban

Fix p2p ignoring good peers
This commit is contained in:
Jack Robison 2019-02-15 20:39:49 -05:00 committed by GitHub
commit eaacf7e034
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 17 deletions

View file

@ -185,9 +185,6 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: s
Returns [<downloaded blob>, <keep connection>] Returns [<downloaded blob>, <keep connection>]
""" """
if blob.get_is_verified() or blob.file_exists:
# file exists but not verified means someone is writing right now, give it time, come back later
return 0, connected_transport
protocol = BlobExchangeClientProtocol(loop, blob_download_timeout) protocol = BlobExchangeClientProtocol(loop, blob_download_timeout)
if connected_transport and not connected_transport.is_closing(): if connected_transport and not connected_transport.is_closing():
connected_transport.set_protocol(protocol) connected_transport.set_protocol(protocol)
@ -199,6 +196,9 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: s
if not connected_transport: if not connected_transport:
await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port),
peer_connect_timeout, loop=loop) peer_connect_timeout, loop=loop)
if blob.get_is_verified() or blob.file_exists:
# file exists but not verified means someone is writing right now, give it time, come back later
return 0, connected_transport
return await protocol.download_blob(blob) return await protocol.download_blob(blob)
except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError): except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError):
return 0, None return 0, None

View file

@ -26,7 +26,7 @@ class BlobDownloader:
self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {} self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {}
self.rounds_won: typing.Dict['KademliaPeer', int] = {} self.rounds_won: typing.Dict['KademliaPeer', int] = {}
def should_race_continue(self): def should_race_continue(self, blob: 'BlobFile'):
if len(self.active_connections) >= self.config.max_connections_per_download: if len(self.active_connections) >= self.config.max_connections_per_download:
return False return False
# if a peer won 3 or more blob races and is active as a downloader, stop the race so bandwidth improves # if a peer won 3 or more blob races and is active as a downloader, stop the race so bandwidth improves
@ -35,7 +35,7 @@ class BlobDownloader:
# for peer, task in self.active_connections.items(): # for peer, task in self.active_connections.items():
# if self.scores.get(peer, 0) >= 0 and self.rounds_won.get(peer, 0) >= 3 and not task.done(): # if self.scores.get(peer, 0) >= 0 and self.rounds_won.get(peer, 0) >= 3 and not task.done():
# return False # return False
return True return not (blob.get_is_verified() or blob.file_exists)
async def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'): async def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'):
if blob.get_is_verified(): if blob.get_is_verified():
@ -91,7 +91,7 @@ class BlobDownloader:
len(batch), len(self.ignored), len(self.active_connections) len(batch), len(self.ignored), len(self.active_connections)
) )
for peer in batch: for peer in batch:
if not self.should_race_continue(): if not self.should_race_continue(blob):
break break
if peer not in self.active_connections and peer not in self.ignored: if peer not in self.active_connections and peer not in self.ignored:
log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port)

View file

@ -74,19 +74,21 @@ class StreamDownloader(StreamAssembler):
def add_fixed_peers(self): def add_fixed_peers(self):
async def _add_fixed_peers(): async def _add_fixed_peers():
self.peer_queue.put_nowait([ addresses = [
KademliaPeer(self.loop, address=(await resolve_host(url, port + 1, proto='tcp')), tcp_port=port + 1) (await resolve_host(url, port + 1, proto='tcp'), port)
for url, port in self.config.reflector_servers for url, port in self.config.reflector_servers
]) ]
delay = self.config.fixed_peer_delay if (
'dht' not in self.config.components_to_skip
and self.node and len(self.node.protocol.routing_table.get_peers())
) else 0.0
self.loop.call_later(delay, lambda:
self.peer_queue.put_nowait([
KademliaPeer(self.loop, address=address, tcp_port=port + 1)
for address, port in addresses
]))
if self.config.reflector_servers: if self.config.reflector_servers:
self.fixed_peers_handle = self.loop.call_later( self.loop.create_task(_add_fixed_peers())
self.config.fixed_peer_delay if (
'dht' not in self.config.components_to_skip
and self.node
and len(self.node.protocol.routing_table.get_peers())
) else 0.0,
lambda: self.loop.create_task(_add_fixed_peers())
)
def download(self, node: typing.Optional['Node'] = None): def download(self, node: typing.Optional['Node'] = None):
self.node = node self.node = node