score peers by speed

This commit is contained in:
Victor Shyba 2019-02-08 03:05:53 -03:00
parent 141d68a2cd
commit 3352e0e4f4
2 changed files with 28 additions and 23 deletions

View file

@ -74,7 +74,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
if self._response_fut and not self._response_fut.done(): if self._response_fut and not self._response_fut.done():
self._response_fut.set_exception(err) self._response_fut.set_exception(err)
async def _download_blob(self) -> typing.Tuple[bool, typing.Optional[asyncio.Transport]]: async def _download_blob(self) -> typing.Tuple[int, typing.Optional[asyncio.Transport]]:
""" """
:return: download success (bool), keep connection (bool) :return: download success (bool), keep connection (bool)
""" """
@ -92,24 +92,24 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
log.warning("%s not in availability response from %s:%i", self.blob.blob_hash, self.peer_address, log.warning("%s not in availability response from %s:%i", self.blob.blob_hash, self.peer_address,
self.peer_port) self.peer_port)
log.warning(response.to_dict()) log.warning(response.to_dict())
return False, self.close() return self._blob_bytes_received, self.close()
elif availability_response.available_blobs and \ elif availability_response.available_blobs and \
availability_response.available_blobs != [self.blob.blob_hash]: availability_response.available_blobs != [self.blob.blob_hash]:
log.warning("blob availability response doesn't match our request from %s:%i", log.warning("blob availability response doesn't match our request from %s:%i",
self.peer_address, self.peer_port) self.peer_address, self.peer_port)
return False, self.close() return self._blob_bytes_received, self.close()
if not price_response or price_response.blob_data_payment_rate != 'RATE_ACCEPTED': if not price_response or price_response.blob_data_payment_rate != 'RATE_ACCEPTED':
log.warning("data rate rejected by %s:%i", self.peer_address, self.peer_port) log.warning("data rate rejected by %s:%i", self.peer_address, self.peer_port)
return False, self.close() return self._blob_bytes_received, self.close()
if not blob_response or blob_response.error: if not blob_response or blob_response.error:
log.warning("blob cant be downloaded from %s:%i", self.peer_address, self.peer_port) log.warning("blob cant be downloaded from %s:%i", self.peer_address, self.peer_port)
return False, self.transport return self._blob_bytes_received, self.transport
if not blob_response.error and blob_response.blob_hash != self.blob.blob_hash: if not blob_response.error and blob_response.blob_hash != self.blob.blob_hash:
log.warning("incoming blob hash mismatch from %s:%i", self.peer_address, self.peer_port) log.warning("incoming blob hash mismatch from %s:%i", self.peer_address, self.peer_port)
return False, self.close() return self._blob_bytes_received, self.close()
if self.blob.length is not None and self.blob.length != blob_response.length: if self.blob.length is not None and self.blob.length != blob_response.length:
log.warning("incoming blob unexpected length from %s:%i", self.peer_address, self.peer_port) log.warning("incoming blob unexpected length from %s:%i", self.peer_address, self.peer_port)
return False, self.close() return self._blob_bytes_received, self.close()
msg = f"downloading {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}," \ msg = f"downloading {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}," \
f" timeout in {self.peer_timeout}" f" timeout in {self.peer_timeout}"
log.debug(msg) log.debug(msg)
@ -117,12 +117,12 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop) await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop)
log.info(msg) log.info(msg)
await self.blob.finished_writing.wait() await self.blob.finished_writing.wait()
return True, self.transport return self._blob_bytes_received, self.transport
except asyncio.TimeoutError: except asyncio.TimeoutError:
return False, self.close() return self._blob_bytes_received, self.close()
except (InvalidBlobHashError, InvalidDataError): except (InvalidBlobHashError, InvalidDataError):
log.warning("invalid blob from %s:%i", self.peer_address, self.peer_port) log.warning("invalid blob from %s:%i", self.peer_address, self.peer_port)
return False, self.close() return self._blob_bytes_received, self.close()
def close(self): def close(self):
if self._response_fut and not self._response_fut.done(): if self._response_fut and not self._response_fut.done():
@ -136,9 +136,9 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
self.transport.close() self.transport.close()
self.transport = None self.transport = None
async def download_blob(self, blob: 'BlobFile') -> typing.Tuple[bool, typing.Optional[asyncio.Transport]]: async def download_blob(self, blob: 'BlobFile') -> typing.Tuple[int, typing.Optional[asyncio.Transport]]:
if blob.get_is_verified() or blob.file_exists: if blob.get_is_verified() or blob.file_exists:
return False, self.transport return 0, self.transport
try: try:
self.blob, self.writer, self._blob_bytes_received = blob, blob.open_for_writing(), 0 self.blob, self.writer, self._blob_bytes_received = blob, blob.open_for_writing(), 0
self._response_fut = asyncio.Future(loop=self.loop) self._response_fut = asyncio.Future(loop=self.loop)
@ -146,12 +146,12 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
except OSError: except OSError:
log.error("race happened downloading from %s:%i", self.peer_address, self.peer_port) log.error("race happened downloading from %s:%i", self.peer_address, self.peer_port)
# i'm not sure how to fix this race condition - jack # i'm not sure how to fix this race condition - jack
return False, self.transport return self._blob_bytes_received, self.transport
except asyncio.TimeoutError: except asyncio.TimeoutError:
if self._response_fut and not self._response_fut.done(): if self._response_fut and not self._response_fut.done():
self._response_fut.cancel() self._response_fut.cancel()
self.close() self.close()
return False, None return self._blob_bytes_received, None
except asyncio.CancelledError: except asyncio.CancelledError:
self.close() self.close()
raise raise
@ -170,14 +170,14 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: str, tcp_port: int, async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: str, tcp_port: int,
peer_connect_timeout: float, blob_download_timeout: float, peer_connect_timeout: float, blob_download_timeout: float,
connected_transport: asyncio.Transport = None)\ connected_transport: asyncio.Transport = None)\
-> typing.Tuple[bool, typing.Optional[asyncio.Transport]]: -> typing.Tuple[int, typing.Optional[asyncio.Transport]]:
""" """
Returns [<downloaded blob>, <keep connection>] Returns [<downloaded blob>, <keep connection>]
""" """
if blob.get_is_verified() or blob.file_exists: if blob.get_is_verified() or blob.file_exists:
# file exists but not verified means someone is writing right now, give it time, come back later # file exists but not verified means someone is writing right now, give it time, come back later
return False, connected_transport return 0, connected_transport
protocol = BlobExchangeClientProtocol(loop, blob_download_timeout) protocol = BlobExchangeClientProtocol(loop, blob_download_timeout)
if connected_transport and not connected_transport.is_closing(): if connected_transport and not connected_transport.is_closing():
connected_transport.set_protocol(protocol) connected_transport.set_protocol(protocol)
@ -190,4 +190,4 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: s
peer_connect_timeout, loop=loop) peer_connect_timeout, loop=loop)
return await protocol.download_blob(blob) return await protocol.download_blob(blob)
except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError): except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError):
return False, None return 0, None

View file

@ -24,14 +24,15 @@ class BlobDownloader:
self.ignored: typing.Set['KademliaPeer'] = set() self.ignored: typing.Set['KademliaPeer'] = set()
self.scores: typing.Dict['KademliaPeer', int] = {} self.scores: typing.Dict['KademliaPeer', int] = {}
self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {} self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {}
self.rounds_won: typing.Dict['KademliaPeer', int] = {}
def should_race_continue(self): def should_race_continue(self):
if len(self.active_connections) >= self.config.max_connections_per_download: if len(self.active_connections) >= self.config.max_connections_per_download:
return False return False
# if a peer won 2 or more blob races and is active as a downloader, stop the race so bandwidth improves # if a peer won 3 or more blob races and is active as a downloader, stop the race so bandwidth improves
# the safe net side is that any failure will reset the peer score, triggering the race back # the safe net side is that any failure will reset the peer score, triggering the race back
for peer, task in self.active_connections.items(): for peer, task in self.active_connections.items():
if self.scores.get(peer, 0) >= 2 and not task.done(): if self.scores.get(peer, 0) >= 0 and self.rounds_won.get(peer, 0) >= 3 and not task.done():
return False return False
return True return True
@ -40,10 +41,13 @@ class BlobDownloader:
return return
self.scores[peer] = self.scores.get(peer, 0) - 1 # starts losing score, to account for cancelled ones self.scores[peer] = self.scores.get(peer, 0) - 1 # starts losing score, to account for cancelled ones
transport = self.connections.get(peer) transport = self.connections.get(peer)
success, transport = await request_blob( start = self.loop.time()
bytes_received, transport = await request_blob(
self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout,
self.config.blob_download_timeout, connected_transport=transport self.config.blob_download_timeout, connected_transport=transport
) )
if bytes_received == blob.get_length():
self.rounds_won[peer] = self.rounds_won.get(peer, 0) + 1
if not transport and peer not in self.ignored: if not transport and peer not in self.ignored:
self.ignored.add(peer) self.ignored.add(peer)
log.debug("drop peer %s:%i", peer.address, peer.tcp_port) log.debug("drop peer %s:%i", peer.address, peer.tcp_port)
@ -52,7 +56,8 @@ class BlobDownloader:
elif transport: elif transport:
log.debug("keep peer %s:%i", peer.address, peer.tcp_port) log.debug("keep peer %s:%i", peer.address, peer.tcp_port)
self.connections[peer] = transport self.connections[peer] = transport
self.scores[peer] = (self.scores.get(peer, 0) + 2) if success else 0 rough_speed = (bytes_received / (self.loop.time() - start)) if bytes_received else 0
self.scores[peer] = rough_speed
async def new_peer_or_finished(self, blob: 'BlobFile'): async def new_peer_or_finished(self, blob: 'BlobFile'):
async def get_and_re_add_peers(): async def get_and_re_add_peers():
@ -113,8 +118,8 @@ class BlobDownloader:
raise e raise e
def close(self): def close(self):
for transport in self.connections.values(): for transport in self.connections.values():
transport.close() transport.close()
async def download_blob(loop, config: 'Config', blob_manager: 'BlobFileManager', node: 'Node', async def download_blob(loop, config: 'Config', blob_manager: 'BlobFileManager', node: 'Node',