forked from LBRYCommunity/lbry-sdk
90 lines
No EOL
3.8 KiB
Python
90 lines
No EOL
3.8 KiB
Python
import logging
|
|
from lbrynet.core.client.StreamProgressManager import StreamProgressManager
|
|
from twisted.internet import defer
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class LiveStreamProgressManager(StreamProgressManager):
|
|
def __init__(self, finished_callback, blob_manager, download_manager, delete_blob_after_finished=False,
|
|
download_whole=True, max_before_skip_ahead=5):
|
|
self.download_whole = download_whole
|
|
self.max_before_skip_ahead = max_before_skip_ahead
|
|
StreamProgressManager.__init__(self, finished_callback, blob_manager, download_manager,
|
|
delete_blob_after_finished)
|
|
|
|
######### IProgressManager #########
|
|
|
|
def stream_position(self):
|
|
blobs = self.download_manager.blobs
|
|
if not blobs:
|
|
return 0
|
|
else:
|
|
newest_known_blobnum = max(blobs.iterkeys())
|
|
position = newest_known_blobnum
|
|
oldest_relevant_blob_num = (max(0, newest_known_blobnum - self.max_before_skip_ahead + 1))
|
|
for i in xrange(newest_known_blobnum, oldest_relevant_blob_num - 1, -1):
|
|
if i in blobs and (not blobs[i].is_validated() and not i in self.provided_blob_nums):
|
|
position = i
|
|
return position
|
|
|
|
def needed_blobs(self):
|
|
blobs = self.download_manager.blobs
|
|
stream_position = self.stream_position()
|
|
if blobs:
|
|
newest_known_blobnum = max(blobs.iterkeys())
|
|
else:
|
|
newest_known_blobnum = -1
|
|
blobs_needed = []
|
|
for i in xrange(stream_position, newest_known_blobnum + 1):
|
|
if i in blobs and not blobs[i].is_validated() and not i in self.provided_blob_nums:
|
|
blobs_needed.append(blobs[i])
|
|
return blobs_needed
|
|
|
|
######### internal #########
|
|
|
|
def _output_loop(self):
|
|
|
|
from twisted.internet import reactor
|
|
|
|
if self.stopped is True:
|
|
if self.outputting_d is not None:
|
|
self.outputting_d.callback(True)
|
|
self.outputting_d = None
|
|
return
|
|
|
|
blobs = self.download_manager.blobs
|
|
log.info("In _output_loop. last_blob_outputted: %s", str(self.last_blob_outputted))
|
|
if blobs:
|
|
log.debug("Newest blob number: %s", str(max(blobs.iterkeys())))
|
|
if self.outputting_d is None:
|
|
self.outputting_d = defer.Deferred()
|
|
|
|
current_blob_num = self.last_blob_outputted + 1
|
|
|
|
def finished_outputting_blob():
|
|
self.last_blob_outputted += 1
|
|
final_blob_num = self.download_manager.final_blob_num()
|
|
if final_blob_num is not None and final_blob_num == self.last_blob_outputted:
|
|
self._finished_outputting()
|
|
self.outputting_d.callback(True)
|
|
self.outputting_d = None
|
|
else:
|
|
reactor.callLater(0, self._output_loop)
|
|
|
|
if current_blob_num in blobs and blobs[current_blob_num].is_validated():
|
|
log.info("Outputting blob %s", str(current_blob_num))
|
|
self.provided_blob_nums.append(current_blob_num)
|
|
d = self.download_manager.handle_blob(current_blob_num)
|
|
d.addCallback(lambda _: finished_outputting_blob())
|
|
d.addCallback(lambda _: self._finished_with_blob(current_blob_num))
|
|
elif blobs and max(blobs.iterkeys()) > self.last_blob_outputted + self.max_before_skip_ahead - 1:
|
|
self.last_blob_outputted += 1
|
|
log.info("Skipping blob number %s due to knowing about blob number %s",
|
|
str(self.last_blob_outputted), str(max(blobs.iterkeys())))
|
|
self._finished_with_blob(current_blob_num)
|
|
reactor.callLater(0, self._output_loop)
|
|
else:
|
|
self.outputting_d.callback(True)
|
|
self.outputting_d = None |