forked from LBRYCommunity/lbry-sdk
Remove TempBlobManager
This commit is contained in:
parent
bf6bc02828
commit
f9b728530e
1 changed files with 1 additions and 134 deletions
|
@ -4,11 +4,9 @@ import time
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
|
||||||
from twisted.internet import threads, defer
|
from twisted.internet import threads, defer
|
||||||
from twisted.python.failure import Failure
|
|
||||||
from twisted.enterprise import adbapi
|
from twisted.enterprise import adbapi
|
||||||
from lbrynet.core.HashBlob import BlobFile, TempBlob, BlobFileCreator, TempBlobCreator
|
from lbrynet.core.HashBlob import BlobFile, BlobFileCreator
|
||||||
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
|
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
|
||||||
from lbrynet.core.Error import NoSuchBlobError
|
|
||||||
from lbrynet.core.sqlite_helpers import rerun_if_locked
|
from lbrynet.core.sqlite_helpers import rerun_if_locked
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -293,134 +291,3 @@ class DiskBlobManager(BlobManager):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
# TODO: Having different managers for different blobs breaks the
|
|
||||||
# abstraction of a HashBlob. Why should the management of blobs
|
|
||||||
# care what kind of Blob it has?
|
|
||||||
class TempBlobManager(BlobManager):
|
|
||||||
"""This class stores blobs in memory"""
|
|
||||||
def __init__(self, hash_announcer):
|
|
||||||
BlobManager.__init__(self, hash_announcer)
|
|
||||||
self.blob_type = TempBlob
|
|
||||||
self.blob_creator_type = TempBlobCreator
|
|
||||||
self.blobs = {}
|
|
||||||
self.blob_next_announces = {}
|
|
||||||
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
|
|
||||||
self._next_manage_call = None
|
|
||||||
|
|
||||||
def setup(self):
|
|
||||||
self._manage()
|
|
||||||
return defer.succeed(True)
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
if self._next_manage_call is not None and self._next_manage_call.active():
|
|
||||||
self._next_manage_call.cancel()
|
|
||||||
self._next_manage_call = None
|
|
||||||
|
|
||||||
def get_blob(self, blob_hash, length=None):
|
|
||||||
if blob_hash in self.blobs:
|
|
||||||
return defer.succeed(self.blobs[blob_hash])
|
|
||||||
return self._make_new_blob(blob_hash, length)
|
|
||||||
|
|
||||||
def get_blob_creator(self):
|
|
||||||
return self.blob_creator_type(self)
|
|
||||||
|
|
||||||
def _make_new_blob(self, blob_hash, length=None):
|
|
||||||
blob = self.blob_type(blob_hash, length)
|
|
||||||
self.blobs[blob_hash] = blob
|
|
||||||
return defer.succeed(blob)
|
|
||||||
|
|
||||||
def blob_completed(self, blob, next_announce_time=None):
|
|
||||||
if next_announce_time is None:
|
|
||||||
next_announce_time = time.time()
|
|
||||||
self.blob_next_announces[blob.blob_hash] = next_announce_time
|
|
||||||
return defer.succeed(True)
|
|
||||||
|
|
||||||
def completed_blobs(self, blobhashes_to_check):
|
|
||||||
blobs = [
|
|
||||||
b.blob_hash for b in self.blobs.itervalues()
|
|
||||||
if b.blob_hash in blobhashes_to_check and b.is_validated()
|
|
||||||
]
|
|
||||||
return defer.succeed(blobs)
|
|
||||||
|
|
||||||
def get_all_verified_blobs(self):
|
|
||||||
d = self.completed_blobs(self.blobs)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def hashes_to_announce(self):
|
|
||||||
now = time.time()
|
|
||||||
blobs = [
|
|
||||||
blob_hash for blob_hash, announce_time in self.blob_next_announces.iteritems()
|
|
||||||
if announce_time < now
|
|
||||||
]
|
|
||||||
next_announce_time = self.get_next_announce_time(len(blobs))
|
|
||||||
for b in blobs:
|
|
||||||
self.blob_next_announces[b] = next_announce_time
|
|
||||||
return defer.succeed(blobs)
|
|
||||||
|
|
||||||
def creator_finished(self, blob_creator):
|
|
||||||
assert blob_creator.blob_hash is not None
|
|
||||||
assert blob_creator.blob_hash not in self.blobs
|
|
||||||
assert blob_creator.length is not None
|
|
||||||
new_blob = self.blob_type(blob_creator.blob_hash, blob_creator.length)
|
|
||||||
# TODO: change this; its breaks the encapsulation of the
|
|
||||||
# blob. Maybe better would be to have the blob_creator
|
|
||||||
# produce a blob.
|
|
||||||
new_blob.data_buffer = blob_creator.data_buffer
|
|
||||||
new_blob._verified = True
|
|
||||||
self.blobs[blob_creator.blob_hash] = new_blob
|
|
||||||
self._immediate_announce([blob_creator.blob_hash])
|
|
||||||
next_announce_time = self.get_next_announce_time()
|
|
||||||
d = self.blob_completed(new_blob, next_announce_time)
|
|
||||||
d.addCallback(lambda _: new_blob)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def delete_blobs(self, blob_hashes):
|
|
||||||
for blob_hash in blob_hashes:
|
|
||||||
if not blob_hash in self.blob_hashes_to_delete:
|
|
||||||
self.blob_hashes_to_delete[blob_hash] = False
|
|
||||||
|
|
||||||
def immediate_announce_all_blobs(self):
|
|
||||||
if self.hash_announcer:
|
|
||||||
return self.hash_announcer.immediate_announce(self.blobs.iterkeys())
|
|
||||||
|
|
||||||
def _manage(self):
|
|
||||||
from twisted.internet import reactor
|
|
||||||
|
|
||||||
d = self._delete_blobs_marked_for_deletion()
|
|
||||||
|
|
||||||
def set_next_manage_call():
|
|
||||||
log.info("Setting the next manage call in %s", str(self))
|
|
||||||
self._next_manage_call = reactor.callLater(1, self._manage)
|
|
||||||
|
|
||||||
d.addCallback(lambda _: set_next_manage_call())
|
|
||||||
|
|
||||||
def _delete_blobs_marked_for_deletion(self):
|
|
||||||
def remove_from_list(b_h):
|
|
||||||
del self.blob_hashes_to_delete[b_h]
|
|
||||||
log.info("Deleted blob %s", blob_hash)
|
|
||||||
return b_h
|
|
||||||
|
|
||||||
def set_not_deleting(err, b_h):
|
|
||||||
log.warning("Failed to delete blob %s. Reason: %s", str(b_h), err.getErrorMessage())
|
|
||||||
self.blob_hashes_to_delete[b_h] = False
|
|
||||||
return b_h
|
|
||||||
|
|
||||||
ds = []
|
|
||||||
for blob_hash, being_deleted in self.blob_hashes_to_delete.items():
|
|
||||||
if being_deleted is False:
|
|
||||||
if blob_hash in self.blobs:
|
|
||||||
self.blob_hashes_to_delete[blob_hash] = True
|
|
||||||
log.info("Found a blob marked for deletion: %s", blob_hash)
|
|
||||||
blob = self.blobs[blob_hash]
|
|
||||||
d = blob.delete()
|
|
||||||
|
|
||||||
d.addCallbacks(lambda _: remove_from_list(blob_hash), set_not_deleting,
|
|
||||||
errbackArgs=(blob_hash,))
|
|
||||||
|
|
||||||
ds.append(d)
|
|
||||||
else:
|
|
||||||
remove_from_list(blob_hash)
|
|
||||||
d = defer.fail(Failure(NoSuchBlobError(blob_hash)))
|
|
||||||
log.warning("Blob %s cannot be deleted because it is unknown")
|
|
||||||
ds.append(d)
|
|
||||||
return defer.DeferredList(ds)
|
|
||||||
|
|
Loading…
Reference in a new issue