From 3cf5c536c0d0bbe7891a37c15f6d6b46dc8bc2d4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 30 Apr 2019 17:56:29 -0300 Subject: [PATCH 01/18] improve ban timing and downloader loop friction --- lbrynet/blob_exchange/downloader.py | 28 +++++++++++-------------- tests/integration/test_file_commands.py | 2 +- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 24142f955..7010e54a7 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -14,7 +14,7 @@ log = logging.getLogger(__name__) class BlobDownloader: - BAN_TIME = 10.0 # fixme: when connection manager gets implemented, move it out from here + BAN_FACTOR = 2.0 # fixme: when connection manager gets implemented, move it out from here def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager', peer_queue: asyncio.Queue): @@ -25,6 +25,7 @@ class BlobDownloader: self.active_connections: typing.Dict['KademliaPeer', asyncio.Task] = {} # active request_blob calls self.ignored: typing.Dict['KademliaPeer', int] = {} self.scores: typing.Dict['KademliaPeer', int] = {} + self.failures: typing.Dict['KademliaPeer', int] = {} self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {} self.time_since_last_blob = loop.time() @@ -54,27 +55,20 @@ class BlobDownloader: if not transport and peer not in self.ignored: self.ignored[peer] = self.loop.time() log.debug("drop peer %s:%i", peer.address, peer.tcp_port) + self.failures[peer] = self.failures.get(peer, 0) + 1 if peer in self.connections: del self.connections[peer] elif transport: log.debug("keep peer %s:%i", peer.address, peer.tcp_port) + if bytes_received: + self.failures[peer] = 0 self.connections[peer] = transport rough_speed = (bytes_received / (self.loop.time() - start)) if bytes_received else 0 self.scores[peer] = rough_speed - async def new_peer_or_finished(self, blob: 'AbstractBlob'): - async def get_and_re_add_peers(): - try: - new_peers = await asyncio.wait_for(self.peer_queue.get(), timeout=1.0) - self.peer_queue.put_nowait(new_peers) - except asyncio.TimeoutError: - pass - tasks = [self.loop.create_task(get_and_re_add_peers()), self.loop.create_task(blob.verified.wait())] - active_tasks = list(self.active_connections.values()) - try: - await asyncio.wait(tasks + active_tasks, loop=self.loop, return_when='FIRST_COMPLETED') - finally: - drain_tasks(tasks) + async def new_peer_or_finished(self): + active_tasks = list(self.active_connections.values()) + [asyncio.sleep(0.2)] + await asyncio.wait(active_tasks, loop=self.loop, return_when='FIRST_COMPLETED') def cleanup_active(self): to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()] @@ -85,7 +79,9 @@ class BlobDownloader: now = self.loop.time() if now - self.time_since_last_blob > 60.0: return - forgiven = [banned_peer for banned_peer, when in self.ignored.items() if now - when > self.BAN_TIME] + timeout_for = lambda peer: (self.failures.get(peer, 0) ** self.BAN_FACTOR) - 1.0 + forgiven = [banned_peer for banned_peer, when in self.ignored.items() if now - when > timeout_for(banned_peer)] + log.warning([(timeout_for(peer), when) for peer, when in self.ignored.items()]) self.peer_queue.put_nowait(forgiven) for banned_peer in forgiven: self.ignored.pop(banned_peer) @@ -112,7 +108,7 @@ class BlobDownloader: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) t = self.loop.create_task(self.request_blob_from_peer(blob, peer)) self.active_connections[peer] = t - await self.new_peer_or_finished(blob) + await self.new_peer_or_finished() self.cleanup_active() if batch: to_re_add = list(set(batch).difference(self.ignored)) diff --git a/tests/integration/test_file_commands.py b/tests/integration/test_file_commands.py index 5ccde6068..a2a0b0928 100644 --- a/tests/integration/test_file_commands.py +++ b/tests/integration/test_file_commands.py @@ -159,7 +159,7 @@ class FileCommands(CommandTestCase): self.assertEqual('finished', file_info['status']) async def test_unban_recovers_stream(self): - BlobDownloader.BAN_TIME = .5 # fixme: temporary field, will move to connection manager or a conf + BlobDownloader.BAN_FACTOR = .5 # fixme: temporary field, will move to connection manager or a conf tx = await self.stream_create('foo', '0.01', data=bytes([0] * (1 << 23))) sd_hash = tx['outputs'][0]['value']['source']['sd_hash'] missing_blob_hash = (await self.daemon.jsonrpc_blob_list(sd_hash=sd_hash))[-2] From ac6835fa60d1e7c0b19b742aef9234c75c1f4620 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 30 Apr 2019 19:51:02 -0300 Subject: [PATCH 02/18] dont get stuck --- lbrynet/blob_exchange/downloader.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 7010e54a7..70bb1c601 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -27,7 +27,6 @@ class BlobDownloader: self.scores: typing.Dict['KademliaPeer', int] = {} self.failures: typing.Dict['KademliaPeer', int] = {} self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {} - self.time_since_last_blob = loop.time() def should_race_continue(self, blob: 'AbstractBlob'): if len(self.active_connections) >= self.config.max_connections_per_download: @@ -50,8 +49,6 @@ class BlobDownloader: self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout, self.config.blob_download_timeout, connected_transport=transport ) - if bytes_received == blob.get_length(): - self.time_since_last_blob = self.loop.time() if not transport and peer not in self.ignored: self.ignored[peer] = self.loop.time() log.debug("drop peer %s:%i", peer.address, peer.tcp_port) @@ -77,11 +74,8 @@ class BlobDownloader: def clearbanned(self): now = self.loop.time() - if now - self.time_since_last_blob > 60.0: - return - timeout_for = lambda peer: (self.failures.get(peer, 0) ** self.BAN_FACTOR) - 1.0 + timeout_for = lambda peer: min(30.0, (self.failures.get(peer, 0) ** self.BAN_FACTOR) - 1.0) forgiven = [banned_peer for banned_peer, when in self.ignored.items() if now - when > timeout_for(banned_peer)] - log.warning([(timeout_for(peer), when) for peer, when in self.ignored.items()]) self.peer_queue.put_nowait(forgiven) for banned_peer in forgiven: self.ignored.pop(banned_peer) From 7ffce1040c0958a5b461d57d0e8e674bfc4d549b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 30 Apr 2019 19:51:19 -0300 Subject: [PATCH 03/18] fix foreign key error --- lbrynet/stream/managed_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index b722ac69b..716146723 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -231,7 +231,7 @@ class ManagedStream: self.update_status(ManagedStream.STATUS_RUNNING) await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING) self.update_delayed_stop() - elif not os.path.isfile(self.full_path): + else: await self.save_file(file_name, download_directory) await self.started_writing.wait() From 63bd983012e59b54654d68f8330a0ee4e3ea036d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 1 May 2019 00:42:51 -0300 Subject: [PATCH 04/18] pylint --- lbrynet/blob_exchange/downloader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 70bb1c601..128f720f7 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -1,7 +1,7 @@ import asyncio import typing import logging -from lbrynet.utils import drain_tasks, cache_concurrent +from lbrynet.utils import cache_concurrent from lbrynet.blob_exchange.client import request_blob if typing.TYPE_CHECKING: from lbrynet.conf import Config From 60f7766cf73e9c1d306ebb24988d4de632340c68 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 1 May 2019 02:55:51 -0300 Subject: [PATCH 05/18] small adjustment to new_peer_or_finished --- lbrynet/blob_exchange/downloader.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 128f720f7..85cf3c38b 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -57,14 +57,13 @@ class BlobDownloader: del self.connections[peer] elif transport: log.debug("keep peer %s:%i", peer.address, peer.tcp_port) - if bytes_received: - self.failures[peer] = 0 + self.failures[peer] = 0 self.connections[peer] = transport rough_speed = (bytes_received / (self.loop.time() - start)) if bytes_received else 0 self.scores[peer] = rough_speed async def new_peer_or_finished(self): - active_tasks = list(self.active_connections.values()) + [asyncio.sleep(0.2)] + active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)] await asyncio.wait(active_tasks, loop=self.loop, return_when='FIRST_COMPLETED') def cleanup_active(self): From 771fe935f4b5afdeecc3fe5af364e4d3322c497d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 1 May 2019 13:16:32 -0300 Subject: [PATCH 06/18] test get-delete-get respects existing files --- tests/integration/test_file_commands.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/integration/test_file_commands.py b/tests/integration/test_file_commands.py index a2a0b0928..2876140a7 100644 --- a/tests/integration/test_file_commands.py +++ b/tests/integration/test_file_commands.py @@ -44,6 +44,18 @@ class FileCommands(CommandTestCase): ) self.assertEqual(file_list[0]['confirmations'], 1) + async def test_get_doesnt_touch_user_written_files_between_calls(self): + await self.stream_create('foo', '0.01', data=bytes([0] * (2 << 23))) + self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo')) + first_path = (await self.daemon.jsonrpc_get('lbry://foo', save_file=True)).full_path + await self.wait_files_to_complete() + self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo')) + with open(first_path, 'wb') as f: + f.write(b' ') + f.flush() + second_path = await self.daemon.jsonrpc_get('lbry://foo', save_file=True) + await self.wait_files_to_complete() + self.assertNotEquals(first_path, second_path) async def test_file_list_updated_metadata_on_resolve(self): await self.stream_create('foo', '0.01') From ae2121c5c49edddde88982036f81bd28fc256e5e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 1 May 2019 13:16:45 -0300 Subject: [PATCH 07/18] fix tests and new test --- lbrynet/stream/managed_stream.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 716146723..1f86e5638 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -305,11 +305,11 @@ class ManagedStream: if not os.path.isdir(self.download_directory): log.warning("download directory '%s' does not exist, attempting to make it", self.download_directory) os.mkdir(self.download_directory) - self._file_name = await get_next_available_file_name( - self.loop, self.download_directory, - file_name or self._file_name or self.descriptor.suggested_file_name - ) if not await self.blob_manager.storage.file_exists(self.sd_hash): + self._file_name = await get_next_available_file_name( + self.loop, self.download_directory, + file_name or self._file_name or self.descriptor.suggested_file_name + ) self.rowid = self.blob_manager.storage.save_downloaded_file( self.stream_hash, self.file_name, self.download_directory, 0.0 ) From 41951d8ec259b1634fd761b99118a65955104649 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 1 May 2019 13:17:08 -0300 Subject: [PATCH 08/18] improve ttfb script confs --- scripts/time_to_first_byte.py | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/scripts/time_to_first_byte.py b/scripts/time_to_first_byte.py index 755e1ba50..6ff2e7b55 100644 --- a/scripts/time_to_first_byte.py +++ b/scripts/time_to_first_byte.py @@ -11,7 +11,7 @@ from lbrynet.blob.blob_file import MAX_BLOB_SIZE from lbrynet.conf import Config from lbrynet.schema.uri import parse_lbry_uri from lbrynet.extras.daemon.client import daemon_rpc -from lbrynet.extras import system_info, cli +from lbrynet.extras import system_info def extract_uris(response): @@ -58,7 +58,7 @@ def variance(times): return round(sum(((i - mean) ** 2.0 for i in times)) / (len(times) - 1), 3) -async def wait_for_done(conf, uri): +async def wait_for_done(conf, uri, timeout): name = uri.split("#")[0] last_complete = 0 hang_count = 0 @@ -73,11 +73,11 @@ async def wait_for_done(conf, uri): else: hang_count += 1 await asyncio.sleep(1.0) - if hang_count > 10: + if hang_count > timeout: return False, file['blobs_completed'], file['blobs_in_stream'] -async def main(uris=None, allow_fees=False): +async def main(uris=None, cmd_args=None): if not uris: uris = await get_frontpage_uris() conf = Config() @@ -93,7 +93,7 @@ async def main(uris=None, allow_fees=False): async def __resolve(name): resolved = await daemon_rpc(conf, 'resolve', urls=[name]) if 'error' not in resolved.get(name, {}): - if ("fee" not in resolved[name]['claim']['value']) or allow_fees: + if ("fee" not in resolved[name]['claim']['value']) or cmd_args.allow_fees: resolvable.append(name) else: print(f"{name} has a fee, skipping it") @@ -114,11 +114,13 @@ async def main(uris=None, allow_fees=False): for i, uri in enumerate(resolvable): start = time.time() try: - await daemon_rpc(conf, 'get', uri=uri) + await daemon_rpc(conf, 'get', uri=uri, save_file=True) first_byte = time.time() first_byte_times.append(first_byte - start) print(f"{i + 1}/{len(resolvable)} - {first_byte - start} {uri}") - downloaded, amount_downloaded, blobs_in_stream = await wait_for_done(conf, uri) + downloaded, amount_downloaded, blobs_in_stream = await wait_for_done( + conf, uri, cmd_args.stall_download_timeout + ) if downloaded: download_successes.append(uri) else: @@ -130,8 +132,10 @@ async def main(uris=None, allow_fees=False): except: print(f"{i + 1}/{len(uris)} - failed to start {uri}") failed_to_start.append(uri) - return - # await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) + if cmd_args.exit_on_error: + return + if cmd_args.delete_after_download: + await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) await asyncio.sleep(0.1) print("**********************************************") @@ -160,9 +164,11 @@ async def main(uris=None, allow_fees=False): if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("--data_dir") - parser.add_argument("--wallet_dir") - parser.add_argument("--download_directory") + #parser.add_argument("--data_dir") + #parser.add_argument("--wallet_dir") + #parser.add_argument("--download_directory") parser.add_argument("--allow_fees", action='store_true') - args = parser.parse_args() - asyncio.run(main(allow_fees=args.allow_fees)) + parser.add_argument("--exit_on_error", action='store_true') + parser.add_argument("--stall_download_timeout", default=10) + parser.add_argument("--delete_after_download", action='store_true') + asyncio.run(main(cmd_args=parser.parse_args())) From 895b3992c9f41902af82d3add6daa0209fdb5340 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 1 May 2019 13:26:51 -0300 Subject: [PATCH 09/18] set type for stall download timeout --- scripts/time_to_first_byte.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/time_to_first_byte.py b/scripts/time_to_first_byte.py index 6ff2e7b55..15511e856 100644 --- a/scripts/time_to_first_byte.py +++ b/scripts/time_to_first_byte.py @@ -129,8 +129,8 @@ async def main(uris=None, cmd_args=None): download_speeds.append(mbs) print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {uri} at " f"{mbs}mb/s") - except: - print(f"{i + 1}/{len(uris)} - failed to start {uri}") + except Exception as e: + print(f"{i + 1}/{len(uris)} - failed to start {uri}: {e}") failed_to_start.append(uri) if cmd_args.exit_on_error: return @@ -169,6 +169,6 @@ if __name__ == "__main__": #parser.add_argument("--download_directory") parser.add_argument("--allow_fees", action='store_true') parser.add_argument("--exit_on_error", action='store_true') - parser.add_argument("--stall_download_timeout", default=10) + parser.add_argument("--stall_download_timeout", default=10, type=int) parser.add_argument("--delete_after_download", action='store_true') asyncio.run(main(cmd_args=parser.parse_args())) From fc88261cc655d90d51c4fafa143e1463d976a166 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 1 May 2019 13:59:45 -0300 Subject: [PATCH 10/18] add head_blob_only to ttfb --- scripts/time_to_first_byte.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/scripts/time_to_first_byte.py b/scripts/time_to_first_byte.py index 15511e856..1da21a0b2 100644 --- a/scripts/time_to_first_byte.py +++ b/scripts/time_to_first_byte.py @@ -118,23 +118,24 @@ async def main(uris=None, cmd_args=None): first_byte = time.time() first_byte_times.append(first_byte - start) print(f"{i + 1}/{len(resolvable)} - {first_byte - start} {uri}") - downloaded, amount_downloaded, blobs_in_stream = await wait_for_done( - conf, uri, cmd_args.stall_download_timeout - ) - if downloaded: - download_successes.append(uri) - else: - download_failures.append(uri) - mbs = round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.time() - start) / 1000000, 2) - download_speeds.append(mbs) - print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {uri} at " - f"{mbs}mb/s") + if not cmd_args.head_blob_only: + downloaded, amount_downloaded, blobs_in_stream = await wait_for_done( + conf, uri, cmd_args.stall_download_timeout + ) + if downloaded: + download_successes.append(uri) + else: + download_failures.append(uri) + mbs = round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.time() - start) / 1000000, 2) + download_speeds.append(mbs) + print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {uri} at " + f"{mbs}mb/s") except Exception as e: print(f"{i + 1}/{len(uris)} - failed to start {uri}: {e}") failed_to_start.append(uri) if cmd_args.exit_on_error: return - if cmd_args.delete_after_download: + if cmd_args.delete_after_download or cmd_args.head_blob_only: await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) await asyncio.sleep(0.1) @@ -171,4 +172,5 @@ if __name__ == "__main__": parser.add_argument("--exit_on_error", action='store_true') parser.add_argument("--stall_download_timeout", default=10, type=int) parser.add_argument("--delete_after_download", action='store_true') + parser.add_argument("--head_blob_only", action='store_true') asyncio.run(main(cmd_args=parser.parse_args())) From 971f2a1cef4e05ce359616bd04180fdd109f1998 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 1 May 2019 14:31:11 -0300 Subject: [PATCH 11/18] log download speed only if downloading full streams --- scripts/time_to_first_byte.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/scripts/time_to_first_byte.py b/scripts/time_to_first_byte.py index 1da21a0b2..b762f44ba 100644 --- a/scripts/time_to_first_byte.py +++ b/scripts/time_to_first_byte.py @@ -145,12 +145,13 @@ async def main(uris=None, cmd_args=None): f"Best first byte time: {round(min(first_byte_times), 2)}\n" \ f"95% confidence time-to-first-byte: {confidence(first_byte_times, 1.984)}s\n" \ f"99% confidence time-to-first-byte: {confidence(first_byte_times, 2.626)}s\n" \ - f"Variance: {variance(first_byte_times)}\n" \ - f"Downloaded {len(download_successes)}/{len(resolvable)}\n" \ - f"Best stream download speed: {round(max(download_speeds), 2)}mb/s\n" \ - f"Worst stream download speed: {round(min(download_speeds), 2)}mb/s\n" \ - f"95% confidence download speed: {confidence(download_speeds, 1.984, False)}mb/s\n" \ - f"99% confidence download speed: {confidence(download_speeds, 2.626, False)}mb/s\n" + f"Variance: {variance(first_byte_times)}\n" + if not cmd_args.head_blob_only: + result += f"Downloaded {len(download_successes)}/{len(resolvable)}\n" \ + f"Best stream download speed: {round(max(download_speeds), 2)}mb/s\n" \ + f"Worst stream download speed: {round(min(download_speeds), 2)}mb/s\n" \ + f"95% confidence download speed: {confidence(download_speeds, 1.984, False)}mb/s\n" \ + f"99% confidence download speed: {confidence(download_speeds, 2.626, False)}mb/s\n" if failed_to_start: result += "\nFailed to start:" + "\n".join([f for f in failed_to_start]) From 6594b7c50c1e678683fe77eb97e4152f3dd8c963 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 1 May 2019 15:08:47 -0300 Subject: [PATCH 12/18] test concurrency through races on getting a writer --- tests/unit/blob/test_blob_file.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/unit/blob/test_blob_file.py b/tests/unit/blob/test_blob_file.py index 8b519e27e..664f7e75c 100644 --- a/tests/unit/blob/test_blob_file.py +++ b/tests/unit/blob/test_blob_file.py @@ -133,6 +133,22 @@ class TestBlob(AsyncioTestCase): await self._test_close_writers_on_finished(BlobBuffer) await self._test_close_writers_on_finished(BlobFile, tmp_dir) + async def test_concurrency_and_premature_closes(self): + blob_directory = tempfile.mkdtemp() + self.addCleanup(lambda: shutil.rmtree(blob_directory)) + blob = self._get_blob(BlobBuffer, blob_directory=blob_directory) + writer = blob.get_blob_writer('1.1.1.1', 1337) + self.assertEqual(1, len(blob.writers)) + with self.assertRaises(OSError): + blob.get_blob_writer('1.1.1.1', 1337) + writer.close_handle() + self.assertTrue(blob.writers[('1.1.1.1', 1337)].closed()) + writer = blob.get_blob_writer('1.1.1.1', 1337) + self.assertEqual(blob.writers[('1.1.1.1', 1337)], writer) + writer.close_handle() + await asyncio.sleep(0.000000001) # flush callbacks + self.assertEqual(0, len(blob.writers)) + async def test_delete(self): blob_buffer = await self._test_create_blob(BlobBuffer) self.assertIsInstance(blob_buffer, BlobBuffer) From e43b29fcd151883a22a07847d652e3e94fbec096 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 1 May 2019 15:09:11 -0300 Subject: [PATCH 13/18] if the current writer is closed, its fine to open a new one --- lbrynet/blob/blob_file.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 484e93122..b8a3e4796 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -201,7 +201,7 @@ class AbstractBlob: def get_blob_writer(self, peer_address: typing.Optional[str] = None, peer_port: typing.Optional[int] = None) -> HashBlobWriter: - if (peer_address, peer_port) in self.writers: + if (peer_address, peer_port) in self.writers and not self.writers[(peer_address, peer_port)].closed(): raise OSError(f"attempted to download blob twice from {peer_address}:{peer_port}") fut = asyncio.Future(loop=self.loop) writer = HashBlobWriter(self.blob_hash, self.get_length, fut) From 609cf42868a61a4f0280c7329fce1c9fcc050ef6 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 1 May 2019 18:04:45 -0300 Subject: [PATCH 14/18] cleanup downloader code --- lbrynet/blob_exchange/downloader.py | 42 +++++++---------------------- 1 file changed, 10 insertions(+), 32 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 85cf3c38b..3367e7ee6 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -31,12 +31,6 @@ class BlobDownloader: def should_race_continue(self, blob: 'AbstractBlob'): if len(self.active_connections) >= self.config.max_connections_per_download: return False - # if a peer won 3 or more blob races and is active as a downloader, stop the race so bandwidth improves - # the safe net side is that any failure will reset the peer score, triggering the race back - # TODO: this is a good idea for low bandwidth, but doesnt play nice on high bandwidth - # for peer, task in self.active_connections.items(): - # if self.scores.get(peer, 0) >= 0 and self.rounds_won.get(peer, 0) >= 3 and not task.done(): - # return False return not (blob.get_is_verified() or not blob.is_writeable()) async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer'): @@ -70,14 +64,14 @@ class BlobDownloader: to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()] for peer in to_remove: del self.active_connections[peer] + self.clearbanned() def clearbanned(self): now = self.loop.time() - timeout_for = lambda peer: min(30.0, (self.failures.get(peer, 0) ** self.BAN_FACTOR) - 1.0) - forgiven = [banned_peer for banned_peer, when in self.ignored.items() if now - when > timeout_for(banned_peer)] - self.peer_queue.put_nowait(forgiven) - for banned_peer in forgiven: - self.ignored.pop(banned_peer) + self.ignored = dict(( + (peer, when) for (peer, when) in self.ignored.items() + if (now - when) < min(30.0, (self.failures.get(peer, 0) ** self.BAN_FACTOR)) + )) @cache_concurrent async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'AbstractBlob': @@ -86,15 +80,16 @@ class BlobDownloader: return blob try: while not blob.get_is_verified(): - batch: typing.List['KademliaPeer'] = [] + batch: typing.Set['KademliaPeer'] = set() while not self.peer_queue.empty(): - batch.extend(self.peer_queue.get_nowait()) - batch.sort(key=lambda p: self.scores.get(p, 0), reverse=True) + batch.update(self.peer_queue.get_nowait()) + if batch: + self.peer_queue.put_nowait(list(batch)) log.debug( "running, %d peers, %d ignored, %d active", len(batch), len(self.ignored), len(self.active_connections) ) - for peer in batch: + for peer in sorted(batch, key=lambda peer: self.scores.get(peer, 0), reverse=True): if not self.should_race_continue(blob): break if peer not in self.active_connections and peer not in self.ignored: @@ -103,26 +98,9 @@ class BlobDownloader: self.active_connections[peer] = t await self.new_peer_or_finished() self.cleanup_active() - if batch: - to_re_add = list(set(batch).difference(self.ignored)) - if to_re_add: - self.peer_queue.put_nowait(to_re_add) - else: - self.clearbanned() - else: - self.clearbanned() - blob.close() log.debug("downloaded %s", blob_hash[:8]) return blob finally: - re_add = set() - while self.active_connections: - peer, t = self.active_connections.popitem() - t.cancel() - re_add.add(peer) - re_add = re_add.difference(self.ignored) - if re_add: - self.peer_queue.put_nowait(list(re_add)) blob.close() def close(self): From ac69fcbc5ff3b71e203a5ecacecebe3226402140 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 2 May 2019 11:52:09 -0300 Subject: [PATCH 15/18] fix possible division by zero --- lbrynet/blob_exchange/downloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 3367e7ee6..7aff3bfaa 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -53,8 +53,8 @@ class BlobDownloader: log.debug("keep peer %s:%i", peer.address, peer.tcp_port) self.failures[peer] = 0 self.connections[peer] = transport - rough_speed = (bytes_received / (self.loop.time() - start)) if bytes_received else 0 - self.scores[peer] = rough_speed + elapsed = self.loop.time - start + self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 0 async def new_peer_or_finished(self): active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)] From ff2985cc80bb09f9ef14aec84c4e26257dafa345 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 2 May 2019 12:13:33 -0300 Subject: [PATCH 16/18] close protocol if transport reuse brings responses before requests --- lbrynet/blob_exchange/client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 0f58efeb3..52c9dd537 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -35,6 +35,9 @@ class BlobExchangeClientProtocol(asyncio.Protocol): if self._response_fut and not self._response_fut.done(): self._response_fut.cancel() return + if not self._response_fut: + log.warning("Protocol received data before expected, probable race on keep alive. Closing transport.") + return self.close() if self._blob_bytes_received and not self.writer.closed(): return self._write(data) From 68d7328918ff529cddd8485766f905df0d15885f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 2 May 2019 12:20:49 -0300 Subject: [PATCH 17/18] always close the write handle --- lbrynet/blob_exchange/client.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 52c9dd537..67ca75cae 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -168,6 +168,10 @@ class BlobExchangeClientProtocol(asyncio.Protocol): except asyncio.CancelledError: self.close() raise + finally: + if self.writer and not self.writer.closed(): + self.writer.close_handle() + self.writer = None def connection_made(self, transport: asyncio.Transport): self.transport = transport From cf7bb6a3912210be32c8345dddd90b21660da838 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 2 May 2019 16:41:48 -0300 Subject: [PATCH 18/18] fix tests and stream being deleted on data timeout --- lbrynet/blob_exchange/downloader.py | 2 +- lbrynet/stream/stream_manager.py | 15 ++++++++++----- tests/integration/test_file_commands.py | 6 +++--- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 7aff3bfaa..b8b854a49 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -53,7 +53,7 @@ class BlobDownloader: log.debug("keep peer %s:%i", peer.address, peer.tcp_port) self.failures[peer] = 0 self.connections[peer] = transport - elapsed = self.loop.time - start + elapsed = self.loop.time() - start self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 0 async def new_peer_or_finished(self): diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index b7fe6734a..6fe849df5 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -397,12 +397,17 @@ class StreamManager: if not stream.descriptor: raise DownloadSDTimeout(stream.sd_hash) raise DownloadDataTimeout(stream.sd_hash) - if to_replace: # delete old stream now that the replacement has started downloading - await self.delete_stream(to_replace) - stream.set_claim(resolved, claim) - await self.storage.save_content_claim(stream.stream_hash, outpoint) - self.streams[stream.sd_hash] = stream + finally: + if stream.descriptor: + if to_replace: # delete old stream now that the replacement has started downloading + await self.delete_stream(to_replace) + stream.set_claim(resolved, claim) + await self.storage.save_content_claim(stream.stream_hash, outpoint) + self.streams[stream.sd_hash] = stream return stream + except DownloadDataTimeout as err: # forgive data timeout, dont delete stream + error = err + raise except Exception as err: error = err if stream and stream.descriptor: diff --git a/tests/integration/test_file_commands.py b/tests/integration/test_file_commands.py index 2876140a7..419afd8cf 100644 --- a/tests/integration/test_file_commands.py +++ b/tests/integration/test_file_commands.py @@ -79,12 +79,12 @@ class FileCommands(CommandTestCase): blob_hash for blob_hash in self.server.blob_manager.completed_blob_hashes if blob_hash != sd_hash ] await self.server.blob_manager.delete_blobs(all_except_sd) - resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2) + resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True) self.assertIn('error', resp) self.assertEqual('Failed to download data blobs for sd hash %s within timeout' % sd_hash, resp['error']) - await self.daemon.jsonrpc_file_delete(claim_name='foo') + self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo'), "data timeout didnt create a file") await self.server.blob_manager.delete_blobs([sd_hash]) - resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2) + resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True) self.assertIn('error', resp) self.assertEqual('Failed to download sd blob %s within timeout' % sd_hash, resp['error'])