Merge pull request #1892 from lbryio/batch-query-files

Improve startup time
This commit is contained in:
Jack Robison 2019-02-08 20:43:52 -05:00 committed by GitHub
commit 4541e754dd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 89 additions and 63 deletions

View file

@ -1,4 +1,5 @@
import os import os
import re
import asyncio import asyncio
import binascii import binascii
import logging import logging
@ -17,8 +18,11 @@ from lbrynet.blob.writer import HashBlobWriter
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
_hexmatch = re.compile("^[a-f,0-9]+$")
def is_valid_hashcharacter(char: str) -> bool: def is_valid_hashcharacter(char: str) -> bool:
return char in "0123456789abcdef" return len(char) == 1 and _hexmatch.match(char)
def is_valid_blobhash(blobhash: str) -> bool: def is_valid_blobhash(blobhash: str) -> bool:
@ -29,7 +33,7 @@ def is_valid_blobhash(blobhash: str) -> bool:
@return: True/False @return: True/False
""" """
return len(blobhash) == blobhash_length and all(is_valid_hashcharacter(l) for l in blobhash) return len(blobhash) == blobhash_length and _hexmatch.match(blobhash)
def encrypt_blob_bytes(key: bytes, iv: bytes, unencrypted: bytes) -> typing.Tuple[bytes, str]: def encrypt_blob_bytes(key: bytes, iv: bytes, unencrypted: bytes) -> typing.Tuple[bytes, str]:

View file

@ -1,9 +1,10 @@
import os
import typing import typing
import asyncio import asyncio
import logging import logging
from sqlite3 import IntegrityError from sqlite3 import IntegrityError
from lbrynet.extras.daemon.storage import SQLiteStorage from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.blob.blob_file import BlobFile from lbrynet.blob.blob_file import BlobFile, is_valid_blobhash
from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.descriptor import StreamDescriptor
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
@ -30,8 +31,11 @@ class BlobFileManager:
self.blobs: typing.Dict[str, BlobFile] = {} self.blobs: typing.Dict[str, BlobFile] = {}
async def setup(self) -> bool: async def setup(self) -> bool:
raw_blob_hashes = await self.get_all_verified_blobs() def initialize_blob_hashes():
self.completed_blob_hashes.update(raw_blob_hashes) self.completed_blob_hashes.update(
item.name for item in os.scandir(self.blob_dir) if is_valid_blobhash(item.name)
)
await self.loop.run_in_executor(None, initialize_blob_hashes)
return True return True
def get_blob(self, blob_hash, length: typing.Optional[int] = None): def get_blob(self, blob_hash, length: typing.Optional[int] = None):
@ -59,10 +63,6 @@ class BlobFileManager:
blobs = [self.get_blob(b) for b in blob_hashes] blobs = [self.get_blob(b) for b in blob_hashes]
return [blob.blob_hash for blob in blobs if blob.get_is_verified()] return [blob.blob_hash for blob in blobs if blob.get_is_verified()]
async def get_all_verified_blobs(self) -> typing.List[str]:
blob_hashes = await self.storage.get_all_blob_hashes()
return self.check_completed_blobs(blob_hashes)
async def delete_blob(self, blob_hash: str): async def delete_blob(self, blob_hash: str):
try: try:
blob = self.get_blob(blob_hash) blob = self.get_blob(blob_hash)

View file

@ -51,7 +51,7 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
# fire the Future with the response to our request # fire the Future with the response to our request
self._response_fut.set_result(response) self._response_fut.set_result(response)
if response.blob_data and self.writer and not self.writer.closed(): if response.blob_data and self.writer and not self.writer.closed():
log.debug("got %i blob bytes from %s:%i", len(response.blob_data), self.peer_address, self.peer_port) # log.debug("got %i blob bytes from %s:%i", len(response.blob_data), self.peer_address, self.peer_port)
# write blob bytes if we're writing a blob and have blob bytes to write # write blob bytes if we're writing a blob and have blob bytes to write
self._write(response.blob_data) self._write(response.blob_data)

View file

@ -259,8 +259,11 @@ def main(argv=None):
logging.getLogger('aioupnp').setLevel(logging.WARNING) logging.getLogger('aioupnp').setLevel(logging.WARNING)
logging.getLogger('aiohttp').setLevel(logging.CRITICAL) logging.getLogger('aiohttp').setLevel(logging.CRITICAL)
loop = asyncio.get_event_loop()
if args.verbose: if args.verbose:
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
loop.set_debug(True)
else: else:
log.setLevel(logging.INFO) log.setLevel(logging.INFO)
if conf.share_usage_data: if conf.share_usage_data:
@ -269,7 +272,6 @@ def main(argv=None):
log.addHandler(loggly_handler) log.addHandler(loggly_handler)
daemon = Daemon(conf) daemon = Daemon(conf)
loop = asyncio.get_event_loop()
try: try:
loop.run_until_complete(daemon.start()) loop.run_until_complete(daemon.start())
loop.run_forever() loop.run_forever()

View file

@ -105,14 +105,55 @@ def get_content_claim_from_outpoint(transaction: sqlite3.Connection,
return StoredStreamClaim(*claim_fields) return StoredStreamClaim(*claim_fields)
def _batched_select(transaction, query, parameters): def _batched_select(transaction, query, parameters, batch_size=900):
for start_index in range(0, len(parameters), 900): for start_index in range(0, len(parameters), batch_size):
current_batch = parameters[start_index:start_index+900] current_batch = parameters[start_index:start_index+batch_size]
bind = "({})".format(','.join(['?'] * len(current_batch))) bind = "({})".format(','.join(['?'] * len(current_batch)))
for result in transaction.execute(query.format(bind), current_batch): for result in transaction.execute(query.format(bind), current_batch):
yield result yield result
def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]:
files = []
signed_claims = {}
for (rowid, stream_hash, file_name, download_dir, data_rate, status, _, sd_hash, stream_key,
stream_name, suggested_file_name, *claim_args) in _batched_select(
transaction, "select file.rowid, file.*, stream.*, c.* "
"from file inner join stream on file.stream_hash=stream.stream_hash "
"inner join content_claim cc on file.stream_hash=cc.stream_hash "
"inner join claim c on cc.claim_outpoint=c.claim_outpoint "
"where file.stream_hash in {} "
"order by c.rowid desc", [
stream_hash for (stream_hash,) in transaction.execute("select stream_hash from file")]):
claim = StoredStreamClaim(stream_hash, *claim_args)
if claim.channel_claim_id:
if claim.channel_claim_id not in signed_claims:
signed_claims[claim.channel_claim_id] = []
signed_claims[claim.channel_claim_id].append(claim)
files.append(
{
"row_id": rowid,
"stream_hash": stream_hash,
"file_name": file_name, # hex
"download_directory": download_dir, # hex
"blob_data_rate": data_rate,
"status": status,
"sd_hash": sd_hash,
"key": stream_key,
"stream_name": stream_name, # hex
"suggested_file_name": suggested_file_name, # hex
"claim": claim
}
)
for claim_id in signed_claims.keys():
channel_name = transaction.execute("select claim_name from claim where claim_id=?", (claim_id, )).fetchone()
if channel_name:
for claim in signed_claims[claim_id]:
claim.channel_name = channel_name[0]
return files
class SQLiteStorage(SQLiteMixin): class SQLiteStorage(SQLiteMixin):
CREATE_TABLES_QUERY = """ CREATE_TABLES_QUERY = """
pragma foreign_keys=on; pragma foreign_keys=on;
@ -276,16 +317,18 @@ class SQLiteStorage(SQLiteMixin):
r = transaction.execute( r = transaction.execute(
"select blob_hash from blob " "select blob_hash from blob "
"where blob_hash is not null and " "where blob_hash is not null and "
"(should_announce=1 or single_announce=1) and next_announce_time<? and status='finished'", "(should_announce=1 or single_announce=1) and next_announce_time<? and status='finished' "
(timestamp,) "order by next_announce_time asc limit ?",
(timestamp, int(self.conf.concurrent_blob_announcers * 10))
) )
else: else:
r = transaction.execute( r = transaction.execute(
"select blob_hash from blob where blob_hash is not null " "select blob_hash from blob where blob_hash is not null "
"and next_announce_time<? and status='finished'", (timestamp,) "and next_announce_time<? and status='finished' "
"order by next_announce_time asc limit ?",
(timestamp, int(self.conf.concurrent_blob_announcers * 10))
) )
blobs = [b[0] for b in r.fetchall()] return [b[0] for b in r.fetchall()]
return blobs
return self.db.run(get_and_update) return self.db.run(get_and_update)
def delete_blobs_from_db(self, blob_hashes): def delete_blobs_from_db(self, blob_hashes):
@ -407,37 +450,8 @@ class SQLiteStorage(SQLiteMixin):
binascii.hexlify(download_directory.encode()).decode(), data_payment_rate, status) binascii.hexlify(download_directory.encode()).decode(), data_payment_rate, status)
) )
async def get_all_lbry_files(self) -> typing.List[typing.Dict]: def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]:
def _lbry_file_dict(rowid, stream_hash, file_name, download_dir, data_rate, status, _, sd_hash, stream_key, return self.db.run(get_all_lbry_files)
stream_name, suggested_file_name) -> typing.Dict:
return {
"row_id": rowid,
"stream_hash": stream_hash,
"file_name": file_name,
"download_directory": download_dir,
"blob_data_rate": data_rate,
"status": status,
"sd_hash": sd_hash,
"key": stream_key,
"stream_name": stream_name,
"suggested_file_name": suggested_file_name
}
def _get_all_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]:
file_infos = list(map(lambda a: _lbry_file_dict(*a), transaction.execute(
"select file.rowid, file.*, stream.* "
"from file inner join stream on file.stream_hash=stream.stream_hash"
).fetchall()))
stream_hashes = [file_info['stream_hash'] for file_info in file_infos]
claim_infos = get_claims_from_stream_hashes(transaction, stream_hashes)
for index in range(len(file_infos)): # pylint: disable=consider-using-enumerate
file_infos[index]['claim'] = claim_infos.get(file_infos[index]['stream_hash'])
return file_infos
results = await self.db.run(_get_all_files)
if results:
return results
return []
def change_file_status(self, stream_hash: str, new_status: str): def change_file_status(self, stream_hash: str, new_status: str):
log.info("update file status %s -> %s", stream_hash, new_status) log.info("update file status %s -> %s", stream_hash, new_status)

View file

@ -143,7 +143,9 @@ class StreamManager:
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
async def load_streams_from_database(self): async def load_streams_from_database(self):
log.info("Initializing stream manager from %s", self.storage._db_path)
file_infos = await self.storage.get_all_lbry_files() file_infos = await self.storage.get_all_lbry_files()
log.info("Initializing %i files", len(file_infos))
await asyncio.gather(*[ await asyncio.gather(*[
self.add_stream( self.add_stream(
file_info['sd_hash'], binascii.unhexlify(file_info['file_name']).decode(), file_info['sd_hash'], binascii.unhexlify(file_info['file_name']).decode(),
@ -166,7 +168,7 @@ class StreamManager:
async def reflect_streams(self): async def reflect_streams(self):
while True: while True:
if self.config.reflector_servers: if self.config.reflect_streams and self.config.reflector_servers:
sd_hashes = await self.storage.get_streams_to_re_reflect() sd_hashes = await self.storage.get_streams_to_re_reflect()
streams = list(filter(lambda s: s.sd_hash in sd_hashes, self.streams)) streams = list(filter(lambda s: s.sd_hash in sd_hashes, self.streams))
batch = [] batch = []
@ -203,7 +205,7 @@ class StreamManager:
stream = await ManagedStream.create(self.loop, self.blob_manager, file_path, key, iv_generator) stream = await ManagedStream.create(self.loop, self.blob_manager, file_path, key, iv_generator)
self.streams.add(stream) self.streams.add(stream)
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
if self.config.reflector_servers: if self.config.reflect_streams and self.config.reflector_servers:
host, port = random.choice(self.config.reflector_servers) host, port = random.choice(self.config.reflector_servers)
self.loop.create_task(stream.upload_to_reflector(host, port)) self.loop.create_task(stream.upload_to_reflector(host, port))
return stream return stream

View file

@ -7,6 +7,7 @@ import time
from aiohttp import ClientConnectorError from aiohttp import ClientConnectorError
from lbrynet import __version__ from lbrynet import __version__
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
from lbrynet.conf import Config from lbrynet.conf import Config
from lbrynet.schema.uri import parse_lbry_uri from lbrynet.schema.uri import parse_lbry_uri
from lbrynet.extras.daemon.client import daemon_rpc from lbrynet.extras.daemon.client import daemon_rpc
@ -65,7 +66,7 @@ async def wait_for_done(conf, uri):
files = await daemon_rpc(conf, "file_list", claim_name=name) files = await daemon_rpc(conf, "file_list", claim_name=name)
file = files[0] file = files[0]
if file['status'] in ['finished', 'stopped']: if file['status'] in ['finished', 'stopped']:
return True, f"{file['blobs_completed']}/{file['blobs_in_stream']}", int(file['blobs_completed']) return True, file['blobs_completed'], file['blobs_in_stream']
if last_complete < int(file['blobs_completed']): if last_complete < int(file['blobs_completed']):
hang_count = 0 hang_count = 0
last_complete = int(file['blobs_completed']) last_complete = int(file['blobs_completed'])
@ -73,7 +74,7 @@ async def wait_for_done(conf, uri):
hang_count += 1 hang_count += 1
await asyncio.sleep(1.0) await asyncio.sleep(1.0)
if hang_count > 10: if hang_count > 10:
return False, f"{file['blobs_completed']}/{file['blobs_in_stream']}", int(file['blobs_completed']) return False, file['blobs_completed'], file['blobs_in_stream']
async def main(uris=None): async def main(uris=None):
@ -111,21 +112,24 @@ async def main(uris=None):
first_byte = time.time() first_byte = time.time()
first_byte_times.append(first_byte - start) first_byte_times.append(first_byte - start)
print(f"{i + 1}/{len(resolvable)} - {first_byte - start} {uri}") print(f"{i + 1}/{len(resolvable)} - {first_byte - start} {uri}")
# downloaded, msg, blobs_in_stream = await wait_for_done(conf, uri) downloaded, amount_downloaded, blobs_in_stream = await wait_for_done(conf, uri)
# if downloaded: if downloaded:
# downloaded_times.append((time.time()-start) / downloaded) downloaded_times.append((time.time() - start) / downloaded)
# print(f"\tdownloaded {uri} @ {(time.time()-start) / blobs_in_stream} seconds per blob") else:
# else: download_failures.append(uri)
# print(f"\tfailed to download {uri}, got {msg}") print(f"downloaded {amount_downloaded}/{blobs_in_stream} blobs for {uri} at "
# download_failures.append(uri) f"{round((blobs_in_stream * (MAX_BLOB_SIZE - 1)) / (time.time() - start) / 1000000, 2)}mb/s\n")
except: except:
print(f"{i + 1}/{len(uris)} - timeout in {time.time() - start} {uri}") print(f"{i + 1}/{len(uris)} - failed to start {uri}")
failures.append(uri) failures.append(uri)
await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) return
# await daemon_rpc(conf, 'file_delete', delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name)
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
print("**********************************************") print("**********************************************")
result = f"Tried to start downloading {len(resolvable)} streams from the front page\n" \ result = f"Tried to start downloading {len(resolvable)} streams from the front page\n" \
f"Worst first byte time: {round(max(first_byte_times), 2)}\n" \
f"Best first byte time: {round(min(first_byte_times), 2)}\n" \
f"95% confidence time-to-first-byte: {confidence(first_byte_times, 1.984)}\n" \ f"95% confidence time-to-first-byte: {confidence(first_byte_times, 1.984)}\n" \
f"99% confidence time-to-first-byte: {confidence(first_byte_times, 2.626)}\n" \ f"99% confidence time-to-first-byte: {confidence(first_byte_times, 2.626)}\n" \
f"Variance: {variance(first_byte_times)}\n" \ f"Variance: {variance(first_byte_times)}\n" \