lbry-sdk/lbrynet/lbrylive/LiveStreamMetadataManager.py
2016-12-07 09:35:16 -05:00

390 lines
No EOL
16 KiB
Python

# pylint: skip-file
import time
import logging
from twisted.enterprise import adbapi
import os
import sqlite3
from twisted.internet import defer
from twisted.python.failure import Failure
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError
from lbrynet.core.sqlite_helpers import rerun_if_locked
log = logging.getLogger(__name__)
class DBLiveStreamMetadataManager(DHTHashSupplier):
"""This class stores all stream info in a leveldb database stored in the same directory as the blobfiles"""
def __init__(self, db_dir, hash_announcer):
DHTHashSupplier.__init__(self, hash_announcer)
self.db_dir = db_dir
self.db_conn = None
def setup(self):
return self._open_db()
def stop(self):
self.db_conn = None
return defer.succeed(True)
def get_all_streams(self):
return self._get_all_streams()
def save_stream(self, stream_hash, pub_key, file_name, key, blobs):
next_announce_time = time.time() + self.hash_reannounce_time
d = self._store_stream(stream_hash, pub_key, file_name, key,
next_announce_time=next_announce_time)
def save_blobs():
return self.add_blobs_to_stream(stream_hash, blobs)
def announce_have_stream():
if self.hash_announcer is not None:
self.hash_announcer.immediate_announce([stream_hash])
return stream_hash
d.addCallback(lambda _: save_blobs())
d.addCallback(lambda _: announce_have_stream())
return d
def get_stream_info(self, stream_hash):
return self._get_stream_info(stream_hash)
def check_if_stream_exists(self, stream_hash):
return self._check_if_stream_exists(stream_hash)
def delete_stream(self, stream_hash):
return self._delete_stream(stream_hash)
def add_blobs_to_stream(self, stream_hash, blobs):
return self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True)
def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False):
log.info("Getting blobs for a stream. Count is %s", str(count))
def get_positions_of_start_and_end():
if start_blob is not None:
d1 = self._get_blob_num_by_hash(stream_hash, start_blob)
else:
d1 = defer.succeed(None)
if end_blob is not None:
d2 = self._get_blob_num_by_hash(stream_hash, end_blob)
else:
d2 = defer.succeed(None)
dl = defer.DeferredList([d1, d2])
def get_positions(results):
start_num = None
end_num = None
if results[0][0] is True:
start_num = results[0][1]
if results[1][0] is True:
end_num = results[1][1]
return start_num, end_num
dl.addCallback(get_positions)
return dl
def get_blob_infos(nums):
start_num, end_num = nums
return self._get_further_blob_infos(stream_hash, start_num, end_num,
count, reverse)
d = get_positions_of_start_and_end()
d.addCallback(get_blob_infos)
return d
def get_stream_of_blob(self, blob_hash):
return self._get_stream_of_blobhash(blob_hash)
def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
return self._save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash)
def get_sd_blob_hashes_for_stream(self, stream_hash):
return self._get_sd_blob_hashes_for_stream(stream_hash)
def hashes_to_announce(self):
next_announce_time = time.time() + self.hash_reannounce_time
return self._get_streams_to_announce(next_announce_time)
######### database calls #########
def _open_db(self):
# check_same_thread=False is solely to quiet a spurious error that appears to be due
# to a bug in twisted, where the connection is closed by a different thread than the
# one that opened it. The individual connections in the pool are not used in multiple
# threads.
self.db_conn = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "live_stream.db"),
check_same_thread=False)
def create_tables(transaction):
transaction.execute("create table if not exists live_streams (" +
" stream_hash text primary key, " +
" public_key text, " +
" key text, " +
" stream_name text, " +
" next_announce_time real" +
")")
transaction.execute("create table if not exists live_stream_blobs (" +
" blob_hash text, " +
" stream_hash text, " +
" position integer, " +
" revision integer, " +
" iv text, " +
" length integer, " +
" signature text, " +
" foreign key(stream_hash) references live_streams(stream_hash)" +
")")
transaction.execute("create table if not exists live_stream_descriptors (" +
" sd_blob_hash TEXT PRIMARY KEY, " +
" stream_hash TEXT, " +
" foreign key(stream_hash) references live_streams(stream_hash)" +
")")
return self.db_conn.runInteraction(create_tables)
@rerun_if_locked
def _delete_stream(self, stream_hash):
d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,))
d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHashError(stream_hash)))
def do_delete(transaction, s_h):
transaction.execute("delete from live_streams where stream_hash = ?", (s_h,))
transaction.execute("delete from live_stream_blobs where stream_hash = ?", (s_h,))
transaction.execute("delete from live_stream_descriptors where stream_hash = ?", (s_h,))
d.addCallback(lambda s_h: self.db_conn.runInteraction(do_delete, s_h))
return d
@rerun_if_locked
def _store_stream(self, stream_hash, public_key, name, key, next_announce_time=None):
d = self.db_conn.runQuery("insert into live_streams values (?, ?, ?, ?, ?)",
(stream_hash, public_key, key, name, next_announce_time))
def check_duplicate(err):
if err.check(sqlite3.IntegrityError):
raise DuplicateStreamHashError(stream_hash)
return err
d.addErrback(check_duplicate)
return d
@rerun_if_locked
def _get_all_streams(self):
d = self.db_conn.runQuery("select stream_hash from live_streams")
d.addCallback(lambda results: [r[0] for r in results])
return d
@rerun_if_locked
def _get_stream_info(self, stream_hash):
d = self.db_conn.runQuery("select public_key, key, stream_name from live_streams where stream_hash = ?",
(stream_hash,))
d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHashError(stream_hash)))
return d
@rerun_if_locked
def _check_if_stream_exists(self, stream_hash):
d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,))
d.addCallback(lambda r: True if len(r) else False)
return d
@rerun_if_locked
def _get_streams_to_announce(self, next_announce_time):
def get_and_update(transaction):
timestamp = time.time()
r = transaction.execute("select stream_hash from live_streams where" +
" (next_announce_time is null or next_announce_time < ?) " +
" and stream_hash is not null", (timestamp, ))
s_hs = [s_h for s_h, in r.fetchall()]
transaction.execute("update live_streams set next_announce_time = ? where " +
" (next_announce_time is null or next_announce_time < ?)",
(next_announce_time, timestamp))
return s_hs
return self.db_conn.runInteraction(get_and_update)
@rerun_if_locked
def _get_blob_num_by_hash(self, stream_hash, blob_hash):
d = self.db_conn.runQuery("select position from live_stream_blobs where stream_hash = ? and blob_hash = ?",
(stream_hash, blob_hash))
d.addCallback(lambda r: r[0][0] if len(r) else None)
return d
@rerun_if_locked
def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False):
params = []
q_string = "select * from ("
q_string += " select blob_hash, position, revision, iv, length, signature from live_stream_blobs "
q_string += " where stream_hash = ? "
params.append(stream_hash)
if start_num is not None:
q_string += " and position > ? "
params.append(start_num)
if end_num is not None:
q_string += " and position < ? "
params.append(end_num)
q_string += " order by position "
if reverse is True:
q_string += " DESC "
if count is not None:
q_string += " limit ? "
params.append(count)
q_string += ") order by position"
# Order by position is done twice so that it always returns them from lowest position to
# greatest, but the limit by clause can select the 'count' greatest or 'count' least
return self.db_conn.runQuery(q_string, tuple(params))
@rerun_if_locked
def _add_blobs_to_stream(self, stream_hash, blob_infos, ignore_duplicate_error=False):
def add_blobs(transaction):
for blob_info in blob_infos:
try:
transaction.execute("insert into live_stream_blobs values (?, ?, ?, ?, ?, ?, ?)",
(blob_info.blob_hash, stream_hash, blob_info.blob_num,
blob_info.revision, blob_info.iv, blob_info.length,
blob_info.signature))
except sqlite3.IntegrityError:
if ignore_duplicate_error is False:
raise
return self.db_conn.runInteraction(add_blobs)
@rerun_if_locked
def _get_stream_of_blobhash(self, blob_hash):
d = self.db_conn.runQuery("select stream_hash from live_stream_blobs where blob_hash = ?",
(blob_hash,))
d.addCallback(lambda r: r[0][0] if len(r) else None)
return d
@rerun_if_locked
def _save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
return self.db_conn.runQuery("insert into live_stream_descriptors values (?, ?)",
(sd_blob_hash, stream_hash))
@rerun_if_locked
def _get_sd_blob_hashes_for_stream(self, stream_hash):
d = self.db_conn.runQuery("select sd_blob_hash from live_stream_descriptors where stream_hash = ?",
(stream_hash,))
d.addCallback(lambda results: [r[0] for r in results])
return d
class TempLiveStreamMetadataManager(DHTHashSupplier):
def __init__(self, hash_announcer):
DHTHashSupplier.__init__(self, hash_announcer)
self.streams = {}
self.stream_blobs = {}
self.stream_desc = {}
def setup(self):
return defer.succeed(True)
def stop(self):
return defer.succeed(True)
def get_all_streams(self):
return defer.succeed(self.streams.keys())
def save_stream(self, stream_hash, pub_key, file_name, key, blobs):
next_announce_time = time.time() + self.hash_reannounce_time
self.streams[stream_hash] = {'public_key': pub_key, 'stream_name': file_name,
'key': key, 'next_announce_time': next_announce_time}
d = self.add_blobs_to_stream(stream_hash, blobs)
def announce_have_stream():
if self.hash_announcer is not None:
self.hash_announcer.immediate_announce([stream_hash])
return stream_hash
d.addCallback(lambda _: announce_have_stream())
return d
def get_stream_info(self, stream_hash):
if stream_hash in self.streams:
stream_info = self.streams[stream_hash]
return defer.succeed([stream_info['public_key'], stream_info['key'], stream_info['stream_name']])
return defer.succeed(None)
def delete_stream(self, stream_hash):
if stream_hash in self.streams:
del self.streams[stream_hash]
for (s_h, b_h) in self.stream_blobs.keys():
if s_h == stream_hash:
del self.stream_blobs[(s_h, b_h)]
return defer.succeed(True)
def add_blobs_to_stream(self, stream_hash, blobs):
assert stream_hash in self.streams, "Can't add blobs to a stream that isn't known"
for blob in blobs:
info = {}
info['blob_num'] = blob.blob_num
info['length'] = blob.length
info['iv'] = blob.iv
info['revision'] = blob.revision
info['signature'] = blob.signature
self.stream_blobs[(stream_hash, blob.blob_hash)] = info
return defer.succeed(True)
def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False):
if start_blob is not None:
start_num = self._get_blob_num_by_hash(stream_hash, start_blob)
else:
start_num = None
if end_blob is not None:
end_num = self._get_blob_num_by_hash(stream_hash, end_blob)
else:
end_num = None
return self._get_further_blob_infos(stream_hash, start_num, end_num, count, reverse)
def get_stream_of_blob(self, blob_hash):
for (s_h, b_h) in self.stream_blobs.iterkeys():
if b_h == blob_hash:
return defer.succeed(s_h)
return defer.succeed(None)
def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False):
blob_infos = []
for (s_h, b_h), info in self.stream_blobs.iteritems():
if stream_hash == s_h:
position = info['blob_num']
length = info['length']
iv = info['iv']
revision = info['revision']
signature = info['signature']
if (start_num is None) or (position > start_num):
if (end_num is None) or (position < end_num):
blob_infos.append((b_h, position, revision, iv, length, signature))
blob_infos.sort(key=lambda i: i[1], reverse=reverse)
if count is not None:
blob_infos = blob_infos[:count]
return defer.succeed(blob_infos)
def _get_blob_num_by_hash(self, stream_hash, blob_hash):
if (stream_hash, blob_hash) in self.stream_blobs:
return self.stream_blobs[(stream_hash, blob_hash)]['blob_num']
def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
self.stream_desc[sd_blob_hash] = stream_hash
return defer.succeed(True)
def get_sd_blob_hashes_for_stream(self, stream_hash):
return defer.succeed([sd_hash for sd_hash, s_h in self.stream_desc.iteritems() if s_h == stream_hash])
def hashes_to_announce(self):
next_announce_time = time.time() + self.hash_reannounce_time
stream_hashes = []
current_time = time.time()
for stream_hash, stream_info in self.streams.iteritems():
announce_time = stream_info['announce_time']
if announce_time < current_time:
self.streams[stream_hash]['announce_time'] = next_announce_time
stream_hashes.append(stream_hash)
return stream_hashes