Merge pull request #1852 from lbryio/close_protocol_reflector

refactor reflect to close protocol
This commit is contained in:
Jack Robison 2019-02-04 14:42:18 -05:00 committed by GitHub
commit 7d7d4c35c3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -176,15 +176,7 @@ class ManagedStream:
protocol = StreamReflectorClient(self.blob_manager, self.descriptor) protocol = StreamReflectorClient(self.blob_manager, self.descriptor)
try: try:
await self.loop.create_connection(lambda: protocol, host, port) await self.loop.create_connection(lambda: protocol, host, port)
except ConnectionRefusedError:
return sent
try:
await protocol.send_handshake() await protocol.send_handshake()
except (asyncio.CancelledError, asyncio.TimeoutError, ValueError):
if protocol.transport:
protocol.transport.close()
return sent
try:
sent_sd, needed = await protocol.send_descriptor() sent_sd, needed = await protocol.send_descriptor()
if sent_sd: if sent_sd:
sent.append(self.sd_hash) sent.append(self.sd_hash)
@ -193,20 +185,16 @@ class ManagedStream:
self.fully_reflected.set() self.fully_reflected.set()
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}") await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
return [] return []
except (asyncio.CancelledError, asyncio.TimeoutError, ValueError): for blob_hash in needed:
if protocol.transport:
protocol.transport.close()
return sent
for blob_hash in needed:
try:
await protocol.send_blob(blob_hash) await protocol.send_blob(blob_hash)
sent.append(blob_hash) sent.append(blob_hash)
except (asyncio.CancelledError, asyncio.TimeoutError, ValueError): except (asyncio.CancelledError, asyncio.TimeoutError, ValueError):
if protocol.transport: return sent
protocol.transport.close() except ConnectionRefusedError:
return sent return sent
if protocol.transport: finally:
protocol.transport.close() if protocol.transport:
protocol.transport.close()
if not self.fully_reflected.is_set(): if not self.fully_reflected.is_set():
self.fully_reflected.set() self.fully_reflected.set()
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}") await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")