lbry-sdk/lbrynet/core/HashBlob.py

337 lines
12 KiB
Python
Raw Normal View History

2017-07-24 11:25:11 +02:00
from io import BytesIO
2015-08-20 17:27:15 +02:00
import logging
import os
import threading
from twisted.internet import interfaces, defer, threads
from twisted.protocols.basic import FileSender
2017-07-24 09:04:15 +02:00
from twisted.web.client import FileBodyProducer
2015-08-20 17:27:15 +02:00
from twisted.python.failure import Failure
from zope.interface import implements
from lbrynet import conf
2017-09-13 21:27:43 +02:00
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError
2015-08-20 17:27:15 +02:00
from lbrynet.core.cryptoutils import get_lbry_hash_obj
from lbrynet.core.utils import is_valid_blobhash
2015-08-20 17:27:15 +02:00
log = logging.getLogger(__name__)
2017-09-13 21:27:43 +02:00
2015-08-20 17:27:15 +02:00
class HashBlobReader(object):
implements(interfaces.IConsumer)
def __init__(self, write_func):
self.write_func = write_func
def registerProducer(self, producer, streaming):
from twisted.internet import reactor
self.producer = producer
self.streaming = streaming
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
pass
def write(self, data):
from twisted.internet import reactor
self.write_func(data)
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
class HashBlobWriter(object):
def __init__(self, length_getter, finished_cb):
self.write_handle = BytesIO()
2015-08-20 17:27:15 +02:00
self.length_getter = length_getter
self.finished_cb = finished_cb
self.finished_cb_d = None
self._hashsum = get_lbry_hash_obj()
2015-08-20 17:27:15 +02:00
self.len_so_far = 0
@property
def blob_hash(self):
return self._hashsum.hexdigest()
2015-08-20 17:27:15 +02:00
def write(self, data):
if self.write_handle is None:
log.info("writer has already been closed")
2017-09-13 21:27:43 +02:00
# can this be changed to IOError?
raise ValueError('I/O operation on closed file')
self._hashsum.update(data)
2015-08-20 17:27:15 +02:00
self.len_so_far += len(data)
if self.len_so_far > self.length_getter():
self.finished_cb_d = self.finished_cb(
2016-11-30 21:20:45 +01:00
self,
Failure(InvalidDataError("Length so far is greater than the expected length."
" %s to %s" % (self.len_so_far,
self.length_getter()))))
2015-08-20 17:27:15 +02:00
else:
self.write_handle.write(data)
if self.len_so_far == self.length_getter():
self.finished_cb_d = self.finished_cb(self)
2015-08-20 17:27:15 +02:00
def close_handle(self):
if self.write_handle is not None:
self.write_handle.close()
self.write_handle = None
def close(self, reason=None):
# if we've already called finished_cb because we either finished writing
# or closed already, do nothing
if self.finished_cb_d is not None:
return
if reason is None:
reason = Failure(DownloadCanceledError())
self.finished_cb_d = self.finished_cb(self, reason)
2015-08-20 17:27:15 +02:00
2017-09-13 21:27:43 +02:00
class BlobFile(object):
"""
A chunk of data available on the network which is specified by a hashsum
2015-08-20 17:27:15 +02:00
2017-09-13 21:27:43 +02:00
This class is used to create blobs on the local filesystem
when we already know the blob hash before hand (i.e., when downloading blobs)
Also can be used for reading from blobs on the local filesystem
"""
def __str__(self):
return self.blob_hash[:16]
def __repr__(self):
return '<{}({})>'.format(self.__class__.__name__, str(self))
def __init__(self, blob_dir, blob_hash, length=None):
if not is_valid_blobhash(blob_hash):
raise InvalidBlobHashError(blob_hash)
2015-08-20 17:27:15 +02:00
self.blob_hash = blob_hash
self.length = length
self.writers = {} # {Peer: writer, finished_deferred}
self.finished_deferred = None
self._verified = False
2015-08-20 17:27:15 +02:00
self.readers = 0
2017-09-13 21:27:43 +02:00
self.blob_dir = blob_dir
self.file_path = os.path.join(blob_dir, self.blob_hash)
self.setting_verified_blob_lock = threading.Lock()
self.moved_verified_blob = False
if os.path.isfile(self.file_path):
self.set_length(os.path.getsize(self.file_path))
# This assumes that the hash of the blob has already been
# checked as part of the blob creation process. It might
# be worth having a function that checks the actual hash;
# its probably too expensive to have that check be part of
# this call.
self._verified = True
def open_for_writing(self, peer):
"""
open a blob file to be written by peer, supports concurrent
writers, as long as they are from differnt peers.
returns tuple of (writer, finished_deferred)
writer - a file like object with a write() function, close() when finished
finished_deferred - deferred that is fired when write is finished and returns
a instance of itself as HashBlob
"""
if not peer in self.writers:
log.debug("Opening %s to be written by %s", str(self), str(peer))
finished_deferred = defer.Deferred()
writer = HashBlobWriter(self.get_length, self.writer_finished)
self.writers[peer] = (writer, finished_deferred)
return (writer, finished_deferred)
log.warning("Tried to download the same file twice simultaneously from the same peer")
return None, None
def open_for_reading(self):
"""
open blob for reading
returns a file handle that can be read() from.
once finished with the file handle, user must call close_read_handle()
otherwise blob cannot be deleted.
"""
if self._verified is True:
file_handle = None
try:
file_handle = open(self.file_path, 'rb')
self.readers += 1
return file_handle
except IOError:
log.exception('Failed to open %s', self.file_path)
self.close_read_handle(file_handle)
return None
def delete(self):
"""
delete blob file from file system, prevent deletion
if a blob is being read from or written to
returns a deferred that firesback when delete is completed
"""
if not self.writers and not self.readers:
self._verified = False
self.moved_verified_blob = False
def delete_from_file_system():
if os.path.isfile(self.file_path):
os.remove(self.file_path)
d = threads.deferToThread(delete_from_file_system)
def log_error(err):
log.warning("An error occurred deleting %s: %s",
str(self.file_path), err.getErrorMessage())
return err
d.addErrback(log_error)
return d
else:
return defer.fail(Failure(
ValueError("File is currently being read or written and cannot be deleted")))
2015-08-20 17:27:15 +02:00
@property
def verified(self):
"""
Protect verified from being modified by other classes.
verified is True if a write to a blob has completed succesfully,
or a blob has been read to have the same length as specified
in init
"""
return self._verified
2015-08-20 17:27:15 +02:00
def set_length(self, length):
if self.length is not None and length == self.length:
return True
2017-01-17 04:23:20 +01:00
if self.length is None and 0 <= length <= conf.settings['BLOB_SIZE']:
2015-08-20 17:27:15 +02:00
self.length = length
return True
2016-11-30 21:20:45 +01:00
log.warning("Got an invalid length. Previous length: %s, Invalid length: %s",
self.length, length)
2015-08-20 17:27:15 +02:00
return False
def get_length(self):
return self.length
def is_validated(self):
return bool(self._verified)
2015-08-20 17:27:15 +02:00
def is_downloading(self):
if self.writers:
return True
return False
def read(self, write_func):
def close_self(*args):
self.close_read_handle(file_handle)
return args[0]
file_sender = FileSender()
reader = HashBlobReader(write_func)
file_handle = self.open_for_reading()
if file_handle is not None:
d = file_sender.beginFileTransfer(file_handle, reader)
d.addCallback(close_self)
else:
d = defer.fail(ValueError("Could not read the blob"))
return d
def writer_finished(self, writer, err=None):
def fire_finished_deferred():
self._verified = True
2015-08-20 17:27:15 +02:00
for p, (w, finished_deferred) in self.writers.items():
if w == writer:
del self.writers[p]
finished_deferred.callback(self)
2015-08-20 17:27:15 +02:00
return True
2016-11-30 21:20:45 +01:00
log.warning(
"Somehow, the writer that was accepted as being valid was already removed: %s",
writer)
2015-08-20 17:27:15 +02:00
return False
def errback_finished_deferred(err):
for p, (w, finished_deferred) in self.writers.items():
if w == writer:
del self.writers[p]
finished_deferred.errback(err)
2015-08-20 17:27:15 +02:00
def cancel_other_downloads():
for p, (w, finished_deferred) in self.writers.items():
w.close()
2015-08-20 17:27:15 +02:00
if err is None:
if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash:
if self._verified is False:
2015-08-20 17:27:15 +02:00
d = self._save_verified_blob(writer)
d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred)
d.addCallback(lambda _: cancel_other_downloads())
else:
errback_finished_deferred(Failure(DownloadCanceledError()))
d = defer.succeed(True)
else:
err_string = "length vs expected: {0}, {1}, hash vs expected: {2}, {3}"
err_string = err_string.format(self.length, writer.len_so_far, self.blob_hash,
writer.blob_hash)
2015-08-20 17:27:15 +02:00
errback_finished_deferred(Failure(InvalidDataError(err_string)))
d = defer.succeed(True)
else:
errback_finished_deferred(err)
d = defer.succeed(True)
d.addBoth(lambda _: writer.close_handle())
2015-08-20 17:27:15 +02:00
return d
def close_read_handle(self, file_handle):
if file_handle is not None:
file_handle.close()
self.readers -= 1
@defer.inlineCallbacks
2015-08-20 17:27:15 +02:00
def _save_verified_blob(self, writer):
with self.setting_verified_blob_lock:
if self.moved_verified_blob is False:
writer.write_handle.seek(0)
out_path = os.path.join(self.blob_dir, self.blob_hash)
producer = FileBodyProducer(writer.write_handle)
yield producer.startProducing(open(out_path, 'wb'))
self.moved_verified_blob = True
defer.returnValue(True)
else:
raise DownloadCanceledError()
2015-08-20 17:27:15 +02:00
2017-09-13 21:27:43 +02:00
class BlobFileCreator(object):
"""
This class is used to create blobs on the local filesystem
when we do not know the blob hash beforehand (i.e, when creating
a new stream)
"""
def __init__(self, blob_dir):
self.blob_dir = blob_dir
self.buffer = BytesIO()
self._is_open = True
self._hashsum = get_lbry_hash_obj()
2015-08-20 17:27:15 +02:00
self.len_so_far = 0
self.blob_hash = None
self.length = None
@defer.inlineCallbacks
2015-08-20 17:27:15 +02:00
def close(self):
self.length = self.len_so_far
self.blob_hash = self._hashsum.hexdigest()
if self.blob_hash and self._is_open:
2017-07-24 09:04:15 +02:00
self.buffer.seek(0)
out_path = os.path.join(self.blob_dir, self.blob_hash)
producer = FileBodyProducer(self.buffer)
yield producer.startProducing(open(out_path, 'wb'))
self._is_open = False
defer.returnValue(self.blob_hash)
2015-08-20 17:27:15 +02:00
def write(self, data):
if not self._is_open:
raise IOError
self._hashsum.update(data)
self.len_so_far += len(data)
self.buffer.write(data)