import logging import os import sqlite3 import traceback from binascii import hexlify, unhexlify from decimal import Decimal from twisted.internet import defer, task, threads from twisted.enterprise import adbapi from lbryschema.claim import ClaimDict from lbryschema.decode import smart_decode from lbrynet import conf from lbrynet.cryptstream.CryptBlob import CryptBlobInfo from lbrynet.dht.constants import dataExpireTimeout from torba.constants import COIN log = logging.getLogger(__name__) def _get_next_available_file_name(download_directory, file_name): base_name, ext = os.path.splitext(file_name) i = 0 while os.path.isfile(os.path.join(download_directory, file_name)): i += 1 file_name = "%s_%i%s" % (base_name, i, ext) return os.path.join(download_directory, file_name) def _open_file_for_writing(download_directory, suggested_file_name): file_path = _get_next_available_file_name(download_directory, suggested_file_name) try: file_handle = open(file_path, 'wb') file_handle.close() except IOError: log.error(traceback.format_exc()) raise ValueError( "Failed to open %s. Make sure you have permission to save files to that location." % file_path ) return os.path.basename(file_path) def open_file_for_writing(download_directory, suggested_file_name): """ Used to touch the path of a file to be downloaded :param download_directory: (str) :param suggested_file_name: (str) :return: (str) basename """ return threads.deferToThread(_open_file_for_writing, download_directory, suggested_file_name) def rerun_if_locked(f): max_attempts = 5 def rerun(err, rerun_count, *args, **kwargs): connection = args[0] reactor = connection.reactor log.debug("Failed to execute (%s): %s", err, args) if err.check(sqlite3.OperationalError) and "database is locked" in str(err.value): log.warning("database was locked. rerunning %s with args %s, kwargs %s", str(f), str(args), str(kwargs)) if rerun_count < max_attempts: delay = 2**rerun_count return task.deferLater(reactor, delay, inner_wrapper, rerun_count + 1, *args, **kwargs) raise err def check_needed_rerun(result, rerun_count): if rerun_count: log.info("successfully reran database query") return result def inner_wrapper(rerun_count, *args, **kwargs): d = f(*args, **kwargs) d.addCallback(check_needed_rerun, rerun_count) d.addErrback(rerun, rerun_count, *args, **kwargs) return d def wrapper(*args, **kwargs): return inner_wrapper(0, *args, **kwargs) return wrapper class SqliteConnection(adbapi.ConnectionPool): def __init__(self, db_path): super().__init__('sqlite3', db_path, check_same_thread=False) @rerun_if_locked def runInteraction(self, interaction, *args, **kw): return super().runInteraction(interaction, *args, **kw) @classmethod def set_reactor(cls, reactor): cls.reactor = reactor class SQLiteStorage: CREATE_TABLES_QUERY = """ pragma foreign_keys=on; pragma journal_mode=WAL; create table if not exists blob ( blob_hash char(96) primary key not null, blob_length integer not null, next_announce_time integer not null, should_announce integer not null default 0, status text not null, last_announced_time integer, single_announce integer ); create table if not exists stream ( stream_hash char(96) not null primary key, sd_hash char(96) not null references blob, stream_key text not null, stream_name text not null, suggested_filename text not null ); create table if not exists stream_blob ( stream_hash char(96) not null references stream, blob_hash char(96) references blob, position integer not null, iv char(32) not null, primary key (stream_hash, blob_hash) ); create table if not exists claim ( claim_outpoint text not null primary key, claim_id char(40) not null, claim_name text not null, amount integer not null, height integer not null, serialized_metadata blob not null, channel_claim_id text, address text not null, claim_sequence integer not null ); create table if not exists file ( stream_hash text primary key not null references stream, file_name text not null, download_directory text not null, blob_data_rate real not null, status text not null ); create table if not exists content_claim ( stream_hash text unique not null references file, claim_outpoint text not null references claim, primary key (stream_hash, claim_outpoint) ); create table if not exists support ( support_outpoint text not null primary key, claim_id text not null, amount integer not null, address text not null ); create table if not exists reflected_stream ( sd_hash text not null, reflector_address text not null, timestamp integer, primary key (sd_hash, reflector_address) ); """ def __init__(self, db_dir, reactor=None): if not reactor: from twisted.internet import reactor self.db_dir = db_dir self._db_path = os.path.join(db_dir, "lbrynet.sqlite") log.info("connecting to database: %s", self._db_path) self.db = SqliteConnection(self._db_path) self.db.set_reactor(reactor) self.clock = reactor # used to refresh the claim attributes on a ManagedEncryptedFileDownloader when a # change to the associated content claim occurs. these are added by the file manager # when it loads each file self.content_claim_callbacks = {} # {: } self.check_should_announce_lc = None if 'reflector' not in conf.settings['components_to_skip']: self.check_should_announce_lc = task.LoopingCall(self.verify_will_announce_all_head_and_sd_blobs) @defer.inlineCallbacks def setup(self): def _create_tables(transaction): transaction.executescript(self.CREATE_TABLES_QUERY) yield self.db.runInteraction(_create_tables) if self.check_should_announce_lc and not self.check_should_announce_lc.running: self.check_should_announce_lc.start(600) defer.returnValue(None) @defer.inlineCallbacks def run_and_return_one_or_none(self, query, *args): result = yield self.db.runQuery(query, args) if result: defer.returnValue(result[0][0]) else: defer.returnValue(None) @defer.inlineCallbacks def run_and_return_list(self, query, *args): result = yield self.db.runQuery(query, args) if result: defer.returnValue([i[0] for i in result]) else: defer.returnValue([]) def run_and_return_id(self, query, *args): def do_save(t): t.execute(query, args) return t.lastrowid return self.db.runInteraction(do_save) def stop(self): if self.check_should_announce_lc and self.check_should_announce_lc.running: self.check_should_announce_lc.stop() self.db.close() return defer.succeed(True) # # # # # # # # # blob functions # # # # # # # # # def add_completed_blob(self, blob_hash, length, next_announce_time, should_announce, status="finished"): log.debug("Adding a completed blob. blob_hash=%s, length=%i", blob_hash, length) values = (blob_hash, length, next_announce_time or 0, int(bool(should_announce)), status, 0, 0) return self.db.runOperation("insert or replace into blob values (?, ?, ?, ?, ?, ?, ?)", values) def set_should_announce(self, blob_hash, next_announce_time, should_announce): return self.db.runOperation( "update blob set next_announce_time=?, should_announce=? where blob_hash=?", (next_announce_time or 0, int(bool(should_announce)), blob_hash) ) def get_blob_status(self, blob_hash): return self.run_and_return_one_or_none( "select status from blob where blob_hash=?", blob_hash ) @defer.inlineCallbacks def add_known_blob(self, blob_hash, length): yield self.db.runOperation( "insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)", (blob_hash, length, 0, 0, "pending", 0, 0) ) def should_announce(self, blob_hash): return self.run_and_return_one_or_none( "select should_announce from blob where blob_hash=?", blob_hash ) def count_should_announce_blobs(self): return self.run_and_return_one_or_none( "select count(*) from blob where should_announce=1 and status='finished'" ) def get_all_should_announce_blobs(self): return self.run_and_return_list( "select blob_hash from blob where should_announce=1 and status='finished'" ) @defer.inlineCallbacks def get_all_finished_blobs(self): blob_hashes = yield self.run_and_return_list( "select blob_hash from blob where status='finished'" ) defer.returnValue([unhexlify(blob_hash) for blob_hash in blob_hashes]) def count_finished_blobs(self): return self.run_and_return_one_or_none( "select count(*) from blob where status='finished'" ) def update_last_announced_blob(self, blob_hash, last_announced): return self.db.runOperation( "update blob set next_announce_time=?, last_announced_time=?, single_announce=0 where blob_hash=?", (int(last_announced + (dataExpireTimeout / 2)), int(last_announced), blob_hash) ) def should_single_announce_blobs(self, blob_hashes, immediate=False): def set_single_announce(transaction): now = self.clock.seconds() for blob_hash in blob_hashes: if immediate: transaction.execute( "update blob set single_announce=1, next_announce_time=? " "where blob_hash=? and status='finished'", (int(now), blob_hash) ) else: transaction.execute( "update blob set single_announce=1 where blob_hash=? and status='finished'", (blob_hash, ) ) return self.db.runInteraction(set_single_announce) def get_blobs_to_announce(self): def get_and_update(transaction): timestamp = self.clock.seconds() if conf.settings['announce_head_blobs_only']: r = transaction.execute( "select blob_hash from blob " "where blob_hash is not null and " "(should_announce=1 or single_announce=1) and next_announce_time