diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 290ed071a..882902e9b 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -194,15 +194,14 @@ class AbstractBlob: return if self.is_writeable(): self._write_blob(verified_bytes) + self.verified.set() if self.blob_completed_callback: - self.blob_completed_callback(self).add_done_callback(lambda _: self.verified.set()) - else: - self.verified.set() + self.blob_completed_callback(self) def get_blob_writer(self, peer_address: typing.Optional[str] = None, peer_port: typing.Optional[int] = None) -> HashBlobWriter: if (peer_address, peer_port) in self.writers: - log.exception("attempted to download blob twice from %s:%s", peer_address, peer_port) + raise OSError(f"attempted to download blob twice from {peer_address}:{peer_port}") fut = asyncio.Future(loop=self.loop) writer = HashBlobWriter(self.blob_hash, self.get_length, fut) self.writers[(peer_address, peer_port)] = writer diff --git a/lbrynet/blob/writer.py b/lbrynet/blob/writer.py index 699c226d8..914680b19 100644 --- a/lbrynet/blob/writer.py +++ b/lbrynet/blob/writer.py @@ -62,6 +62,8 @@ class HashBlobWriter: self.close_handle() def close_handle(self): + if not self.finished.done(): + self.finished.cancel() if self.buffer is not None: self.buffer.close() self.buffer = None diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 11f0d1804..0f58efeb3 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -145,18 +145,17 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.buf = b'' async def download_blob(self, blob: 'AbstractBlob') -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: + blob_hash = blob.blob_hash if blob.get_is_verified() or not blob.is_writeable(): return 0, self.transport try: - - self.blob, self.writer, self._blob_bytes_received = blob, blob.get_blob_writer(self.peer_address, - self.peer_port), 0 + self._blob_bytes_received = 0 + self.blob, self.writer = blob, blob.get_blob_writer(self.peer_address, self.peer_port) self._response_fut = asyncio.Future(loop=self.loop) return await self._download_blob() except OSError as e: - log.error("race happened downloading from %s:%i", self.peer_address, self.peer_port) # i'm not sure how to fix this race condition - jack - log.exception(e) + log.exception("race happened downloading %s from %s:%i", blob_hash, self.peer_address, self.peer_port) return self._blob_bytes_received, self.transport except asyncio.TimeoutError: if self._response_fut and not self._response_fut.done():