forked from LBRYCommunity/lbry-sdk
Merge pull request #1845 from lbryio/stuck_peers_fixup
fix active peers not being removed after closing + parsing of data stream
This commit is contained in:
commit
c1b4a012ec
2 changed files with 20 additions and 22 deletions
|
@ -33,10 +33,14 @@ class BlobDownloader:
|
||||||
async def _request_blob():
|
async def _request_blob():
|
||||||
if blob.get_is_verified():
|
if blob.get_is_verified():
|
||||||
return
|
return
|
||||||
success, keep_connection = await request_blob(
|
try:
|
||||||
self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout,
|
success, keep_connection = await request_blob(
|
||||||
self.config.blob_download_timeout
|
self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout,
|
||||||
)
|
self.config.blob_download_timeout
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
if peer in self.active_connections:
|
||||||
|
self.active_connections.pop(peer)
|
||||||
if not keep_connection and peer not in self.ignored:
|
if not keep_connection and peer not in self.ignored:
|
||||||
self.ignored.add(peer)
|
self.ignored.add(peer)
|
||||||
log.debug("drop peer %s:%i", peer.address, peer.tcp_port)
|
log.debug("drop peer %s:%i", peer.address, peer.tcp_port)
|
||||||
|
|
|
@ -146,32 +146,26 @@ def _parse_blob_response(response_msg: bytes) -> typing.Tuple[typing.Optional[ty
|
||||||
# <blob bytes>
|
# <blob bytes>
|
||||||
# <json><blob bytes>
|
# <json><blob bytes>
|
||||||
|
|
||||||
extra_data = b''
|
|
||||||
response = None
|
|
||||||
curr_pos = 0
|
curr_pos = 0
|
||||||
while True:
|
while True:
|
||||||
next_close_paren = response_msg.find(b'}', curr_pos)
|
next_close_paren = response_msg.find(b'}', curr_pos)
|
||||||
if next_close_paren == -1:
|
if next_close_paren == -1:
|
||||||
break
|
return None, response_msg
|
||||||
curr_pos = next_close_paren + 1
|
curr_pos = next_close_paren + 1
|
||||||
try:
|
try:
|
||||||
response = json.loads(response_msg[:curr_pos])
|
response = json.loads(response_msg[:curr_pos])
|
||||||
if not isinstance(response, dict):
|
|
||||||
raise ValueError()
|
|
||||||
for key in response.keys():
|
|
||||||
if key not in [
|
|
||||||
BlobPaymentAddressResponse.key,
|
|
||||||
BlobAvailabilityResponse.key,
|
|
||||||
BlobPriceResponse.key,
|
|
||||||
BlobDownloadResponse.key]:
|
|
||||||
raise ValueError()
|
|
||||||
extra_data = response_msg[curr_pos:]
|
|
||||||
break
|
|
||||||
except ValueError:
|
except ValueError:
|
||||||
response = None
|
continue
|
||||||
if response is None:
|
possible_response_keys = {
|
||||||
extra_data = response_msg
|
BlobPaymentAddressResponse.key,
|
||||||
return response, extra_data
|
BlobAvailabilityResponse.key,
|
||||||
|
BlobPriceResponse.key,
|
||||||
|
BlobDownloadResponse.key
|
||||||
|
}
|
||||||
|
if isinstance(response, dict) and response.keys():
|
||||||
|
if set(response.keys()).issubset(possible_response_keys):
|
||||||
|
return response, response_msg[curr_pos:]
|
||||||
|
return None, response_msg
|
||||||
|
|
||||||
|
|
||||||
class BlobRequest:
|
class BlobRequest:
|
||||||
|
|
Loading…
Reference in a new issue