import time import logging from twisted.enterprise import adbapi import os import sqlite3 from twisted.internet import defer from twisted.python.failure import Failure from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError from lbrynet.core.sqlite_helpers import rerun_if_locked log = logging.getLogger(__name__) class DBLiveStreamMetadataManager(DHTHashSupplier): """This class stores all stream info in a leveldb database stored in the same directory as the blobfiles""" def __init__(self, db_dir, hash_announcer): DHTHashSupplier.__init__(self, hash_announcer) self.db_dir = db_dir self.db_conn = None def setup(self): return self._open_db() def stop(self): self.db_conn = None return defer.succeed(True) def get_all_streams(self): return self._get_all_streams() def save_stream(self, stream_hash, pub_key, file_name, key, blobs): next_announce_time = time.time() + self.hash_reannounce_time d = self._store_stream(stream_hash, pub_key, file_name, key, next_announce_time=next_announce_time) def save_blobs(): return self.add_blobs_to_stream(stream_hash, blobs) def announce_have_stream(): if self.hash_announcer is not None: self.hash_announcer.immediate_announce([stream_hash]) return stream_hash d.addCallback(lambda _: save_blobs()) d.addCallback(lambda _: announce_have_stream()) return d def get_stream_info(self, stream_hash): return self._get_stream_info(stream_hash) def check_if_stream_exists(self, stream_hash): return self._check_if_stream_exists(stream_hash) def delete_stream(self, stream_hash): return self._delete_stream(stream_hash) def add_blobs_to_stream(self, stream_hash, blobs): return self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True) def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False): log.info("Getting blobs for a stream. Count is %s", str(count)) def get_positions_of_start_and_end(): if start_blob is not None: d1 = self._get_blob_num_by_hash(stream_hash, start_blob) else: d1 = defer.succeed(None) if end_blob is not None: d2 = self._get_blob_num_by_hash(stream_hash, end_blob) else: d2 = defer.succeed(None) dl = defer.DeferredList([d1, d2]) def get_positions(results): start_num = None end_num = None if results[0][0] is True: start_num = results[0][1] if results[1][0] is True: end_num = results[1][1] return start_num, end_num dl.addCallback(get_positions) return dl def get_blob_infos(nums): start_num, end_num = nums return self._get_further_blob_infos(stream_hash, start_num, end_num, count, reverse) d = get_positions_of_start_and_end() d.addCallback(get_blob_infos) return d def get_stream_of_blob(self, blob_hash): return self._get_stream_of_blobhash(blob_hash) def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash): return self._save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash) def get_sd_blob_hashes_for_stream(self, stream_hash): return self._get_sd_blob_hashes_for_stream(stream_hash) def hashes_to_announce(self): next_announce_time = time.time() + self.hash_reannounce_time return self._get_streams_to_announce(next_announce_time) ######### database calls ######### def _open_db(self): # check_same_thread=False is solely to quiet a spurious error that appears to be due # to a bug in twisted, where the connection is closed by a different thread than the # one that opened it. The individual connections in the pool are not used in multiple # threads. self.db_conn = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "live_stream.db"), check_same_thread=False) def create_tables(transaction): transaction.execute("create table if not exists live_streams (" + " stream_hash text primary key, " + " public_key text, " + " key text, " + " stream_name text, " + " next_announce_time real" + ")") transaction.execute("create table if not exists live_stream_blobs (" + " blob_hash text, " + " stream_hash text, " + " position integer, " + " revision integer, " + " iv text, " + " length integer, " + " signature text, " + " foreign key(stream_hash) references live_streams(stream_hash)" + ")") transaction.execute("create table if not exists live_stream_descriptors (" + " sd_blob_hash TEXT PRIMARY KEY, " + " stream_hash TEXT, " + " foreign key(stream_hash) references live_streams(stream_hash)" + ")") return self.db_conn.runInteraction(create_tables) @rerun_if_locked def _delete_stream(self, stream_hash): d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,)) d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHashError(stream_hash))) def do_delete(transaction, s_h): transaction.execute("delete from live_streams where stream_hash = ?", (s_h,)) transaction.execute("delete from live_stream_blobs where stream_hash = ?", (s_h,)) transaction.execute("delete from live_stream_descriptors where stream_hash = ?", (s_h,)) d.addCallback(lambda s_h: self.db_conn.runInteraction(do_delete, s_h)) return d @rerun_if_locked def _store_stream(self, stream_hash, public_key, name, key, next_announce_time=None): d = self.db_conn.runQuery("insert into live_streams values (?, ?, ?, ?, ?)", (stream_hash, public_key, key, name, next_announce_time)) def check_duplicate(err): if err.check(sqlite3.IntegrityError): raise DuplicateStreamHashError(stream_hash) return err d.addErrback(check_duplicate) return d @rerun_if_locked def _get_all_streams(self): d = self.db_conn.runQuery("select stream_hash from live_streams") d.addCallback(lambda results: [r[0] for r in results]) return d @rerun_if_locked def _get_stream_info(self, stream_hash): d = self.db_conn.runQuery("select public_key, key, stream_name from live_streams where stream_hash = ?", (stream_hash,)) d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHashError(stream_hash))) return d @rerun_if_locked def _check_if_stream_exists(self, stream_hash): d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,)) d.addCallback(lambda r: True if len(r) else False) return d @rerun_if_locked def _get_streams_to_announce(self, next_announce_time): def get_and_update(transaction): timestamp = time.time() r = transaction.execute("select stream_hash from live_streams where" + " (next_announce_time is null or next_announce_time < ?) " + " and stream_hash is not null", (timestamp, )) s_hs = [s_h for s_h, in r.fetchall()] transaction.execute("update live_streams set next_announce_time = ? where " + " (next_announce_time is null or next_announce_time < ?)", (next_announce_time, timestamp)) return s_hs return self.db_conn.runInteraction(get_and_update) @rerun_if_locked def _get_blob_num_by_hash(self, stream_hash, blob_hash): d = self.db_conn.runQuery("select position from live_stream_blobs where stream_hash = ? and blob_hash = ?", (stream_hash, blob_hash)) d.addCallback(lambda r: r[0][0] if len(r) else None) return d @rerun_if_locked def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False): params = [] q_string = "select * from (" q_string += " select blob_hash, position, revision, iv, length, signature from live_stream_blobs " q_string += " where stream_hash = ? " params.append(stream_hash) if start_num is not None: q_string += " and position > ? " params.append(start_num) if end_num is not None: q_string += " and position < ? " params.append(end_num) q_string += " order by position " if reverse is True: q_string += " DESC " if count is not None: q_string += " limit ? " params.append(count) q_string += ") order by position" # Order by position is done twice so that it always returns them from lowest position to # greatest, but the limit by clause can select the 'count' greatest or 'count' least return self.db_conn.runQuery(q_string, tuple(params)) @rerun_if_locked def _add_blobs_to_stream(self, stream_hash, blob_infos, ignore_duplicate_error=False): def add_blobs(transaction): for blob_info in blob_infos: try: transaction.execute("insert into live_stream_blobs values (?, ?, ?, ?, ?, ?, ?)", (blob_info.blob_hash, stream_hash, blob_info.blob_num, blob_info.revision, blob_info.iv, blob_info.length, blob_info.signature)) except sqlite3.IntegrityError: if ignore_duplicate_error is False: raise return self.db_conn.runInteraction(add_blobs) @rerun_if_locked def _get_stream_of_blobhash(self, blob_hash): d = self.db_conn.runQuery("select stream_hash from live_stream_blobs where blob_hash = ?", (blob_hash,)) d.addCallback(lambda r: r[0][0] if len(r) else None) return d @rerun_if_locked def _save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash): return self.db_conn.runQuery("insert into live_stream_descriptors values (?, ?)", (sd_blob_hash, stream_hash)) @rerun_if_locked def _get_sd_blob_hashes_for_stream(self, stream_hash): d = self.db_conn.runQuery("select sd_blob_hash from live_stream_descriptors where stream_hash = ?", (stream_hash,)) d.addCallback(lambda results: [r[0] for r in results]) return d class TempLiveStreamMetadataManager(DHTHashSupplier): def __init__(self, hash_announcer): DHTHashSupplier.__init__(self, hash_announcer) self.streams = {} self.stream_blobs = {} self.stream_desc = {} def setup(self): return defer.succeed(True) def stop(self): return defer.succeed(True) def get_all_streams(self): return defer.succeed(self.streams.keys()) def save_stream(self, stream_hash, pub_key, file_name, key, blobs): next_announce_time = time.time() + self.hash_reannounce_time self.streams[stream_hash] = {'public_key': pub_key, 'stream_name': file_name, 'key': key, 'next_announce_time': next_announce_time} d = self.add_blobs_to_stream(stream_hash, blobs) def announce_have_stream(): if self.hash_announcer is not None: self.hash_announcer.immediate_announce([stream_hash]) return stream_hash d.addCallback(lambda _: announce_have_stream()) return d def get_stream_info(self, stream_hash): if stream_hash in self.streams: stream_info = self.streams[stream_hash] return defer.succeed([stream_info['public_key'], stream_info['key'], stream_info['stream_name']]) return defer.succeed(None) def delete_stream(self, stream_hash): if stream_hash in self.streams: del self.streams[stream_hash] for (s_h, b_h) in self.stream_blobs.keys(): if s_h == stream_hash: del self.stream_blobs[(s_h, b_h)] return defer.succeed(True) def add_blobs_to_stream(self, stream_hash, blobs): assert stream_hash in self.streams, "Can't add blobs to a stream that isn't known" for blob in blobs: info = {} info['blob_num'] = blob.blob_num info['length'] = blob.length info['iv'] = blob.iv info['revision'] = blob.revision info['signature'] = blob.signature self.stream_blobs[(stream_hash, blob.blob_hash)] = info return defer.succeed(True) def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False): if start_blob is not None: start_num = self._get_blob_num_by_hash(stream_hash, start_blob) else: start_num = None if end_blob is not None: end_num = self._get_blob_num_by_hash(stream_hash, end_blob) else: end_num = None return self._get_further_blob_infos(stream_hash, start_num, end_num, count, reverse) def get_stream_of_blob(self, blob_hash): for (s_h, b_h) in self.stream_blobs.iterkeys(): if b_h == blob_hash: return defer.succeed(s_h) return defer.succeed(None) def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False): blob_infos = [] for (s_h, b_h), info in self.stream_blobs.iteritems(): if stream_hash == s_h: position = info['blob_num'] length = info['length'] iv = info['iv'] revision = info['revision'] signature = info['signature'] if (start_num is None) or (position > start_num): if (end_num is None) or (position < end_num): blob_infos.append((b_h, position, revision, iv, length, signature)) blob_infos.sort(key=lambda i: i[1], reverse=reverse) if count is not None: blob_infos = blob_infos[:count] return defer.succeed(blob_infos) def _get_blob_num_by_hash(self, stream_hash, blob_hash): if (stream_hash, blob_hash) in self.stream_blobs: return self.stream_blobs[(stream_hash, blob_hash)]['blob_num'] def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash): self.stream_desc[sd_blob_hash] = stream_hash return defer.succeed(True) def get_sd_blob_hashes_for_stream(self, stream_hash): return defer.succeed([sd_hash for sd_hash, s_h in self.stream_desc.iteritems() if s_h == stream_hash]) def hashes_to_announce(self): next_announce_time = time.time() + self.hash_reannounce_time stream_hashes = [] current_time = time.time() for stream_hash, stream_info in self.streams.iteritems(): announce_time = stream_info['announce_time'] if announce_time < current_time: self.streams[stream_hash]['announce_time'] = next_announce_time stream_hashes.append(stream_hash) return stream_hashes