diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index ee6344b17..b628808a1 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -1,6 +1,7 @@ import asyncio import logging import typing +import binascii from lbrynet.blob_exchange.serialization import BlobResponse, BlobRequest if typing.TYPE_CHECKING: from lbrynet.blob.blob_file import BlobFile @@ -25,7 +26,9 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self._request_lock = asyncio.Lock(loop=self.loop) def data_received(self, data: bytes): - if self.transport.is_closing(): # TODO: is this needed? + if not self.transport or self.transport.is_closing(): + log.warning("transport closing, but got more bytes from %s:%i\n%s", self.peer_address, self.peer_port, + binascii.hexlify(data)) if self._response_fut and not self._response_fut.done(): self._response_fut.cancel() return diff --git a/lbrynet/stream/assembler.py b/lbrynet/stream/assembler.py index 80bea1154..00e5ae754 100644 --- a/lbrynet/stream/assembler.py +++ b/lbrynet/stream/assembler.py @@ -59,7 +59,7 @@ class StreamAssembler: decrypted = await self.loop.run_in_executor(None, _decrypt_and_write) if decrypted: - log.info("decrypted %s", blob.blob_hash[:8]) + log.debug("decrypted %s", blob.blob_hash[:8]) return async def assemble_decrypted_stream(self, output_dir: str, output_file_name: typing.Optional[str] = None): diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index b6ec2ff78..4687d10a7 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -29,7 +29,6 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t self.peer_timeout = peer_timeout self.peer_connect_timeout = peer_connect_timeout self.current_blob: 'BlobFile' = None - self.download_task: asyncio.Task = None self.accumulate_connections_task: asyncio.Task = None self.new_peer_event = asyncio.Event(loop=self.loop) @@ -106,13 +105,16 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t loop=self.loop) if got_new_peer and not got_new_peer.done(): got_new_peer.cancel() + async with self._lock: if self.current_blob.get_is_verified(): + # a download attempt finished if got_new_peer and not got_new_peer.done(): got_new_peer.cancel() drain_tasks(download_tasks) return self.current_blob else: + # we got a new peer, re add the other pending download attempts for task in download_tasks: if task and not task.done(): self.running_download_requests.append(task) @@ -147,14 +149,13 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t added += 1 if added: if not self.new_peer_event.is_set(): - log.info("added %i new peers", len(peers)) + log.debug("added %i new peers", len(peers)) self.new_peer_event.set() async def _accumulate_connections(self, node: 'Node'): blob_queue = asyncio.Queue(loop=self.loop) blob_queue.put_nowait(self.sd_hash) task = asyncio.create_task(self.got_descriptor.wait()) - added_peers = asyncio.Event(loop=self.loop) add_fixed_peers_timer: typing.Optional[asyncio.Handle] = None if self.fixed_peers: @@ -178,8 +179,6 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t async for peers in search_junction: if peers: self._add_peer_protocols(peers) - if not added_peers.is_set(): - added_peers.set() return except asyncio.CancelledError: pass