diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index e641fda55..2ca4c3d20 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -4,11 +4,9 @@ import time import sqlite3 from twisted.internet import threads, defer -from twisted.python.failure import Failure from twisted.enterprise import adbapi -from lbrynet.core.HashBlob import BlobFile, TempBlob, BlobFileCreator, TempBlobCreator +from lbrynet.core.HashBlob import BlobFile, BlobFileCreator from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier -from lbrynet.core.Error import NoSuchBlobError from lbrynet.core.sqlite_helpers import rerun_if_locked log = logging.getLogger(__name__) @@ -293,134 +291,3 @@ class DiskBlobManager(BlobManager): return d -# TODO: Having different managers for different blobs breaks the -# abstraction of a HashBlob. Why should the management of blobs -# care what kind of Blob it has? -class TempBlobManager(BlobManager): - """This class stores blobs in memory""" - def __init__(self, hash_announcer): - BlobManager.__init__(self, hash_announcer) - self.blob_type = TempBlob - self.blob_creator_type = TempBlobCreator - self.blobs = {} - self.blob_next_announces = {} - self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)} - self._next_manage_call = None - - def setup(self): - self._manage() - return defer.succeed(True) - - def stop(self): - if self._next_manage_call is not None and self._next_manage_call.active(): - self._next_manage_call.cancel() - self._next_manage_call = None - - def get_blob(self, blob_hash, length=None): - if blob_hash in self.blobs: - return defer.succeed(self.blobs[blob_hash]) - return self._make_new_blob(blob_hash, length) - - def get_blob_creator(self): - return self.blob_creator_type(self) - - def _make_new_blob(self, blob_hash, length=None): - blob = self.blob_type(blob_hash, length) - self.blobs[blob_hash] = blob - return defer.succeed(blob) - - def blob_completed(self, blob, next_announce_time=None): - if next_announce_time is None: - next_announce_time = time.time() - self.blob_next_announces[blob.blob_hash] = next_announce_time - return defer.succeed(True) - - def completed_blobs(self, blobhashes_to_check): - blobs = [ - b.blob_hash for b in self.blobs.itervalues() - if b.blob_hash in blobhashes_to_check and b.is_validated() - ] - return defer.succeed(blobs) - - def get_all_verified_blobs(self): - d = self.completed_blobs(self.blobs) - return d - - def hashes_to_announce(self): - now = time.time() - blobs = [ - blob_hash for blob_hash, announce_time in self.blob_next_announces.iteritems() - if announce_time < now - ] - next_announce_time = self.get_next_announce_time(len(blobs)) - for b in blobs: - self.blob_next_announces[b] = next_announce_time - return defer.succeed(blobs) - - def creator_finished(self, blob_creator): - assert blob_creator.blob_hash is not None - assert blob_creator.blob_hash not in self.blobs - assert blob_creator.length is not None - new_blob = self.blob_type(blob_creator.blob_hash, blob_creator.length) - # TODO: change this; its breaks the encapsulation of the - # blob. Maybe better would be to have the blob_creator - # produce a blob. - new_blob.data_buffer = blob_creator.data_buffer - new_blob._verified = True - self.blobs[blob_creator.blob_hash] = new_blob - self._immediate_announce([blob_creator.blob_hash]) - next_announce_time = self.get_next_announce_time() - d = self.blob_completed(new_blob, next_announce_time) - d.addCallback(lambda _: new_blob) - return d - - def delete_blobs(self, blob_hashes): - for blob_hash in blob_hashes: - if not blob_hash in self.blob_hashes_to_delete: - self.blob_hashes_to_delete[blob_hash] = False - - def immediate_announce_all_blobs(self): - if self.hash_announcer: - return self.hash_announcer.immediate_announce(self.blobs.iterkeys()) - - def _manage(self): - from twisted.internet import reactor - - d = self._delete_blobs_marked_for_deletion() - - def set_next_manage_call(): - log.info("Setting the next manage call in %s", str(self)) - self._next_manage_call = reactor.callLater(1, self._manage) - - d.addCallback(lambda _: set_next_manage_call()) - - def _delete_blobs_marked_for_deletion(self): - def remove_from_list(b_h): - del self.blob_hashes_to_delete[b_h] - log.info("Deleted blob %s", blob_hash) - return b_h - - def set_not_deleting(err, b_h): - log.warning("Failed to delete blob %s. Reason: %s", str(b_h), err.getErrorMessage()) - self.blob_hashes_to_delete[b_h] = False - return b_h - - ds = [] - for blob_hash, being_deleted in self.blob_hashes_to_delete.items(): - if being_deleted is False: - if blob_hash in self.blobs: - self.blob_hashes_to_delete[blob_hash] = True - log.info("Found a blob marked for deletion: %s", blob_hash) - blob = self.blobs[blob_hash] - d = blob.delete() - - d.addCallbacks(lambda _: remove_from_list(blob_hash), set_not_deleting, - errbackArgs=(blob_hash,)) - - ds.append(d) - else: - remove_from_list(blob_hash) - d = defer.fail(Failure(NoSuchBlobError(blob_hash))) - log.warning("Blob %s cannot be deleted because it is unknown") - ds.append(d) - return defer.DeferredList(ds)