blob_exchange lint

This commit is contained in:
Victor Shyba 2020-01-03 02:07:21 -03:00 committed by Lex Berezhny
parent 44f402c64e
commit 28fbb70858
3 changed files with 20 additions and 21 deletions

View file

@ -95,7 +95,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
if self._response_fut and not self._response_fut.done(): if self._response_fut and not self._response_fut.done():
self._response_fut.set_exception(err) self._response_fut.set_exception(err)
async def _download_blob(self) -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]: async def _download_blob(self) -> typing.Tuple[int, Optional['BlobExchangeClientProtocol']]: # pylint: disable=too-many-return-statements
""" """
:return: download success (bool), connected protocol (BlobExchangeClientProtocol) :return: download success (bool), connected protocol (BlobExchangeClientProtocol)
""" """
@ -213,11 +213,11 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
self.connection_manager.connection_made(f"{self.peer_address}:{self.peer_port}") self.connection_manager.connection_made(f"{self.peer_address}:{self.peer_port}")
log.debug("connection made to %s:%i", self.peer_address, self.peer_port) log.debug("connection made to %s:%i", self.peer_address, self.peer_port)
def connection_lost(self, reason): def connection_lost(self, exc):
if self.connection_manager: if self.connection_manager:
self.connection_manager.outgoing_connection_lost(f"{self.peer_address}:{self.peer_port}") self.connection_manager.outgoing_connection_lost(f"{self.peer_address}:{self.peer_port}")
log.debug("connection lost to %s:%i (reason: %s, %s)", self.peer_address, self.peer_port, str(reason), log.debug("connection lost to %s:%i (reason: %s, %s)", self.peer_address, self.peer_port, str(exc),
str(type(reason))) str(type(exc)))
self.close() self.close()

View file

@ -46,7 +46,7 @@ class BlobAvailabilityRequest(BlobMessage):
def __init__(self, requested_blobs: typing.List[str], lbrycrd_address: typing.Optional[bool] = True, def __init__(self, requested_blobs: typing.List[str], lbrycrd_address: typing.Optional[bool] = True,
**kwargs) -> None: **kwargs) -> None:
assert len(requested_blobs) assert len(requested_blobs) > 0
self.requested_blobs = requested_blobs self.requested_blobs = requested_blobs
self.lbrycrd_address = lbrycrd_address self.lbrycrd_address = lbrycrd_address
@ -134,9 +134,9 @@ class BlobErrorResponse(BlobMessage):
} }
blob_request_types = typing.Union[BlobPriceRequest, BlobAvailabilityRequest, BlobDownloadRequest, blob_request_types = typing.Union[BlobPriceRequest, BlobAvailabilityRequest, BlobDownloadRequest, # pylint: disable=invalid-name
BlobPaymentAddressRequest] BlobPaymentAddressRequest]
blob_response_types = typing.Union[BlobPriceResponse, BlobAvailabilityResponse, BlobDownloadResponse, blob_response_types = typing.Union[BlobPriceResponse, BlobAvailabilityResponse, BlobDownloadResponse, # pylint: disable=invalid-name
BlobErrorResponse, BlobPaymentAddressResponse] BlobErrorResponse, BlobPaymentAddressResponse]
@ -157,10 +157,10 @@ def _parse_blob_response(response_msg: bytes) -> typing.Tuple[typing.Optional[ty
except ValueError: except ValueError:
continue continue
possible_response_keys = { possible_response_keys = {
BlobPaymentAddressResponse.key, BlobPaymentAddressResponse.key,
BlobAvailabilityResponse.key, BlobAvailabilityResponse.key,
BlobPriceResponse.key, BlobPriceResponse.key,
BlobDownloadResponse.key BlobDownloadResponse.key
} }
if isinstance(response, dict) and response.keys(): if isinstance(response, dict) and response.keys():
if set(response.keys()).issubset(possible_response_keys): if set(response.keys()).issubset(possible_response_keys):
@ -179,7 +179,7 @@ class BlobRequest:
return d return d
def _get_request(self, request_type: blob_request_types): def _get_request(self, request_type: blob_request_types):
request = tuple(filter(lambda r: type(r) == request_type, self.requests)) request = tuple(filter(lambda r: type(r) == request_type, self.requests)) # pylint: disable=unidiomatic-typecheck
if request: if request:
return request[0] return request[0]
@ -235,7 +235,7 @@ class BlobResponse:
return d return d
def _get_response(self, response_type: blob_response_types): def _get_response(self, response_type: blob_response_types):
response = tuple(filter(lambda r: type(r) == response_type, self.responses)) response = tuple(filter(lambda r: type(r) == response_type, self.responses)) # pylint: disable=unidiomatic-typecheck
if response: if response:
return response[0] return response[0]
@ -280,4 +280,3 @@ class BlobResponse:
if response_type.key in response if response_type.key in response
]) ])
return cls(requests, extra) return cls(requests, extra)

View file

@ -96,21 +96,21 @@ class BlobServerProtocol(asyncio.Protocol):
incoming_blob = {'blob_hash': blob.blob_hash, 'length': blob.length} incoming_blob = {'blob_hash': blob.blob_hash, 'length': blob.length}
responses.append(BlobDownloadResponse(incoming_blob=incoming_blob)) responses.append(BlobDownloadResponse(incoming_blob=incoming_blob))
self.send_response(responses) self.send_response(responses)
bh = blob.blob_hash[:8] blob_hash = blob.blob_hash[:8]
log.debug("send %s to %s:%i", bh, peer_address, peer_port) log.debug("send %s to %s:%i", blob_hash, peer_address, peer_port)
self.started_transfer.set() self.started_transfer.set()
try: try:
sent = await asyncio.wait_for(blob.sendfile(self), self.transfer_timeout, loop=self.loop) sent = await asyncio.wait_for(blob.sendfile(self), self.transfer_timeout, loop=self.loop)
if sent and sent > 0: if sent and sent > 0:
self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent) self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent)
log.info("sent %s (%i bytes) to %s:%i", bh, sent, peer_address, peer_port) log.info("sent %s (%i bytes) to %s:%i", blob_hash, sent, peer_address, peer_port)
else: else:
log.debug("stopped sending %s to %s:%i", bh, peer_address, peer_port) log.debug("stopped sending %s to %s:%i", blob_hash, peer_address, peer_port)
except (OSError, asyncio.TimeoutError) as err: except (OSError, asyncio.TimeoutError) as err:
if isinstance(err, asyncio.TimeoutError): if isinstance(err, asyncio.TimeoutError):
log.debug("timed out sending blob %s to %s", bh, peer_address) log.debug("timed out sending blob %s to %s", blob_hash, peer_address)
else: else:
log.warning("could not read blob %s to send %s:%i", bh, peer_address, peer_port) log.warning("could not read blob %s to send %s:%i", blob_hash, peer_address, peer_port)
self.close() self.close()
finally: finally:
self.transfer_finished.set() self.transfer_finished.set()
@ -127,7 +127,7 @@ class BlobServerProtocol(asyncio.Protocol):
return return
if data: if data:
self.blob_manager.connection_manager.received_data(self.peer_address_and_port, len(data)) self.blob_manager.connection_manager.received_data(self.peer_address_and_port, len(data))
message, separator, remainder = data.rpartition(b'}') _, separator, remainder = data.rpartition(b'}')
if not separator: if not separator:
self.buf += data self.buf += data
return return