unban after 10 seconds, give up after 60
This commit is contained in:
parent
71b66ab337
commit
b36c22e2f4
2 changed files with 26 additions and 9 deletions
|
@ -111,7 +111,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||||
return self._blob_bytes_received, self.close()
|
return self._blob_bytes_received, self.close()
|
||||||
if not blob_response or blob_response.error:
|
if not blob_response or blob_response.error:
|
||||||
log.warning("blob cant be downloaded from %s:%i", self.peer_address, self.peer_port)
|
log.warning("blob cant be downloaded from %s:%i", self.peer_address, self.peer_port)
|
||||||
return self._blob_bytes_received, self.transport
|
return self._blob_bytes_received, self.close()
|
||||||
if not blob_response.error and blob_response.blob_hash != self.blob.blob_hash:
|
if not blob_response.error and blob_response.blob_hash != self.blob.blob_hash:
|
||||||
log.warning("incoming blob hash mismatch from %s:%i", self.peer_address, self.peer_port)
|
log.warning("incoming blob hash mismatch from %s:%i", self.peer_address, self.peer_port)
|
||||||
return self._blob_bytes_received, self.close()
|
return self._blob_bytes_received, self.close()
|
||||||
|
|
|
@ -14,6 +14,7 @@ log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class BlobDownloader:
|
class BlobDownloader:
|
||||||
|
BAN_TIME = 10.0 # fixme: when connection manager gets implemented, move it out from here
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager',
|
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager',
|
||||||
peer_queue: asyncio.Queue):
|
peer_queue: asyncio.Queue):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
|
@ -21,10 +22,10 @@ class BlobDownloader:
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.peer_queue = peer_queue
|
self.peer_queue = peer_queue
|
||||||
self.active_connections: typing.Dict['KademliaPeer', asyncio.Task] = {} # active request_blob calls
|
self.active_connections: typing.Dict['KademliaPeer', asyncio.Task] = {} # active request_blob calls
|
||||||
self.ignored: typing.Set['KademliaPeer'] = set()
|
self.ignored: typing.Dict['KademliaPeer', int] = {}
|
||||||
self.scores: typing.Dict['KademliaPeer', int] = {}
|
self.scores: typing.Dict['KademliaPeer', int] = {}
|
||||||
self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {}
|
self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {}
|
||||||
self.rounds_won: typing.Dict['KademliaPeer', int] = {}
|
self.time_since_last_blob = loop.time()
|
||||||
|
|
||||||
def should_race_continue(self, blob: 'BlobFile'):
|
def should_race_continue(self, blob: 'BlobFile'):
|
||||||
if len(self.active_connections) >= self.config.max_connections_per_download:
|
if len(self.active_connections) >= self.config.max_connections_per_download:
|
||||||
|
@ -48,9 +49,9 @@ class BlobDownloader:
|
||||||
self.config.blob_download_timeout, connected_transport=transport
|
self.config.blob_download_timeout, connected_transport=transport
|
||||||
)
|
)
|
||||||
if bytes_received == blob.get_length():
|
if bytes_received == blob.get_length():
|
||||||
self.rounds_won[peer] = self.rounds_won.get(peer, 0) + 1
|
self.time_since_last_blob = self.loop.time()
|
||||||
if not transport and peer not in self.ignored:
|
if not transport and peer not in self.ignored:
|
||||||
self.ignored.add(peer)
|
self.ignored[peer] = self.loop.time()
|
||||||
log.debug("drop peer %s:%i", peer.address, peer.tcp_port)
|
log.debug("drop peer %s:%i", peer.address, peer.tcp_port)
|
||||||
if peer in self.connections:
|
if peer in self.connections:
|
||||||
del self.connections[peer]
|
del self.connections[peer]
|
||||||
|
@ -62,8 +63,11 @@ class BlobDownloader:
|
||||||
|
|
||||||
async def new_peer_or_finished(self, blob: 'BlobFile'):
|
async def new_peer_or_finished(self, blob: 'BlobFile'):
|
||||||
async def get_and_re_add_peers():
|
async def get_and_re_add_peers():
|
||||||
new_peers = await self.peer_queue.get()
|
try:
|
||||||
|
new_peers = await asyncio.wait_for(self.peer_queue.get(), timeout=1.0)
|
||||||
self.peer_queue.put_nowait(new_peers)
|
self.peer_queue.put_nowait(new_peers)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
tasks = [self.loop.create_task(get_and_re_add_peers()), self.loop.create_task(blob.verified.wait())]
|
tasks = [self.loop.create_task(get_and_re_add_peers()), self.loop.create_task(blob.verified.wait())]
|
||||||
active_tasks = list(self.active_connections.values())
|
active_tasks = list(self.active_connections.values())
|
||||||
try:
|
try:
|
||||||
|
@ -76,6 +80,15 @@ class BlobDownloader:
|
||||||
for peer in to_remove:
|
for peer in to_remove:
|
||||||
del self.active_connections[peer]
|
del self.active_connections[peer]
|
||||||
|
|
||||||
|
def clearbanned(self):
|
||||||
|
now = self.loop.time()
|
||||||
|
if now - self.time_since_last_blob > 60.0:
|
||||||
|
return
|
||||||
|
forgiven = [banned_peer for banned_peer, when in self.ignored.items() if now - when > self.BAN_TIME]
|
||||||
|
self.peer_queue.put_nowait(forgiven)
|
||||||
|
for banned_peer in forgiven:
|
||||||
|
self.ignored.pop(banned_peer)
|
||||||
|
|
||||||
async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
|
async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
|
||||||
blob = self.blob_manager.get_blob(blob_hash, length)
|
blob = self.blob_manager.get_blob(blob_hash, length)
|
||||||
if blob.get_is_verified():
|
if blob.get_is_verified():
|
||||||
|
@ -101,6 +114,8 @@ class BlobDownloader:
|
||||||
self.cleanup_active()
|
self.cleanup_active()
|
||||||
if batch:
|
if batch:
|
||||||
self.peer_queue.put_nowait(set(batch).difference(self.ignored))
|
self.peer_queue.put_nowait(set(batch).difference(self.ignored))
|
||||||
|
else:
|
||||||
|
self.clearbanned()
|
||||||
while self.active_connections:
|
while self.active_connections:
|
||||||
peer, task = self.active_connections.popitem()
|
peer, task = self.active_connections.popitem()
|
||||||
if task and not task.done():
|
if task and not task.done():
|
||||||
|
@ -119,6 +134,8 @@ class BlobDownloader:
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
self.scores.clear()
|
||||||
|
self.ignored.clear()
|
||||||
for transport in self.connections.values():
|
for transport in self.connections.values():
|
||||||
transport.close()
|
transport.close()
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue