diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 484e93122..b8a3e4796 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -201,7 +201,7 @@ class AbstractBlob: def get_blob_writer(self, peer_address: typing.Optional[str] = None, peer_port: typing.Optional[int] = None) -> HashBlobWriter: - if (peer_address, peer_port) in self.writers: + if (peer_address, peer_port) in self.writers and not self.writers[(peer_address, peer_port)].closed(): raise OSError(f"attempted to download blob twice from {peer_address}:{peer_port}") fut = asyncio.Future(loop=self.loop) writer = HashBlobWriter(self.blob_hash, self.get_length, fut) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 0f58efeb3..67ca75cae 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -35,6 +35,9 @@ class BlobExchangeClientProtocol(asyncio.Protocol): if self._response_fut and not self._response_fut.done(): self._response_fut.cancel() return + if not self._response_fut: + log.warning("Protocol received data before expected, probable race on keep alive. Closing transport.") + return self.close() if self._blob_bytes_received and not self.writer.closed(): return self._write(data) @@ -165,6 +168,10 @@ class BlobExchangeClientProtocol(asyncio.Protocol): except asyncio.CancelledError: self.close() raise + finally: + if self.writer and not self.writer.closed(): + self.writer.close_handle() + self.writer = None def connection_made(self, transport: asyncio.Transport): self.transport = transport diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 24142f955..b8b854a49 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -1,7 +1,7 @@ import asyncio import typing import logging -from lbrynet.utils import drain_tasks, cache_concurrent +from lbrynet.utils import cache_concurrent from lbrynet.blob_exchange.client import request_blob if typing.TYPE_CHECKING: from lbrynet.conf import Config @@ -14,7 +14,7 @@ log = logging.getLogger(__name__) class BlobDownloader: - BAN_TIME = 10.0 # fixme: when connection manager gets implemented, move it out from here + BAN_FACTOR = 2.0 # fixme: when connection manager gets implemented, move it out from here def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager', peer_queue: asyncio.Queue): @@ -25,18 +25,12 @@ class BlobDownloader: self.active_connections: typing.Dict['KademliaPeer', asyncio.Task] = {} # active request_blob calls self.ignored: typing.Dict['KademliaPeer', int] = {} self.scores: typing.Dict['KademliaPeer', int] = {} + self.failures: typing.Dict['KademliaPeer', int] = {} self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {} - self.time_since_last_blob = loop.time() def should_race_continue(self, blob: 'AbstractBlob'): if len(self.active_connections) >= self.config.max_connections_per_download: return False - # if a peer won 3 or more blob races and is active as a downloader, stop the race so bandwidth improves - # the safe net side is that any failure will reset the peer score, triggering the race back - # TODO: this is a good idea for low bandwidth, but doesnt play nice on high bandwidth - # for peer, task in self.active_connections.items(): - # if self.scores.get(peer, 0) >= 0 and self.rounds_won.get(peer, 0) >= 3 and not task.done(): - # return False return not (blob.get_is_verified() or not blob.is_writeable()) async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer'): @@ -49,46 +43,35 @@ class BlobDownloader: self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, self.config.blob_download_timeout, connected_transport=transport ) - if bytes_received == blob.get_length(): - self.time_since_last_blob = self.loop.time() if not transport and peer not in self.ignored: self.ignored[peer] = self.loop.time() log.debug("drop peer %s:%i", peer.address, peer.tcp_port) + self.failures[peer] = self.failures.get(peer, 0) + 1 if peer in self.connections: del self.connections[peer] elif transport: log.debug("keep peer %s:%i", peer.address, peer.tcp_port) + self.failures[peer] = 0 self.connections[peer] = transport - rough_speed = (bytes_received / (self.loop.time() - start)) if bytes_received else 0 - self.scores[peer] = rough_speed + elapsed = self.loop.time() - start + self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 0 - async def new_peer_or_finished(self, blob: 'AbstractBlob'): - async def get_and_re_add_peers(): - try: - new_peers = await asyncio.wait_for(self.peer_queue.get(), timeout=1.0) - self.peer_queue.put_nowait(new_peers) - except asyncio.TimeoutError: - pass - tasks = [self.loop.create_task(get_and_re_add_peers()), self.loop.create_task(blob.verified.wait())] - active_tasks = list(self.active_connections.values()) - try: - await asyncio.wait(tasks + active_tasks, loop=self.loop, return_when='FIRST_COMPLETED') - finally: - drain_tasks(tasks) + async def new_peer_or_finished(self): + active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)] + await asyncio.wait(active_tasks, loop=self.loop, return_when='FIRST_COMPLETED') def cleanup_active(self): to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()] for peer in to_remove: del self.active_connections[peer] + self.clearbanned() def clearbanned(self): now = self.loop.time() - if now - self.time_since_last_blob > 60.0: - return - forgiven = [banned_peer for banned_peer, when in self.ignored.items() if now - when > self.BAN_TIME] - self.peer_queue.put_nowait(forgiven) - for banned_peer in forgiven: - self.ignored.pop(banned_peer) + self.ignored = dict(( + (peer, when) for (peer, when) in self.ignored.items() + if (now - when) < min(30.0, (self.failures.get(peer, 0) ** self.BAN_FACTOR)) + )) @cache_concurrent async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'AbstractBlob': @@ -97,43 +80,27 @@ class BlobDownloader: return blob try: while not blob.get_is_verified(): - batch: typing.List['KademliaPeer'] = [] + batch: typing.Set['KademliaPeer'] = set() while not self.peer_queue.empty(): - batch.extend(self.peer_queue.get_nowait()) - batch.sort(key=lambda p: self.scores.get(p, 0), reverse=True) + batch.update(self.peer_queue.get_nowait()) + if batch: + self.peer_queue.put_nowait(list(batch)) log.debug( "running, %d peers, %d ignored, %d active", len(batch), len(self.ignored), len(self.active_connections) ) - for peer in batch: + for peer in sorted(batch, key=lambda peer: self.scores.get(peer, 0), reverse=True): if not self.should_race_continue(blob): break if peer not in self.active_connections and peer not in self.ignored: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) t = self.loop.create_task(self.request_blob_from_peer(blob, peer)) self.active_connections[peer] = t - await self.new_peer_or_finished(blob) + await self.new_peer_or_finished() self.cleanup_active() - if batch: - to_re_add = list(set(batch).difference(self.ignored)) - if to_re_add: - self.peer_queue.put_nowait(to_re_add) - else: - self.clearbanned() - else: - self.clearbanned() - blob.close() log.debug("downloaded %s", blob_hash[:8]) return blob finally: - re_add = set() - while self.active_connections: - peer, t = self.active_connections.popitem() - t.cancel() - re_add.add(peer) - re_add = re_add.difference(self.ignored) - if re_add: - self.peer_queue.put_nowait(list(re_add)) blob.close() def close(self): diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index b722ac69b..1f86e5638 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -231,7 +231,7 @@ class ManagedStream: self.update_status(ManagedStream.STATUS_RUNNING) await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING) self.update_delayed_stop() - elif not os.path.isfile(self.full_path): + else: await self.save_file(file_name, download_directory) await self.started_writing.wait() @@ -305,11 +305,11 @@ class ManagedStream: if not os.path.isdir(self.download_directory): log.warning("download directory '%s' does not exist, attempting to make it", self.download_directory) os.mkdir(self.download_directory) - self._file_name = await get_next_available_file_name( - self.loop, self.download_directory, - file_name or self._file_name or self.descriptor.suggested_file_name - ) if not await self.blob_manager.storage.file_exists(self.sd_hash): + self._file_name = await get_next_available_file_name( + self.loop, self.download_directory, + file_name or self._file_name or self.descriptor.suggested_file_name + ) self.rowid = self.blob_manager.storage.save_downloaded_file( self.stream_hash, self.file_name, self.download_directory, 0.0 ) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index b7fe6734a..6fe849df5 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -397,12 +397,17 @@ class StreamManager: if not stream.descriptor: raise DownloadSDTimeout(stream.sd_hash) raise DownloadDataTimeout(stream.sd_hash) - if to_replace: # delete old stream now that the replacement has started downloading - await self.delete_stream(to_replace) - stream.set_claim(resolved, claim) - await self.storage.save_content_claim(stream.stream_hash, outpoint) - self.streams[stream.sd_hash] = stream + finally: + if stream.descriptor: + if to_replace: # delete old stream now that the replacement has started downloading + await self.delete_stream(to_replace) + stream.set_claim(resolved, claim) + await self.storage.save_content_claim(stream.stream_hash, outpoint) + self.streams[stream.sd_hash] = stream return stream + except DownloadDataTimeout as err: # forgive data timeout, dont delete stream + error = err + raise except Exception as err: error = err if stream and stream.descriptor: diff --git a/scripts/time_to_first_byte.py b/scripts/time_to_first_byte.py index 755e1ba50..b762f44ba 100644 --- a/scripts/time_to_first_byte.py +++ b/scripts/time_to_first_byte.py @@ -11,7 +11,7 @@ from lbrynet.blob.blob_file import MAX_BLOB_SIZE from lbrynet.conf import Config from lbrynet.schema.uri import parse_lbry_uri from lbrynet.extras.daemon.client import daemon_rpc -from lbrynet.extras import system_info, cli +from lbrynet.extras import system_info def extract_uris(response): @@ -58,7 +58,7 @@ def variance(times): return round(sum(((i - mean) ** 2.0 for i in times)) / (len(times) - 1), 3) -async def wait_for_done(conf, uri): +async def wait_for_done(conf, uri, timeout): name = uri.split("#")[0] last_complete = 0 hang_count = 0 @@ -73,11 +73,11 @@ async def wait_for_done(conf, uri): else: hang_count += 1 await asyncio.sleep(1.0) - if hang_count > 10: + if hang_count > timeout: return False, file['blobs_completed'], file['blobs_in_stream'] -async def main(uris=None, allow_fees=False): +async def main(uris=None, cmd_args=None): if not uris: uris = await get_frontpage_uris() conf = Config() @@ -93,7 +93,7 @@ async def main(uris=None, allow_fees=False): async def __resolve(name): resolved = await daemon_rpc(conf, 'resolve', urls=[name]) if 'error' not in resolved.get(name, {}): - if ("fee" not in resolved[name]['claim']['value']) or allow_fees: + if ("fee" not in resolved[name]['claim']['value']) or cmd_args.allow_fees: resolvable.append(name) else: print(f"{name} has a fee, skipping it") @@ -114,24 +114,29 @@ async def main(uris=None, allow_fees=False): for i, uri in enumerate(resolvable): start = time.time() try: - await daemon_rpc(conf, 'get', uri=uri) + await daemon_rpc(conf, 'get', uri=uri, save_file=True) first_byte = time.time() first_byte_times.append(first_byte - start) print(f"{i + 1}/{len(resolvable)} - {first_byte - start} {uri}") - downloaded, amount_downloaded, blobs_in_stream = await wait_for_done(conf, uri) - if downloaded: - download_successes.append(uri) - else: - download_failures.append(uri) - mbs = round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.time() - start) / 1000000, 2) - download_speeds.append(mbs) - print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {uri} at " - f"{mbs}mb/s") - except: - print(f"{i + 1}/{len(uris)} - failed to start {uri}") + if not cmd_args.head_blob_only: + downloaded, amount_downloaded, blobs_in_stream = await wait_for_done( + conf, uri, cmd_args.stall_download_timeout + ) + if downloaded: + download_successes.append(uri) + else: + download_failures.append(uri) + mbs = round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.time() - start) / 1000000, 2) + download_speeds.append(mbs) + print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {uri} at " + f"{mbs}mb/s") + except Exception as e: + print(f"{i + 1}/{len(uris)} - failed to start {uri}: {e}") failed_to_start.append(uri) - return - # await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) + if cmd_args.exit_on_error: + return + if cmd_args.delete_after_download or cmd_args.head_blob_only: + await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) await asyncio.sleep(0.1) print("**********************************************") @@ -140,12 +145,13 @@ async def main(uris=None, allow_fees=False): f"Best first byte time: {round(min(first_byte_times), 2)}\n" \ f"95% confidence time-to-first-byte: {confidence(first_byte_times, 1.984)}s\n" \ f"99% confidence time-to-first-byte: {confidence(first_byte_times, 2.626)}s\n" \ - f"Variance: {variance(first_byte_times)}\n" \ - f"Downloaded {len(download_successes)}/{len(resolvable)}\n" \ - f"Best stream download speed: {round(max(download_speeds), 2)}mb/s\n" \ - f"Worst stream download speed: {round(min(download_speeds), 2)}mb/s\n" \ - f"95% confidence download speed: {confidence(download_speeds, 1.984, False)}mb/s\n" \ - f"99% confidence download speed: {confidence(download_speeds, 2.626, False)}mb/s\n" + f"Variance: {variance(first_byte_times)}\n" + if not cmd_args.head_blob_only: + result += f"Downloaded {len(download_successes)}/{len(resolvable)}\n" \ + f"Best stream download speed: {round(max(download_speeds), 2)}mb/s\n" \ + f"Worst stream download speed: {round(min(download_speeds), 2)}mb/s\n" \ + f"95% confidence download speed: {confidence(download_speeds, 1.984, False)}mb/s\n" \ + f"99% confidence download speed: {confidence(download_speeds, 2.626, False)}mb/s\n" if failed_to_start: result += "\nFailed to start:" + "\n".join([f for f in failed_to_start]) @@ -160,9 +166,12 @@ async def main(uris=None, allow_fees=False): if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("--data_dir") - parser.add_argument("--wallet_dir") - parser.add_argument("--download_directory") + #parser.add_argument("--data_dir") + #parser.add_argument("--wallet_dir") + #parser.add_argument("--download_directory") parser.add_argument("--allow_fees", action='store_true') - args = parser.parse_args() - asyncio.run(main(allow_fees=args.allow_fees)) + parser.add_argument("--exit_on_error", action='store_true') + parser.add_argument("--stall_download_timeout", default=10, type=int) + parser.add_argument("--delete_after_download", action='store_true') + parser.add_argument("--head_blob_only", action='store_true') + asyncio.run(main(cmd_args=parser.parse_args())) diff --git a/tests/integration/test_file_commands.py b/tests/integration/test_file_commands.py index 5ccde6068..419afd8cf 100644 --- a/tests/integration/test_file_commands.py +++ b/tests/integration/test_file_commands.py @@ -44,6 +44,18 @@ class FileCommands(CommandTestCase): ) self.assertEqual(file_list[0]['confirmations'], 1) + async def test_get_doesnt_touch_user_written_files_between_calls(self): + await self.stream_create('foo', '0.01', data=bytes([0] * (2 << 23))) + self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo')) + first_path = (await self.daemon.jsonrpc_get('lbry://foo', save_file=True)).full_path + await self.wait_files_to_complete() + self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo')) + with open(first_path, 'wb') as f: + f.write(b' ') + f.flush() + second_path = await self.daemon.jsonrpc_get('lbry://foo', save_file=True) + await self.wait_files_to_complete() + self.assertNotEquals(first_path, second_path) async def test_file_list_updated_metadata_on_resolve(self): await self.stream_create('foo', '0.01') @@ -67,12 +79,12 @@ class FileCommands(CommandTestCase): blob_hash for blob_hash in self.server.blob_manager.completed_blob_hashes if blob_hash != sd_hash ] await self.server.blob_manager.delete_blobs(all_except_sd) - resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2) + resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True) self.assertIn('error', resp) self.assertEqual('Failed to download data blobs for sd hash %s within timeout' % sd_hash, resp['error']) - await self.daemon.jsonrpc_file_delete(claim_name='foo') + self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo'), "data timeout didnt create a file") await self.server.blob_manager.delete_blobs([sd_hash]) - resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2) + resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True) self.assertIn('error', resp) self.assertEqual('Failed to download sd blob %s within timeout' % sd_hash, resp['error']) @@ -159,7 +171,7 @@ class FileCommands(CommandTestCase): self.assertEqual('finished', file_info['status']) async def test_unban_recovers_stream(self): - BlobDownloader.BAN_TIME = .5 # fixme: temporary field, will move to connection manager or a conf + BlobDownloader.BAN_FACTOR = .5 # fixme: temporary field, will move to connection manager or a conf tx = await self.stream_create('foo', '0.01', data=bytes([0] * (1 << 23))) sd_hash = tx['outputs'][0]['value']['source']['sd_hash'] missing_blob_hash = (await self.daemon.jsonrpc_blob_list(sd_hash=sd_hash))[-2] diff --git a/tests/unit/blob/test_blob_file.py b/tests/unit/blob/test_blob_file.py index 8b519e27e..664f7e75c 100644 --- a/tests/unit/blob/test_blob_file.py +++ b/tests/unit/blob/test_blob_file.py @@ -133,6 +133,22 @@ class TestBlob(AsyncioTestCase): await self._test_close_writers_on_finished(BlobBuffer) await self._test_close_writers_on_finished(BlobFile, tmp_dir) + async def test_concurrency_and_premature_closes(self): + blob_directory = tempfile.mkdtemp() + self.addCleanup(lambda: shutil.rmtree(blob_directory)) + blob = self._get_blob(BlobBuffer, blob_directory=blob_directory) + writer = blob.get_blob_writer('1.1.1.1', 1337) + self.assertEqual(1, len(blob.writers)) + with self.assertRaises(OSError): + blob.get_blob_writer('1.1.1.1', 1337) + writer.close_handle() + self.assertTrue(blob.writers[('1.1.1.1', 1337)].closed()) + writer = blob.get_blob_writer('1.1.1.1', 1337) + self.assertEqual(blob.writers[('1.1.1.1', 1337)], writer) + writer.close_handle() + await asyncio.sleep(0.000000001) # flush callbacks + self.assertEqual(0, len(blob.writers)) + async def test_delete(self): blob_buffer = await self._test_create_blob(BlobBuffer) self.assertIsInstance(blob_buffer, BlobBuffer)