From 43ac928f0b7212b6354dd62ef0407d47335884f1 Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor1984@riseup.net> Date: Thu, 7 Feb 2019 20:09:48 -0300 Subject: [PATCH 01/14] remove dht locks --- lbrynet/dht/node.py | 2 +- lbrynet/dht/protocol/protocol.py | 79 +++++++++++++++----------------- 2 files changed, 38 insertions(+), 43 deletions(-) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 2cb4c3cf3..45d73cb53 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -64,7 +64,7 @@ class Node: # ping the set of peers; upon success/failure the routing able and last replied/failed time will be updated to_ping = [peer for peer in set(total_peers) if self.protocol.peer_manager.peer_is_good(peer) is not True] if to_ping: - await self.protocol.ping_queue.enqueue_maybe_ping(*to_ping, delay=0) + self.protocol.ping_queue.enqueue_maybe_ping(*to_ping, delay=0) fut = asyncio.Future(loop=self.loop) self.loop.call_later(constants.refresh_interval, fut.set_result, None) diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index ec3cd8dd0..913cac83b 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -192,23 +192,21 @@ class PingQueue: self._process_task: asyncio.Task = None self._next_task: asyncio.Future = None self._next_timer: asyncio.TimerHandle = None - self._lock = asyncio.Lock() self._running = False @property def running(self): return self._running - async def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None): + def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None): delay = constants.check_refresh_interval if delay is None else delay - async with self._lock: - for peer in peers: - if delay and peer not in self._enqueued_contacts: - self._pending_contacts[peer] = self._loop.time() + delay - elif peer not in self._enqueued_contacts: - self._enqueued_contacts.append(peer) - if peer in self._pending_contacts: - del self._pending_contacts[peer] + for peer in peers: + if delay and peer not in self._enqueued_contacts: + self._pending_contacts[peer] = self._loop.time() + delay + elif peer not in self._enqueued_contacts: + self._enqueued_contacts.append(peer) + if peer in self._pending_contacts: + del self._pending_contacts[peer] async def _process(self): async def _ping(p: 'KademliaPeer'): @@ -223,17 +221,16 @@ class PingQueue: while True: tasks = [] - async with self._lock: - if self._enqueued_contacts or self._pending_contacts: - now = self._loop.time() - scheduled = [k for k, d in self._pending_contacts.items() if now >= d] - for k in scheduled: - del self._pending_contacts[k] - if k not in self._enqueued_contacts: - self._enqueued_contacts.append(k) - while self._enqueued_contacts: - peer = self._enqueued_contacts.pop() - tasks.append(self._loop.create_task(_ping(peer))) + if self._enqueued_contacts or self._pending_contacts: + now = self._loop.time() + scheduled = [k for k, d in self._pending_contacts.items() if now >= d] + for k in scheduled: + del self._pending_contacts[k] + if k not in self._enqueued_contacts: + self._enqueued_contacts.append(k) + while self._enqueued_contacts: + peer = self._enqueued_contacts.pop() + tasks.append(self._loop.create_task(_ping(peer))) if tasks: await asyncio.wait(tasks, loop=self._loop) @@ -282,7 +279,6 @@ class KademliaProtocol(DatagramProtocol): self.data_store = DictDataStore(self.loop, self.peer_manager) self.ping_queue = PingQueue(self.loop, self) self.node_rpc = KademliaRPC(self, self.loop, self.peer_port) - self.lock = asyncio.Lock(loop=self.loop) self.rpc_timeout = rpc_timeout self._split_lock = asyncio.Lock(loop=self.loop) @@ -424,7 +420,7 @@ class KademliaProtocol(DatagramProtocol): # will be added to our routing table if successful is_good = self.peer_manager.peer_is_good(peer) if is_good is None: - await self.ping_queue.enqueue_maybe_ping(peer) + self.ping_queue.enqueue_maybe_ping(peer) elif is_good is True: await self.add_peer(peer) @@ -553,26 +549,25 @@ class KademliaProtocol(DatagramProtocol): if message.rpc_id in self.sent_messages: self.sent_messages.pop(message.rpc_id) - async with self.lock: + if isinstance(message, RequestDatagram): + response_fut = self.loop.create_future() + response_fut.add_done_callback(pop_from_sent_messages) + self.sent_messages[message.rpc_id] = (peer, response_fut, message) + try: + self.transport.sendto(data, (peer.address, peer.udp_port)) + except OSError as err: + # TODO: handle ENETUNREACH + if err.errno == socket.EWOULDBLOCK: + # i'm scared this may swallow important errors, but i get a million of these + # on Linux and it doesn't seem to affect anything -grin + log.warning("Can't send data to dht: EWOULDBLOCK") + else: + log.error("DHT socket error sending %i bytes to %s:%i - %s (code %i)", + len(data), peer.address, peer.udp_port, str(err), err.errno) if isinstance(message, RequestDatagram): - response_fut = self.loop.create_future() - response_fut.add_done_callback(pop_from_sent_messages) - self.sent_messages[message.rpc_id] = (peer, response_fut, message) - try: - self.transport.sendto(data, (peer.address, peer.udp_port)) - except OSError as err: - # TODO: handle ENETUNREACH - if err.errno == socket.EWOULDBLOCK: - # i'm scared this may swallow important errors, but i get a million of these - # on Linux and it doesn't seem to affect anything -grin - log.warning("Can't send data to dht: EWOULDBLOCK") - else: - log.error("DHT socket error sending %i bytes to %s:%i - %s (code %i)", - len(data), peer.address, peer.udp_port, str(err), err.errno) - if isinstance(message, RequestDatagram): - self.sent_messages[message.rpc_id][1].set_exception(err) - else: - raise err + self.sent_messages[message.rpc_id][1].set_exception(err) + else: + raise err if isinstance(message, RequestDatagram): self.peer_manager.report_last_sent(peer.address, peer.udp_port) elif isinstance(message, ErrorDatagram): From a616582733c85891a7cdcfe07b2a0da3528edcb5 Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor1984@riseup.net> Date: Thu, 7 Feb 2019 20:11:28 -0300 Subject: [PATCH 02/14] wait on connection tasks --- lbrynet/blob_exchange/downloader.py | 32 ++++++++++++++++++----------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 3bcdc97f0..5b9467742 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -13,11 +13,6 @@ if typing.TYPE_CHECKING: log = logging.getLogger(__name__) -def drain_into(a: list, b: list): - while a: - b.append(a.pop()) - - class BlobDownloader: def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager', peer_queue: asyncio.Queue): @@ -52,11 +47,17 @@ class BlobDownloader: self.peer_queue.put_nowait(new_peers) tasks = [self.loop.create_task(get_and_re_add_peers()), self.loop.create_task(blob.verified.wait())] try: - await asyncio.wait(tasks, loop=self.loop, return_when='FIRST_COMPLETED') + done, pending = await asyncio.wait(tasks, loop=self.loop, return_when='FIRST_COMPLETED') + drain_tasks(pending) except asyncio.CancelledError: drain_tasks(tasks) raise + def cleanup_active(self): + to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()] + for peer in to_remove: + del self.active_connections[peer] + async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile': blob = self.blob_manager.get_blob(blob_hash, length) if blob.get_is_verified(): @@ -65,7 +66,11 @@ class BlobDownloader: while not blob.get_is_verified(): batch: typing.List['KademliaPeer'] = [] while not self.peer_queue.empty(): - batch.extend(await self.peer_queue.get()) + batch.extend(self.peer_queue.get_nowait()) + log.debug( + "running, %d peers, %d ignored, %d active", + len(batch), len(self.ignored), len(self.active_connections) + ) for peer in batch: if len(self.active_connections) >= self.config.max_connections_per_download: break @@ -73,11 +78,11 @@ class BlobDownloader: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) t = self.loop.create_task(self.request_blob_from_peer(blob, peer)) self.active_connections[peer] = t - t.add_done_callback( - lambda _: - None if peer not in self.active_connections else self.active_connections.pop(peer) - ) - await self.new_peer_or_finished(blob) + if self.active_connections: + await asyncio.wait(self.active_connections.values(), return_when='FIRST_COMPLETED') + self.cleanup_active() + else: + await self.new_peer_or_finished(blob) to_re_add = list(set(filter(lambda peer: peer not in self.ignored, batch))) to_re_add.sort(key=lambda peer: self.scores.get(peer, 0), reverse=True) if to_re_add: @@ -95,6 +100,9 @@ class BlobDownloader: if task and not task.done(): task.cancel() raise + except (OSError, Exception) as e: + log.exception(e) + raise e async def download_blob(loop, config: 'Config', blob_manager: 'BlobFileManager', node: 'Node', From 5586a226c2bddf3cc7c1cd6aa2ab04391eea85e2 Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor1984@riseup.net> Date: Thu, 7 Feb 2019 20:20:39 -0300 Subject: [PATCH 03/14] bypass parser during download --- lbrynet/blob_exchange/client.py | 40 +++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index b630d74b2..ace3c98b0 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -32,6 +32,8 @@ class BlobExchangeClientProtocol(asyncio.Protocol): if self._response_fut and not self._response_fut.done(): self._response_fut.cancel() return + if self._blob_bytes_received and not self.writer.closed(): + return self._write(data) response = BlobResponse.deserialize(data) @@ -51,23 +53,27 @@ class BlobExchangeClientProtocol(asyncio.Protocol): if response.blob_data and self.writer and not self.writer.closed(): log.debug("got %i blob bytes from %s:%i", len(response.blob_data), self.peer_address, self.peer_port) # write blob bytes if we're writing a blob and have blob bytes to write - if len(response.blob_data) > (self.blob.get_length() - self._blob_bytes_received): - data = response.blob_data[:(self.blob.get_length() - self._blob_bytes_received)] - log.warning("got more than asked from %s:%d, probable sendfile bug", self.peer_address, self.peer_port) - else: - data = response.blob_data - self._blob_bytes_received += len(data) - try: - self.writer.write(data) - return - except IOError as err: - log.error("error downloading blob from %s:%i: %s", self.peer_address, self.peer_port, err) - if self._response_fut and not self._response_fut.done(): - self._response_fut.set_exception(err) - except (asyncio.CancelledError, asyncio.TimeoutError) as err: # TODO: is this needed? - log.error("%s downloading blob from %s:%i", str(err), self.peer_address, self.peer_port) - if self._response_fut and not self._response_fut.done(): - self._response_fut.set_exception(err) + self._write(response.blob_data) + + + def _write(self, data): + if len(data) > (self.blob.get_length() - self._blob_bytes_received): + data = data[:(self.blob.get_length() - self._blob_bytes_received)] + log.warning("got more than asked from %s:%d, probable sendfile bug", self.peer_address, self.peer_port) + else: + data = data + self._blob_bytes_received += len(data) + try: + self.writer.write(data) + return + except IOError as err: + log.error("error downloading blob from %s:%i: %s", self.peer_address, self.peer_port, err) + if self._response_fut and not self._response_fut.done(): + self._response_fut.set_exception(err) + except (asyncio.CancelledError, asyncio.TimeoutError) as err: # TODO: is this needed? + log.error("%s downloading blob from %s:%i", str(err), self.peer_address, self.peer_port) + if self._response_fut and not self._response_fut.done(): + self._response_fut.set_exception(err) async def _download_blob(self) -> typing.Tuple[bool, bool]: """ From ad03f91d2481f4f575998cfd08bbcd099a3898c6 Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor1984@riseup.net> Date: Fri, 8 Feb 2019 01:03:13 -0300 Subject: [PATCH 04/14] let cancelled errors flow --- lbrynet/blob_exchange/client.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index ace3c98b0..09e14f507 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -65,12 +65,11 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self._blob_bytes_received += len(data) try: self.writer.write(data) - return except IOError as err: log.error("error downloading blob from %s:%i: %s", self.peer_address, self.peer_port, err) if self._response_fut and not self._response_fut.done(): self._response_fut.set_exception(err) - except (asyncio.CancelledError, asyncio.TimeoutError) as err: # TODO: is this needed? + except (asyncio.TimeoutError) as err: # TODO: is this needed? log.error("%s downloading blob from %s:%i", str(err), self.peer_address, self.peer_port) if self._response_fut and not self._response_fut.done(): self._response_fut.set_exception(err) @@ -119,8 +118,6 @@ class BlobExchangeClientProtocol(asyncio.Protocol): log.info(msg) await self.blob.finished_writing.wait() return True, True - except asyncio.CancelledError: - return False, True except asyncio.TimeoutError: return False, False except (InvalidBlobHashError, InvalidDataError): @@ -159,7 +156,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): except asyncio.CancelledError: if self._response_fut and not self._response_fut.done(): self._response_fut.cancel() - return False, True + raise def connection_made(self, transport: asyncio.Transport): self.transport = transport @@ -186,7 +183,7 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: s await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), peer_connect_timeout, loop=loop) return await protocol.download_blob(blob) - except (asyncio.TimeoutError, asyncio.CancelledError, ConnectionRefusedError, ConnectionAbortedError, OSError): + except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError): return False, False finally: await protocol.close() From 2d7eb835180159f4239f2401c8014df8f79cd472 Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor1984@riseup.net> Date: Fri, 8 Feb 2019 01:04:38 -0300 Subject: [PATCH 05/14] change score calculation, wait for active peers too, simplify peer sorting/keeping --- lbrynet/blob_exchange/downloader.py | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 5b9467742..9f6b5ab99 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -27,6 +27,7 @@ class BlobDownloader: async def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'): if blob.get_is_verified(): return + self.scores[peer] = self.scores.get(peer, 0) - 1 # starts losing score, to account for cancelled ones success, keep_connection = await request_blob( self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, self.config.blob_download_timeout @@ -36,22 +37,18 @@ class BlobDownloader: log.debug("drop peer %s:%i", peer.address, peer.tcp_port) elif keep_connection: log.debug("keep peer %s:%i", peer.address, peer.tcp_port) - if success: - self.scores[peer] = self.scores.get(peer, 0) + 2 - else: - self.scores[peer] = self.scores.get(peer, 0) - 1 + self.scores[peer] = self.scores.get(peer, 0) + 2 if success else 0 async def new_peer_or_finished(self, blob: 'BlobFile'): async def get_and_re_add_peers(): new_peers = await self.peer_queue.get() self.peer_queue.put_nowait(new_peers) tasks = [self.loop.create_task(get_and_re_add_peers()), self.loop.create_task(blob.verified.wait())] + active_tasks = list(self.active_connections.values()) try: - done, pending = await asyncio.wait(tasks, loop=self.loop, return_when='FIRST_COMPLETED') - drain_tasks(pending) - except asyncio.CancelledError: + await asyncio.wait(tasks + active_tasks, loop=self.loop, return_when='FIRST_COMPLETED') + finally: drain_tasks(tasks) - raise def cleanup_active(self): to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()] @@ -67,6 +64,7 @@ class BlobDownloader: batch: typing.List['KademliaPeer'] = [] while not self.peer_queue.empty(): batch.extend(self.peer_queue.get_nowait()) + batch.sort(key=lambda peer: self.scores.get(peer, 0), reverse=True) log.debug( "running, %d peers, %d ignored, %d active", len(batch), len(self.ignored), len(self.active_connections) @@ -78,15 +76,10 @@ class BlobDownloader: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) t = self.loop.create_task(self.request_blob_from_peer(blob, peer)) self.active_connections[peer] = t - if self.active_connections: - await asyncio.wait(self.active_connections.values(), return_when='FIRST_COMPLETED') - self.cleanup_active() - else: - await self.new_peer_or_finished(blob) - to_re_add = list(set(filter(lambda peer: peer not in self.ignored, batch))) - to_re_add.sort(key=lambda peer: self.scores.get(peer, 0), reverse=True) - if to_re_add: - self.peer_queue.put_nowait(to_re_add) + await self.new_peer_or_finished(blob) + self.cleanup_active() + if batch: + self.peer_queue.put_nowait(set(batch).difference(self.ignored)) while self.active_connections: peer, task = self.active_connections.popitem() if task and not task.done(): From 73da223ae1fca3f573a5e96d14ab71989564c7e6 Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor1984@riseup.net> Date: Fri, 8 Feb 2019 01:05:58 -0300 Subject: [PATCH 06/14] conf: give up first contact earlier, but let the transfer go longer --- lbrynet/conf.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 2f2eab69e..9c90cdeba 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -475,8 +475,8 @@ class Config(CLIConfig): # protocol timeouts download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0) - blob_download_timeout = Float("Timeout to download a blob from a peer", 20.0) - peer_connect_timeout = Float("Timeout to establish a TCP connection to a peer", 3.0) + blob_download_timeout = Float("Timeout to download a blob from a peer", 30.0) + peer_connect_timeout = Float("Timeout to establish a TCP connection to a peer", 2.0) node_rpc_timeout = Float("Timeout when making a DHT request", constants.rpc_timeout) # blob announcement and download From 1be5dce30ebf6b3f5cbdae82b5b57b9e566108a4 Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor1984@riseup.net> Date: Fri, 8 Feb 2019 01:28:03 -0300 Subject: [PATCH 07/14] stop racing during long streams --- lbrynet/blob_exchange/downloader.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 9f6b5ab99..86e50798d 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -24,6 +24,16 @@ class BlobDownloader: self.ignored: typing.Set['KademliaPeer'] = set() self.scores: typing.Dict['KademliaPeer', int] = {} + def should_race_continue(self): + if len(self.active_connections) >= self.config.max_connections_per_download: + return False + # if a peer won 2 or more blob races and is active as a downloader, stop the race so bandwidth improves + # the safe net side is that any failure will reset the peer score, triggering the race back + for peer, task in self.active_connections.items(): + if self.scores.get(peer, 0) >= 2 and not task.done(): + return False + return True + async def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'): if blob.get_is_verified(): return @@ -37,7 +47,7 @@ class BlobDownloader: log.debug("drop peer %s:%i", peer.address, peer.tcp_port) elif keep_connection: log.debug("keep peer %s:%i", peer.address, peer.tcp_port) - self.scores[peer] = self.scores.get(peer, 0) + 2 if success else 0 + self.scores[peer] = (self.scores.get(peer, 0) + 2) if success else 0 async def new_peer_or_finished(self, blob: 'BlobFile'): async def get_and_re_add_peers(): @@ -70,7 +80,7 @@ class BlobDownloader: len(batch), len(self.ignored), len(self.active_connections) ) for peer in batch: - if len(self.active_connections) >= self.config.max_connections_per_download: + if not self.should_race_continue(): break if peer not in self.active_connections and peer not in self.ignored: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) From c06ec6cd6999fb13d422c10186a0c51c6de94b98 Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor1984@riseup.net> Date: Fri, 8 Feb 2019 02:27:58 -0300 Subject: [PATCH 08/14] manage connections, reusing them when possible --- lbrynet/blob_exchange/client.py | 61 +++++++++++++++-------------- lbrynet/blob_exchange/downloader.py | 18 +++++++-- lbrynet/stream/downloader.py | 1 + 3 files changed, 47 insertions(+), 33 deletions(-) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 09e14f507..59ed7b56f 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -74,7 +74,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): if self._response_fut and not self._response_fut.done(): self._response_fut.set_exception(err) - async def _download_blob(self) -> typing.Tuple[bool, bool]: + async def _download_blob(self) -> typing.Tuple[bool, typing.Optional[asyncio.Transport]]: """ :return: download success (bool), keep connection (bool) """ @@ -92,24 +92,24 @@ class BlobExchangeClientProtocol(asyncio.Protocol): log.warning("%s not in availability response from %s:%i", self.blob.blob_hash, self.peer_address, self.peer_port) log.warning(response.to_dict()) - return False, False + return False, self.close() elif availability_response.available_blobs and \ availability_response.available_blobs != [self.blob.blob_hash]: log.warning("blob availability response doesn't match our request from %s:%i", self.peer_address, self.peer_port) - return False, False + return False, self.close() if not price_response or price_response.blob_data_payment_rate != 'RATE_ACCEPTED': log.warning("data rate rejected by %s:%i", self.peer_address, self.peer_port) - return False, False + return False, self.close() if not blob_response or blob_response.error: log.warning("blob cant be downloaded from %s:%i", self.peer_address, self.peer_port) - return False, True + return False, self.transport if not blob_response.error and blob_response.blob_hash != self.blob.blob_hash: log.warning("incoming blob hash mismatch from %s:%i", self.peer_address, self.peer_port) - return False, False + return False, self.close() if self.blob.length is not None and self.blob.length != blob_response.length: log.warning("incoming blob unexpected length from %s:%i", self.peer_address, self.peer_port) - return False, False + return False, self.close() msg = f"downloading {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}," \ f" timeout in {self.peer_timeout}" log.debug(msg) @@ -117,16 +117,14 @@ class BlobExchangeClientProtocol(asyncio.Protocol): await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop) log.info(msg) await self.blob.finished_writing.wait() - return True, True + return True, self.transport except asyncio.TimeoutError: - return False, False + return False, self.close() except (InvalidBlobHashError, InvalidDataError): log.warning("invalid blob from %s:%i", self.peer_address, self.peer_port) - return False, False - finally: - await self.close() + return False, self.close() - async def close(self): + def close(self): if self._response_fut and not self._response_fut.done(): self._response_fut.cancel() if self.writer and not self.writer.closed(): @@ -138,9 +136,9 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.transport.close() self.transport = None - async def download_blob(self, blob: 'BlobFile') -> typing.Tuple[bool, bool]: + async def download_blob(self, blob: 'BlobFile') -> typing.Tuple[bool, typing.Optional[asyncio.Transport]]: if blob.get_is_verified(): - return False, True + return False, self.transport try: self.blob, self.writer, self._blob_bytes_received = blob, blob.open_for_writing(), 0 self._response_fut = asyncio.Future(loop=self.loop) @@ -148,14 +146,14 @@ class BlobExchangeClientProtocol(asyncio.Protocol): except OSError: log.error("race happened downloading from %s:%i", self.peer_address, self.peer_port) # i'm not sure how to fix this race condition - jack - return False, True + return False, self.transport except asyncio.TimeoutError: if self._response_fut and not self._response_fut.done(): self._response_fut.cancel() - return False, False + self.close() + return False, None except asyncio.CancelledError: - if self._response_fut and not self._response_fut.done(): - self._response_fut.cancel() + self.close() raise def connection_made(self, transport: asyncio.Transport): @@ -166,24 +164,29 @@ class BlobExchangeClientProtocol(asyncio.Protocol): def connection_lost(self, reason): log.debug("connection lost to %s:%i (reason: %s, %s)", self.peer_address, self.peer_port, str(reason), str(type(reason))) - self.transport = None - self.loop.create_task(self.close()) + self.close() async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: str, tcp_port: int, - peer_connect_timeout: float, blob_download_timeout: float) -> typing.Tuple[bool, bool]: + peer_connect_timeout: float, blob_download_timeout: float, + connected_transport: asyncio.Transport = None)\ + -> typing.Tuple[bool, typing.Optional[asyncio.Transport]]: """ Returns [<downloaded blob>, <keep connection>] """ - protocol = BlobExchangeClientProtocol(loop, blob_download_timeout) if blob.get_is_verified(): - return False, True + return False, connected_transport + protocol = BlobExchangeClientProtocol(loop, blob_download_timeout) + if connected_transport and not connected_transport.is_closing(): + connected_transport.set_protocol(protocol) + protocol.connection_made(connected_transport) + else: + connected_transport = None try: - await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), - peer_connect_timeout, loop=loop) + if not connected_transport: + await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), + peer_connect_timeout, loop=loop) return await protocol.download_blob(blob) except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError): - return False, False - finally: - await protocol.close() + return False, None diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 86e50798d..7c62b1989 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -23,6 +23,7 @@ class BlobDownloader: self.active_connections: typing.Dict['KademliaPeer', asyncio.Task] = {} # active request_blob calls self.ignored: typing.Set['KademliaPeer'] = set() self.scores: typing.Dict['KademliaPeer', int] = {} + self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {} def should_race_continue(self): if len(self.active_connections) >= self.config.max_connections_per_download: @@ -38,15 +39,19 @@ class BlobDownloader: if blob.get_is_verified(): return self.scores[peer] = self.scores.get(peer, 0) - 1 # starts losing score, to account for cancelled ones - success, keep_connection = await request_blob( + transport = self.connections.get(peer) + success, transport = await request_blob( self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, - self.config.blob_download_timeout + self.config.blob_download_timeout, connected_transport=transport ) - if not keep_connection and peer not in self.ignored: + if not transport and peer not in self.ignored: self.ignored.add(peer) log.debug("drop peer %s:%i", peer.address, peer.tcp_port) - elif keep_connection: + if peer in self.connections: + del self.connections[peer] + elif transport: log.debug("keep peer %s:%i", peer.address, peer.tcp_port) + self.connections[peer] = transport self.scores[peer] = (self.scores.get(peer, 0) + 2) if success else 0 async def new_peer_or_finished(self, blob: 'BlobFile'): @@ -107,6 +112,10 @@ class BlobDownloader: log.exception(e) raise e + def close(self): + for transport in self.connections.values(): + transport.close() + async def download_blob(loop, config: 'Config', blob_manager: 'BlobFileManager', node: 'Node', blob_hash: str) -> 'BlobFile': @@ -119,3 +128,4 @@ async def download_blob(loop, config: 'Config', blob_manager: 'BlobFileManager', finally: if accumulate_task and not accumulate_task.done(): accumulate_task.cancel() + downloader.close() diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index e9142c2c4..c0408f0ce 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -51,6 +51,7 @@ class StreamDownloader(StreamAssembler): async def after_finished(self): log.info("downloaded stream %s -> %s", self.sd_hash, self.output_path) await self.blob_manager.storage.change_file_status(self.descriptor.stream_hash, 'finished') + self.blob_downloader.close() def stop(self): if self.accumulate_task: From 141d68a2cd69a79ed1f59b7d1b4d93ab96027e71 Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor1984@riseup.net> Date: Fri, 8 Feb 2019 02:38:27 -0300 Subject: [PATCH 09/14] fix write race condition --- lbrynet/blob/blob_file.py | 8 ++++++-- lbrynet/blob_exchange/client.py | 5 +++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index fb298d32f..9d6246d09 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -66,7 +66,7 @@ class BlobFile: self.verified: asyncio.Event = asyncio.Event(loop=self.loop) self.finished_writing = asyncio.Event(loop=loop) self.blob_write_lock = asyncio.Lock(loop=loop) - if os.path.isfile(os.path.join(blob_dir, blob_hash)): + if self.file_exists: length = int(os.stat(os.path.join(blob_dir, blob_hash)).st_size) self.length = length self.verified.set() @@ -74,6 +74,10 @@ class BlobFile: self.saved_verified_blob = False self.blob_completed_callback = blob_completed_callback + @property + def file_exists(self): + return os.path.isfile(self.file_path) + def writer_finished(self, writer: HashBlobWriter): def callback(finished: asyncio.Future): try: @@ -116,7 +120,7 @@ class BlobFile: self.verified.set() def open_for_writing(self) -> HashBlobWriter: - if os.path.exists(self.file_path): + if self.file_exists: raise OSError(f"File already exists '{self.file_path}'") fut = asyncio.Future(loop=self.loop) writer = HashBlobWriter(self.blob_hash, self.get_length, fut) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 59ed7b56f..0dda55e06 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -137,7 +137,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.transport = None async def download_blob(self, blob: 'BlobFile') -> typing.Tuple[bool, typing.Optional[asyncio.Transport]]: - if blob.get_is_verified(): + if blob.get_is_verified() or blob.file_exists: return False, self.transport try: self.blob, self.writer, self._blob_bytes_received = blob, blob.open_for_writing(), 0 @@ -175,7 +175,8 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: s Returns [<downloaded blob>, <keep connection>] """ - if blob.get_is_verified(): + if blob.get_is_verified() or blob.file_exists: + # file exists but not verified means someone is writing right now, give it time, come back later return False, connected_transport protocol = BlobExchangeClientProtocol(loop, blob_download_timeout) if connected_transport and not connected_transport.is_closing(): From 3352e0e4f4443c1f8111f0a0ff2a8b9962174f90 Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor1984@riseup.net> Date: Fri, 8 Feb 2019 03:05:53 -0300 Subject: [PATCH 10/14] score peers by speed --- lbrynet/blob_exchange/client.py | 34 ++++++++++++++--------------- lbrynet/blob_exchange/downloader.py | 17 ++++++++++----- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 0dda55e06..a6e86f0b4 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -74,7 +74,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): if self._response_fut and not self._response_fut.done(): self._response_fut.set_exception(err) - async def _download_blob(self) -> typing.Tuple[bool, typing.Optional[asyncio.Transport]]: + async def _download_blob(self) -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: """ :return: download success (bool), keep connection (bool) """ @@ -92,24 +92,24 @@ class BlobExchangeClientProtocol(asyncio.Protocol): log.warning("%s not in availability response from %s:%i", self.blob.blob_hash, self.peer_address, self.peer_port) log.warning(response.to_dict()) - return False, self.close() + return self._blob_bytes_received, self.close() elif availability_response.available_blobs and \ availability_response.available_blobs != [self.blob.blob_hash]: log.warning("blob availability response doesn't match our request from %s:%i", self.peer_address, self.peer_port) - return False, self.close() + return self._blob_bytes_received, self.close() if not price_response or price_response.blob_data_payment_rate != 'RATE_ACCEPTED': log.warning("data rate rejected by %s:%i", self.peer_address, self.peer_port) - return False, self.close() + return self._blob_bytes_received, self.close() if not blob_response or blob_response.error: log.warning("blob cant be downloaded from %s:%i", self.peer_address, self.peer_port) - return False, self.transport + return self._blob_bytes_received, self.transport if not blob_response.error and blob_response.blob_hash != self.blob.blob_hash: log.warning("incoming blob hash mismatch from %s:%i", self.peer_address, self.peer_port) - return False, self.close() + return self._blob_bytes_received, self.close() if self.blob.length is not None and self.blob.length != blob_response.length: log.warning("incoming blob unexpected length from %s:%i", self.peer_address, self.peer_port) - return False, self.close() + return self._blob_bytes_received, self.close() msg = f"downloading {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}," \ f" timeout in {self.peer_timeout}" log.debug(msg) @@ -117,12 +117,12 @@ class BlobExchangeClientProtocol(asyncio.Protocol): await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop) log.info(msg) await self.blob.finished_writing.wait() - return True, self.transport + return self._blob_bytes_received, self.transport except asyncio.TimeoutError: - return False, self.close() + return self._blob_bytes_received, self.close() except (InvalidBlobHashError, InvalidDataError): log.warning("invalid blob from %s:%i", self.peer_address, self.peer_port) - return False, self.close() + return self._blob_bytes_received, self.close() def close(self): if self._response_fut and not self._response_fut.done(): @@ -136,9 +136,9 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.transport.close() self.transport = None - async def download_blob(self, blob: 'BlobFile') -> typing.Tuple[bool, typing.Optional[asyncio.Transport]]: + async def download_blob(self, blob: 'BlobFile') -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: if blob.get_is_verified() or blob.file_exists: - return False, self.transport + return 0, self.transport try: self.blob, self.writer, self._blob_bytes_received = blob, blob.open_for_writing(), 0 self._response_fut = asyncio.Future(loop=self.loop) @@ -146,12 +146,12 @@ class BlobExchangeClientProtocol(asyncio.Protocol): except OSError: log.error("race happened downloading from %s:%i", self.peer_address, self.peer_port) # i'm not sure how to fix this race condition - jack - return False, self.transport + return self._blob_bytes_received, self.transport except asyncio.TimeoutError: if self._response_fut and not self._response_fut.done(): self._response_fut.cancel() self.close() - return False, None + return self._blob_bytes_received, None except asyncio.CancelledError: self.close() raise @@ -170,14 +170,14 @@ class BlobExchangeClientProtocol(asyncio.Protocol): async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: str, tcp_port: int, peer_connect_timeout: float, blob_download_timeout: float, connected_transport: asyncio.Transport = None)\ - -> typing.Tuple[bool, typing.Optional[asyncio.Transport]]: + -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: """ Returns [<downloaded blob>, <keep connection>] """ if blob.get_is_verified() or blob.file_exists: # file exists but not verified means someone is writing right now, give it time, come back later - return False, connected_transport + return 0, connected_transport protocol = BlobExchangeClientProtocol(loop, blob_download_timeout) if connected_transport and not connected_transport.is_closing(): connected_transport.set_protocol(protocol) @@ -190,4 +190,4 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: s peer_connect_timeout, loop=loop) return await protocol.download_blob(blob) except (asyncio.TimeoutError, ConnectionRefusedError, ConnectionAbortedError, OSError): - return False, None + return 0, None diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 7c62b1989..adf181d8d 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -24,14 +24,15 @@ class BlobDownloader: self.ignored: typing.Set['KademliaPeer'] = set() self.scores: typing.Dict['KademliaPeer', int] = {} self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {} + self.rounds_won: typing.Dict['KademliaPeer', int] = {} def should_race_continue(self): if len(self.active_connections) >= self.config.max_connections_per_download: return False - # if a peer won 2 or more blob races and is active as a downloader, stop the race so bandwidth improves + # if a peer won 3 or more blob races and is active as a downloader, stop the race so bandwidth improves # the safe net side is that any failure will reset the peer score, triggering the race back for peer, task in self.active_connections.items(): - if self.scores.get(peer, 0) >= 2 and not task.done(): + if self.scores.get(peer, 0) >= 0 and self.rounds_won.get(peer, 0) >= 3 and not task.done(): return False return True @@ -40,10 +41,13 @@ class BlobDownloader: return self.scores[peer] = self.scores.get(peer, 0) - 1 # starts losing score, to account for cancelled ones transport = self.connections.get(peer) - success, transport = await request_blob( + start = self.loop.time() + bytes_received, transport = await request_blob( self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, self.config.blob_download_timeout, connected_transport=transport ) + if bytes_received == blob.get_length(): + self.rounds_won[peer] = self.rounds_won.get(peer, 0) + 1 if not transport and peer not in self.ignored: self.ignored.add(peer) log.debug("drop peer %s:%i", peer.address, peer.tcp_port) @@ -52,7 +56,8 @@ class BlobDownloader: elif transport: log.debug("keep peer %s:%i", peer.address, peer.tcp_port) self.connections[peer] = transport - self.scores[peer] = (self.scores.get(peer, 0) + 2) if success else 0 + rough_speed = (bytes_received / (self.loop.time() - start)) if bytes_received else 0 + self.scores[peer] = rough_speed async def new_peer_or_finished(self, blob: 'BlobFile'): async def get_and_re_add_peers(): @@ -113,8 +118,8 @@ class BlobDownloader: raise e def close(self): - for transport in self.connections.values(): - transport.close() + for transport in self.connections.values(): + transport.close() async def download_blob(loop, config: 'Config', blob_manager: 'BlobFileManager', node: 'Node', From af1619ebfb80aa3627c3b5e0884e5f4c41b81074 Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor1984@riseup.net> Date: Fri, 8 Feb 2019 03:32:38 -0300 Subject: [PATCH 11/14] disable race limiting for now --- lbrynet/blob_exchange/downloader.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index adf181d8d..59a5b0ec5 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -31,9 +31,10 @@ class BlobDownloader: return False # if a peer won 3 or more blob races and is active as a downloader, stop the race so bandwidth improves # the safe net side is that any failure will reset the peer score, triggering the race back - for peer, task in self.active_connections.items(): - if self.scores.get(peer, 0) >= 0 and self.rounds_won.get(peer, 0) >= 3 and not task.done(): - return False + # TODO: this is a good idea for low bandwidth, but doesnt play nice on high bandwidth + # for peer, task in self.active_connections.items(): + # if self.scores.get(peer, 0) >= 0 and self.rounds_won.get(peer, 0) >= 3 and not task.done(): + # return False return True async def request_blob_from_peer(self, blob: 'BlobFile', peer: 'KademliaPeer'): From e2b06677b5f6eafd3210f687ef163526314512b8 Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor1984@riseup.net> Date: Fri, 8 Feb 2019 03:42:31 -0300 Subject: [PATCH 12/14] defer db insert during downloads --- lbrynet/stream/assembler.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lbrynet/stream/assembler.py b/lbrynet/stream/assembler.py index b5923c032..913d538ba 100644 --- a/lbrynet/stream/assembler.py +++ b/lbrynet/stream/assembler.py @@ -86,6 +86,7 @@ class StreamAssembler: ) await self.blob_manager.blob_completed(self.sd_blob) written_blobs = None + save_tasks = [] try: with open(self.output_path, 'wb') as stream_handle: self.stream_handle = stream_handle @@ -101,7 +102,7 @@ class StreamAssembler: await self.blob_manager.delete_blobs([blob_info.blob_hash]) continue if await self._decrypt_blob(blob, blob_info, self.descriptor.key): - await self.blob_manager.blob_completed(blob) + save_tasks.append(asyncio.ensure_future(self.blob_manager.blob_completed(blob))) written_blobs = i if not self.wrote_bytes_event.is_set(): self.wrote_bytes_event.set() @@ -115,6 +116,8 @@ class StreamAssembler: self.descriptor.sd_hash) continue finally: + if save_tasks: + await asyncio.wait(save_tasks) if written_blobs == len(self.descriptor.blobs) - 2: log.debug("finished decrypting and assembling stream") await self.after_finished() From 8b25acff6e7bd2097b9e6b0957088695767b2a73 Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor1984@riseup.net> Date: Fri, 8 Feb 2019 04:12:53 -0300 Subject: [PATCH 13/14] do not wait blob to be written, log errors from races --- lbrynet/blob_exchange/client.py | 6 ++++-- tests/unit/blob_exchange/test_transfer_blob.py | 5 ++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index a6e86f0b4..40b197707 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -116,7 +116,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}" await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop) log.info(msg) - await self.blob.finished_writing.wait() + # await self.blob.finished_writing.wait() not necessary, but a dangerous change. TODO: is it needed? return self._blob_bytes_received, self.transport except asyncio.TimeoutError: return self._blob_bytes_received, self.close() @@ -143,9 +143,10 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.blob, self.writer, self._blob_bytes_received = blob, blob.open_for_writing(), 0 self._response_fut = asyncio.Future(loop=self.loop) return await self._download_blob() - except OSError: + except OSError as e: log.error("race happened downloading from %s:%i", self.peer_address, self.peer_port) # i'm not sure how to fix this race condition - jack + log.exception(e) return self._blob_bytes_received, self.transport except asyncio.TimeoutError: if self._response_fut and not self._response_fut.done(): @@ -182,6 +183,7 @@ async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', address: s if connected_transport and not connected_transport.is_closing(): connected_transport.set_protocol(protocol) protocol.connection_made(connected_transport) + log.debug("reusing connection for %s:%d", address, tcp_port) else: connected_transport = None try: diff --git a/tests/unit/blob_exchange/test_transfer_blob.py b/tests/unit/blob_exchange/test_transfer_blob.py index 6dacba381..482781ab2 100644 --- a/tests/unit/blob_exchange/test_transfer_blob.py +++ b/tests/unit/blob_exchange/test_transfer_blob.py @@ -70,6 +70,7 @@ class TestBlobExchange(BlobExchangeTestBase): # download the blob downloaded = await request_blob(self.loop, client_blob, self.server_from_client.address, self.server_from_client.tcp_port, 2, 3) + await client_blob.finished_writing.wait() self.assertEqual(client_blob.get_is_verified(), True) self.assertTrue(downloaded) @@ -111,6 +112,7 @@ class TestBlobExchange(BlobExchangeTestBase): ), self._test_transfer_blob(blob_hash) ) + await second_client_blob.finished_writing.wait() self.assertEqual(second_client_blob.get_is_verified(), True) async def test_host_different_blobs_to_multiple_peers_at_once(self): @@ -140,7 +142,8 @@ class TestBlobExchange(BlobExchangeTestBase): self.loop, second_client_blob, server_from_second_client.address, server_from_second_client.tcp_port, 2, 3 ), - self._test_transfer_blob(sd_hash) + self._test_transfer_blob(sd_hash), + second_client_blob.finished_writing.wait() ) self.assertEqual(second_client_blob.get_is_verified(), True) From 9ecec524129cd40c718b5b69c6ff11ec26deb334 Mon Sep 17 00:00:00 2001 From: Victor Shyba <victor1984@riseup.net> Date: Fri, 8 Feb 2019 04:39:28 -0300 Subject: [PATCH 14/14] check lock acquired to prevent races --- lbrynet/blob_exchange/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 40b197707..80d78fb22 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -137,7 +137,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): self.transport = None async def download_blob(self, blob: 'BlobFile') -> typing.Tuple[int, typing.Optional[asyncio.Transport]]: - if blob.get_is_verified() or blob.file_exists: + if blob.get_is_verified() or blob.file_exists or blob.blob_write_lock.locked(): return 0, self.transport try: self.blob, self.writer, self._blob_bytes_received = blob, blob.open_for_writing(), 0