2015-08-20 11:27:15 -04:00
|
|
|
import logging
|
|
|
|
import os
|
2018-07-27 20:31:15 -04:00
|
|
|
from binascii import unhexlify
|
2018-02-12 13:43:36 -05:00
|
|
|
from sqlite3 import IntegrityError
|
2018-08-02 14:32:08 -04:00
|
|
|
from twisted.internet import threads, defer
|
2017-09-13 15:46:39 -04:00
|
|
|
from lbrynet.blob.blob_file import BlobFile
|
|
|
|
from lbrynet.blob.creator import BlobFileCreator
|
2015-08-20 11:27:15 -04:00
|
|
|
|
2015-09-08 15:42:56 -04:00
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
2017-08-10 13:49:43 -04:00
|
|
|
|
2018-07-21 18:34:59 -04:00
|
|
|
class DiskBlobManager:
|
2018-06-29 12:01:46 -04:00
|
|
|
def __init__(self, blob_dir, storage, node_datastore=None):
|
2017-08-02 12:11:41 -04:00
|
|
|
"""
|
2018-03-27 15:12:44 -04:00
|
|
|
This class stores blobs on the hard disk
|
|
|
|
|
2017-08-02 12:11:41 -04:00
|
|
|
blob_dir - directory where blobs are stored
|
2018-03-27 15:12:44 -04:00
|
|
|
storage - SQLiteStorage object
|
2017-08-02 12:11:41 -04:00
|
|
|
"""
|
2018-02-12 13:43:36 -05:00
|
|
|
self.storage = storage
|
2015-08-20 11:27:15 -04:00
|
|
|
self.blob_dir = blob_dir
|
2018-06-29 12:01:46 -04:00
|
|
|
self._node_datastore = node_datastore
|
2015-08-20 11:27:15 -04:00
|
|
|
self.blob_creator_type = BlobFileCreator
|
2016-12-14 16:37:17 -06:00
|
|
|
# TODO: consider using an LRU for blobs as there could potentially
|
|
|
|
# be thousands of blobs loaded up, many stale
|
2015-08-20 11:27:15 -04:00
|
|
|
self.blobs = {}
|
2018-01-30 20:16:25 -05:00
|
|
|
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
|
2015-08-20 11:27:15 -04:00
|
|
|
|
2018-06-29 12:01:46 -04:00
|
|
|
@defer.inlineCallbacks
|
2015-08-20 11:27:15 -04:00
|
|
|
def setup(self):
|
2018-06-29 12:01:46 -04:00
|
|
|
if self._node_datastore is not None:
|
|
|
|
raw_blob_hashes = yield self.storage.get_all_finished_blobs()
|
|
|
|
self._node_datastore.completed_blobs.update(raw_blob_hashes)
|
|
|
|
defer.returnValue(True)
|
2015-08-20 11:27:15 -04:00
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
return defer.succeed(True)
|
|
|
|
|
2016-12-19 18:16:37 -08:00
|
|
|
def get_blob(self, blob_hash, length=None):
|
2016-11-30 14:20:45 -06:00
|
|
|
"""Return a blob identified by blob_hash, which may be a new blob or a
|
|
|
|
blob that is already on the hard disk
|
|
|
|
"""
|
2017-09-15 10:48:54 -04:00
|
|
|
if length is not None and not isinstance(length, int):
|
2017-12-15 20:25:20 -05:00
|
|
|
raise Exception("invalid length type: %s (%s)" % (length, str(type(length))))
|
2015-08-20 11:27:15 -04:00
|
|
|
if blob_hash in self.blobs:
|
|
|
|
return defer.succeed(self.blobs[blob_hash])
|
2016-12-19 18:16:37 -08:00
|
|
|
return self._make_new_blob(blob_hash, length)
|
2015-08-20 11:27:15 -04:00
|
|
|
|
|
|
|
def get_blob_creator(self):
|
2017-07-27 14:31:04 -04:00
|
|
|
return self.blob_creator_type(self.blob_dir)
|
2015-08-20 11:27:15 -04:00
|
|
|
|
2016-12-19 18:16:37 -08:00
|
|
|
def _make_new_blob(self, blob_hash, length=None):
|
2016-12-10 15:02:13 -08:00
|
|
|
log.debug('Making a new blob for %s', blob_hash)
|
2017-09-15 09:56:01 -04:00
|
|
|
blob = BlobFile(self.blob_dir, blob_hash, length)
|
2015-08-20 11:27:15 -04:00
|
|
|
self.blobs[blob_hash] = blob
|
2017-01-20 10:54:36 -06:00
|
|
|
return defer.succeed(blob)
|
2015-08-20 11:27:15 -04:00
|
|
|
|
2017-08-10 13:49:43 -04:00
|
|
|
@defer.inlineCallbacks
|
2018-03-27 17:35:31 -04:00
|
|
|
def blob_completed(self, blob, should_announce=False, next_announce_time=None):
|
2018-02-12 13:43:36 -05:00
|
|
|
yield self.storage.add_completed_blob(
|
|
|
|
blob.blob_hash, blob.length, next_announce_time, should_announce
|
|
|
|
)
|
2018-06-29 12:01:46 -04:00
|
|
|
if self._node_datastore is not None:
|
2018-07-27 20:31:15 -04:00
|
|
|
self._node_datastore.completed_blobs.add(unhexlify(blob.blob_hash))
|
2015-08-20 11:27:15 -04:00
|
|
|
|
2017-01-20 10:54:36 -06:00
|
|
|
def completed_blobs(self, blobhashes_to_check):
|
|
|
|
return self._completed_blobs(blobhashes_to_check)
|
2015-08-20 11:27:15 -04:00
|
|
|
|
2017-10-05 13:59:27 -04:00
|
|
|
def count_should_announce_blobs(self):
|
2018-02-12 13:43:36 -05:00
|
|
|
return self.storage.count_should_announce_blobs()
|
2017-10-05 13:59:27 -04:00
|
|
|
|
2017-09-20 22:04:23 -04:00
|
|
|
def set_should_announce(self, blob_hash, should_announce):
|
2018-03-27 17:35:31 -04:00
|
|
|
now = self.storage.clock.seconds()
|
|
|
|
return self.storage.set_should_announce(blob_hash, now, should_announce)
|
2017-09-20 22:04:23 -04:00
|
|
|
|
|
|
|
def get_should_announce(self, blob_hash):
|
2018-02-12 13:43:36 -05:00
|
|
|
return self.storage.should_announce(blob_hash)
|
2017-09-20 22:04:23 -04:00
|
|
|
|
2017-08-02 12:11:41 -04:00
|
|
|
def creator_finished(self, blob_creator, should_announce):
|
2015-09-08 15:42:56 -04:00
|
|
|
log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
|
2017-09-15 09:56:15 -04:00
|
|
|
if blob_creator.blob_hash is None:
|
|
|
|
raise Exception("Blob hash is None")
|
|
|
|
if blob_creator.blob_hash in self.blobs:
|
|
|
|
raise Exception("Creator finished for blob that is already marked as completed")
|
|
|
|
if blob_creator.length is None:
|
|
|
|
raise Exception("Blob has a length of 0")
|
|
|
|
new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length)
|
2015-08-20 11:27:15 -04:00
|
|
|
self.blobs[blob_creator.blob_hash] = new_blob
|
2018-03-27 15:12:44 -04:00
|
|
|
return self.blob_completed(new_blob, should_announce)
|
2015-08-20 11:27:15 -04:00
|
|
|
|
2016-08-26 00:32:33 -04:00
|
|
|
def get_all_verified_blobs(self):
|
|
|
|
d = self._get_all_verified_blob_hashes()
|
|
|
|
d.addCallback(self.completed_blobs)
|
|
|
|
return d
|
|
|
|
|
2017-06-16 13:16:19 -04:00
|
|
|
@defer.inlineCallbacks
|
|
|
|
def delete_blobs(self, blob_hashes):
|
|
|
|
bh_to_delete_from_db = []
|
|
|
|
for blob_hash in blob_hashes:
|
2018-08-09 11:26:57 -04:00
|
|
|
if not blob_hash:
|
|
|
|
continue
|
2018-06-29 12:01:46 -04:00
|
|
|
if self._node_datastore is not None:
|
|
|
|
try:
|
2018-08-04 12:10:41 -04:00
|
|
|
self._node_datastore.completed_blobs.remove(unhexlify(blob_hash))
|
2018-06-29 12:01:46 -04:00
|
|
|
except KeyError:
|
|
|
|
pass
|
2017-06-16 13:16:19 -04:00
|
|
|
try:
|
|
|
|
blob = yield self.get_blob(blob_hash)
|
|
|
|
yield blob.delete()
|
|
|
|
bh_to_delete_from_db.append(blob_hash)
|
2017-11-05 21:53:51 -05:00
|
|
|
del self.blobs[blob_hash]
|
2017-06-16 13:16:19 -04:00
|
|
|
except Exception as e:
|
|
|
|
log.warning("Failed to delete blob file. Reason: %s", e)
|
2018-02-12 13:43:36 -05:00
|
|
|
try:
|
|
|
|
yield self.storage.delete_blobs_from_db(bh_to_delete_from_db)
|
|
|
|
except IntegrityError as err:
|
2018-08-04 12:10:41 -04:00
|
|
|
if str(err) != "FOREIGN KEY constraint failed":
|
2018-02-12 13:43:36 -05:00
|
|
|
raise err
|
2017-10-05 13:59:27 -04:00
|
|
|
|
2017-01-20 10:54:36 -06:00
|
|
|
@defer.inlineCallbacks
|
|
|
|
def _completed_blobs(self, blobhashes_to_check):
|
|
|
|
"""Returns of the blobhashes_to_check, which are valid"""
|
2017-02-14 13:18:42 -06:00
|
|
|
blobs = yield defer.DeferredList([self.get_blob(b) for b in blobhashes_to_check])
|
2017-01-20 10:54:36 -06:00
|
|
|
blob_hashes = [b.blob_hash for success, b in blobs if success and b.verified]
|
|
|
|
defer.returnValue(blob_hashes)
|
2015-09-04 16:22:02 -04:00
|
|
|
|
2017-06-16 13:13:41 -04:00
|
|
|
def _get_all_verified_blob_hashes(self):
|
2018-02-12 13:43:36 -05:00
|
|
|
d = self.storage.get_all_blob_hashes()
|
2015-09-04 16:22:02 -04:00
|
|
|
|
|
|
|
def get_verified_blobs(blobs):
|
|
|
|
verified_blobs = []
|
2018-02-12 13:43:36 -05:00
|
|
|
for blob_hash in blobs:
|
2015-09-04 16:22:02 -04:00
|
|
|
file_path = os.path.join(self.blob_dir, blob_hash)
|
|
|
|
if os.path.isfile(file_path):
|
2017-01-20 18:47:53 +00:00
|
|
|
verified_blobs.append(blob_hash)
|
2015-09-04 16:22:02 -04:00
|
|
|
return verified_blobs
|
|
|
|
|
|
|
|
d.addCallback(lambda blobs: threads.deferToThread(get_verified_blobs, blobs))
|
|
|
|
return d
|