lbry-sdk/lbrynet/p2p/BlobManager.py

135 lines
5.2 KiB
Python
Raw Normal View History

import asyncio
2015-08-20 17:27:15 +02:00
import logging
import os
2018-07-28 02:31:15 +02:00
from binascii import unhexlify
from sqlite3 import IntegrityError
from twisted.internet import defer
from lbrynet.extras.compat import f2d
2017-09-13 21:46:39 +02:00
from lbrynet.blob.blob_file import BlobFile
from lbrynet.blob.creator import BlobFileCreator
2015-08-20 17:27:15 +02:00
log = logging.getLogger(__name__)
class DiskBlobManager:
def __init__(self, blob_dir, storage, node_datastore=None):
"""
This class stores blobs on the hard disk
blob_dir - directory where blobs are stored
storage - SQLiteStorage object
"""
self.storage = storage
2015-08-20 17:27:15 +02:00
self.blob_dir = blob_dir
self._node_datastore = node_datastore
2015-08-20 17:27:15 +02:00
self.blob_creator_type = BlobFileCreator
2016-12-14 23:37:17 +01:00
# TODO: consider using an LRU for blobs as there could potentially
# be thousands of blobs loaded up, many stale
2015-08-20 17:27:15 +02:00
self.blobs = {}
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
2015-08-20 17:27:15 +02:00
async def setup(self):
if self._node_datastore is not None:
raw_blob_hashes = await self.storage.get_all_finished_blobs()
self._node_datastore.completed_blobs.update(raw_blob_hashes)
2015-08-20 17:27:15 +02:00
async def stop(self):
pass
2015-08-20 17:27:15 +02:00
def get_blob(self, blob_hash, length=None):
2016-11-30 21:20:45 +01:00
"""Return a blob identified by blob_hash, which may be a new blob or a
blob that is already on the hard disk
"""
2017-09-15 16:48:54 +02:00
if length is not None and not isinstance(length, int):
raise Exception("invalid length type: {} ({})".format(length, str(type(length))))
2015-08-20 17:27:15 +02:00
if blob_hash in self.blobs:
return self.blobs[blob_hash]
return self._make_new_blob(blob_hash, length)
2015-08-20 17:27:15 +02:00
def get_blob_creator(self):
return self.blob_creator_type(self.blob_dir)
2015-08-20 17:27:15 +02:00
def _make_new_blob(self, blob_hash, length=None):
log.debug('Making a new blob for %s', blob_hash)
2017-09-15 15:56:01 +02:00
blob = BlobFile(self.blob_dir, blob_hash, length)
2015-08-20 17:27:15 +02:00
self.blobs[blob_hash] = blob
return blob
2015-08-20 17:27:15 +02:00
@defer.inlineCallbacks
2018-03-27 23:35:31 +02:00
def blob_completed(self, blob, should_announce=False, next_announce_time=None):
yield f2d(self.storage.add_completed_blob(
blob.blob_hash, blob.length, next_announce_time, should_announce
))
if self._node_datastore is not None:
2018-07-28 02:31:15 +02:00
self._node_datastore.completed_blobs.add(unhexlify(blob.blob_hash))
2015-08-20 17:27:15 +02:00
def completed_blobs(self, blobhashes_to_check):
return self._completed_blobs(blobhashes_to_check)
2015-08-20 17:27:15 +02:00
def count_should_announce_blobs(self):
return f2d(self.storage.count_should_announce_blobs())
def set_should_announce(self, blob_hash, should_announce):
return f2d(self.storage.set_should_announce(
blob_hash, asyncio.get_running_loop().time(), should_announce
))
def get_should_announce(self, blob_hash):
return f2d(self.storage.should_announce(blob_hash))
def creator_finished(self, blob_creator, should_announce):
log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
2017-09-15 15:56:15 +02:00
if blob_creator.blob_hash is None:
raise Exception("Blob hash is None")
if blob_creator.blob_hash in self.blobs:
raise Exception("Creator finished for blob that is already marked as completed")
if blob_creator.length is None:
raise Exception("Blob has a length of 0")
new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length)
2015-08-20 17:27:15 +02:00
self.blobs[blob_creator.blob_hash] = new_blob
return self.blob_completed(new_blob, should_announce)
2015-08-20 17:27:15 +02:00
2016-08-26 06:32:33 +02:00
def get_all_verified_blobs(self):
d = f2d(self._get_all_verified_blob_hashes())
2016-08-26 06:32:33 +02:00
d.addCallback(self.completed_blobs)
return d
@defer.inlineCallbacks
def delete_blobs(self, blob_hashes):
bh_to_delete_from_db = []
for blob_hash in blob_hashes:
2018-08-09 17:26:57 +02:00
if not blob_hash:
continue
if self._node_datastore is not None:
try:
self._node_datastore.completed_blobs.remove(unhexlify(blob_hash))
except KeyError:
pass
try:
blob = self.get_blob(blob_hash)
blob.delete()
bh_to_delete_from_db.append(blob_hash)
del self.blobs[blob_hash]
except Exception as e:
log.warning("Failed to delete blob file. Reason: %s", e)
try:
yield f2d(self.storage.delete_blobs_from_db(bh_to_delete_from_db))
except IntegrityError as err:
if str(err) != "FOREIGN KEY constraint failed":
raise err
def _completed_blobs(self, blobhashes_to_check):
"""Returns of the blobhashes_to_check, which are valid"""
blobs = [self.get_blob(b) for b in blobhashes_to_check]
blob_hashes = [b.blob_hash for b in blobs if b.verified]
return blob_hashes
async def _get_all_verified_blob_hashes(self):
blobs = await self.storage.get_all_blob_hashes()
verified_blobs = []
for blob_hash in blobs:
file_path = os.path.join(self.blob_dir, blob_hash)
if os.path.isfile(file_path):
verified_blobs.append(blob_hash)
return verified_blobs