simplify CryptStreamCreator
This commit is contained in:
parent
90bce0b375
commit
487f2490ab
2 changed files with 63 additions and 88 deletions
|
@ -1,81 +0,0 @@
|
||||||
import logging
|
|
||||||
from twisted.internet import interfaces, defer
|
|
||||||
from zope.interface import implements
|
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class StreamCreator(object):
|
|
||||||
"""Classes which derive from this class create a 'stream', which can be any
|
|
||||||
collection of associated blobs and associated metadata. These classes
|
|
||||||
use the IConsumer interface to get data from an IProducer and transform
|
|
||||||
the data into a 'stream'"""
|
|
||||||
|
|
||||||
implements(interfaces.IConsumer)
|
|
||||||
|
|
||||||
def __init__(self, name):
|
|
||||||
"""
|
|
||||||
@param name: the name of the stream
|
|
||||||
"""
|
|
||||||
self.name = name
|
|
||||||
self.stopped = True
|
|
||||||
self.producer = None
|
|
||||||
self.streaming = None
|
|
||||||
self.blob_count = -1
|
|
||||||
self.current_blob = None
|
|
||||||
self.finished_deferreds = []
|
|
||||||
|
|
||||||
def _blob_finished(self, blob_info):
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def registerProducer(self, producer, streaming):
|
|
||||||
|
|
||||||
from twisted.internet import reactor
|
|
||||||
|
|
||||||
self.producer = producer
|
|
||||||
self.streaming = streaming
|
|
||||||
self.stopped = False
|
|
||||||
if streaming is False:
|
|
||||||
reactor.callLater(0, self.producer.resumeProducing)
|
|
||||||
|
|
||||||
def unregisterProducer(self):
|
|
||||||
self.stopped = True
|
|
||||||
self.producer = None
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
"""Stop creating the stream. Create the terminating zero-length blob."""
|
|
||||||
log.debug("stop has been called for StreamCreator")
|
|
||||||
self.stopped = True
|
|
||||||
if self.current_blob is not None:
|
|
||||||
current_blob = self.current_blob
|
|
||||||
d = current_blob.close()
|
|
||||||
d.addCallback(self._blob_finished)
|
|
||||||
d.addErrback(self._error)
|
|
||||||
self.finished_deferreds.append(d)
|
|
||||||
self.current_blob = None
|
|
||||||
self._finalize()
|
|
||||||
dl = defer.DeferredList(self.finished_deferreds)
|
|
||||||
dl.addCallback(lambda _: self._finished())
|
|
||||||
dl.addErrback(self._error)
|
|
||||||
return dl
|
|
||||||
|
|
||||||
def _error(self, error):
|
|
||||||
log.error(error)
|
|
||||||
|
|
||||||
def _finalize(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _finished(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
# TODO: move the stream creation process to its own thread and
|
|
||||||
# remove the reactor from this process.
|
|
||||||
def write(self, data):
|
|
||||||
from twisted.internet import reactor
|
|
||||||
self._write(data)
|
|
||||||
if self.stopped is False and self.streaming is False:
|
|
||||||
reactor.callLater(0, self.producer.resumeProducing)
|
|
||||||
|
|
||||||
def _write(self, data):
|
|
||||||
pass
|
|
|
@ -3,25 +3,27 @@ Utility for creating Crypt Streams, which are encrypted blobs and associated met
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from twisted.internet import interfaces, defer
|
||||||
|
from zope.interface import implements
|
||||||
from Crypto import Random
|
from Crypto import Random
|
||||||
from Crypto.Cipher import AES
|
from Crypto.Cipher import AES
|
||||||
|
|
||||||
from twisted.internet import defer
|
|
||||||
from lbrynet.core.StreamCreator import StreamCreator
|
|
||||||
from lbrynet.cryptstream.CryptBlob import CryptStreamBlobMaker
|
from lbrynet.cryptstream.CryptBlob import CryptStreamBlobMaker
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class CryptStreamCreator(StreamCreator):
|
class CryptStreamCreator(object):
|
||||||
"""Create a new stream with blobs encrypted by a symmetric cipher.
|
"""
|
||||||
|
Create a new stream with blobs encrypted by a symmetric cipher.
|
||||||
|
|
||||||
Each blob is encrypted with the same key, but each blob has its
|
Each blob is encrypted with the same key, but each blob has its
|
||||||
own initialization vector which is associated with the blob when
|
own initialization vector which is associated with the blob when
|
||||||
the blob is associated with the stream.
|
the blob is associated with the stream.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
implements(interfaces.IConsumer)
|
||||||
|
|
||||||
def __init__(self, blob_manager, name=None, key=None, iv_generator=None):
|
def __init__(self, blob_manager, name=None, key=None, iv_generator=None):
|
||||||
"""@param blob_manager: Object that stores and provides access to blobs.
|
"""@param blob_manager: Object that stores and provides access to blobs.
|
||||||
@type blob_manager: BlobManager
|
@type blob_manager: BlobManager
|
||||||
|
@ -39,14 +41,59 @@ class CryptStreamCreator(StreamCreator):
|
||||||
|
|
||||||
@return: None
|
@return: None
|
||||||
"""
|
"""
|
||||||
StreamCreator.__init__(self, name)
|
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
|
self.name = name
|
||||||
self.key = key
|
self.key = key
|
||||||
if iv_generator is None:
|
if iv_generator is None:
|
||||||
self.iv_generator = self.random_iv_generator()
|
self.iv_generator = self.random_iv_generator()
|
||||||
else:
|
else:
|
||||||
self.iv_generator = iv_generator
|
self.iv_generator = iv_generator
|
||||||
|
|
||||||
|
self.stopped = True
|
||||||
|
self.producer = None
|
||||||
|
self.streaming = None
|
||||||
|
self.blob_count = -1
|
||||||
|
self.current_blob = None
|
||||||
|
self.finished_deferreds = []
|
||||||
|
|
||||||
|
def registerProducer(self, producer, streaming):
|
||||||
|
from twisted.internet import reactor
|
||||||
|
|
||||||
|
self.producer = producer
|
||||||
|
self.streaming = streaming
|
||||||
|
self.stopped = False
|
||||||
|
if streaming is False:
|
||||||
|
reactor.callLater(0, self.producer.resumeProducing)
|
||||||
|
|
||||||
|
def unregisterProducer(self):
|
||||||
|
self.stopped = True
|
||||||
|
self.producer = None
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stop creating the stream. Create the terminating zero-length blob."""
|
||||||
|
log.debug("stop has been called for StreamCreator")
|
||||||
|
self.stopped = True
|
||||||
|
if self.current_blob is not None:
|
||||||
|
current_blob = self.current_blob
|
||||||
|
d = current_blob.close()
|
||||||
|
d.addCallback(self._blob_finished)
|
||||||
|
d.addErrback(self._error)
|
||||||
|
self.finished_deferreds.append(d)
|
||||||
|
self.current_blob = None
|
||||||
|
self._finalize()
|
||||||
|
dl = defer.DeferredList(self.finished_deferreds)
|
||||||
|
dl.addCallback(lambda _: self._finished())
|
||||||
|
dl.addErrback(self._error)
|
||||||
|
return dl
|
||||||
|
|
||||||
|
# TODO: move the stream creation process to its own thread and
|
||||||
|
# remove the reactor from this process.
|
||||||
|
def write(self, data):
|
||||||
|
from twisted.internet import reactor
|
||||||
|
self._write(data)
|
||||||
|
if self.stopped is False and self.streaming is False:
|
||||||
|
reactor.callLater(0, self.producer.resumeProducing)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def random_iv_generator():
|
def random_iv_generator():
|
||||||
while 1:
|
while 1:
|
||||||
|
@ -96,3 +143,12 @@ class CryptStreamCreator(StreamCreator):
|
||||||
|
|
||||||
def _get_blob_maker(self, iv, blob_creator):
|
def _get_blob_maker(self, iv, blob_creator):
|
||||||
return CryptStreamBlobMaker(self.key, iv, self.blob_count, blob_creator)
|
return CryptStreamBlobMaker(self.key, iv, self.blob_count, blob_creator)
|
||||||
|
|
||||||
|
def _error(self, error):
|
||||||
|
log.error(error)
|
||||||
|
|
||||||
|
def _finished(self):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def _blob_finished(self, blob_info):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
Loading…
Reference in a new issue