diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py new file mode 100644 index 000000000..1c8557418 --- /dev/null +++ b/lbrynet/database/storage.py @@ -0,0 +1,645 @@ +import logging +import os +import time +import sqlite3 +import traceback +from decimal import Decimal +from twisted.internet import defer, task, reactor, threads +from twisted.enterprise import adbapi + +from lbryschema.claim import ClaimDict +from lbryschema.decode import smart_decode +from lbrynet import conf +from lbrynet.cryptstream.CryptBlob import CryptBlobInfo +from lbryum.constants import COIN + +log = logging.getLogger(__name__) + + +def _get_next_available_file_name(download_directory, file_name): + base_name, ext = os.path.splitext(file_name or "_") + if ext: + ext = ".%s" % ext + i = 0 + while os.path.isfile(os.path.join(download_directory, file_name)): + i += 1 + file_name = "%s_%i%s" % (base_name, i, ext) + return os.path.join(download_directory, file_name) + + +def _open_file_for_writing(download_directory, suggested_file_name): + file_path = _get_next_available_file_name(download_directory, suggested_file_name) + try: + file_handle = open(file_path, 'wb') + file_handle.close() + except IOError: + log.error(traceback.format_exc()) + raise ValueError( + "Failed to open %s. Make sure you have permission to save files to that location." % file_path + ) + return os.path.basename(file_path) + + +def open_file_for_writing(download_directory, suggested_file_name): + """ + Used to touch the path of a file to be downloaded + + :param download_directory: (str) + :param suggested_file_name: (str) + :return: (str) basename + """ + return threads.deferToThread(_open_file_for_writing, download_directory, suggested_file_name) + + +def get_next_announce_time(hash_announcer, num_hashes_to_announce=1, min_reannounce_time=60*60, + single_announce_duration=5): + """ + Hash reannounce time is set to current time + MIN_HASH_REANNOUNCE_TIME, + unless we are announcing a lot of hashes at once which could cause the + the announce queue to pile up. To prevent pile up, reannounce + only after a conservative estimate of when it will finish + to announce all the hashes. + + Args: + num_hashes_to_announce: number of hashes that will be added to the queue + Returns: + timestamp for next announce time + """ + queue_size = hash_announcer.hash_queue_size() + num_hashes_to_announce + reannounce = max(min_reannounce_time, + queue_size * single_announce_duration) + return time.time() + reannounce + + +def rerun_if_locked(f): + max_attempts = 3 + + def rerun(err, rerun_count, *args, **kwargs): + log.debug("Failed to execute (%s): %s", err, args) + if err.check(sqlite3.OperationalError) and err.value.message == "database is locked": + log.warning("database was locked. rerunning %s with args %s, kwargs %s", + str(f), str(args), str(kwargs)) + if rerun_count < max_attempts: + return task.deferLater(reactor, 0, inner_wrapper, rerun_count + 1, *args, **kwargs) + raise err + + def inner_wrapper(rerun_count, *args, **kwargs): + d = f(*args, **kwargs) + d.addErrback(rerun, rerun_count, *args, **kwargs) + return d + + def wrapper(*args, **kwargs): + return inner_wrapper(0, *args, **kwargs) + + return wrapper + + +class SqliteConnection(adbapi.ConnectionPool): + def __init__(self, db_path): + adbapi.ConnectionPool.__init__(self, 'sqlite3', db_path, check_same_thread=False) + + @rerun_if_locked + def runInteraction(self, interaction, *args, **kw): + return adbapi.ConnectionPool.runInteraction(self, interaction, *args, **kw) + + +class SQLiteStorage(object): + CREATE_TABLES_QUERY = """ + pragma foreign_keys=on; + pragma journal_mode=WAL; + + create table if not exists blob ( + blob_hash char(96) primary key not null, + blob_length integer not null, + next_announce_time integer not null, + should_announce integer not null default 0, + status text not null + ); + + create table if not exists stream ( + stream_hash char(96) not null primary key, + sd_hash char(96) not null, + stream_key text not null, + stream_name text not null, + suggested_filename text not null, + foreign key(sd_hash) references blob(blob_hash) + ); + + create table if not exists stream_blob ( + stream_hash char(96) not null, + blob_hash char(96), + position integer not null, + iv char(32) not null, + primary key (stream_hash, blob_hash), + foreign key(stream_hash) references stream(stream_hash), + foreign key (blob_hash) references blob(blob_hash) + ); + + create table if not exists claim ( + claim_outpoint text not null primary key, + claim_id char(40) not null, + claim_name text not null, + amount integer not null, + height integer not null, + serialized_metadata blob not null, + channel_claim_id text, + address text not null, + claim_sequence integer not null + ); + + create table if not exists file ( + stream_hash text primary key not null, + file_name text not null, + download_directory text not null, + blob_data_rate real not null, + status text not null, + foreign key(stream_hash) references stream(stream_hash) + ); + + create table if not exists content_claim ( + stream_hash text unique not null, + claim_outpoint text not null, + primary key (stream_hash, claim_outpoint), + foreign key (claim_outpoint) references claim(claim_outpoint), + foreign key(stream_hash) references file(stream_hash) + ); + + create table if not exists support ( + support_outpoint text not null primary key, + claim_id text not null, + amount integer not null, + address text not null + ); + """ + + def __init__(self, db_dir): + self.db_dir = db_dir + self._db_path = os.path.join(db_dir, "lbrynet.sqlite") + log.info("connecting to database: %s", self._db_path) + self.db = SqliteConnection(self._db_path) + + def setup(self): + def _create_tables(transaction): + transaction.executescript(self.CREATE_TABLES_QUERY) + return self.db.runInteraction(_create_tables) + + @defer.inlineCallbacks + def run_and_return_one_or_none(self, query, *args): + result = yield self.db.runQuery(query, args) + if result: + defer.returnValue(result[0][0]) + else: + defer.returnValue(None) + + @defer.inlineCallbacks + def run_and_return_list(self, query, *args): + result = yield self.db.runQuery(query, args) + if result: + defer.returnValue([i[0] for i in result]) + else: + defer.returnValue([]) + + def stop(self): + self.db.close() + return defer.succeed(True) + + # # # # # # # # # blob functions # # # # # # # # # + + @defer.inlineCallbacks + def add_completed_blob(self, blob_hash, length, next_announce_time, should_announce): + log.debug("Adding a completed blob. blob_hash=%s, length=%i", blob_hash, length) + yield self.add_known_blob(blob_hash, length) + yield self.set_blob_status(blob_hash, "finished") + yield self.set_should_announce(blob_hash, next_announce_time, should_announce) + yield self.db.runOperation( + "update blob set blob_length=? where blob_hash=?", (length, blob_hash) + ) + + def set_should_announce(self, blob_hash, next_announce_time, should_announce): + should_announce = 1 if should_announce else 0 + return self.db.runOperation( + "update blob set next_announce_time=?, should_announce=? where blob_hash=?", + (next_announce_time, should_announce, blob_hash) + ) + + def set_blob_status(self, blob_hash, status): + return self.db.runOperation( + "update blob set status=? where blob_hash=?", (status, blob_hash) + ) + + def get_blob_status(self, blob_hash): + return self.run_and_return_one_or_none( + "select status from blob where blob_hash=?", blob_hash + ) + + @defer.inlineCallbacks + def add_known_blob(self, blob_hash, length): + status = yield self.get_blob_status(blob_hash) + if status is None: + status = "pending" + yield self.db.runOperation("insert into blob values (?, ?, ?, ?, ?)", + (blob_hash, length, 0, 0, status)) + defer.returnValue(status) + + def should_announce(self, blob_hash): + return self.run_and_return_one_or_none( + "select should_announce from blob where blob_hash=?", blob_hash + ) + + def count_should_announce_blobs(self): + return self.run_and_return_one_or_none( + "select count(*) from blob where should_announce=1 and status=?", "finished" + ) + + def get_all_should_announce_blobs(self): + return self.run_and_return_list( + "select blob_hash from blob where should_announce=1 and status=?", "finished" + ) + + def get_blobs_to_announce(self, hash_announcer): + def get_and_update(transaction): + timestamp = time.time() + if conf.settings['announce_head_blobs_only']: + r = transaction.execute( + "select blob_hash from blob " + "where blob_hash is not null and should_announce=1 and next_announce_time