From 487f2490ab520c136e558cea30fec847471cd2b6 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 7 Sep 2017 15:38:47 -0400 Subject: [PATCH] simplify CryptStreamCreator --- lbrynet/core/StreamCreator.py | 81 ----------------------- lbrynet/cryptstream/CryptStreamCreator.py | 70 ++++++++++++++++++-- 2 files changed, 63 insertions(+), 88 deletions(-) delete mode 100644 lbrynet/core/StreamCreator.py diff --git a/lbrynet/core/StreamCreator.py b/lbrynet/core/StreamCreator.py deleted file mode 100644 index 5093651be..000000000 --- a/lbrynet/core/StreamCreator.py +++ /dev/null @@ -1,81 +0,0 @@ -import logging -from twisted.internet import interfaces, defer -from zope.interface import implements - - -log = logging.getLogger(__name__) - - -class StreamCreator(object): - """Classes which derive from this class create a 'stream', which can be any - collection of associated blobs and associated metadata. These classes - use the IConsumer interface to get data from an IProducer and transform - the data into a 'stream'""" - - implements(interfaces.IConsumer) - - def __init__(self, name): - """ - @param name: the name of the stream - """ - self.name = name - self.stopped = True - self.producer = None - self.streaming = None - self.blob_count = -1 - self.current_blob = None - self.finished_deferreds = [] - - def _blob_finished(self, blob_info): - raise NotImplementedError() - - def registerProducer(self, producer, streaming): - - from twisted.internet import reactor - - self.producer = producer - self.streaming = streaming - self.stopped = False - if streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - def unregisterProducer(self): - self.stopped = True - self.producer = None - - def stop(self): - """Stop creating the stream. Create the terminating zero-length blob.""" - log.debug("stop has been called for StreamCreator") - self.stopped = True - if self.current_blob is not None: - current_blob = self.current_blob - d = current_blob.close() - d.addCallback(self._blob_finished) - d.addErrback(self._error) - self.finished_deferreds.append(d) - self.current_blob = None - self._finalize() - dl = defer.DeferredList(self.finished_deferreds) - dl.addCallback(lambda _: self._finished()) - dl.addErrback(self._error) - return dl - - def _error(self, error): - log.error(error) - - def _finalize(self): - pass - - def _finished(self): - pass - - # TODO: move the stream creation process to its own thread and - # remove the reactor from this process. - def write(self, data): - from twisted.internet import reactor - self._write(data) - if self.stopped is False and self.streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - def _write(self, data): - pass diff --git a/lbrynet/cryptstream/CryptStreamCreator.py b/lbrynet/cryptstream/CryptStreamCreator.py index 446c29951..9c94ad476 100644 --- a/lbrynet/cryptstream/CryptStreamCreator.py +++ b/lbrynet/cryptstream/CryptStreamCreator.py @@ -3,25 +3,27 @@ Utility for creating Crypt Streams, which are encrypted blobs and associated met """ import logging - +from twisted.internet import interfaces, defer +from zope.interface import implements from Crypto import Random from Crypto.Cipher import AES - -from twisted.internet import defer -from lbrynet.core.StreamCreator import StreamCreator from lbrynet.cryptstream.CryptBlob import CryptStreamBlobMaker log = logging.getLogger(__name__) -class CryptStreamCreator(StreamCreator): - """Create a new stream with blobs encrypted by a symmetric cipher. +class CryptStreamCreator(object): + """ + Create a new stream with blobs encrypted by a symmetric cipher. Each blob is encrypted with the same key, but each blob has its own initialization vector which is associated with the blob when the blob is associated with the stream. """ + + implements(interfaces.IConsumer) + def __init__(self, blob_manager, name=None, key=None, iv_generator=None): """@param blob_manager: Object that stores and provides access to blobs. @type blob_manager: BlobManager @@ -39,14 +41,59 @@ class CryptStreamCreator(StreamCreator): @return: None """ - StreamCreator.__init__(self, name) self.blob_manager = blob_manager + self.name = name self.key = key if iv_generator is None: self.iv_generator = self.random_iv_generator() else: self.iv_generator = iv_generator + self.stopped = True + self.producer = None + self.streaming = None + self.blob_count = -1 + self.current_blob = None + self.finished_deferreds = [] + + def registerProducer(self, producer, streaming): + from twisted.internet import reactor + + self.producer = producer + self.streaming = streaming + self.stopped = False + if streaming is False: + reactor.callLater(0, self.producer.resumeProducing) + + def unregisterProducer(self): + self.stopped = True + self.producer = None + + def stop(self): + """Stop creating the stream. Create the terminating zero-length blob.""" + log.debug("stop has been called for StreamCreator") + self.stopped = True + if self.current_blob is not None: + current_blob = self.current_blob + d = current_blob.close() + d.addCallback(self._blob_finished) + d.addErrback(self._error) + self.finished_deferreds.append(d) + self.current_blob = None + self._finalize() + dl = defer.DeferredList(self.finished_deferreds) + dl.addCallback(lambda _: self._finished()) + dl.addErrback(self._error) + return dl + + # TODO: move the stream creation process to its own thread and + # remove the reactor from this process. + def write(self, data): + from twisted.internet import reactor + self._write(data) + if self.stopped is False and self.streaming is False: + reactor.callLater(0, self.producer.resumeProducing) + @staticmethod def random_iv_generator(): while 1: @@ -96,3 +143,12 @@ class CryptStreamCreator(StreamCreator): def _get_blob_maker(self, iv, blob_creator): return CryptStreamBlobMaker(self.key, iv, self.blob_count, blob_creator) + + def _error(self, error): + log.error(error) + + def _finished(self): + raise NotImplementedError() + + def _blob_finished(self, blob_info): + raise NotImplementedError()