diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 9d6246d09..f99dbe768 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -1,4 +1,5 @@ import os +import re import asyncio import binascii import logging @@ -17,8 +18,11 @@ from lbrynet.blob.writer import HashBlobWriter log = logging.getLogger(__name__) +_hexmatch = re.compile("^[a-f,0-9]+$") + + def is_valid_hashcharacter(char: str) -> bool: - return char in "0123456789abcdef" + return len(char) == 1 and _hexmatch.match(char) def is_valid_blobhash(blobhash: str) -> bool: @@ -29,7 +33,7 @@ def is_valid_blobhash(blobhash: str) -> bool: @return: True/False """ - return len(blobhash) == blobhash_length and all(is_valid_hashcharacter(l) for l in blobhash) + return len(blobhash) == blobhash_length and _hexmatch.match(blobhash) def encrypt_blob_bytes(key: bytes, iv: bytes, unencrypted: bytes) -> typing.Tuple[bytes, str]: diff --git a/lbrynet/blob/blob_manager.py b/lbrynet/blob/blob_manager.py index f079ccb8f..73c2b0573 100644 --- a/lbrynet/blob/blob_manager.py +++ b/lbrynet/blob/blob_manager.py @@ -1,9 +1,10 @@ +import os import typing import asyncio import logging from sqlite3 import IntegrityError from lbrynet.extras.daemon.storage import SQLiteStorage -from lbrynet.blob.blob_file import BlobFile +from lbrynet.blob.blob_file import BlobFile, is_valid_blobhash from lbrynet.stream.descriptor import StreamDescriptor if typing.TYPE_CHECKING: @@ -30,8 +31,11 @@ class BlobFileManager: self.blobs: typing.Dict[str, BlobFile] = {} async def setup(self) -> bool: - raw_blob_hashes = await self.get_all_verified_blobs() - self.completed_blob_hashes.update(raw_blob_hashes) + def initialize_blob_hashes(): + self.completed_blob_hashes.update( + item.name for item in os.scandir(self.blob_dir) if is_valid_blobhash(item.name) + ) + await self.loop.run_in_executor(None, initialize_blob_hashes) return True def get_blob(self, blob_hash, length: typing.Optional[int] = None): @@ -59,10 +63,6 @@ class BlobFileManager: blobs = [self.get_blob(b) for b in blob_hashes] return [blob.blob_hash for blob in blobs if blob.get_is_verified()] - async def get_all_verified_blobs(self) -> typing.List[str]: - blob_hashes = await self.storage.get_all_blob_hashes() - return self.check_completed_blobs(blob_hashes) - async def delete_blob(self, blob_hash: str): try: blob = self.get_blob(blob_hash) diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py index 80d78fb22..fc972bc98 100644 --- a/lbrynet/blob_exchange/client.py +++ b/lbrynet/blob_exchange/client.py @@ -51,7 +51,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol): # fire the Future with the response to our request self._response_fut.set_result(response) if response.blob_data and self.writer and not self.writer.closed(): - log.debug("got %i blob bytes from %s:%i", len(response.blob_data), self.peer_address, self.peer_port) + # log.debug("got %i blob bytes from %s:%i", len(response.blob_data), self.peer_address, self.peer_port) # write blob bytes if we're writing a blob and have blob bytes to write self._write(response.blob_data) diff --git a/lbrynet/extras/cli.py b/lbrynet/extras/cli.py index 08ab4b3a5..e5aa3edcd 100644 --- a/lbrynet/extras/cli.py +++ b/lbrynet/extras/cli.py @@ -259,8 +259,11 @@ def main(argv=None): logging.getLogger('aioupnp').setLevel(logging.WARNING) logging.getLogger('aiohttp').setLevel(logging.CRITICAL) + loop = asyncio.get_event_loop() + if args.verbose: log.setLevel(logging.DEBUG) + loop.set_debug(True) else: log.setLevel(logging.INFO) if conf.share_usage_data: @@ -269,7 +272,6 @@ def main(argv=None): log.addHandler(loggly_handler) daemon = Daemon(conf) - loop = asyncio.get_event_loop() try: loop.run_until_complete(daemon.start()) loop.run_forever() diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index abdacfe6f..b37909240 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -105,14 +105,55 @@ def get_content_claim_from_outpoint(transaction: sqlite3.Connection, return StoredStreamClaim(*claim_fields) -def _batched_select(transaction, query, parameters): - for start_index in range(0, len(parameters), 900): - current_batch = parameters[start_index:start_index+900] +def _batched_select(transaction, query, parameters, batch_size=900): + for start_index in range(0, len(parameters), batch_size): + current_batch = parameters[start_index:start_index+batch_size] bind = "({})".format(','.join(['?'] * len(current_batch))) for result in transaction.execute(query.format(bind), current_batch): yield result +def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]: + files = [] + signed_claims = {} + for (rowid, stream_hash, file_name, download_dir, data_rate, status, _, sd_hash, stream_key, + stream_name, suggested_file_name, *claim_args) in _batched_select( + transaction, "select file.rowid, file.*, stream.*, c.* " + "from file inner join stream on file.stream_hash=stream.stream_hash " + "inner join content_claim cc on file.stream_hash=cc.stream_hash " + "inner join claim c on cc.claim_outpoint=c.claim_outpoint " + "where file.stream_hash in {} " + "order by c.rowid desc", [ + stream_hash for (stream_hash,) in transaction.execute("select stream_hash from file")]): + + claim = StoredStreamClaim(stream_hash, *claim_args) + if claim.channel_claim_id: + if claim.channel_claim_id not in signed_claims: + signed_claims[claim.channel_claim_id] = [] + signed_claims[claim.channel_claim_id].append(claim) + files.append( + { + "row_id": rowid, + "stream_hash": stream_hash, + "file_name": file_name, # hex + "download_directory": download_dir, # hex + "blob_data_rate": data_rate, + "status": status, + "sd_hash": sd_hash, + "key": stream_key, + "stream_name": stream_name, # hex + "suggested_file_name": suggested_file_name, # hex + "claim": claim + } + ) + for claim_id in signed_claims.keys(): + channel_name = transaction.execute("select claim_name from claim where claim_id=?", (claim_id, )).fetchone() + if channel_name: + for claim in signed_claims[claim_id]: + claim.channel_name = channel_name[0] + return files + + class SQLiteStorage(SQLiteMixin): CREATE_TABLES_QUERY = """ pragma foreign_keys=on; @@ -276,16 +317,18 @@ class SQLiteStorage(SQLiteMixin): r = transaction.execute( "select blob_hash from blob " "where blob_hash is not null and " - "(should_announce=1 or single_announce=1) and next_announce_time typing.List[typing.Dict]: - def _lbry_file_dict(rowid, stream_hash, file_name, download_dir, data_rate, status, _, sd_hash, stream_key, - stream_name, suggested_file_name) -> typing.Dict: - return { - "row_id": rowid, - "stream_hash": stream_hash, - "file_name": file_name, - "download_directory": download_dir, - "blob_data_rate": data_rate, - "status": status, - "sd_hash": sd_hash, - "key": stream_key, - "stream_name": stream_name, - "suggested_file_name": suggested_file_name - } - - def _get_all_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]: - file_infos = list(map(lambda a: _lbry_file_dict(*a), transaction.execute( - "select file.rowid, file.*, stream.* " - "from file inner join stream on file.stream_hash=stream.stream_hash" - ).fetchall())) - stream_hashes = [file_info['stream_hash'] for file_info in file_infos] - claim_infos = get_claims_from_stream_hashes(transaction, stream_hashes) - for index in range(len(file_infos)): # pylint: disable=consider-using-enumerate - file_infos[index]['claim'] = claim_infos.get(file_infos[index]['stream_hash']) - return file_infos - - results = await self.db.run(_get_all_files) - if results: - return results - return [] + def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]: + return self.db.run(get_all_lbry_files) def change_file_status(self, stream_hash: str, new_status: str): log.info("update file status %s -> %s", stream_hash, new_status) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index e587b6e07..b31882793 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -143,7 +143,9 @@ class StreamManager: self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) async def load_streams_from_database(self): + log.info("Initializing stream manager from %s", self.storage._db_path) file_infos = await self.storage.get_all_lbry_files() + log.info("Initializing %i files", len(file_infos)) await asyncio.gather(*[ self.add_stream( file_info['sd_hash'], binascii.unhexlify(file_info['file_name']).decode(), @@ -166,7 +168,7 @@ class StreamManager: async def reflect_streams(self): while True: - if self.config.reflector_servers: + if self.config.reflect_streams and self.config.reflector_servers: sd_hashes = await self.storage.get_streams_to_re_reflect() streams = list(filter(lambda s: s.sd_hash in sd_hashes, self.streams)) batch = [] @@ -203,7 +205,7 @@ class StreamManager: stream = await ManagedStream.create(self.loop, self.blob_manager, file_path, key, iv_generator) self.streams.add(stream) self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) - if self.config.reflector_servers: + if self.config.reflect_streams and self.config.reflector_servers: host, port = random.choice(self.config.reflector_servers) self.loop.create_task(stream.upload_to_reflector(host, port)) return stream diff --git a/scripts/time_to_first_byte.py b/scripts/time_to_first_byte.py index ac45a57d8..6f6260254 100644 --- a/scripts/time_to_first_byte.py +++ b/scripts/time_to_first_byte.py @@ -7,6 +7,7 @@ import time from aiohttp import ClientConnectorError from lbrynet import __version__ +from lbrynet.blob.blob_file import MAX_BLOB_SIZE from lbrynet.conf import Config from lbrynet.schema.uri import parse_lbry_uri from lbrynet.extras.daemon.client import daemon_rpc @@ -65,7 +66,7 @@ async def wait_for_done(conf, uri): files = await daemon_rpc(conf, "file_list", claim_name=name) file = files[0] if file['status'] in ['finished', 'stopped']: - return True, f"{file['blobs_completed']}/{file['blobs_in_stream']}", int(file['blobs_completed']) + return True, file['blobs_completed'], file['blobs_in_stream'] if last_complete < int(file['blobs_completed']): hang_count = 0 last_complete = int(file['blobs_completed']) @@ -73,7 +74,7 @@ async def wait_for_done(conf, uri): hang_count += 1 await asyncio.sleep(1.0) if hang_count > 10: - return False, f"{file['blobs_completed']}/{file['blobs_in_stream']}", int(file['blobs_completed']) + return False, file['blobs_completed'], file['blobs_in_stream'] async def main(uris=None): @@ -111,21 +112,24 @@ async def main(uris=None): first_byte = time.time() first_byte_times.append(first_byte - start) print(f"{i + 1}/{len(resolvable)} - {first_byte - start} {uri}") - # downloaded, msg, blobs_in_stream = await wait_for_done(conf, uri) - # if downloaded: - # downloaded_times.append((time.time()-start) / downloaded) - # print(f"\tdownloaded {uri} @ {(time.time()-start) / blobs_in_stream} seconds per blob") - # else: - # print(f"\tfailed to download {uri}, got {msg}") - # download_failures.append(uri) + downloaded, amount_downloaded, blobs_in_stream = await wait_for_done(conf, uri) + if downloaded: + downloaded_times.append((time.time() - start) / downloaded) + else: + download_failures.append(uri) + print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {uri} at " + f"{round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.time() - start) / 1000000, 2)}mb/s\n") except: - print(f"{i + 1}/{len(uris)} - timeout in {time.time() - start} {uri}") + print(f"{i + 1}/{len(uris)} - failed to start {uri}") failures.append(uri) - await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) + return + # await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) await asyncio.sleep(0.1) print("**********************************************") result = f"Tried to start downloading {len(resolvable)} streams from the front page\n" \ + f"Worst first byte time: {round(max(first_byte_times), 2)}\n" \ + f"Best first byte time: {round(min(first_byte_times), 2)}\n" \ f"95% confidence time-to-first-byte: {confidence(first_byte_times, 1.984)}\n" \ f"99% confidence time-to-first-byte: {confidence(first_byte_times, 2.626)}\n" \ f"Variance: {variance(first_byte_times)}\n" \