lbry-sdk/lbrynet/core/StreamCreator.py

77 lines
2.2 KiB
Python
Raw Normal View History

2015-08-20 17:27:15 +02:00
import logging
from twisted.internet import interfaces, defer
from zope.interface import implements
log = logging.getLogger(__name__)
2015-08-20 17:27:15 +02:00
class StreamCreator(object):
"""Classes which derive from this class create a 'stream', which can be any
collection of associated blobs and associated metadata. These classes
use the IConsumer interface to get data from an IProducer and transform
the data into a 'stream'"""
implements(interfaces.IConsumer)
def __init__(self, name):
"""
@param name: the name of the stream
"""
self.name = name
self.stopped = True
self.producer = None
self.streaming = None
self.blob_count = -1
self.current_blob = None
self.finished_deferreds = []
def _blob_finished(self, blob_info):
pass
def registerProducer(self, producer, streaming):
from twisted.internet import reactor
self.producer = producer
self.streaming = streaming
self.stopped = False
if streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
self.stopped = True
self.producer = None
def stop(self):
"""Stop creating the stream. Create the terminating zero-length blob."""
log.debug("stop has been called for StreamCreator")
2015-08-20 17:27:15 +02:00
self.stopped = True
if self.current_blob is not None:
current_blob = self.current_blob
d = current_blob.close()
d.addCallback(self._blob_finished)
self.finished_deferreds.append(d)
self.current_blob = None
self._finalize()
dl = defer.DeferredList(self.finished_deferreds)
dl.addCallback(lambda _: self._finished())
return dl
def _finalize(self):
pass
def _finished(self):
pass
2017-01-20 21:50:10 +01:00
# TODO: move the stream creation process to its own thread and
# remove the reactor from this process.
2015-08-20 17:27:15 +02:00
def write(self, data):
from twisted.internet import reactor
self._write(data)
if self.stopped is False and self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
def _write(self, data):
2016-11-04 17:39:52 +01:00
pass