delete live lbrylive livestreaming code
This commit is contained in:
parent
4c122f4ab0
commit
a2eb0cad33
16 changed files with 0 additions and 2112 deletions
|
@ -1,24 +0,0 @@
|
|||
# pylint: skip-file
|
||||
from lbrynet.cryptstream.CryptBlob import CryptStreamBlobMaker, CryptBlobInfo
|
||||
import binascii
|
||||
|
||||
|
||||
class LiveBlobInfo(CryptBlobInfo):
|
||||
def __init__(self, blob_hash, blob_num, length, iv, revision, signature):
|
||||
CryptBlobInfo.__init__(self, blob_hash, blob_num, length, iv)
|
||||
self.revision = revision
|
||||
self.signature = signature
|
||||
|
||||
|
||||
class LiveStreamBlobMaker(CryptStreamBlobMaker):
|
||||
def __init__(self, key, iv, blob_num, blob):
|
||||
CryptStreamBlobMaker.__init__(self, key, iv, blob_num, blob)
|
||||
# The following is a placeholder for a currently unimplemented feature.
|
||||
# In the future it may be possible for the live stream creator to overwrite a blob
|
||||
# with a newer revision. If that happens, the 0 will be incremented to the
|
||||
# actual revision count
|
||||
self.revision = 0
|
||||
|
||||
def _return_info(self, blob_hash):
|
||||
return LiveBlobInfo(blob_hash, self.blob_num, self.length, binascii.hexlify(self.iv),
|
||||
self.revision, None)
|
|
@ -1,176 +0,0 @@
|
|||
# pylint: skip-file
|
||||
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter
|
||||
from lbrynet.lbrylive.StreamDescriptor import get_sd_info
|
||||
from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator
|
||||
from lbrynet.lbrylive.LiveBlob import LiveStreamBlobMaker
|
||||
from lbrynet.core.cryptoutils import get_lbry_hash_obj, get_pub_key, sign_with_pass_phrase
|
||||
from Crypto import Random
|
||||
import binascii
|
||||
import logging
|
||||
from lbrynet import conf
|
||||
from twisted.internet import interfaces, defer
|
||||
from twisted.protocols.basic import FileSender
|
||||
from zope.interface import implements
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LiveStreamCreator(CryptStreamCreator):
|
||||
def __init__(self, blob_manager, stream_info_manager, name=None, key=None, iv_generator=None,
|
||||
delete_after_num=None, secret_pass_phrase=None):
|
||||
CryptStreamCreator.__init__(self, blob_manager, name, key, iv_generator)
|
||||
self.stream_hash = None
|
||||
self.stream_info_manager = stream_info_manager
|
||||
self.delete_after_num = delete_after_num
|
||||
self.secret_pass_phrase = secret_pass_phrase
|
||||
self.file_extension = conf.settings['CRYPTSD_FILE_EXTENSION']
|
||||
self.finished_blob_hashes = {}
|
||||
|
||||
def _save_stream(self):
|
||||
d = self.stream_info_manager.save_stream(self.stream_hash, get_pub_key(self.secret_pass_phrase),
|
||||
binascii.hexlify(self.name), binascii.hexlify(self.key),
|
||||
[])
|
||||
return d
|
||||
|
||||
def _blob_finished(self, blob_info):
|
||||
log.debug("In blob_finished")
|
||||
log.debug("length: %s", str(blob_info.length))
|
||||
sig_hash = get_lbry_hash_obj()
|
||||
sig_hash.update(self.stream_hash)
|
||||
if blob_info.length != 0:
|
||||
sig_hash.update(blob_info.blob_hash)
|
||||
sig_hash.update(str(blob_info.blob_num))
|
||||
sig_hash.update(str(blob_info.revision))
|
||||
sig_hash.update(blob_info.iv)
|
||||
sig_hash.update(str(blob_info.length))
|
||||
signature = sign_with_pass_phrase(sig_hash.digest(), self.secret_pass_phrase)
|
||||
blob_info.signature = signature
|
||||
self.finished_blob_hashes[blob_info.blob_num] = blob_info.blob_hash
|
||||
if self.delete_after_num is not None:
|
||||
self._delete_old_blobs(blob_info.blob_num)
|
||||
d = self.stream_info_manager.add_blobs_to_stream(self.stream_hash, [blob_info])
|
||||
|
||||
def log_add_error(err):
|
||||
log.error("An error occurred adding a blob info to the stream info manager: %s", err.getErrorMessage())
|
||||
return err
|
||||
|
||||
d.addErrback(log_add_error)
|
||||
log.debug("returning from blob_finished")
|
||||
return d
|
||||
|
||||
def setup(self):
|
||||
"""Create the secret pass phrase if it wasn't provided, compute the stream hash,
|
||||
save the stream to the stream info manager, and return the stream hash
|
||||
"""
|
||||
if self.secret_pass_phrase is None:
|
||||
self.secret_pass_phrase = Random.new().read(512)
|
||||
|
||||
d = CryptStreamCreator.setup(self)
|
||||
|
||||
def make_stream_hash():
|
||||
hashsum = get_lbry_hash_obj()
|
||||
hashsum.update(binascii.hexlify(self.name))
|
||||
hashsum.update(get_pub_key(self.secret_pass_phrase))
|
||||
hashsum.update(binascii.hexlify(self.key))
|
||||
self.stream_hash = hashsum.hexdigest()
|
||||
return self.stream_hash
|
||||
|
||||
d.addCallback(lambda _: make_stream_hash())
|
||||
d.addCallback(lambda _: self._save_stream())
|
||||
d.addCallback(lambda _: self.stream_hash)
|
||||
return d
|
||||
|
||||
def publish_stream_descriptor(self):
|
||||
descriptor_writer = BlobStreamDescriptorWriter(self.blob_manager)
|
||||
d = get_sd_info(self.stream_info_manager, self.stream_hash, False)
|
||||
d.addCallback(descriptor_writer.create_descriptor)
|
||||
return d
|
||||
|
||||
def _delete_old_blobs(self, newest_blob_num):
|
||||
assert self.delete_after_num is not None, "_delete_old_blobs called with delete_after_num=None"
|
||||
oldest_to_keep = newest_blob_num - self.delete_after_num + 1
|
||||
nums_to_delete = [num for num in self.finished_blob_hashes.iterkeys() if num < oldest_to_keep]
|
||||
for num in nums_to_delete:
|
||||
self.blob_manager.delete_blobs([self.finished_blob_hashes[num]])
|
||||
del self.finished_blob_hashes[num]
|
||||
|
||||
def _get_blob_maker(self, iv, blob_creator):
|
||||
return LiveStreamBlobMaker(self.key, iv, self.blob_count, blob_creator)
|
||||
|
||||
|
||||
class StdOutLiveStreamCreator(LiveStreamCreator):
|
||||
def __init__(self, stream_name, blob_manager, stream_info_manager):
|
||||
LiveStreamCreator.__init__(self, blob_manager, stream_info_manager, stream_name,
|
||||
delete_after_num=20)
|
||||
|
||||
def start_streaming(self):
|
||||
stdin_producer = StdinStreamProducer(self)
|
||||
d = stdin_producer.begin_producing()
|
||||
|
||||
def stop_stream():
|
||||
d = self.stop()
|
||||
return d
|
||||
|
||||
d.addCallback(lambda _: stop_stream())
|
||||
return d
|
||||
|
||||
|
||||
class FileLiveStreamCreator(LiveStreamCreator):
|
||||
def __init__(self, blob_manager, stream_info_manager, file_name, file_handle,
|
||||
secret_pass_phrase=None, key=None, iv_generator=None, stream_name=None):
|
||||
if stream_name is None:
|
||||
stream_name = file_name
|
||||
LiveStreamCreator.__init__(self, blob_manager, stream_info_manager, stream_name,
|
||||
secret_pass_phrase, key, iv_generator)
|
||||
self.file_name = file_name
|
||||
self.file_handle = file_handle
|
||||
|
||||
def start_streaming(self):
|
||||
file_sender = FileSender()
|
||||
d = file_sender.beginFileTransfer(self.file_handle, self)
|
||||
|
||||
def stop_stream():
|
||||
d = self.stop()
|
||||
return d
|
||||
|
||||
d.addCallback(lambda _: stop_stream())
|
||||
return d
|
||||
|
||||
|
||||
class StdinStreamProducer(object):
|
||||
"""This class reads data from standard in and sends it to a stream creator"""
|
||||
|
||||
implements(interfaces.IPushProducer)
|
||||
|
||||
def __init__(self, consumer):
|
||||
self.consumer = consumer
|
||||
self.reader = None
|
||||
self.finished_deferred = None
|
||||
|
||||
def begin_producing(self):
|
||||
|
||||
self.finished_deferred = defer.Deferred()
|
||||
self.consumer.registerProducer(self, True)
|
||||
self.resumeProducing()
|
||||
return self.finished_deferred
|
||||
|
||||
def resumeProducing(self):
|
||||
if self.reader is not None:
|
||||
self.reader.resumeProducing()
|
||||
|
||||
def stopProducing(self):
|
||||
if self.reader is not None:
|
||||
self.reader.stopReading()
|
||||
self.consumer.unregisterProducer()
|
||||
self.finished_deferred.callback(True)
|
||||
|
||||
def pauseProducing(self):
|
||||
if self.reader is not None:
|
||||
self.reader.pauseProducing()
|
||||
|
||||
def childDataReceived(self, fd, data):
|
||||
self.consumer.write(data)
|
||||
|
||||
def childConnectionLost(self, fd, reason):
|
||||
self.stopProducing()
|
|
@ -1,390 +0,0 @@
|
|||
# pylint: skip-file
|
||||
import time
|
||||
import logging
|
||||
from twisted.enterprise import adbapi
|
||||
import os
|
||||
import sqlite3
|
||||
from twisted.internet import defer
|
||||
from twisted.python.failure import Failure
|
||||
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
|
||||
from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash
|
||||
from lbrynet.core.sqlite_helpers import rerun_if_locked
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DBLiveStreamMetadataManager(DHTHashSupplier):
|
||||
"""This class stores all stream info in a leveldb database stored in the same directory as the blobfiles"""
|
||||
|
||||
def __init__(self, db_dir, hash_announcer):
|
||||
DHTHashSupplier.__init__(self, hash_announcer)
|
||||
self.db_dir = db_dir
|
||||
self.db_conn = None
|
||||
|
||||
def setup(self):
|
||||
return self._open_db()
|
||||
|
||||
def stop(self):
|
||||
self.db_conn = None
|
||||
return defer.succeed(True)
|
||||
|
||||
def get_all_streams(self):
|
||||
return self._get_all_streams()
|
||||
|
||||
def save_stream(self, stream_hash, pub_key, file_name, key, blobs):
|
||||
next_announce_time = time.time() + self.hash_reannounce_time
|
||||
d = self._store_stream(stream_hash, pub_key, file_name, key,
|
||||
next_announce_time=next_announce_time)
|
||||
|
||||
def save_blobs():
|
||||
return self.add_blobs_to_stream(stream_hash, blobs)
|
||||
|
||||
def announce_have_stream():
|
||||
if self.hash_announcer is not None:
|
||||
self.hash_announcer.immediate_announce([stream_hash])
|
||||
return stream_hash
|
||||
|
||||
d.addCallback(lambda _: save_blobs())
|
||||
d.addCallback(lambda _: announce_have_stream())
|
||||
return d
|
||||
|
||||
def get_stream_info(self, stream_hash):
|
||||
return self._get_stream_info(stream_hash)
|
||||
|
||||
def check_if_stream_exists(self, stream_hash):
|
||||
return self._check_if_stream_exists(stream_hash)
|
||||
|
||||
def delete_stream(self, stream_hash):
|
||||
return self._delete_stream(stream_hash)
|
||||
|
||||
def add_blobs_to_stream(self, stream_hash, blobs):
|
||||
return self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True)
|
||||
|
||||
def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False):
|
||||
log.info("Getting blobs for a stream. Count is %s", str(count))
|
||||
|
||||
def get_positions_of_start_and_end():
|
||||
if start_blob is not None:
|
||||
d1 = self._get_blob_num_by_hash(stream_hash, start_blob)
|
||||
else:
|
||||
d1 = defer.succeed(None)
|
||||
if end_blob is not None:
|
||||
d2 = self._get_blob_num_by_hash(stream_hash, end_blob)
|
||||
else:
|
||||
d2 = defer.succeed(None)
|
||||
|
||||
dl = defer.DeferredList([d1, d2])
|
||||
|
||||
def get_positions(results):
|
||||
start_num = None
|
||||
end_num = None
|
||||
if results[0][0] is True:
|
||||
start_num = results[0][1]
|
||||
if results[1][0] is True:
|
||||
end_num = results[1][1]
|
||||
return start_num, end_num
|
||||
|
||||
dl.addCallback(get_positions)
|
||||
return dl
|
||||
|
||||
def get_blob_infos(nums):
|
||||
start_num, end_num = nums
|
||||
return self._get_further_blob_infos(stream_hash, start_num, end_num,
|
||||
count, reverse)
|
||||
|
||||
d = get_positions_of_start_and_end()
|
||||
d.addCallback(get_blob_infos)
|
||||
return d
|
||||
|
||||
def get_stream_of_blob(self, blob_hash):
|
||||
return self._get_stream_of_blobhash(blob_hash)
|
||||
|
||||
def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
|
||||
return self._save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash)
|
||||
|
||||
def get_sd_blob_hashes_for_stream(self, stream_hash):
|
||||
return self._get_sd_blob_hashes_for_stream(stream_hash)
|
||||
|
||||
def hashes_to_announce(self):
|
||||
next_announce_time = time.time() + self.hash_reannounce_time
|
||||
return self._get_streams_to_announce(next_announce_time)
|
||||
|
||||
######### database calls #########
|
||||
|
||||
def _open_db(self):
|
||||
# check_same_thread=False is solely to quiet a spurious error that appears to be due
|
||||
# to a bug in twisted, where the connection is closed by a different thread than the
|
||||
# one that opened it. The individual connections in the pool are not used in multiple
|
||||
# threads.
|
||||
self.db_conn = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "live_stream.db"),
|
||||
check_same_thread=False)
|
||||
|
||||
def create_tables(transaction):
|
||||
transaction.execute("create table if not exists live_streams (" +
|
||||
" stream_hash text primary key, " +
|
||||
" public_key text, " +
|
||||
" key text, " +
|
||||
" stream_name text, " +
|
||||
" next_announce_time real" +
|
||||
")")
|
||||
transaction.execute("create table if not exists live_stream_blobs (" +
|
||||
" blob_hash text, " +
|
||||
" stream_hash text, " +
|
||||
" position integer, " +
|
||||
" revision integer, " +
|
||||
" iv text, " +
|
||||
" length integer, " +
|
||||
" signature text, " +
|
||||
" foreign key(stream_hash) references live_streams(stream_hash)" +
|
||||
")")
|
||||
transaction.execute("create table if not exists live_stream_descriptors (" +
|
||||
" sd_blob_hash TEXT PRIMARY KEY, " +
|
||||
" stream_hash TEXT, " +
|
||||
" foreign key(stream_hash) references live_streams(stream_hash)" +
|
||||
")")
|
||||
|
||||
return self.db_conn.runInteraction(create_tables)
|
||||
|
||||
@rerun_if_locked
|
||||
def _delete_stream(self, stream_hash):
|
||||
|
||||
d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,))
|
||||
d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHash(stream_hash)))
|
||||
|
||||
def do_delete(transaction, s_h):
|
||||
transaction.execute("delete from live_streams where stream_hash = ?", (s_h,))
|
||||
transaction.execute("delete from live_stream_blobs where stream_hash = ?", (s_h,))
|
||||
transaction.execute("delete from live_stream_descriptors where stream_hash = ?", (s_h,))
|
||||
|
||||
d.addCallback(lambda s_h: self.db_conn.runInteraction(do_delete, s_h))
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _store_stream(self, stream_hash, public_key, name, key, next_announce_time=None):
|
||||
d = self.db_conn.runQuery("insert into live_streams values (?, ?, ?, ?, ?)",
|
||||
(stream_hash, public_key, key, name, next_announce_time))
|
||||
|
||||
def check_duplicate(err):
|
||||
if err.check(sqlite3.IntegrityError):
|
||||
raise DuplicateStreamHashError(stream_hash)
|
||||
return err
|
||||
|
||||
d.addErrback(check_duplicate)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_all_streams(self):
|
||||
d = self.db_conn.runQuery("select stream_hash from live_streams")
|
||||
d.addCallback(lambda results: [r[0] for r in results])
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_stream_info(self, stream_hash):
|
||||
d = self.db_conn.runQuery("select public_key, key, stream_name from live_streams where stream_hash = ?",
|
||||
(stream_hash,))
|
||||
d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHash(stream_hash)))
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _check_if_stream_exists(self, stream_hash):
|
||||
d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,))
|
||||
d.addCallback(lambda r: True if len(r) else False)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_streams_to_announce(self, next_announce_time):
|
||||
|
||||
def get_and_update(transaction):
|
||||
timestamp = time.time()
|
||||
r = transaction.execute("select stream_hash from live_streams where" +
|
||||
" (next_announce_time is null or next_announce_time < ?) " +
|
||||
" and stream_hash is not null", (timestamp, ))
|
||||
s_hs = [s_h for s_h, in r.fetchall()]
|
||||
transaction.execute("update live_streams set next_announce_time = ? where " +
|
||||
" (next_announce_time is null or next_announce_time < ?)",
|
||||
(next_announce_time, timestamp))
|
||||
return s_hs
|
||||
|
||||
return self.db_conn.runInteraction(get_and_update)
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_blob_num_by_hash(self, stream_hash, blob_hash):
|
||||
d = self.db_conn.runQuery("select position from live_stream_blobs where stream_hash = ? and blob_hash = ?",
|
||||
(stream_hash, blob_hash))
|
||||
d.addCallback(lambda r: r[0][0] if len(r) else None)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False):
|
||||
params = []
|
||||
q_string = "select * from ("
|
||||
q_string += " select blob_hash, position, revision, iv, length, signature from live_stream_blobs "
|
||||
q_string += " where stream_hash = ? "
|
||||
params.append(stream_hash)
|
||||
if start_num is not None:
|
||||
q_string += " and position > ? "
|
||||
params.append(start_num)
|
||||
if end_num is not None:
|
||||
q_string += " and position < ? "
|
||||
params.append(end_num)
|
||||
q_string += " order by position "
|
||||
if reverse is True:
|
||||
q_string += " DESC "
|
||||
if count is not None:
|
||||
q_string += " limit ? "
|
||||
params.append(count)
|
||||
q_string += ") order by position"
|
||||
# Order by position is done twice so that it always returns them from lowest position to
|
||||
# greatest, but the limit by clause can select the 'count' greatest or 'count' least
|
||||
return self.db_conn.runQuery(q_string, tuple(params))
|
||||
|
||||
@rerun_if_locked
|
||||
def _add_blobs_to_stream(self, stream_hash, blob_infos, ignore_duplicate_error=False):
|
||||
|
||||
def add_blobs(transaction):
|
||||
for blob_info in blob_infos:
|
||||
try:
|
||||
transaction.execute("insert into live_stream_blobs values (?, ?, ?, ?, ?, ?, ?)",
|
||||
(blob_info.blob_hash, stream_hash, blob_info.blob_num,
|
||||
blob_info.revision, blob_info.iv, blob_info.length,
|
||||
blob_info.signature))
|
||||
except sqlite3.IntegrityError:
|
||||
if ignore_duplicate_error is False:
|
||||
raise
|
||||
|
||||
return self.db_conn.runInteraction(add_blobs)
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_stream_of_blobhash(self, blob_hash):
|
||||
d = self.db_conn.runQuery("select stream_hash from live_stream_blobs where blob_hash = ?",
|
||||
(blob_hash,))
|
||||
d.addCallback(lambda r: r[0][0] if len(r) else None)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
|
||||
return self.db_conn.runQuery("insert into live_stream_descriptors values (?, ?)",
|
||||
(sd_blob_hash, stream_hash))
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_sd_blob_hashes_for_stream(self, stream_hash):
|
||||
d = self.db_conn.runQuery("select sd_blob_hash from live_stream_descriptors where stream_hash = ?",
|
||||
(stream_hash,))
|
||||
d.addCallback(lambda results: [r[0] for r in results])
|
||||
return d
|
||||
|
||||
|
||||
class TempLiveStreamMetadataManager(DHTHashSupplier):
|
||||
|
||||
def __init__(self, hash_announcer):
|
||||
DHTHashSupplier.__init__(self, hash_announcer)
|
||||
self.streams = {}
|
||||
self.stream_blobs = {}
|
||||
self.stream_desc = {}
|
||||
|
||||
def setup(self):
|
||||
return defer.succeed(True)
|
||||
|
||||
def stop(self):
|
||||
return defer.succeed(True)
|
||||
|
||||
def get_all_streams(self):
|
||||
return defer.succeed(self.streams.keys())
|
||||
|
||||
def save_stream(self, stream_hash, pub_key, file_name, key, blobs):
|
||||
next_announce_time = time.time() + self.hash_reannounce_time
|
||||
self.streams[stream_hash] = {'public_key': pub_key, 'stream_name': file_name,
|
||||
'key': key, 'next_announce_time': next_announce_time}
|
||||
d = self.add_blobs_to_stream(stream_hash, blobs)
|
||||
|
||||
def announce_have_stream():
|
||||
if self.hash_announcer is not None:
|
||||
self.hash_announcer.immediate_announce([stream_hash])
|
||||
return stream_hash
|
||||
|
||||
d.addCallback(lambda _: announce_have_stream())
|
||||
return d
|
||||
|
||||
def get_stream_info(self, stream_hash):
|
||||
if stream_hash in self.streams:
|
||||
stream_info = self.streams[stream_hash]
|
||||
return defer.succeed([stream_info['public_key'], stream_info['key'], stream_info['stream_name']])
|
||||
return defer.succeed(None)
|
||||
|
||||
def delete_stream(self, stream_hash):
|
||||
if stream_hash in self.streams:
|
||||
del self.streams[stream_hash]
|
||||
for (s_h, b_h) in self.stream_blobs.keys():
|
||||
if s_h == stream_hash:
|
||||
del self.stream_blobs[(s_h, b_h)]
|
||||
return defer.succeed(True)
|
||||
|
||||
def add_blobs_to_stream(self, stream_hash, blobs):
|
||||
assert stream_hash in self.streams, "Can't add blobs to a stream that isn't known"
|
||||
for blob in blobs:
|
||||
info = {}
|
||||
info['blob_num'] = blob.blob_num
|
||||
info['length'] = blob.length
|
||||
info['iv'] = blob.iv
|
||||
info['revision'] = blob.revision
|
||||
info['signature'] = blob.signature
|
||||
self.stream_blobs[(stream_hash, blob.blob_hash)] = info
|
||||
return defer.succeed(True)
|
||||
|
||||
def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False):
|
||||
|
||||
if start_blob is not None:
|
||||
start_num = self._get_blob_num_by_hash(stream_hash, start_blob)
|
||||
else:
|
||||
start_num = None
|
||||
if end_blob is not None:
|
||||
end_num = self._get_blob_num_by_hash(stream_hash, end_blob)
|
||||
else:
|
||||
end_num = None
|
||||
return self._get_further_blob_infos(stream_hash, start_num, end_num, count, reverse)
|
||||
|
||||
def get_stream_of_blob(self, blob_hash):
|
||||
for (s_h, b_h) in self.stream_blobs.iterkeys():
|
||||
if b_h == blob_hash:
|
||||
return defer.succeed(s_h)
|
||||
return defer.succeed(None)
|
||||
|
||||
def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False):
|
||||
blob_infos = []
|
||||
for (s_h, b_h), info in self.stream_blobs.iteritems():
|
||||
if stream_hash == s_h:
|
||||
position = info['blob_num']
|
||||
length = info['length']
|
||||
iv = info['iv']
|
||||
revision = info['revision']
|
||||
signature = info['signature']
|
||||
if (start_num is None) or (position > start_num):
|
||||
if (end_num is None) or (position < end_num):
|
||||
blob_infos.append((b_h, position, revision, iv, length, signature))
|
||||
blob_infos.sort(key=lambda i: i[1], reverse=reverse)
|
||||
if count is not None:
|
||||
blob_infos = blob_infos[:count]
|
||||
return defer.succeed(blob_infos)
|
||||
|
||||
def _get_blob_num_by_hash(self, stream_hash, blob_hash):
|
||||
if (stream_hash, blob_hash) in self.stream_blobs:
|
||||
return self.stream_blobs[(stream_hash, blob_hash)]['blob_num']
|
||||
|
||||
def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
|
||||
self.stream_desc[sd_blob_hash] = stream_hash
|
||||
return defer.succeed(True)
|
||||
|
||||
def get_sd_blob_hashes_for_stream(self, stream_hash):
|
||||
return defer.succeed([sd_hash for sd_hash, s_h in self.stream_desc.iteritems() if s_h == stream_hash])
|
||||
|
||||
def hashes_to_announce(self):
|
||||
next_announce_time = time.time() + self.hash_reannounce_time
|
||||
stream_hashes = []
|
||||
current_time = time.time()
|
||||
for stream_hash, stream_info in self.streams.iteritems():
|
||||
announce_time = stream_info['announce_time']
|
||||
if announce_time < current_time:
|
||||
self.streams[stream_hash]['announce_time'] = next_announce_time
|
||||
stream_hashes.append(stream_hash)
|
||||
return stream_hashes
|
|
@ -1,53 +0,0 @@
|
|||
# pylint: skip-file
|
||||
class BaseLiveStreamPaymentRateManager(object):
|
||||
def __init__(self, blob_info_rate, blob_data_rate=None):
|
||||
self.min_live_blob_info_payment_rate = blob_info_rate
|
||||
self.min_blob_data_payment_rate = blob_data_rate
|
||||
|
||||
|
||||
class LiveStreamPaymentRateManager(object):
|
||||
def __init__(self, base_live_stream_payment_rate_manager, payment_rate_manager,
|
||||
blob_info_rate=None, blob_data_rate=None):
|
||||
self._base_live_stream_payment_rate_manager = base_live_stream_payment_rate_manager
|
||||
self._payment_rate_manager = payment_rate_manager
|
||||
self.min_live_blob_info_payment_rate = blob_info_rate
|
||||
self.min_blob_data_payment_rate = blob_data_rate
|
||||
self.points_paid = 0.0
|
||||
|
||||
def get_rate_live_blob_info(self, peer):
|
||||
return self.get_effective_min_live_blob_info_payment_rate()
|
||||
|
||||
def accept_rate_live_blob_info(self, peer, payment_rate):
|
||||
return payment_rate >= self.get_effective_min_live_blob_info_payment_rate()
|
||||
|
||||
def get_rate_blob_data(self, peer, blobs):
|
||||
response = self._payment_rate_manager.strategy.make_offer(peer, blobs)
|
||||
return response.rate
|
||||
|
||||
def accept_rate_blob_data(self, peer, blobs, offer):
|
||||
response = self._payment_rate_manager.strategy.respond_to_offer(offer, peer, blobs)
|
||||
return response.accepted
|
||||
|
||||
def reply_to_offer(self, peer, blobs, offer):
|
||||
reply = self._payment_rate_manager.strategy.respond_to_offer(offer, peer, blobs)
|
||||
self._payment_rate_manager.strategy.offer_accepted(peer, reply)
|
||||
return reply
|
||||
|
||||
def get_effective_min_blob_data_payment_rate(self):
|
||||
rate = self.min_blob_data_payment_rate
|
||||
if rate is None:
|
||||
rate = self._payment_rate_manager.min_blob_data_payment_rate
|
||||
if rate is None:
|
||||
rate = self._base_live_stream_payment_rate_manager.min_blob_data_payment_rate
|
||||
if rate is None:
|
||||
rate = self._payment_rate_manager.get_effective_min_blob_data_payment_rate()
|
||||
return rate
|
||||
|
||||
def get_effective_min_live_blob_info_payment_rate(self):
|
||||
rate = self.min_live_blob_info_payment_rate
|
||||
if rate is None:
|
||||
rate = self._base_live_stream_payment_rate_manager.min_live_blob_info_payment_rate
|
||||
return rate
|
||||
|
||||
def record_points_paid(self, amount):
|
||||
self.points_paid += amount
|
|
@ -1,122 +0,0 @@
|
|||
# pylint: skip-file
|
||||
# pylint: skip-file
|
||||
# This file is not maintained, but might be used in the future
|
||||
#
|
||||
import logging
|
||||
import sys
|
||||
from lbrynet.lbrylive.LiveStreamCreator import StdOutLiveStreamCreator
|
||||
from lbrynet.core.BlobManager import TempBlobManager
|
||||
from lbrynet.core.Session import Session
|
||||
from lbrynet.core.server.BlobAvailabilityHandler import BlobAvailabilityHandlerFactory
|
||||
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
|
||||
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
|
||||
from lbrynet.lbrylive.PaymentRateManager import BaseLiveStreamPaymentRateManager
|
||||
from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager
|
||||
from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory
|
||||
from lbrynet.dht.node import Node
|
||||
from twisted.internet import defer, task
|
||||
|
||||
|
||||
class StdinUploader():
|
||||
"""This class reads from standard in, creates a stream, and makes it available on the network."""
|
||||
def __init__(self, peer_port, dht_node_port, known_dht_nodes,
|
||||
stream_info_manager_class=DBLiveStreamMetadataManager, blob_manager_class=TempBlobManager):
|
||||
"""
|
||||
@param peer_port: the network port on which to listen for peers
|
||||
|
||||
@param dht_node_port: the network port on which to listen for nodes in the DHT
|
||||
|
||||
@param known_dht_nodes: a list of (ip_address, dht_port) which will be used to join the DHT network
|
||||
"""
|
||||
self.peer_port = peer_port
|
||||
self.lbry_server_port = None
|
||||
self.session = Session(blob_manager_class=blob_manager_class,
|
||||
stream_info_manager_class=stream_info_manager_class,
|
||||
dht_node_class=Node, dht_node_port=dht_node_port,
|
||||
known_dht_nodes=known_dht_nodes, peer_port=self.peer_port,
|
||||
use_upnp=False)
|
||||
self.payment_rate_manager = BaseLiveStreamPaymentRateManager()
|
||||
|
||||
def start(self):
|
||||
"""Initialize the session and start listening on the peer port"""
|
||||
d = self.session.setup()
|
||||
d.addCallback(lambda _: self._start())
|
||||
|
||||
return d
|
||||
|
||||
def _start(self):
|
||||
self._start_server()
|
||||
return True
|
||||
|
||||
def _start_server(self):
|
||||
query_handler_factories = [
|
||||
CryptBlobInfoQueryHandlerFactory(self.stream_info_manager, self.session.wallet,
|
||||
self.payment_rate_manager),
|
||||
BlobAvailabilityHandlerFactory(self.session.blob_manager),
|
||||
BlobRequestHandlerFactory(self.session.blob_manager, self.session.wallet,
|
||||
self.payment_rate_manager),
|
||||
self.session.wallet.get_wallet_info_query_handler_factory()
|
||||
]
|
||||
|
||||
self.server_factory = ServerProtocolFactory(self.session.rate_limiter,
|
||||
query_handler_factories,
|
||||
self.session.peer_manager)
|
||||
from twisted.internet import reactor
|
||||
self.lbry_server_port = reactor.listenTCP(self.peer_port, self.server_factory)
|
||||
|
||||
def start_live_stream(self, stream_name):
|
||||
"""Create the stream and start reading from stdin
|
||||
|
||||
@param stream_name: a string, the suggested name of this stream
|
||||
"""
|
||||
stream_creator_helper = StdOutLiveStreamCreator(stream_name, self.session.blob_manager,
|
||||
self.stream_info_manager)
|
||||
d = stream_creator_helper.create_and_publish_stream_descriptor()
|
||||
|
||||
def print_sd_hash(sd_hash):
|
||||
print "Stream descriptor hash:", sd_hash
|
||||
|
||||
d.addCallback(print_sd_hash)
|
||||
d.addCallback(lambda _: stream_creator_helper.start_streaming())
|
||||
return d
|
||||
|
||||
def shut_down(self):
|
||||
"""End the session and stop listening on the server port"""
|
||||
d = self.session.shut_down()
|
||||
d.addCallback(lambda _: self._shut_down())
|
||||
return d
|
||||
|
||||
def _shut_down(self):
|
||||
if self.lbry_server_port is not None:
|
||||
d = defer.maybeDeferred(self.lbry_server_port.stopListening)
|
||||
else:
|
||||
d = defer.succeed(True)
|
||||
return d
|
||||
|
||||
|
||||
def launch_stdin_uploader():
|
||||
|
||||
from twisted.internet import reactor
|
||||
|
||||
logging.basicConfig(level=logging.WARNING, filename="ul.log")
|
||||
if len(sys.argv) == 4:
|
||||
uploader = StdinUploader(int(sys.argv[2]), int(sys.argv[3]), [])
|
||||
elif len(sys.argv) == 6:
|
||||
uploader = StdinUploader(int(sys.argv[2]), int(sys.argv[3]), [(sys.argv[4], int(sys.argv[5]))])
|
||||
else:
|
||||
print "Usage: lbrynet-stdin-uploader <stream_name> <peer_port> <dht_node_port>" \
|
||||
" [<dht_bootstrap_host> <dht_bootstrap port>]"
|
||||
sys.exit(1)
|
||||
|
||||
def start_stdin_uploader():
|
||||
return uploader.start_live_stream(sys.argv[1])
|
||||
|
||||
def shut_down():
|
||||
logging.debug("Telling the reactor to stop in 60 seconds")
|
||||
reactor.callLater(60, reactor.stop)
|
||||
|
||||
d = task.deferLater(reactor, 0, uploader.start)
|
||||
d.addCallback(lambda _: start_stdin_uploader())
|
||||
d.addCallback(lambda _: shut_down())
|
||||
reactor.addSystemEventTrigger('before', 'shutdown', uploader.shut_down)
|
||||
reactor.run()
|
|
@ -1,102 +0,0 @@
|
|||
# pylint: skip-file
|
||||
# pylint: skip-file
|
||||
# This file is not maintained, but might be used in the future
|
||||
#
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from lbrynet.lbrylive.client.LiveStreamDownloader import LiveStreamDownloader
|
||||
from lbrynet.core.BlobManager import TempBlobManager
|
||||
from lbrynet.core.Session import Session
|
||||
from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader
|
||||
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
|
||||
from lbrynet.lbrylive.PaymentRateManager import BaseLiveStreamPaymentRateManager
|
||||
from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager
|
||||
from lbrynet.lbrylive.StreamDescriptor import save_sd_info
|
||||
from lbrynet.dht.node import Node
|
||||
from twisted.internet import task
|
||||
|
||||
|
||||
class StdoutDownloader():
|
||||
"""This class downloads a live stream from the network and outputs it to standard out."""
|
||||
def __init__(self, dht_node_port, known_dht_nodes,
|
||||
stream_info_manager_class=DBLiveStreamMetadataManager, blob_manager_class=TempBlobManager):
|
||||
"""
|
||||
@param dht_node_port: the network port on which to listen for DHT node requests
|
||||
|
||||
@param known_dht_nodes: a list of (ip_address, dht_port) which will be used to join the DHT network
|
||||
|
||||
"""
|
||||
|
||||
self.session = Session(blob_manager_class=blob_manager_class,
|
||||
stream_info_manager_class=stream_info_manager_class,
|
||||
dht_node_class=Node, dht_node_port=dht_node_port, known_dht_nodes=known_dht_nodes,
|
||||
use_upnp=False)
|
||||
self.payment_rate_manager = BaseLiveStreamPaymentRateManager()
|
||||
|
||||
def start(self):
|
||||
"""Initialize the session"""
|
||||
d = self.session.setup()
|
||||
return d
|
||||
|
||||
def read_sd_file(self, sd_blob):
|
||||
reader = BlobStreamDescriptorReader(sd_blob)
|
||||
return save_sd_info(self.stream_info_manager, reader, ignore_duplicate=True)
|
||||
|
||||
def download_sd_file_from_hash(self, sd_hash):
|
||||
downloader = StandaloneBlobDownloader(sd_hash, self.session.blob_manager,
|
||||
self.session.peer_finder, self.session.rate_limiter,
|
||||
self.session.wallet)
|
||||
d = downloader.download()
|
||||
return d
|
||||
|
||||
def start_download(self, sd_hash):
|
||||
"""Start downloading the stream from the network and outputting it to standard out"""
|
||||
d = self.download_sd_file_from_hash(sd_hash)
|
||||
d.addCallbacks(self.read_sd_file)
|
||||
|
||||
def start_stream(stream_hash):
|
||||
consumer = LiveStreamDownloader(stream_hash, self.session.peer_finder,
|
||||
self.session.rate_limiter, self.session.blob_manager,
|
||||
self.stream_info_manager, self.payment_rate_manager,
|
||||
self.session.wallet)
|
||||
return consumer.start()
|
||||
|
||||
d.addCallback(start_stream)
|
||||
return d
|
||||
|
||||
def shut_down(self):
|
||||
"""End the session"""
|
||||
d = self.session.shut_down()
|
||||
return d
|
||||
|
||||
|
||||
def launch_stdout_downloader():
|
||||
|
||||
from twisted.internet import reactor
|
||||
|
||||
logging.basicConfig(level=logging.WARNING, filename="dl.log")
|
||||
if len(sys.argv) == 3:
|
||||
downloader = StdoutDownloader(int(sys.argv[2]), [])
|
||||
elif len(sys.argv) == 5:
|
||||
downloader = StdoutDownloader(int(sys.argv[2]), [(sys.argv[3], int(sys.argv[4]))])
|
||||
else:
|
||||
print "Usage: lbrynet-stdout-downloader <sd_hash> <peer_port> <dht_node_port>" \
|
||||
" [<dht_bootstrap_host> <dht_bootstrap port>]"
|
||||
sys.exit(1)
|
||||
|
||||
def start_stdout_downloader():
|
||||
return downloader.start_download(sys.argv[1])
|
||||
|
||||
def print_error(err):
|
||||
logging.warning(err.getErrorMessage())
|
||||
|
||||
def shut_down():
|
||||
reactor.stop()
|
||||
|
||||
d = task.deferLater(reactor, 0, downloader.start)
|
||||
d.addCallback(lambda _: start_stdout_downloader())
|
||||
d.addErrback(print_error)
|
||||
d.addCallback(lambda _: shut_down())
|
||||
reactor.addSystemEventTrigger('before', 'shutdown', downloader.shut_down)
|
||||
reactor.run()
|
|
@ -1,138 +0,0 @@
|
|||
# pylint: skip-file
|
||||
import binascii
|
||||
import logging
|
||||
from lbrynet.core.cryptoutils import get_lbry_hash_obj, verify_signature
|
||||
from twisted.internet import defer, threads
|
||||
from lbrynet.core.Error import DuplicateStreamHashError, InvalidStreamDescriptorError
|
||||
from lbrynet.lbrylive.LiveBlob import LiveBlobInfo
|
||||
from lbrynet.interfaces import IStreamDescriptorValidator
|
||||
from zope.interface import implements
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
LiveStreamType = "lbrylive"
|
||||
|
||||
|
||||
def save_sd_info(stream_info_manager, sd_info, ignore_duplicate=False):
|
||||
log.debug("Saving info for %s", str(sd_info['stream_name']))
|
||||
hex_stream_name = sd_info['stream_name']
|
||||
public_key = sd_info['public_key']
|
||||
key = sd_info['key']
|
||||
stream_hash = sd_info['stream_hash']
|
||||
raw_blobs = sd_info['blobs']
|
||||
crypt_blobs = []
|
||||
for blob in raw_blobs:
|
||||
length = blob['length']
|
||||
if length != 0:
|
||||
blob_hash = blob['blob_hash']
|
||||
else:
|
||||
blob_hash = None
|
||||
blob_num = blob['blob_num']
|
||||
revision = blob['revision']
|
||||
iv = blob['iv']
|
||||
signature = blob['signature']
|
||||
crypt_blobs.append(LiveBlobInfo(blob_hash, blob_num, length, iv, revision, signature))
|
||||
log.debug("Trying to save stream info for %s", str(hex_stream_name))
|
||||
d = stream_info_manager.save_stream(stream_hash, public_key, hex_stream_name,
|
||||
key, crypt_blobs)
|
||||
|
||||
def check_if_duplicate(err):
|
||||
if ignore_duplicate is True:
|
||||
err.trap(DuplicateStreamHashError)
|
||||
|
||||
d.addErrback(check_if_duplicate)
|
||||
|
||||
d.addCallback(lambda _: stream_hash)
|
||||
return d
|
||||
|
||||
|
||||
def get_sd_info(stream_info_manager, stream_hash, include_blobs):
|
||||
d = stream_info_manager.get_stream_info(stream_hash)
|
||||
|
||||
def format_info(stream_info):
|
||||
fields = {}
|
||||
fields['stream_type'] = LiveStreamType
|
||||
fields['stream_name'] = stream_info[2]
|
||||
fields['public_key'] = stream_info[0]
|
||||
fields['key'] = stream_info[1]
|
||||
fields['stream_hash'] = stream_hash
|
||||
|
||||
def format_blobs(blobs):
|
||||
formatted_blobs = []
|
||||
for blob_hash, blob_num, revision, iv, length, signature in blobs:
|
||||
blob = {}
|
||||
if length != 0:
|
||||
blob['blob_hash'] = blob_hash
|
||||
blob['blob_num'] = blob_num
|
||||
blob['revision'] = revision
|
||||
blob['iv'] = iv
|
||||
blob['length'] = length
|
||||
blob['signature'] = signature
|
||||
formatted_blobs.append(blob)
|
||||
fields['blobs'] = formatted_blobs
|
||||
return fields
|
||||
|
||||
if include_blobs is True:
|
||||
d = stream_info_manager.get_blobs_for_stream(stream_hash)
|
||||
else:
|
||||
d = defer.succeed([])
|
||||
d.addCallback(format_blobs)
|
||||
return d
|
||||
|
||||
d.addCallback(format_info)
|
||||
return d
|
||||
|
||||
|
||||
class LiveStreamDescriptorValidator(object):
|
||||
implements(IStreamDescriptorValidator)
|
||||
|
||||
def __init__(self, raw_info):
|
||||
self.raw_info = raw_info
|
||||
|
||||
def validate(self):
|
||||
log.debug("Trying to validate stream descriptor for %s", str(self.raw_info['stream_name']))
|
||||
hex_stream_name = self.raw_info['stream_name']
|
||||
public_key = self.raw_info['public_key']
|
||||
key = self.raw_info['key']
|
||||
stream_hash = self.raw_info['stream_hash']
|
||||
h = get_lbry_hash_obj()
|
||||
h.update(hex_stream_name)
|
||||
h.update(public_key)
|
||||
h.update(key)
|
||||
if h.hexdigest() != stream_hash:
|
||||
raise InvalidStreamDescriptorError("Stream hash does not match stream metadata")
|
||||
blobs = self.raw_info['blobs']
|
||||
|
||||
def check_blob_signatures():
|
||||
for blob in blobs:
|
||||
length = blob['length']
|
||||
if length != 0:
|
||||
blob_hash = blob['blob_hash']
|
||||
else:
|
||||
blob_hash = None
|
||||
blob_num = blob['blob_num']
|
||||
revision = blob['revision']
|
||||
iv = blob['iv']
|
||||
signature = blob['signature']
|
||||
hashsum = get_lbry_hash_obj()
|
||||
hashsum.update(stream_hash)
|
||||
if length != 0:
|
||||
hashsum.update(blob_hash)
|
||||
hashsum.update(str(blob_num))
|
||||
hashsum.update(str(revision))
|
||||
hashsum.update(iv)
|
||||
hashsum.update(str(length))
|
||||
if not verify_signature(hashsum.digest(), signature, public_key):
|
||||
raise InvalidStreamDescriptorError("Invalid signature in stream descriptor")
|
||||
|
||||
return threads.deferToThread(check_blob_signatures)
|
||||
|
||||
def info_to_show(self):
|
||||
info = []
|
||||
info.append(("stream_name", binascii.unhexlify(self.raw_info.get("stream_name"))))
|
||||
return info
|
||||
|
||||
def get_length_of_stream(self):
|
||||
return None
|
|
@ -1,180 +0,0 @@
|
|||
# pylint: skip-file
|
||||
import binascii
|
||||
from lbrynet.core.StreamDescriptor import StreamMetadata
|
||||
from lbrynet.cryptstream.client.CryptStreamDownloader import CryptStreamDownloader
|
||||
from zope.interface import implements
|
||||
from lbrynet.lbrylive.client.LiveStreamMetadataHandler import LiveStreamMetadataHandler
|
||||
from lbrynet.lbrylive.client.LiveStreamProgressManager import LiveStreamProgressManager
|
||||
import os
|
||||
from lbrynet.lbrylive.StreamDescriptor import save_sd_info
|
||||
from lbrynet.lbrylive.PaymentRateManager import LiveStreamPaymentRateManager
|
||||
from twisted.internet import defer, threads # , process
|
||||
from lbrynet.interfaces import IStreamDownloaderFactory
|
||||
from lbrynet.lbrylive.StreamDescriptor import LiveStreamType
|
||||
|
||||
|
||||
class _LiveStreamDownloader(CryptStreamDownloader):
|
||||
|
||||
def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
|
||||
payment_rate_manager, wallet):
|
||||
CryptStreamDownloader.__init__(self, peer_finder, rate_limiter, blob_manager,
|
||||
payment_rate_manager, wallet)
|
||||
self.stream_hash = stream_hash
|
||||
self.stream_info_manager = stream_info_manager
|
||||
self.public_key = None
|
||||
|
||||
def set_stream_info(self):
|
||||
if self.public_key is None and self.key is None:
|
||||
|
||||
d = self.stream_info_manager.get_stream_info(self.stream_hash)
|
||||
|
||||
def set_stream_info(stream_info):
|
||||
public_key, key, stream_name = stream_info
|
||||
self.public_key = public_key
|
||||
self.key = binascii.unhexlify(key)
|
||||
self.stream_name = binascii.unhexlify(stream_name)
|
||||
|
||||
d.addCallback(set_stream_info)
|
||||
return d
|
||||
else:
|
||||
return defer.succeed(True)
|
||||
|
||||
|
||||
class LiveStreamDownloader(_LiveStreamDownloader):
|
||||
def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
|
||||
payment_rate_manager, wallet):
|
||||
_LiveStreamDownloader.__init__(self, stream_hash, peer_finder, rate_limiter, blob_manager,
|
||||
stream_info_manager, payment_rate_manager, wallet)
|
||||
|
||||
|
||||
def _get_metadata_handler(self, download_manager):
|
||||
return LiveStreamMetadataHandler(self.stream_hash, self.stream_info_manager,
|
||||
self.peer_finder, self.public_key, False,
|
||||
self.payment_rate_manager, self.wallet, download_manager, 10)
|
||||
|
||||
def _get_progress_manager(self, download_manager):
|
||||
return LiveStreamProgressManager(self._finished_downloading, self.blob_manager, download_manager,
|
||||
delete_blob_after_finished=True, download_whole=False,
|
||||
max_before_skip_ahead=10)
|
||||
|
||||
def _get_write_func(self):
|
||||
def write_func(data):
|
||||
if self.stopped is False:
|
||||
pass
|
||||
return write_func
|
||||
|
||||
|
||||
class FullLiveStreamDownloader(_LiveStreamDownloader):
|
||||
def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
|
||||
payment_rate_manager, wallet):
|
||||
_LiveStreamDownloader.__init__(self, stream_hash, peer_finder, rate_limiter,
|
||||
blob_manager, stream_info_manager, payment_rate_manager,
|
||||
wallet)
|
||||
self.file_handle = None
|
||||
self.file_name = None
|
||||
|
||||
def set_stream_info(self):
|
||||
d = _LiveStreamDownloader.set_stream_info(self)
|
||||
|
||||
def set_file_name_if_unset():
|
||||
if not self.file_name:
|
||||
if not self.stream_name:
|
||||
self.stream_name = "_"
|
||||
self.file_name = os.path.basename(self.stream_name)
|
||||
|
||||
d.addCallback(lambda _: set_file_name_if_unset())
|
||||
return d
|
||||
|
||||
def stop(self, err=None):
|
||||
d = self._close_file()
|
||||
d.addBoth(lambda _: _LiveStreamDownloader.stop(self, err))
|
||||
return d
|
||||
|
||||
def _start(self):
|
||||
if self.file_handle is None:
|
||||
d = self._open_file()
|
||||
else:
|
||||
d = defer.succeed(True)
|
||||
d.addCallback(lambda _: _LiveStreamDownloader._start(self))
|
||||
return d
|
||||
|
||||
def _open_file(self):
|
||||
def open_file():
|
||||
self.file_handle = open(self.file_name, 'wb')
|
||||
return threads.deferToThread(open_file)
|
||||
|
||||
def _get_metadata_handler(self, download_manager):
|
||||
return LiveStreamMetadataHandler(self.stream_hash, self.stream_info_manager,
|
||||
self.peer_finder, self.public_key, True,
|
||||
self.payment_rate_manager, self.wallet, download_manager)
|
||||
|
||||
def _get_primary_request_creators(self, download_manager):
|
||||
return [download_manager.blob_requester, download_manager.blob_info_finder]
|
||||
|
||||
def _get_write_func(self):
|
||||
def write_func(data):
|
||||
if self.stopped is False:
|
||||
self.file_handle.write(data)
|
||||
return write_func
|
||||
|
||||
def _close_file(self):
|
||||
def close_file():
|
||||
if self.file_handle is not None:
|
||||
self.file_handle.close()
|
||||
self.file_handle = None
|
||||
return threads.deferToThread(close_file)
|
||||
|
||||
|
||||
class FullLiveStreamDownloaderFactory(object):
|
||||
|
||||
implements(IStreamDownloaderFactory)
|
||||
|
||||
def __init__(self, peer_finder, rate_limiter, blob_manager, stream_info_manager, wallet,
|
||||
default_payment_rate_manager):
|
||||
self.peer_finder = peer_finder
|
||||
self.rate_limiter = rate_limiter
|
||||
self.blob_manager = blob_manager
|
||||
self.stream_info_manager = stream_info_manager
|
||||
self.wallet = wallet
|
||||
self.default_payment_rate_manager = default_payment_rate_manager
|
||||
|
||||
def can_download(self, sd_validator):
|
||||
return True
|
||||
|
||||
def make_downloader(self, metadata, options, payment_rate_manager):
|
||||
# TODO: check options for payment rate manager parameters
|
||||
prm = LiveStreamPaymentRateManager(self.default_payment_rate_manager,
|
||||
payment_rate_manager)
|
||||
|
||||
def save_source_if_blob(stream_hash):
|
||||
if metadata.metadata_source == StreamMetadata.FROM_BLOB:
|
||||
d = self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, metadata.source_blob_hash)
|
||||
else:
|
||||
d = defer.succeed(True)
|
||||
d.addCallback(lambda _: stream_hash)
|
||||
return d
|
||||
|
||||
d = save_sd_info(self.stream_info_manager, metadata.validator.raw_info)
|
||||
d.addCallback(save_source_if_blob)
|
||||
|
||||
def create_downloader(stream_hash):
|
||||
stream_downloader = FullLiveStreamDownloader(stream_hash, self.peer_finder, self.rate_limiter,
|
||||
self.blob_manager, self.stream_info_manager,
|
||||
prm, self.wallet)
|
||||
d = stream_downloader.set_stream_info()
|
||||
d.addCallback(lambda _: stream_downloader)
|
||||
return d
|
||||
|
||||
d.addCallback(create_downloader)
|
||||
return d
|
||||
|
||||
|
||||
def add_full_live_stream_downloader_to_sd_identifier(session, stream_info_manager, sd_identifier,
|
||||
base_live_stream_payment_rate_manager):
|
||||
downloader_factory = FullLiveStreamDownloaderFactory(session.peer_finder,
|
||||
session.rate_limiter,
|
||||
session.blob_manager,
|
||||
stream_info_manager,
|
||||
session.wallet,
|
||||
base_live_stream_payment_rate_manager)
|
||||
sd_identifier.add_stream_downloader_factory(LiveStreamType, downloader_factory)
|
|
@ -1,347 +0,0 @@
|
|||
# pylint: skip-file
|
||||
from collections import defaultdict
|
||||
import logging
|
||||
from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
from twisted.python.failure import Failure
|
||||
from lbrynet import conf
|
||||
from lbrynet.core.client.ClientRequest import ClientRequest, ClientPaidRequest
|
||||
from lbrynet.lbrylive.LiveBlob import LiveBlobInfo
|
||||
from lbrynet.core.cryptoutils import get_lbry_hash_obj, verify_signature
|
||||
from lbrynet.interfaces import IRequestCreator, IMetadataHandler
|
||||
from lbrynet.core.Error import InsufficientFundsError, InvalidResponseError, RequestCanceledError
|
||||
from lbrynet.core.Error import NoResponseError, ConnectionClosedBeforeResponseError
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LiveStreamMetadataHandler(object):
|
||||
implements(IRequestCreator, IMetadataHandler)
|
||||
|
||||
def __init__(self, stream_hash, stream_info_manager, peer_finder, stream_pub_key, download_whole,
|
||||
payment_rate_manager, wallet, download_manager, max_before_skip_ahead=None):
|
||||
self.stream_hash = stream_hash
|
||||
self.stream_info_manager = stream_info_manager
|
||||
self.payment_rate_manager = payment_rate_manager
|
||||
self.wallet = wallet
|
||||
self.peer_finder = peer_finder
|
||||
self.stream_pub_key = stream_pub_key
|
||||
self.download_whole = download_whole
|
||||
self.max_before_skip_ahead = max_before_skip_ahead
|
||||
if self.download_whole is False:
|
||||
assert self.max_before_skip_ahead is not None, \
|
||||
"If download whole is False, max_before_skip_ahead must be set"
|
||||
self.download_manager = download_manager
|
||||
self._peers = defaultdict(int) # {Peer: score}
|
||||
self._protocol_prices = {}
|
||||
self._final_blob_num = None
|
||||
self._price_disagreements = [] # [Peer]
|
||||
self._incompatible_peers = [] # [Peer]
|
||||
|
||||
######### IMetadataHandler #########
|
||||
|
||||
def get_initial_blobs(self):
|
||||
d = self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
|
||||
d.addCallback(self._format_initial_blobs_for_download_manager)
|
||||
return d
|
||||
|
||||
def final_blob_num(self):
|
||||
return self._final_blob_num
|
||||
|
||||
######## IRequestCreator #########
|
||||
|
||||
def send_next_request(self, peer, protocol):
|
||||
if self._finished_discovery() is False and self._should_send_request_to(peer) is True:
|
||||
p_r = None
|
||||
if not self._price_settled(protocol):
|
||||
p_r = self._get_price_request(peer, protocol)
|
||||
d_r = self._get_discover_request(peer)
|
||||
reserved_points = self._reserve_points(peer, protocol, d_r.max_pay_units)
|
||||
if reserved_points is not None:
|
||||
d1 = protocol.add_request(d_r)
|
||||
d1.addCallback(self._handle_discover_response, peer, d_r)
|
||||
d1.addBoth(self._pay_or_cancel_payment, protocol, reserved_points)
|
||||
d1.addErrback(self._request_failed, peer)
|
||||
if p_r is not None:
|
||||
d2 = protocol.add_request(p_r)
|
||||
d2.addCallback(self._handle_price_response, peer, p_r, protocol)
|
||||
d2.addErrback(self._request_failed, peer)
|
||||
return defer.succeed(True)
|
||||
else:
|
||||
return defer.fail(InsufficientFundsError())
|
||||
return defer.succeed(False)
|
||||
|
||||
def get_new_peers(self):
|
||||
d = self._get_hash_for_peer_search()
|
||||
d.addCallback(self._find_peers_for_hash)
|
||||
return d
|
||||
|
||||
######### internal calls #########
|
||||
|
||||
def _get_hash_for_peer_search(self):
|
||||
r = None
|
||||
if self._finished_discovery() is False:
|
||||
r = self.stream_hash
|
||||
log.debug("Info finder peer search response for stream %s: %s", str(self.stream_hash), str(r))
|
||||
return defer.succeed(r)
|
||||
|
||||
def _find_peers_for_hash(self, h):
|
||||
if h is None:
|
||||
return None
|
||||
else:
|
||||
d = self.peer_finder.find_peers_for_blob(h)
|
||||
|
||||
def choose_best_peers(peers):
|
||||
bad_peers = self._get_bad_peers()
|
||||
return [p for p in peers if not p in bad_peers]
|
||||
|
||||
d.addCallback(choose_best_peers)
|
||||
return d
|
||||
|
||||
def _format_initial_blobs_for_download_manager(self, blob_infos):
|
||||
infos = []
|
||||
for blob_hash, blob_num, revision, iv, length, signature in blob_infos:
|
||||
if blob_hash is not None:
|
||||
infos.append(LiveBlobInfo(blob_hash, blob_num, length, iv, revision, signature))
|
||||
else:
|
||||
log.debug("Setting _final_blob_num to %s", str(blob_num - 1))
|
||||
self._final_blob_num = blob_num - 1
|
||||
return infos
|
||||
|
||||
def _should_send_request_to(self, peer):
|
||||
if self._peers[peer] < -5.0:
|
||||
return False
|
||||
if peer in self._price_disagreements:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _get_bad_peers(self):
|
||||
return [p for p in self._peers.iterkeys() if not self._should_send_request_to(p)]
|
||||
|
||||
def _finished_discovery(self):
|
||||
if self._get_discovery_params() is None:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _get_discover_request(self, peer):
|
||||
discovery_params = self._get_discovery_params()
|
||||
if discovery_params:
|
||||
further_blobs_request = {}
|
||||
reference, start, end, count = discovery_params
|
||||
further_blobs_request['reference'] = reference
|
||||
if start is not None:
|
||||
further_blobs_request['start'] = start
|
||||
if end is not None:
|
||||
further_blobs_request['end'] = end
|
||||
if count is not None:
|
||||
further_blobs_request['count'] = count
|
||||
else:
|
||||
further_blobs_request['count'] = conf.settings['MAX_BLOB_INFOS_TO_REQUEST']
|
||||
log.debug("Requesting %s blob infos from %s", str(further_blobs_request['count']), str(peer))
|
||||
r_dict = {'further_blobs': further_blobs_request}
|
||||
response_identifier = 'further_blobs'
|
||||
request = ClientPaidRequest(r_dict, response_identifier, further_blobs_request['count'])
|
||||
return request
|
||||
return None
|
||||
|
||||
def _get_discovery_params(self):
|
||||
log.debug("In _get_discovery_params")
|
||||
stream_position = self.download_manager.stream_position()
|
||||
blobs = self.download_manager.blobs
|
||||
if blobs:
|
||||
last_blob_num = max(blobs.iterkeys())
|
||||
else:
|
||||
last_blob_num = -1
|
||||
final_blob_num = self.final_blob_num()
|
||||
if final_blob_num is not None:
|
||||
last_blob_num = final_blob_num
|
||||
if self.download_whole is False:
|
||||
log.debug("download_whole is False")
|
||||
if final_blob_num is not None:
|
||||
for i in xrange(stream_position, final_blob_num + 1):
|
||||
if not i in blobs:
|
||||
count = min(self.max_before_skip_ahead, (final_blob_num - i + 1))
|
||||
return self.stream_hash, None, 'end', count
|
||||
return None
|
||||
else:
|
||||
if blobs:
|
||||
for i in xrange(stream_position, last_blob_num + 1):
|
||||
if not i in blobs:
|
||||
if i == 0:
|
||||
return self.stream_hash, 'beginning', 'end', -1 * self.max_before_skip_ahead
|
||||
else:
|
||||
return self.stream_hash, blobs[i-1].blob_hash, 'end', -1 * self.max_before_skip_ahead
|
||||
return self.stream_hash, blobs[last_blob_num].blob_hash, 'end', -1 * self.max_before_skip_ahead
|
||||
else:
|
||||
return self.stream_hash, None, 'end', -1 * self.max_before_skip_ahead
|
||||
log.debug("download_whole is True")
|
||||
beginning = None
|
||||
end = None
|
||||
for i in xrange(stream_position, last_blob_num + 1):
|
||||
if not i in blobs:
|
||||
if beginning is None:
|
||||
if i == 0:
|
||||
beginning = 'beginning'
|
||||
else:
|
||||
beginning = blobs[i-1].blob_hash
|
||||
else:
|
||||
if beginning is not None:
|
||||
end = blobs[i].blob_hash
|
||||
break
|
||||
if beginning is None:
|
||||
if final_blob_num is not None:
|
||||
log.debug("Discovery is finished. stream_position: %s, last_blob_num + 1: %s", str(stream_position),
|
||||
str(last_blob_num + 1))
|
||||
return None
|
||||
else:
|
||||
log.debug("Discovery is not finished. final blob num is unknown.")
|
||||
if last_blob_num != -1:
|
||||
return self.stream_hash, blobs[last_blob_num].blob_hash, None, None
|
||||
else:
|
||||
return self.stream_hash, 'beginning', None, None
|
||||
else:
|
||||
log.info("Discovery is not finished. Not all blobs are known.")
|
||||
return self.stream_hash, beginning, end, None
|
||||
|
||||
def _price_settled(self, protocol):
|
||||
if protocol in self._protocol_prices:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _update_local_score(self, peer, amount):
|
||||
self._peers[peer] += amount
|
||||
|
||||
def _reserve_points(self, peer, protocol, max_infos):
|
||||
assert protocol in self._protocol_prices
|
||||
point_amount = 1.0 * max_infos * self._protocol_prices[protocol] / 1000.0
|
||||
return self.wallet.reserve_points(peer, point_amount)
|
||||
|
||||
def _pay_or_cancel_payment(self, arg, protocol, reserved_points):
|
||||
if isinstance(arg, Failure) or arg == 0:
|
||||
self._cancel_points(reserved_points)
|
||||
else:
|
||||
self._pay_peer(protocol, arg, reserved_points)
|
||||
return arg
|
||||
|
||||
def _pay_peer(self, protocol, num_infos, reserved_points):
|
||||
assert num_infos != 0
|
||||
assert protocol in self._protocol_prices
|
||||
point_amount = 1.0 * num_infos * self._protocol_prices[protocol] / 1000.0
|
||||
self.wallet.send_points(reserved_points, point_amount)
|
||||
self.payment_rate_manager.record_points_paid(point_amount)
|
||||
|
||||
def _cancel_points(self, reserved_points):
|
||||
return self.wallet.cancel_point_reservation(reserved_points)
|
||||
|
||||
def _get_price_request(self, peer, protocol):
|
||||
self._protocol_prices[protocol] = self.payment_rate_manager.get_rate_live_blob_info(peer)
|
||||
request_dict = {'blob_info_payment_rate': self._protocol_prices[protocol]}
|
||||
request = ClientRequest(request_dict, 'blob_info_payment_rate')
|
||||
return request
|
||||
|
||||
def _handle_price_response(self, response_dict, peer, request, protocol):
|
||||
if not request.response_identifier in response_dict:
|
||||
return InvalidResponseError("response identifier not in response")
|
||||
assert protocol in self._protocol_prices
|
||||
response = response_dict[request.response_identifier]
|
||||
if response == "RATE_ACCEPTED":
|
||||
return True
|
||||
else:
|
||||
log.info("Rate offer has been rejected by %s", str(peer))
|
||||
del self._protocol_prices[protocol]
|
||||
self._price_disagreements.append(peer)
|
||||
return True
|
||||
|
||||
def _handle_discover_response(self, response_dict, peer, request):
|
||||
if not request.response_identifier in response_dict:
|
||||
return InvalidResponseError("response identifier not in response")
|
||||
response = response_dict[request.response_identifier]
|
||||
blob_infos = []
|
||||
if 'error' in response:
|
||||
if response['error'] == 'RATE_UNSET':
|
||||
return defer.succeed(0)
|
||||
else:
|
||||
return InvalidResponseError("Got an unknown error from the peer: %s" %
|
||||
(response['error'],))
|
||||
if not 'blob_infos' in response:
|
||||
return InvalidResponseError("Missing the required field 'blob_infos'")
|
||||
raw_blob_infos = response['blob_infos']
|
||||
log.info("Handling %s further blobs from %s", str(len(raw_blob_infos)), str(peer))
|
||||
log.debug("blobs: %s", str(raw_blob_infos))
|
||||
for raw_blob_info in raw_blob_infos:
|
||||
length = raw_blob_info['length']
|
||||
if length != 0:
|
||||
blob_hash = raw_blob_info['blob_hash']
|
||||
else:
|
||||
blob_hash = None
|
||||
num = raw_blob_info['blob_num']
|
||||
revision = raw_blob_info['revision']
|
||||
iv = raw_blob_info['iv']
|
||||
signature = raw_blob_info['signature']
|
||||
blob_info = LiveBlobInfo(blob_hash, num, length, iv, revision, signature)
|
||||
log.debug("Learned about a potential blob: %s", str(blob_hash))
|
||||
if self._verify_blob(blob_info):
|
||||
if blob_hash is None:
|
||||
log.info("Setting _final_blob_num to %s", str(num - 1))
|
||||
self._final_blob_num = num - 1
|
||||
else:
|
||||
blob_infos.append(blob_info)
|
||||
else:
|
||||
raise ValueError("Peer sent an invalid blob info")
|
||||
d = self.stream_info_manager.add_blobs_to_stream(self.stream_hash, blob_infos)
|
||||
|
||||
def add_blobs_to_download_manager():
|
||||
blob_nums = [b.blob_num for b in blob_infos]
|
||||
log.info("Adding the following blob nums to the download manager: %s", str(blob_nums))
|
||||
self.download_manager.add_blobs_to_download(blob_infos)
|
||||
|
||||
d.addCallback(lambda _: add_blobs_to_download_manager())
|
||||
|
||||
def pay_or_penalize_peer():
|
||||
if len(blob_infos):
|
||||
self._update_local_score(peer, len(blob_infos))
|
||||
peer.update_stats('downloaded_crypt_blob_infos', len(blob_infos))
|
||||
peer.update_score(len(blob_infos))
|
||||
else:
|
||||
self._update_local_score(peer, -.0001)
|
||||
return len(blob_infos)
|
||||
|
||||
d.addCallback(lambda _: pay_or_penalize_peer())
|
||||
|
||||
return d
|
||||
|
||||
def _verify_blob(self, blob):
|
||||
log.debug("Got an unverified blob to check:")
|
||||
log.debug("blob_hash: %s", blob.blob_hash)
|
||||
log.debug("blob_num: %s", str(blob.blob_num))
|
||||
log.debug("revision: %s", str(blob.revision))
|
||||
log.debug("iv: %s", blob.iv)
|
||||
log.debug("length: %s", str(blob.length))
|
||||
hashsum = get_lbry_hash_obj()
|
||||
hashsum.update(self.stream_hash)
|
||||
if blob.length != 0:
|
||||
hashsum.update(blob.blob_hash)
|
||||
hashsum.update(str(blob.blob_num))
|
||||
hashsum.update(str(blob.revision))
|
||||
hashsum.update(blob.iv)
|
||||
hashsum.update(str(blob.length))
|
||||
log.debug("hexdigest to be verified: %s", hashsum.hexdigest())
|
||||
if verify_signature(hashsum.digest(), blob.signature, self.stream_pub_key):
|
||||
log.debug("Blob info is valid")
|
||||
return True
|
||||
else:
|
||||
log.debug("The blob info is invalid")
|
||||
return False
|
||||
|
||||
def _request_failed(self, reason, peer):
|
||||
if reason.check(RequestCanceledError):
|
||||
return
|
||||
if reason.check(NoResponseError):
|
||||
self._incompatible_peers.append(peer)
|
||||
log.warning("Crypt stream info finder: a request failed. Reason: %s", reason.getErrorMessage())
|
||||
self._update_local_score(peer, -5.0)
|
||||
peer.update_score(-10.0)
|
||||
if reason.check(ConnectionClosedBeforeResponseError):
|
||||
return
|
||||
return reason
|
|
@ -1,74 +0,0 @@
|
|||
# pylint: skip-file
|
||||
from lbrynet.lbrylive.StreamDescriptor import LiveStreamType, LiveStreamDescriptorValidator
|
||||
from lbrynet.core.DownloadOption import DownloadOption, DownloadOptionChoice
|
||||
|
||||
|
||||
def add_live_stream_to_sd_identifier(sd_identifier, base_live_stream_payment_rate_manager):
|
||||
sd_identifier.add_stream_type(LiveStreamType, LiveStreamDescriptorValidator,
|
||||
LiveStreamOptions(base_live_stream_payment_rate_manager))
|
||||
|
||||
|
||||
class LiveStreamOptions(object):
|
||||
def __init__(self, base_live_stream_payment_rate_manager):
|
||||
self.base_live_stream_prm = base_live_stream_payment_rate_manager
|
||||
|
||||
def get_downloader_options(self, sd_validator, payment_rate_manager):
|
||||
prm = payment_rate_manager
|
||||
|
||||
def get_default_data_rate_description():
|
||||
if prm.min_blob_data_payment_rate is None:
|
||||
return "Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate)
|
||||
else:
|
||||
return "%f LBC/MB" % prm.min_blob_data_payment_rate
|
||||
|
||||
options = [
|
||||
DownloadOption(
|
||||
[
|
||||
DownloadOptionChoice(None,
|
||||
"No change",
|
||||
"No change"),
|
||||
DownloadOptionChoice(None,
|
||||
"Application default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate),
|
||||
"Default (%s LBC/MB)" % str(prm.base.min_blob_data_payment_rate)),
|
||||
DownloadOptionChoice(float,
|
||||
"Rate in LBC/MB",
|
||||
"Rate in LBC/MB")
|
||||
],
|
||||
"rate which will be paid for data",
|
||||
"data payment rate",
|
||||
prm.min_blob_data_payment_rate,
|
||||
get_default_data_rate_description()
|
||||
),
|
||||
DownloadOption(
|
||||
[
|
||||
DownloadOptionChoice(None,
|
||||
"No change",
|
||||
"No change"),
|
||||
DownloadOptionChoice(None,
|
||||
"Application default (%s LBC/MB)" % str(self.base_live_stream_prm.min_live_blob_info_payment_rate),
|
||||
"Default (%s LBC/MB)" % str(self.base_live_stream_prm.min_live_blob_info_payment_rate)),
|
||||
DownloadOptionChoice(float,
|
||||
"Rate in LBC/MB",
|
||||
"Rate in LBC/MB")
|
||||
],
|
||||
"rate which will be paid for metadata",
|
||||
"metadata payment rate",
|
||||
None,
|
||||
"Application default (%s LBC/MB)" % str(self.base_live_stream_prm.min_live_blob_info_payment_rate)
|
||||
),
|
||||
DownloadOption(
|
||||
[
|
||||
DownloadOptionChoice(True,
|
||||
"Allow reuploading data downloaded for this file",
|
||||
"Allow reuploading"),
|
||||
DownloadOptionChoice(False,
|
||||
"Disallow reuploading data downloaded for this file",
|
||||
"Disallow reuploading")
|
||||
],
|
||||
"allow reuploading data downloaded for this file",
|
||||
"allow upload",
|
||||
True,
|
||||
"Allow"
|
||||
),
|
||||
]
|
||||
return options
|
|
@ -1,91 +0,0 @@
|
|||
# pylint: skip-file
|
||||
import logging
|
||||
from lbrynet.core.client.StreamProgressManager import StreamProgressManager
|
||||
from twisted.internet import defer
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LiveStreamProgressManager(StreamProgressManager):
|
||||
def __init__(self, finished_callback, blob_manager, download_manager, delete_blob_after_finished=False,
|
||||
download_whole=True, max_before_skip_ahead=5):
|
||||
self.download_whole = download_whole
|
||||
self.max_before_skip_ahead = max_before_skip_ahead
|
||||
StreamProgressManager.__init__(self, finished_callback, blob_manager, download_manager,
|
||||
delete_blob_after_finished)
|
||||
|
||||
######### IProgressManager #########
|
||||
|
||||
def stream_position(self):
|
||||
blobs = self.download_manager.blobs
|
||||
if not blobs:
|
||||
return 0
|
||||
else:
|
||||
newest_known_blobnum = max(blobs.iterkeys())
|
||||
position = newest_known_blobnum
|
||||
oldest_relevant_blob_num = (max(0, newest_known_blobnum - self.max_before_skip_ahead + 1))
|
||||
for i in xrange(newest_known_blobnum, oldest_relevant_blob_num - 1, -1):
|
||||
if i in blobs and (not blobs[i].is_validated() and not i in self.provided_blob_nums):
|
||||
position = i
|
||||
return position
|
||||
|
||||
def needed_blobs(self):
|
||||
blobs = self.download_manager.blobs
|
||||
stream_position = self.stream_position()
|
||||
if blobs:
|
||||
newest_known_blobnum = max(blobs.iterkeys())
|
||||
else:
|
||||
newest_known_blobnum = -1
|
||||
blobs_needed = []
|
||||
for i in xrange(stream_position, newest_known_blobnum + 1):
|
||||
if i in blobs and not blobs[i].is_validated() and not i in self.provided_blob_nums:
|
||||
blobs_needed.append(blobs[i])
|
||||
return blobs_needed
|
||||
|
||||
######### internal #########
|
||||
|
||||
def _output_loop(self):
|
||||
|
||||
from twisted.internet import reactor
|
||||
|
||||
if self.stopped is True:
|
||||
if self.outputting_d is not None:
|
||||
self.outputting_d.callback(True)
|
||||
self.outputting_d = None
|
||||
return
|
||||
|
||||
blobs = self.download_manager.blobs
|
||||
log.info("In _output_loop. last_blob_outputted: %s", str(self.last_blob_outputted))
|
||||
if blobs:
|
||||
log.debug("Newest blob number: %s", str(max(blobs.iterkeys())))
|
||||
if self.outputting_d is None:
|
||||
self.outputting_d = defer.Deferred()
|
||||
|
||||
current_blob_num = self.last_blob_outputted + 1
|
||||
|
||||
def finished_outputting_blob():
|
||||
self.last_blob_outputted += 1
|
||||
final_blob_num = self.download_manager.final_blob_num()
|
||||
if final_blob_num is not None and final_blob_num == self.last_blob_outputted:
|
||||
self._finished_outputting()
|
||||
self.outputting_d.callback(True)
|
||||
self.outputting_d = None
|
||||
else:
|
||||
reactor.callLater(0, self._output_loop)
|
||||
|
||||
if current_blob_num in blobs and blobs[current_blob_num].is_validated():
|
||||
log.info("Outputting blob %s", str(current_blob_num))
|
||||
self.provided_blob_nums.append(current_blob_num)
|
||||
d = self.download_manager.handle_blob(current_blob_num)
|
||||
d.addCallback(lambda _: finished_outputting_blob())
|
||||
d.addCallback(lambda _: self._finished_with_blob(current_blob_num))
|
||||
elif blobs and max(blobs.iterkeys()) > self.last_blob_outputted + self.max_before_skip_ahead - 1:
|
||||
self.last_blob_outputted += 1
|
||||
log.info("Skipping blob number %s due to knowing about blob number %s",
|
||||
str(self.last_blob_outputted), str(max(blobs.iterkeys())))
|
||||
self._finished_with_blob(current_blob_num)
|
||||
reactor.callLater(0, self._output_loop)
|
||||
else:
|
||||
self.outputting_d.callback(True)
|
||||
self.outputting_d = None
|
|
@ -1,184 +0,0 @@
|
|||
# pylint: skip-file
|
||||
import logging
|
||||
from twisted.internet import defer
|
||||
from zope.interface import implements
|
||||
from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CryptBlobInfoQueryHandlerFactory(object):
|
||||
implements(IQueryHandlerFactory)
|
||||
|
||||
def __init__(self, stream_info_manager, wallet, payment_rate_manager):
|
||||
self.stream_info_manager = stream_info_manager
|
||||
self.wallet = wallet
|
||||
self.payment_rate_manager = payment_rate_manager
|
||||
|
||||
######### IQueryHandlerFactory #########
|
||||
|
||||
def build_query_handler(self):
|
||||
q_h = CryptBlobInfoQueryHandler(self.stream_info_manager, self.wallet, self.payment_rate_manager)
|
||||
return q_h
|
||||
|
||||
def get_primary_query_identifier(self):
|
||||
return 'further_blobs'
|
||||
|
||||
def get_description(self):
|
||||
return ("Stream Blob Information - blob hashes that are associated with streams,"
|
||||
" and the blobs' associated metadata")
|
||||
|
||||
|
||||
class CryptBlobInfoQueryHandler(object):
|
||||
implements(IQueryHandler)
|
||||
|
||||
def __init__(self, stream_info_manager, wallet, payment_rate_manager):
|
||||
self.stream_info_manager = stream_info_manager
|
||||
self.wallet = wallet
|
||||
self.payment_rate_manager = payment_rate_manager
|
||||
self.query_identifiers = ['blob_info_payment_rate', 'further_blobs']
|
||||
self.blob_info_payment_rate = None
|
||||
self.peer = None
|
||||
|
||||
######### IQueryHandler #########
|
||||
|
||||
def register_with_request_handler(self, request_handler, peer):
|
||||
self.peer = peer
|
||||
request_handler.register_query_handler(self, self.query_identifiers)
|
||||
|
||||
def handle_queries(self, queries):
|
||||
response = {}
|
||||
|
||||
if self.query_identifiers[0] in queries:
|
||||
if not self.handle_blob_info_payment_rate(queries[self.query_identifiers[0]]):
|
||||
return defer.succeed({'blob_info_payment_rate': 'RATE_TOO_LOW'})
|
||||
else:
|
||||
response['blob_info_payment_rate'] = "RATE_ACCEPTED"
|
||||
|
||||
if self.query_identifiers[1] in queries:
|
||||
further_blobs_request = queries[self.query_identifiers[1]]
|
||||
log.debug("Received the client's request for additional blob information")
|
||||
|
||||
if self.blob_info_payment_rate is None:
|
||||
response['further_blobs'] = {'error': 'RATE_UNSET'}
|
||||
return defer.succeed(response)
|
||||
|
||||
def count_and_charge(blob_infos):
|
||||
if len(blob_infos) != 0:
|
||||
log.info("Responding with %s infos", str(len(blob_infos)))
|
||||
expected_payment = 1.0 * len(blob_infos) * self.blob_info_payment_rate / 1000.0
|
||||
self.wallet.add_expected_payment(self.peer, expected_payment)
|
||||
self.peer.update_stats('uploaded_crypt_blob_infos', len(blob_infos))
|
||||
return blob_infos
|
||||
|
||||
def set_field(further_blobs):
|
||||
response['further_blobs'] = {'blob_infos': further_blobs}
|
||||
return response
|
||||
|
||||
def get_further_blobs(stream_hash):
|
||||
if stream_hash is None:
|
||||
response['further_blobs'] = {'error': 'REFERENCE_HASH_UNKNOWN'}
|
||||
return defer.succeed(response)
|
||||
start = further_blobs_request.get("start")
|
||||
end = further_blobs_request.get("end")
|
||||
count = further_blobs_request.get("count")
|
||||
if count is not None:
|
||||
try:
|
||||
count = int(count)
|
||||
except ValueError:
|
||||
response['further_blobs'] = {'error': 'COUNT_NON_INTEGER'}
|
||||
return defer.succeed(response)
|
||||
|
||||
if len([x for x in [start, end, count] if x is not None]) < 2:
|
||||
response['further_blobs'] = {'error': 'TOO_FEW_PARAMETERS'}
|
||||
return defer.succeed(response)
|
||||
|
||||
inner_d = self.get_further_blobs(stream_hash, start, end, count)
|
||||
|
||||
inner_d.addCallback(count_and_charge)
|
||||
inner_d.addCallback(self.format_blob_infos)
|
||||
inner_d.addCallback(set_field)
|
||||
return inner_d
|
||||
|
||||
if 'reference' in further_blobs_request:
|
||||
d = self.get_stream_hash_from_reference(further_blobs_request['reference'])
|
||||
d.addCallback(get_further_blobs)
|
||||
return d
|
||||
else:
|
||||
response['further_blobs'] = {'error': 'NO_REFERENCE_SENT'}
|
||||
return defer.succeed(response)
|
||||
else:
|
||||
return defer.succeed({})
|
||||
|
||||
######### internal #########
|
||||
|
||||
def handle_blob_info_payment_rate(self, requested_payment_rate):
|
||||
if not self.payment_rate_manager.accept_rate_live_blob_info(self.peer, requested_payment_rate):
|
||||
return False
|
||||
else:
|
||||
self.blob_info_payment_rate = requested_payment_rate
|
||||
return True
|
||||
|
||||
def format_blob_infos(self, blobs):
|
||||
blob_infos = []
|
||||
for blob_hash, blob_num, revision, iv, length, signature in blobs:
|
||||
blob_info = {}
|
||||
if length != 0:
|
||||
blob_info['blob_hash'] = blob_hash
|
||||
blob_info['blob_num'] = blob_num
|
||||
blob_info['revision'] = revision
|
||||
blob_info['iv'] = iv
|
||||
blob_info['length'] = length
|
||||
blob_info['signature'] = signature
|
||||
blob_infos.append(blob_info)
|
||||
return blob_infos
|
||||
|
||||
def get_stream_hash_from_reference(self, reference):
|
||||
d = self.stream_info_manager.check_if_stream_exists(reference)
|
||||
|
||||
def check_if_stream_found(result):
|
||||
if result is True:
|
||||
return reference
|
||||
else:
|
||||
return self.stream_info_manager.get_stream_of_blob(reference)
|
||||
|
||||
d.addCallback(check_if_stream_found)
|
||||
return d
|
||||
|
||||
def get_further_blobs(self, stream_hash, start, end, count):
|
||||
ds = []
|
||||
if start is not None and start != "beginning":
|
||||
ds.append(self.stream_info_manager.get_stream_of_blob(start))
|
||||
if end is not None and end != 'end':
|
||||
ds.append(self.stream_info_manager.get_stream_of_blob(end))
|
||||
dl = defer.DeferredList(ds, fireOnOneErrback=True)
|
||||
|
||||
def ensure_streams_match(results):
|
||||
for success, stream_of_blob in results:
|
||||
if stream_of_blob != stream_hash:
|
||||
raise ValueError("Blob does not match stream")
|
||||
return True
|
||||
|
||||
def get_blob_infos():
|
||||
reverse = False
|
||||
count_to_use = count
|
||||
if start is None:
|
||||
reverse = True
|
||||
elif end is not None and count_to_use is not None and count_to_use < 0:
|
||||
reverse = True
|
||||
if count_to_use is not None and count_to_use < 0:
|
||||
count_to_use *= -1
|
||||
if start == "beginning" or start is None:
|
||||
s = None
|
||||
else:
|
||||
s = start
|
||||
if end == "end" or end is None:
|
||||
e = None
|
||||
else:
|
||||
e = end
|
||||
return self.stream_info_manager.get_blobs_for_stream(stream_hash, s, e, count_to_use, reverse)
|
||||
|
||||
dl.addCallback(ensure_streams_match)
|
||||
dl.addCallback(lambda _: get_blob_infos())
|
||||
return dl
|
|
@ -12,15 +12,9 @@ from Crypto.PublicKey import RSA
|
|||
from Crypto import Random
|
||||
from Crypto.Hash import MD5
|
||||
from lbrynet import conf
|
||||
from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator
|
||||
from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager
|
||||
from lbrynet.lbrylive.LiveStreamMetadataManager import TempLiveStreamMetadataManager
|
||||
from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager, \
|
||||
DBEncryptedFileMetadataManager
|
||||
from lbrynet import analytics
|
||||
from lbrynet.lbrylive.LiveStreamCreator import FileLiveStreamCreator
|
||||
from lbrynet.lbrylive.LiveStreamMetadataManager import DBLiveStreamMetadataManager
|
||||
from lbrynet.lbrylive.LiveStreamMetadataManager import TempLiveStreamMetadataManager
|
||||
from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager
|
||||
from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
|
||||
from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager
|
||||
|
@ -44,10 +38,6 @@ from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter
|
|||
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
|
||||
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
|
||||
|
||||
from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory
|
||||
from lbrynet.lbrylive.client.LiveStreamOptions import add_live_stream_to_sd_identifier
|
||||
from lbrynet.lbrylive.client.LiveStreamDownloader import add_full_live_stream_downloader_to_sd_identifier
|
||||
|
||||
from tests import mocks
|
||||
|
||||
|
||||
|
@ -308,132 +298,6 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event,
|
|||
reactor.run()
|
||||
|
||||
|
||||
def start_live_server(sd_hash_queue, kill_event, dead_event):
|
||||
use_epoll_on_linux()
|
||||
from twisted.internet import reactor
|
||||
|
||||
logging.debug("In start_server.")
|
||||
|
||||
Random.atfork()
|
||||
|
||||
r = random.Random()
|
||||
r.seed("start_live_server")
|
||||
|
||||
wallet = FakeWallet()
|
||||
peer_manager = PeerManager()
|
||||
peer_finder = FakePeerFinder(5553, peer_manager, 1)
|
||||
hash_announcer = FakeAnnouncer()
|
||||
rate_limiter = DummyRateLimiter()
|
||||
sd_identifier = StreamDescriptorIdentifier()
|
||||
|
||||
db_dir = "server"
|
||||
os.mkdir(db_dir)
|
||||
|
||||
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553,
|
||||
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker,
|
||||
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1])
|
||||
stream_info_manager = DBLiveStreamMetadataManager(session.db_dir, hash_announcer)
|
||||
|
||||
logging.debug("Created the session")
|
||||
|
||||
server_port = []
|
||||
|
||||
def start_listening():
|
||||
logging.debug("Starting the server protocol")
|
||||
query_handler_factories = {
|
||||
1: CryptBlobInfoQueryHandlerFactory(stream_info_manager, session.wallet,
|
||||
session.payment_rate_manager),
|
||||
2: BlobRequestHandlerFactory(session.blob_manager, session.wallet,
|
||||
session.payment_rate_manager,
|
||||
analytics.Track()),
|
||||
3: session.wallet.get_wallet_info_query_handler_factory()
|
||||
}
|
||||
|
||||
server_factory = ServerProtocolFactory(session.rate_limiter,
|
||||
query_handler_factories,
|
||||
session.peer_manager)
|
||||
server_port.append(reactor.listenTCP(5553, server_factory))
|
||||
logging.debug("Server protocol has started")
|
||||
|
||||
def create_stream():
|
||||
logging.debug("Making the live stream")
|
||||
test_file = GenFile(5209343, b''.join([chr(i + 2) for i in xrange(0, 64, 6)]))
|
||||
stream_creator_helper = FileLiveStreamCreator(session.blob_manager, stream_info_manager,
|
||||
"test_file", test_file)
|
||||
d = stream_creator_helper.setup()
|
||||
d.addCallback(lambda _: stream_creator_helper.publish_stream_descriptor())
|
||||
d.addCallback(put_sd_hash_on_queue)
|
||||
d.addCallback(lambda _: stream_creator_helper.start_streaming())
|
||||
return d
|
||||
|
||||
def put_sd_hash_on_queue(sd_hash):
|
||||
logging.debug("Telling the client to start running. Stream hash: %s", str(sd_hash))
|
||||
sd_hash_queue.put(sd_hash)
|
||||
logging.debug("sd hash has been added to the queue")
|
||||
|
||||
def set_dead_event():
|
||||
logging.debug("Setting the dead event")
|
||||
dead_event.set()
|
||||
|
||||
def print_error(err):
|
||||
logging.debug("An error occurred during shutdown: %s", err.getTraceback())
|
||||
|
||||
def stop_reactor():
|
||||
logging.debug("Server is stopping its reactor")
|
||||
reactor.stop()
|
||||
|
||||
def shut_down(arg):
|
||||
logging.debug("Shutting down")
|
||||
if isinstance(arg, Failure):
|
||||
logging.error("Shut down is due to an error: %s", arg.getTraceback())
|
||||
d = defer.maybeDeferred(server_port[0].stopListening)
|
||||
d.addErrback(print_error)
|
||||
d.addCallback(lambda _: session.shut_down())
|
||||
d.addCallback(lambda _: stream_info_manager.stop())
|
||||
d.addErrback(print_error)
|
||||
d.addCallback(lambda _: set_dead_event())
|
||||
d.addErrback(print_error)
|
||||
d.addCallback(lambda _: reactor.callLater(0, stop_reactor))
|
||||
d.addErrback(print_error)
|
||||
return d
|
||||
|
||||
def wait_for_kill_event():
|
||||
|
||||
d = defer.Deferred()
|
||||
|
||||
def check_for_kill():
|
||||
if kill_event.is_set():
|
||||
logging.debug("Kill event has been found set")
|
||||
kill_check.stop()
|
||||
d.callback(True)
|
||||
|
||||
kill_check = task.LoopingCall(check_for_kill)
|
||||
kill_check.start(1.0)
|
||||
|
||||
return d
|
||||
|
||||
def enable_live_stream():
|
||||
add_live_stream_to_sd_identifier(sd_identifier, session.base_payment_rate_manager)
|
||||
add_full_live_stream_downloader_to_sd_identifier(session, stream_info_manager, sd_identifier,
|
||||
session.base_payment_rate_manager)
|
||||
|
||||
def run_server():
|
||||
d = session.setup()
|
||||
d.addCallback(lambda _: stream_info_manager.setup())
|
||||
d.addCallback(lambda _: enable_live_stream())
|
||||
d.addCallback(lambda _: start_listening())
|
||||
d.addCallback(lambda _: create_stream())
|
||||
d.addCallback(lambda _: wait_for_kill_event())
|
||||
d.addBoth(shut_down)
|
||||
return d
|
||||
|
||||
reactor.callLater(1, run_server)
|
||||
if not reactor.running:
|
||||
reactor.run()
|
||||
|
||||
|
||||
def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_generous=False):
|
||||
use_epoll_on_linux()
|
||||
from twisted.internet import reactor
|
||||
|
@ -704,101 +568,6 @@ class TestTransfer(TestCase):
|
|||
|
||||
return d
|
||||
|
||||
@unittest.skip("Sadly skipping failing test instead of fixing it")
|
||||
def test_live_transfer(self):
|
||||
|
||||
sd_hash_queue = Queue()
|
||||
kill_event = Event()
|
||||
dead_event = Event()
|
||||
server_args = (sd_hash_queue, kill_event, dead_event)
|
||||
server = Process(target=start_live_server, args=server_args)
|
||||
server.start()
|
||||
self.server_processes.append(server)
|
||||
|
||||
wallet = FakeWallet()
|
||||
peer_manager = PeerManager()
|
||||
peer_finder = FakePeerFinder(5553, peer_manager, 1)
|
||||
hash_announcer = FakeAnnouncer()
|
||||
rate_limiter = DummyRateLimiter()
|
||||
sd_identifier = StreamDescriptorIdentifier()
|
||||
|
||||
db_dir = "client"
|
||||
os.mkdir(db_dir)
|
||||
|
||||
self.session = Session(
|
||||
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, lbryid="abcd",
|
||||
peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=None,
|
||||
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
|
||||
blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node
|
||||
)
|
||||
|
||||
self.stream_info_manager = TempLiveStreamMetadataManager(hash_announcer)
|
||||
|
||||
d = self.wait_for_hash_from_queue(sd_hash_queue)
|
||||
|
||||
def create_downloader(metadata, prm):
|
||||
info_validator = metadata.validator
|
||||
options = metadata.options
|
||||
factories = metadata.factories
|
||||
chosen_options = [
|
||||
o.default_value for o in options.get_downloader_options(info_validator, prm)]
|
||||
return factories[0].make_downloader(metadata, chosen_options, prm)
|
||||
|
||||
def start_lbry_file(lbry_file):
|
||||
lbry_file = lbry_file
|
||||
return lbry_file.start()
|
||||
|
||||
def download_stream(sd_blob_hash):
|
||||
prm = self.session.payment_rate_manager
|
||||
d = download_sd_blob(self.session, sd_blob_hash, prm)
|
||||
d.addCallback(sd_identifier.get_metadata_for_sd_blob)
|
||||
d.addCallback(create_downloader, prm)
|
||||
d.addCallback(start_lbry_file)
|
||||
return d
|
||||
|
||||
def do_download(sd_blob_hash):
|
||||
logging.debug("Starting the download")
|
||||
|
||||
d = self.session.setup()
|
||||
d.addCallback(lambda _: enable_live_stream())
|
||||
d.addCallback(lambda _: download_stream(sd_blob_hash))
|
||||
return d
|
||||
|
||||
def enable_live_stream():
|
||||
add_live_stream_to_sd_identifier(sd_identifier, self.session.payment_rate_manager)
|
||||
add_full_live_stream_downloader_to_sd_identifier(self.session, self.stream_info_manager,
|
||||
sd_identifier,
|
||||
self.session.payment_rate_manager)
|
||||
|
||||
d.addCallback(do_download)
|
||||
|
||||
def check_md5_sum():
|
||||
f = open('test_file')
|
||||
hashsum = MD5.new()
|
||||
hashsum.update(f.read())
|
||||
self.assertEqual(hashsum.hexdigest(), "215b177db8eed86d028b37e5cbad55c7")
|
||||
|
||||
d.addCallback(lambda _: check_md5_sum())
|
||||
|
||||
def stop(arg):
|
||||
if isinstance(arg, Failure):
|
||||
logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback())
|
||||
else:
|
||||
logging.debug("Client is stopping normally.")
|
||||
kill_event.set()
|
||||
logging.debug("Set the kill event")
|
||||
d = self.wait_for_event(dead_event, 15)
|
||||
|
||||
def print_shutting_down():
|
||||
logging.info("Client is shutting down")
|
||||
|
||||
d.addCallback(lambda _: print_shutting_down())
|
||||
d.addCallback(lambda _: arg)
|
||||
return d
|
||||
|
||||
d.addBoth(stop)
|
||||
return d
|
||||
|
||||
def test_last_blob_retrieval(self):
|
||||
kill_event = Event()
|
||||
dead_event_1 = Event()
|
||||
|
|
Loading…
Reference in a new issue