Blob verification fixes (#428)

* Move the blob verification to the actual Blob object
 * remove the check on verification time
 * remove get_blob_length from BlobManager

Removed because I'm not sure what checking verification time against ctime gets us, except some protection against an accidental modification of the blob.
This commit is contained in:
Job Evers‐Meltzer 2017-01-20 10:54:36 -06:00 committed by GitHub
parent 91d673a539
commit 6c571b5227
3 changed files with 68 additions and 143 deletions

View file

@ -8,7 +8,6 @@ from twisted.python.failure import Failure
from twisted.enterprise import adbapi
from lbrynet.core.HashBlob import BlobFile, TempBlob, BlobFileCreator, TempBlobCreator
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
from lbrynet.core.utils import is_valid_blobhash
from lbrynet.core.Error import NoSuchBlobError
from lbrynet.core.sqlite_helpers import rerun_if_locked
@ -37,7 +36,7 @@ class BlobManager(DHTHashSupplier):
def blob_completed(self, blob, next_announce_time=None):
pass
def completed_blobs(self, blobs_to_check):
def completed_blobs(self, blobhashes_to_check):
pass
def hashes_to_announce(self):
@ -49,9 +48,6 @@ class BlobManager(DHTHashSupplier):
def delete_blob(self, blob_hash):
pass
def get_blob_length(self, blob_hash):
pass
def blob_requested(self, blob_hash):
pass
@ -78,7 +74,9 @@ class BlobManager(DHTHashSupplier):
return self.hash_announcer.immediate_announce(blob_hashes)
# TODO: Having different managers for different blobs breaks the
# abstraction of a HashBlob. Why should the management of blobs
# care what kind of Blob it has?
class DiskBlobManager(BlobManager):
"""This class stores blobs on the hard disk"""
def __init__(self, hash_announcer, blob_dir, db_dir):
@ -126,35 +124,17 @@ class DiskBlobManager(BlobManager):
log.debug('Making a new blob for %s', blob_hash)
blob = self.blob_type(self.blob_dir, blob_hash, upload_allowed, length)
self.blobs[blob_hash] = blob
d = self._completed_blobs([blob_hash])
def check_completed(completed_blobs):
def set_length(length):
blob.length = length
if len(completed_blobs) == 1 and completed_blobs[0] == blob_hash:
blob.verified = True
inner_d = self._get_blob_length(blob_hash)
inner_d.addCallback(set_length)
inner_d.addCallback(lambda _: blob)
else:
inner_d = defer.succeed(blob)
return inner_d
d.addCallback(check_completed)
return d
return defer.succeed(blob)
def blob_completed(self, blob, next_announce_time=None):
if next_announce_time is None:
next_announce_time = time.time() + self.hash_reannounce_time
d = self._add_completed_blob(blob.blob_hash, blob.length,
time.time(), next_announce_time)
d = self._add_completed_blob(blob.blob_hash, blob.length, next_announce_time)
d.addCallback(lambda _: self._immediate_announce([blob.blob_hash]))
return d
def completed_blobs(self, blobs_to_check):
return self._completed_blobs(blobs_to_check)
def completed_blobs(self, blobhashes_to_check):
return self._completed_blobs(blobhashes_to_check)
def hashes_to_announce(self):
next_announce_time = time.time() + self.hash_reannounce_time
@ -166,7 +146,6 @@ class DiskBlobManager(BlobManager):
assert blob_creator.blob_hash not in self.blobs
assert blob_creator.length is not None
new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, True, blob_creator.length)
new_blob.verified = True
self.blobs[blob_creator.blob_hash] = new_blob
self._immediate_announce([blob_creator.blob_hash])
next_announce_time = time.time() + self.hash_reannounce_time
@ -178,17 +157,11 @@ class DiskBlobManager(BlobManager):
if not blob_hash in self.blob_hashes_to_delete:
self.blob_hashes_to_delete[blob_hash] = False
def update_all_last_verified_dates(self, timestamp):
return self._update_all_last_verified_dates(timestamp)
def immediate_announce_all_blobs(self):
d = self._get_all_verified_blob_hashes()
d.addCallback(self._immediate_announce)
return d
def get_blob_length(self, blob_hash):
return self._get_blob_length(blob_hash)
def get_all_verified_blobs(self):
d = self._get_all_verified_blob_hashes()
d.addCallback(self.completed_blobs)
@ -246,7 +219,7 @@ class DiskBlobManager(BlobManager):
for blob_hash, being_deleted in self.blob_hashes_to_delete.items():
if being_deleted is False:
self.blob_hashes_to_delete[blob_hash] = True
d = self.get_blob(blob_hash, True)
d = self.get_blob(blob_hash)
d.addCallbacks(
delete, set_not_deleting,
callbackArgs=(blob_hash,), errbackArgs=(blob_hash,))
@ -288,80 +261,21 @@ class DiskBlobManager(BlobManager):
return self.db_conn.runInteraction(create_tables)
@rerun_if_locked
def _add_completed_blob(self, blob_hash, length, timestamp, next_announce_time=None):
def _add_completed_blob(self, blob_hash, length, next_announce_time):
log.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length))
if next_announce_time is None:
next_announce_time = timestamp
d = self.db_conn.runQuery("insert into blobs values (?, ?, ?, ?)",
(blob_hash, length, timestamp, next_announce_time))
d = self.db_conn.runQuery(
"insert into blobs (blob_hash, blob_length, next_announce_time) values (?, ?, ?)",
(blob_hash, length, next_announce_time)
)
d.addErrback(lambda err: err.trap(sqlite3.IntegrityError))
return d
@rerun_if_locked
def _completed_blobs(self, blobs_to_check):
"""Returns of the blobs_to_check, which are valid"""
blobs_to_check = filter(is_valid_blobhash, blobs_to_check)
def _get_last_verified_time(db_transaction, blob_hash):
result = db_transaction.execute(
"select last_verified_time from blobs where blob_hash = ?", (blob_hash,))
row = result.fetchone()
if row:
return row[0]
else:
return None
def _filter_blobs_in_db(db_transaction, blobs_to_check):
for b in blobs_to_check:
verified_time = _get_last_verified_time(db_transaction, b)
if verified_time:
yield (b, verified_time)
def get_blobs_in_db(db_transaction, blob_to_check):
# [(blob_hash, last_verified_time)]
return list(_filter_blobs_in_db(db_transaction, blobs_to_check))
def get_valid_blobs(blobs_in_db):
def check_blob_verified_date(b, verified_time):
file_path = os.path.join(self.blob_dir, b)
if os.path.isfile(file_path):
if verified_time > os.path.getctime(file_path):
return True
else:
log.debug('Verification time for %s is too old (%s < %s)',
file_path, verified_time, os.path.getctime(file_path))
else:
log.debug('file %s does not exist', file_path)
return False
def filter_valid_blobs(results):
assert len(blobs_in_db) == len(results)
valid_blobs = [
b for (b, verified_date), (success, result) in zip(blobs_in_db, results)
if success is True and result is True
]
log.debug('Of %s blobs, %s were valid', len(results), len(valid_blobs))
return valid_blobs
ds = [
threads.deferToThread(check_blob_verified_date, b, verified_date)
for b, verified_date in blobs_in_db
]
dl = defer.DeferredList(ds)
dl.addCallback(filter_valid_blobs)
return dl
d = self.db_conn.runInteraction(get_blobs_in_db, blobs_to_check)
d.addCallback(get_valid_blobs)
return d
@rerun_if_locked
def _get_blob_length(self, blob):
d = self.db_conn.runQuery("select blob_length from blobs where blob_hash = ?", (blob,))
d.addCallback(lambda r: r[0][0] if len(r) else Failure(NoSuchBlobError(blob)))
return d
@defer.inlineCallbacks
def _completed_blobs(self, blobhashes_to_check):
"""Returns of the blobhashes_to_check, which are valid"""
blobs = yield defer.DeferredList([self.get_blob(b, True) for b in blobhashes_to_check])
blob_hashes = [b.blob_hash for success, b in blobs if success and b.verified]
defer.returnValue(blob_hashes)
@rerun_if_locked
def _update_blob_verified_timestamp(self, blob, timestamp):
@ -384,10 +298,6 @@ class DiskBlobManager(BlobManager):
return self.db_conn.runInteraction(get_and_update)
@rerun_if_locked
def _update_all_last_verified_dates(self, timestamp):
return self.db_conn.runQuery("update blobs set last_verified_date = ?", (timestamp,))
@rerun_if_locked
def _delete_blobs_from_db(self, blob_hashes):
@ -430,6 +340,9 @@ class DiskBlobManager(BlobManager):
return d
# TODO: Having different managers for different blobs breaks the
# abstraction of a HashBlob. Why should the management of blobs
# care what kind of Blob it has?
class TempBlobManager(BlobManager):
"""This class stores blobs in memory"""
def __init__(self, hash_announcer):
@ -469,10 +382,10 @@ class TempBlobManager(BlobManager):
self.blob_next_announces[blob.blob_hash] = next_announce_time
return defer.succeed(True)
def completed_blobs(self, blobs_to_check):
def completed_blobs(self, blobhashes_to_check):
blobs = [
b.blob_hash for b in self.blobs.itervalues()
if b.blob_hash in blobs_to_check and b.is_validated()
if b.blob_hash in blobhashes_to_check and b.is_validated()
]
return defer.succeed(blobs)
@ -496,9 +409,11 @@ class TempBlobManager(BlobManager):
assert blob_creator.blob_hash not in self.blobs
assert blob_creator.length is not None
new_blob = self.blob_type(blob_creator.blob_hash, True, blob_creator.length)
new_blob.verified = True
# TODO: change this; its breaks the encapsulation of the
# blob. Maybe better would be to have the blob_creator
# produce a blob.
new_blob.data_buffer = blob_creator.data_buffer
new_blob.length = blob_creator.length
new_blob._verified = True
self.blobs[blob_creator.blob_hash] = new_blob
self._immediate_announce([blob_creator.blob_hash])
next_announce_time = time.time() + self.hash_reannounce_time
@ -511,12 +426,6 @@ class TempBlobManager(BlobManager):
if not blob_hash in self.blob_hashes_to_delete:
self.blob_hashes_to_delete[blob_hash] = False
def get_blob_length(self, blob_hash):
if blob_hash in self.blobs:
if self.blobs[blob_hash].length is not None:
return defer.succeed(self.blobs[blob_hash].length)
return defer.fail(NoSuchBlobError(blob_hash))
def immediate_announce_all_blobs(self):
if self.hash_announcer:
return self.hash_announcer.immediate_announce(self.blobs.iterkeys())

View file

@ -11,6 +11,7 @@ from zope.interface import implements
from lbrynet import conf
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError
from lbrynet.core.cryptoutils import get_lbry_hash_obj
from lbrynet.core.utils import is_valid_blobhash
log = logging.getLogger(__name__)
@ -48,11 +49,15 @@ class HashBlobWriter(object):
self.write_handle = write_handle
self.length_getter = length_getter
self.finished_cb = finished_cb
self.hashsum = get_lbry_hash_obj()
self._hashsum = get_lbry_hash_obj()
self.len_so_far = 0
@property
def blob_hash(self):
return self._hashsum.hexdigest()
def write(self, data):
self.hashsum.update(data)
self._hashsum.update(data)
self.len_so_far += len(data)
if self.len_so_far > self.length_getter():
self.finished_cb(
@ -78,14 +83,20 @@ class HashBlob(object):
"""A chunk of data available on the network which is specified by a hashsum"""
def __init__(self, blob_hash, upload_allowed, length=None):
assert is_valid_blobhash(blob_hash)
self.blob_hash = blob_hash
self.length = length
self.writers = {} # {Peer: writer, finished_deferred}
self.finished_deferred = None
self.verified = False
self._verified = False
self.upload_allowed = upload_allowed
self.readers = 0
@property
def verified(self):
# protect verified from being modified by other classes
return self._verified
def set_length(self, length):
if self.length is not None and length == self.length:
return True
@ -100,10 +111,7 @@ class HashBlob(object):
return self.length
def is_validated(self):
if self.verified:
return True
else:
return False
return bool(self._verified)
def is_downloading(self):
if self.writers:
@ -129,7 +137,7 @@ class HashBlob(object):
def writer_finished(self, writer, err=None):
def fire_finished_deferred():
self.verified = True
self._verified = True
for p, (w, finished_deferred) in self.writers.items():
if w == writer:
finished_deferred.callback(self)
@ -151,8 +159,8 @@ class HashBlob(object):
w.cancel()
if err is None:
if writer.len_so_far == self.length and writer.hashsum.hexdigest() == self.blob_hash:
if self.verified is False:
if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash:
if self._verified is False:
d = self._save_verified_blob(writer)
d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred)
d.addCallback(lambda _: cancel_other_downloads())
@ -162,7 +170,7 @@ class HashBlob(object):
else:
err_string = "length vs expected: {0}, {1}, hash vs expected: {2}, {3}"
err_string = err_string.format(self.length, writer.len_so_far, self.blob_hash,
writer.hashsum.hexdigest())
writer.blob_hash)
errback_finished_deferred(Failure(InvalidDataError(err_string)))
d = defer.succeed(True)
else:
@ -206,6 +214,14 @@ class BlobFile(HashBlob):
self.file_path = os.path.join(blob_dir, self.blob_hash)
self.setting_verified_blob_lock = threading.Lock()
self.moved_verified_blob = False
if os.path.isfile(self.file_path):
self.set_length(os.path.getsize(self.file_path))
# This assumes that the hash of the blob has already been
# checked as part of the blob creation process. It might
# be worth having a function that checks the actual hash;
# its probably too expensive to have that check be part of
# this call.
self._verified = True
def open_for_writing(self, peer):
if not peer in self.writers:
@ -220,7 +236,7 @@ class BlobFile(HashBlob):
return None, None, None
def open_for_reading(self):
if self.verified is True:
if self._verified is True:
file_handle = None
try:
file_handle = open(self.file_path, 'rb')
@ -232,7 +248,7 @@ class BlobFile(HashBlob):
def delete(self):
if not self.writers and not self.readers:
self.verified = False
self._verified = False
self.moved_verified_blob = False
def delete_from_file_system():
@ -299,13 +315,13 @@ class TempBlob(HashBlob):
return None, None, None
def open_for_reading(self):
if self.verified is True:
if self._verified is True:
return StringIO(self.data_buffer)
return None
def delete(self):
if not self.writers and not self.readers:
self.verified = False
self._verified = False
self.data_buffer = ''
return defer.succeed(True)
else:
@ -333,7 +349,7 @@ class TempBlob(HashBlob):
class HashBlobCreator(object):
def __init__(self, blob_manager):
self.blob_manager = blob_manager
self.hashsum = get_lbry_hash_obj()
self._hashsum = get_lbry_hash_obj()
self.len_so_far = 0
self.blob_hash = None
self.length = None
@ -346,7 +362,7 @@ class HashBlobCreator(object):
if self.length == 0:
self.blob_hash = None
else:
self.blob_hash = self.hashsum.hexdigest()
self.blob_hash = self._hashsum.hexdigest()
d = self._close()
if self.blob_hash is not None:
d.addCallback(lambda _: self.blob_manager.creator_finished(self))
@ -356,7 +372,7 @@ class HashBlobCreator(object):
return d
def write(self, data):
self.hashsum.update(data)
self._hashsum.update(data)
self.len_so_far += len(data)
self._write(data)
@ -394,6 +410,7 @@ class BlobFileCreator(HashBlobCreator):
class TempBlobCreator(HashBlobCreator):
def __init__(self, blob_manager):
HashBlobCreator.__init__(self, blob_manager)
# TODO: use StringIO
self.data_buffer = ''
def _close(self):

View file

@ -184,12 +184,11 @@ class AlwaysSend(object):
return d
@defer.inlineCallbacks
def calculate_available_blob_size(blob_manager):
d = blob_manager.get_all_verified_blobs()
d.addCallback(
lambda blobs: defer.DeferredList([blob_manager.get_blob_length(b) for b in blobs]))
d.addCallback(lambda blob_lengths: sum(val for success, val in blob_lengths if success))
return d
blob_hashes = yield blob_manager.get_all_verified_blobs()
blobs = yield defer.DeferredList([blob_manager.get_blob(b) for b in blob_hashes])
defer.returnValue(sum(b.length for success, b in blobs if success and b.length))
class Daemon(AuthJSONRPCServer):