Merge pull request #2076 from lbryio/download_improv
make ttfb configurable, fix race condition on writers, ban on time scales, remove idle code for getting unstuck on long downloads, fix foreign key error and improve downloader loop checks
This commit is contained in:
commit
d61ddbb950
8 changed files with 115 additions and 99 deletions
|
@ -201,7 +201,7 @@ class AbstractBlob:
|
|||
|
||||
def get_blob_writer(self, peer_address: typing.Optional[str] = None,
|
||||
peer_port: typing.Optional[int] = None) -> HashBlobWriter:
|
||||
if (peer_address, peer_port) in self.writers:
|
||||
if (peer_address, peer_port) in self.writers and not self.writers[(peer_address, peer_port)].closed():
|
||||
raise OSError(f"attempted to download blob twice from {peer_address}:{peer_port}")
|
||||
fut = asyncio.Future(loop=self.loop)
|
||||
writer = HashBlobWriter(self.blob_hash, self.get_length, fut)
|
||||
|
|
|
@ -35,6 +35,9 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
|||
if self._response_fut and not self._response_fut.done():
|
||||
self._response_fut.cancel()
|
||||
return
|
||||
if not self._response_fut:
|
||||
log.warning("Protocol received data before expected, probable race on keep alive. Closing transport.")
|
||||
return self.close()
|
||||
if self._blob_bytes_received and not self.writer.closed():
|
||||
return self._write(data)
|
||||
|
||||
|
@ -165,6 +168,10 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
|||
except asyncio.CancelledError:
|
||||
self.close()
|
||||
raise
|
||||
finally:
|
||||
if self.writer and not self.writer.closed():
|
||||
self.writer.close_handle()
|
||||
self.writer = None
|
||||
|
||||
def connection_made(self, transport: asyncio.Transport):
|
||||
self.transport = transport
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import asyncio
|
||||
import typing
|
||||
import logging
|
||||
from lbrynet.utils import drain_tasks, cache_concurrent
|
||||
from lbrynet.utils import cache_concurrent
|
||||
from lbrynet.blob_exchange.client import request_blob
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbrynet.conf import Config
|
||||
|
@ -14,7 +14,7 @@ log = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class BlobDownloader:
|
||||
BAN_TIME = 10.0 # fixme: when connection manager gets implemented, move it out from here
|
||||
BAN_FACTOR = 2.0 # fixme: when connection manager gets implemented, move it out from here
|
||||
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager',
|
||||
peer_queue: asyncio.Queue):
|
||||
|
@ -25,18 +25,12 @@ class BlobDownloader:
|
|||
self.active_connections: typing.Dict['KademliaPeer', asyncio.Task] = {} # active request_blob calls
|
||||
self.ignored: typing.Dict['KademliaPeer', int] = {}
|
||||
self.scores: typing.Dict['KademliaPeer', int] = {}
|
||||
self.failures: typing.Dict['KademliaPeer', int] = {}
|
||||
self.connections: typing.Dict['KademliaPeer', asyncio.Transport] = {}
|
||||
self.time_since_last_blob = loop.time()
|
||||
|
||||
def should_race_continue(self, blob: 'AbstractBlob'):
|
||||
if len(self.active_connections) >= self.config.max_connections_per_download:
|
||||
return False
|
||||
# if a peer won 3 or more blob races and is active as a downloader, stop the race so bandwidth improves
|
||||
# the safe net side is that any failure will reset the peer score, triggering the race back
|
||||
# TODO: this is a good idea for low bandwidth, but doesnt play nice on high bandwidth
|
||||
# for peer, task in self.active_connections.items():
|
||||
# if self.scores.get(peer, 0) >= 0 and self.rounds_won.get(peer, 0) >= 3 and not task.done():
|
||||
# return False
|
||||
return not (blob.get_is_verified() or not blob.is_writeable())
|
||||
|
||||
async def request_blob_from_peer(self, blob: 'AbstractBlob', peer: 'KademliaPeer'):
|
||||
|
@ -49,46 +43,35 @@ class BlobDownloader:
|
|||
self.loop, blob, peer.address, peer.tcp_port, self.config.peer_connect_timeout,
|
||||
self.config.blob_download_timeout, connected_transport=transport
|
||||
)
|
||||
if bytes_received == blob.get_length():
|
||||
self.time_since_last_blob = self.loop.time()
|
||||
if not transport and peer not in self.ignored:
|
||||
self.ignored[peer] = self.loop.time()
|
||||
log.debug("drop peer %s:%i", peer.address, peer.tcp_port)
|
||||
self.failures[peer] = self.failures.get(peer, 0) + 1
|
||||
if peer in self.connections:
|
||||
del self.connections[peer]
|
||||
elif transport:
|
||||
log.debug("keep peer %s:%i", peer.address, peer.tcp_port)
|
||||
self.failures[peer] = 0
|
||||
self.connections[peer] = transport
|
||||
rough_speed = (bytes_received / (self.loop.time() - start)) if bytes_received else 0
|
||||
self.scores[peer] = rough_speed
|
||||
elapsed = self.loop.time() - start
|
||||
self.scores[peer] = bytes_received / elapsed if bytes_received and elapsed else 0
|
||||
|
||||
async def new_peer_or_finished(self, blob: 'AbstractBlob'):
|
||||
async def get_and_re_add_peers():
|
||||
try:
|
||||
new_peers = await asyncio.wait_for(self.peer_queue.get(), timeout=1.0)
|
||||
self.peer_queue.put_nowait(new_peers)
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
tasks = [self.loop.create_task(get_and_re_add_peers()), self.loop.create_task(blob.verified.wait())]
|
||||
active_tasks = list(self.active_connections.values())
|
||||
try:
|
||||
await asyncio.wait(tasks + active_tasks, loop=self.loop, return_when='FIRST_COMPLETED')
|
||||
finally:
|
||||
drain_tasks(tasks)
|
||||
async def new_peer_or_finished(self):
|
||||
active_tasks = list(self.active_connections.values()) + [asyncio.sleep(1)]
|
||||
await asyncio.wait(active_tasks, loop=self.loop, return_when='FIRST_COMPLETED')
|
||||
|
||||
def cleanup_active(self):
|
||||
to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()]
|
||||
for peer in to_remove:
|
||||
del self.active_connections[peer]
|
||||
self.clearbanned()
|
||||
|
||||
def clearbanned(self):
|
||||
now = self.loop.time()
|
||||
if now - self.time_since_last_blob > 60.0:
|
||||
return
|
||||
forgiven = [banned_peer for banned_peer, when in self.ignored.items() if now - when > self.BAN_TIME]
|
||||
self.peer_queue.put_nowait(forgiven)
|
||||
for banned_peer in forgiven:
|
||||
self.ignored.pop(banned_peer)
|
||||
self.ignored = dict((
|
||||
(peer, when) for (peer, when) in self.ignored.items()
|
||||
if (now - when) < min(30.0, (self.failures.get(peer, 0) ** self.BAN_FACTOR))
|
||||
))
|
||||
|
||||
@cache_concurrent
|
||||
async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'AbstractBlob':
|
||||
|
@ -97,43 +80,27 @@ class BlobDownloader:
|
|||
return blob
|
||||
try:
|
||||
while not blob.get_is_verified():
|
||||
batch: typing.List['KademliaPeer'] = []
|
||||
batch: typing.Set['KademliaPeer'] = set()
|
||||
while not self.peer_queue.empty():
|
||||
batch.extend(self.peer_queue.get_nowait())
|
||||
batch.sort(key=lambda p: self.scores.get(p, 0), reverse=True)
|
||||
batch.update(self.peer_queue.get_nowait())
|
||||
if batch:
|
||||
self.peer_queue.put_nowait(list(batch))
|
||||
log.debug(
|
||||
"running, %d peers, %d ignored, %d active",
|
||||
len(batch), len(self.ignored), len(self.active_connections)
|
||||
)
|
||||
for peer in batch:
|
||||
for peer in sorted(batch, key=lambda peer: self.scores.get(peer, 0), reverse=True):
|
||||
if not self.should_race_continue(blob):
|
||||
break
|
||||
if peer not in self.active_connections and peer not in self.ignored:
|
||||
log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port)
|
||||
t = self.loop.create_task(self.request_blob_from_peer(blob, peer))
|
||||
self.active_connections[peer] = t
|
||||
await self.new_peer_or_finished(blob)
|
||||
await self.new_peer_or_finished()
|
||||
self.cleanup_active()
|
||||
if batch:
|
||||
to_re_add = list(set(batch).difference(self.ignored))
|
||||
if to_re_add:
|
||||
self.peer_queue.put_nowait(to_re_add)
|
||||
else:
|
||||
self.clearbanned()
|
||||
else:
|
||||
self.clearbanned()
|
||||
blob.close()
|
||||
log.debug("downloaded %s", blob_hash[:8])
|
||||
return blob
|
||||
finally:
|
||||
re_add = set()
|
||||
while self.active_connections:
|
||||
peer, t = self.active_connections.popitem()
|
||||
t.cancel()
|
||||
re_add.add(peer)
|
||||
re_add = re_add.difference(self.ignored)
|
||||
if re_add:
|
||||
self.peer_queue.put_nowait(list(re_add))
|
||||
blob.close()
|
||||
|
||||
def close(self):
|
||||
|
|
|
@ -231,7 +231,7 @@ class ManagedStream:
|
|||
self.update_status(ManagedStream.STATUS_RUNNING)
|
||||
await self.blob_manager.storage.change_file_status(self.stream_hash, ManagedStream.STATUS_RUNNING)
|
||||
self.update_delayed_stop()
|
||||
elif not os.path.isfile(self.full_path):
|
||||
else:
|
||||
await self.save_file(file_name, download_directory)
|
||||
await self.started_writing.wait()
|
||||
|
||||
|
@ -305,11 +305,11 @@ class ManagedStream:
|
|||
if not os.path.isdir(self.download_directory):
|
||||
log.warning("download directory '%s' does not exist, attempting to make it", self.download_directory)
|
||||
os.mkdir(self.download_directory)
|
||||
self._file_name = await get_next_available_file_name(
|
||||
self.loop, self.download_directory,
|
||||
file_name or self._file_name or self.descriptor.suggested_file_name
|
||||
)
|
||||
if not await self.blob_manager.storage.file_exists(self.sd_hash):
|
||||
self._file_name = await get_next_available_file_name(
|
||||
self.loop, self.download_directory,
|
||||
file_name or self._file_name or self.descriptor.suggested_file_name
|
||||
)
|
||||
self.rowid = self.blob_manager.storage.save_downloaded_file(
|
||||
self.stream_hash, self.file_name, self.download_directory, 0.0
|
||||
)
|
||||
|
|
|
@ -397,12 +397,17 @@ class StreamManager:
|
|||
if not stream.descriptor:
|
||||
raise DownloadSDTimeout(stream.sd_hash)
|
||||
raise DownloadDataTimeout(stream.sd_hash)
|
||||
if to_replace: # delete old stream now that the replacement has started downloading
|
||||
await self.delete_stream(to_replace)
|
||||
stream.set_claim(resolved, claim)
|
||||
await self.storage.save_content_claim(stream.stream_hash, outpoint)
|
||||
self.streams[stream.sd_hash] = stream
|
||||
finally:
|
||||
if stream.descriptor:
|
||||
if to_replace: # delete old stream now that the replacement has started downloading
|
||||
await self.delete_stream(to_replace)
|
||||
stream.set_claim(resolved, claim)
|
||||
await self.storage.save_content_claim(stream.stream_hash, outpoint)
|
||||
self.streams[stream.sd_hash] = stream
|
||||
return stream
|
||||
except DownloadDataTimeout as err: # forgive data timeout, dont delete stream
|
||||
error = err
|
||||
raise
|
||||
except Exception as err:
|
||||
error = err
|
||||
if stream and stream.descriptor:
|
||||
|
|
|
@ -11,7 +11,7 @@ from lbrynet.blob.blob_file import MAX_BLOB_SIZE
|
|||
from lbrynet.conf import Config
|
||||
from lbrynet.schema.uri import parse_lbry_uri
|
||||
from lbrynet.extras.daemon.client import daemon_rpc
|
||||
from lbrynet.extras import system_info, cli
|
||||
from lbrynet.extras import system_info
|
||||
|
||||
|
||||
def extract_uris(response):
|
||||
|
@ -58,7 +58,7 @@ def variance(times):
|
|||
return round(sum(((i - mean) ** 2.0 for i in times)) / (len(times) - 1), 3)
|
||||
|
||||
|
||||
async def wait_for_done(conf, uri):
|
||||
async def wait_for_done(conf, uri, timeout):
|
||||
name = uri.split("#")[0]
|
||||
last_complete = 0
|
||||
hang_count = 0
|
||||
|
@ -73,11 +73,11 @@ async def wait_for_done(conf, uri):
|
|||
else:
|
||||
hang_count += 1
|
||||
await asyncio.sleep(1.0)
|
||||
if hang_count > 10:
|
||||
if hang_count > timeout:
|
||||
return False, file['blobs_completed'], file['blobs_in_stream']
|
||||
|
||||
|
||||
async def main(uris=None, allow_fees=False):
|
||||
async def main(uris=None, cmd_args=None):
|
||||
if not uris:
|
||||
uris = await get_frontpage_uris()
|
||||
conf = Config()
|
||||
|
@ -93,7 +93,7 @@ async def main(uris=None, allow_fees=False):
|
|||
async def __resolve(name):
|
||||
resolved = await daemon_rpc(conf, 'resolve', urls=[name])
|
||||
if 'error' not in resolved.get(name, {}):
|
||||
if ("fee" not in resolved[name]['claim']['value']) or allow_fees:
|
||||
if ("fee" not in resolved[name]['claim']['value']) or cmd_args.allow_fees:
|
||||
resolvable.append(name)
|
||||
else:
|
||||
print(f"{name} has a fee, skipping it")
|
||||
|
@ -114,24 +114,29 @@ async def main(uris=None, allow_fees=False):
|
|||
for i, uri in enumerate(resolvable):
|
||||
start = time.time()
|
||||
try:
|
||||
await daemon_rpc(conf, 'get', uri=uri)
|
||||
await daemon_rpc(conf, 'get', uri=uri, save_file=True)
|
||||
first_byte = time.time()
|
||||
first_byte_times.append(first_byte - start)
|
||||
print(f"{i + 1}/{len(resolvable)} - {first_byte - start} {uri}")
|
||||
downloaded, amount_downloaded, blobs_in_stream = await wait_for_done(conf, uri)
|
||||
if downloaded:
|
||||
download_successes.append(uri)
|
||||
else:
|
||||
download_failures.append(uri)
|
||||
mbs = round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.time() - start) / 1000000, 2)
|
||||
download_speeds.append(mbs)
|
||||
print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {uri} at "
|
||||
f"{mbs}mb/s")
|
||||
except:
|
||||
print(f"{i + 1}/{len(uris)} - failed to start {uri}")
|
||||
if not cmd_args.head_blob_only:
|
||||
downloaded, amount_downloaded, blobs_in_stream = await wait_for_done(
|
||||
conf, uri, cmd_args.stall_download_timeout
|
||||
)
|
||||
if downloaded:
|
||||
download_successes.append(uri)
|
||||
else:
|
||||
download_failures.append(uri)
|
||||
mbs = round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.time() - start) / 1000000, 2)
|
||||
download_speeds.append(mbs)
|
||||
print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {uri} at "
|
||||
f"{mbs}mb/s")
|
||||
except Exception as e:
|
||||
print(f"{i + 1}/{len(uris)} - failed to start {uri}: {e}")
|
||||
failed_to_start.append(uri)
|
||||
return
|
||||
# await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name)
|
||||
if cmd_args.exit_on_error:
|
||||
return
|
||||
if cmd_args.delete_after_download or cmd_args.head_blob_only:
|
||||
await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name)
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
print("**********************************************")
|
||||
|
@ -140,12 +145,13 @@ async def main(uris=None, allow_fees=False):
|
|||
f"Best first byte time: {round(min(first_byte_times), 2)}\n" \
|
||||
f"95% confidence time-to-first-byte: {confidence(first_byte_times, 1.984)}s\n" \
|
||||
f"99% confidence time-to-first-byte: {confidence(first_byte_times, 2.626)}s\n" \
|
||||
f"Variance: {variance(first_byte_times)}\n" \
|
||||
f"Downloaded {len(download_successes)}/{len(resolvable)}\n" \
|
||||
f"Best stream download speed: {round(max(download_speeds), 2)}mb/s\n" \
|
||||
f"Worst stream download speed: {round(min(download_speeds), 2)}mb/s\n" \
|
||||
f"95% confidence download speed: {confidence(download_speeds, 1.984, False)}mb/s\n" \
|
||||
f"99% confidence download speed: {confidence(download_speeds, 2.626, False)}mb/s\n"
|
||||
f"Variance: {variance(first_byte_times)}\n"
|
||||
if not cmd_args.head_blob_only:
|
||||
result += f"Downloaded {len(download_successes)}/{len(resolvable)}\n" \
|
||||
f"Best stream download speed: {round(max(download_speeds), 2)}mb/s\n" \
|
||||
f"Worst stream download speed: {round(min(download_speeds), 2)}mb/s\n" \
|
||||
f"95% confidence download speed: {confidence(download_speeds, 1.984, False)}mb/s\n" \
|
||||
f"99% confidence download speed: {confidence(download_speeds, 2.626, False)}mb/s\n"
|
||||
|
||||
if failed_to_start:
|
||||
result += "\nFailed to start:" + "\n".join([f for f in failed_to_start])
|
||||
|
@ -160,9 +166,12 @@ async def main(uris=None, allow_fees=False):
|
|||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--data_dir")
|
||||
parser.add_argument("--wallet_dir")
|
||||
parser.add_argument("--download_directory")
|
||||
#parser.add_argument("--data_dir")
|
||||
#parser.add_argument("--wallet_dir")
|
||||
#parser.add_argument("--download_directory")
|
||||
parser.add_argument("--allow_fees", action='store_true')
|
||||
args = parser.parse_args()
|
||||
asyncio.run(main(allow_fees=args.allow_fees))
|
||||
parser.add_argument("--exit_on_error", action='store_true')
|
||||
parser.add_argument("--stall_download_timeout", default=10, type=int)
|
||||
parser.add_argument("--delete_after_download", action='store_true')
|
||||
parser.add_argument("--head_blob_only", action='store_true')
|
||||
asyncio.run(main(cmd_args=parser.parse_args()))
|
||||
|
|
|
@ -44,6 +44,18 @@ class FileCommands(CommandTestCase):
|
|||
)
|
||||
self.assertEqual(file_list[0]['confirmations'], 1)
|
||||
|
||||
async def test_get_doesnt_touch_user_written_files_between_calls(self):
|
||||
await self.stream_create('foo', '0.01', data=bytes([0] * (2 << 23)))
|
||||
self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo'))
|
||||
first_path = (await self.daemon.jsonrpc_get('lbry://foo', save_file=True)).full_path
|
||||
await self.wait_files_to_complete()
|
||||
self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo'))
|
||||
with open(first_path, 'wb') as f:
|
||||
f.write(b' ')
|
||||
f.flush()
|
||||
second_path = await self.daemon.jsonrpc_get('lbry://foo', save_file=True)
|
||||
await self.wait_files_to_complete()
|
||||
self.assertNotEquals(first_path, second_path)
|
||||
|
||||
async def test_file_list_updated_metadata_on_resolve(self):
|
||||
await self.stream_create('foo', '0.01')
|
||||
|
@ -67,12 +79,12 @@ class FileCommands(CommandTestCase):
|
|||
blob_hash for blob_hash in self.server.blob_manager.completed_blob_hashes if blob_hash != sd_hash
|
||||
]
|
||||
await self.server.blob_manager.delete_blobs(all_except_sd)
|
||||
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2)
|
||||
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True)
|
||||
self.assertIn('error', resp)
|
||||
self.assertEqual('Failed to download data blobs for sd hash %s within timeout' % sd_hash, resp['error'])
|
||||
await self.daemon.jsonrpc_file_delete(claim_name='foo')
|
||||
self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo'), "data timeout didnt create a file")
|
||||
await self.server.blob_manager.delete_blobs([sd_hash])
|
||||
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2)
|
||||
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True)
|
||||
self.assertIn('error', resp)
|
||||
self.assertEqual('Failed to download sd blob %s within timeout' % sd_hash, resp['error'])
|
||||
|
||||
|
@ -159,7 +171,7 @@ class FileCommands(CommandTestCase):
|
|||
self.assertEqual('finished', file_info['status'])
|
||||
|
||||
async def test_unban_recovers_stream(self):
|
||||
BlobDownloader.BAN_TIME = .5 # fixme: temporary field, will move to connection manager or a conf
|
||||
BlobDownloader.BAN_FACTOR = .5 # fixme: temporary field, will move to connection manager or a conf
|
||||
tx = await self.stream_create('foo', '0.01', data=bytes([0] * (1 << 23)))
|
||||
sd_hash = tx['outputs'][0]['value']['source']['sd_hash']
|
||||
missing_blob_hash = (await self.daemon.jsonrpc_blob_list(sd_hash=sd_hash))[-2]
|
||||
|
|
|
@ -133,6 +133,22 @@ class TestBlob(AsyncioTestCase):
|
|||
await self._test_close_writers_on_finished(BlobBuffer)
|
||||
await self._test_close_writers_on_finished(BlobFile, tmp_dir)
|
||||
|
||||
async def test_concurrency_and_premature_closes(self):
|
||||
blob_directory = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(blob_directory))
|
||||
blob = self._get_blob(BlobBuffer, blob_directory=blob_directory)
|
||||
writer = blob.get_blob_writer('1.1.1.1', 1337)
|
||||
self.assertEqual(1, len(blob.writers))
|
||||
with self.assertRaises(OSError):
|
||||
blob.get_blob_writer('1.1.1.1', 1337)
|
||||
writer.close_handle()
|
||||
self.assertTrue(blob.writers[('1.1.1.1', 1337)].closed())
|
||||
writer = blob.get_blob_writer('1.1.1.1', 1337)
|
||||
self.assertEqual(blob.writers[('1.1.1.1', 1337)], writer)
|
||||
writer.close_handle()
|
||||
await asyncio.sleep(0.000000001) # flush callbacks
|
||||
self.assertEqual(0, len(blob.writers))
|
||||
|
||||
async def test_delete(self):
|
||||
blob_buffer = await self._test_create_blob(BlobBuffer)
|
||||
self.assertIsInstance(blob_buffer, BlobBuffer)
|
||||
|
|
Loading…
Reference in a new issue