lbry-sdk/lbrynet/core/HashBlob.py

415 lines
13 KiB
Python
Raw Normal View History

2017-07-24 11:25:11 +02:00
from StringIO import StringIO
from io import BytesIO
2015-08-20 17:27:15 +02:00
import logging
import os
import tempfile
import threading
import shutil
from twisted.internet import interfaces, defer, threads
from twisted.protocols.basic import FileSender
2017-07-24 09:04:15 +02:00
from twisted.web.client import FileBodyProducer
2015-08-20 17:27:15 +02:00
from twisted.python.failure import Failure
from zope.interface import implements
from lbrynet import conf
2015-08-20 17:27:15 +02:00
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError
from lbrynet.core.cryptoutils import get_lbry_hash_obj
from lbrynet.core.utils import is_valid_blobhash
2015-08-20 17:27:15 +02:00
log = logging.getLogger(__name__)
2015-08-20 17:27:15 +02:00
class HashBlobReader(object):
implements(interfaces.IConsumer)
def __init__(self, write_func):
self.write_func = write_func
def registerProducer(self, producer, streaming):
from twisted.internet import reactor
self.producer = producer
self.streaming = streaming
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
def unregisterProducer(self):
pass
def write(self, data):
from twisted.internet import reactor
self.write_func(data)
if self.streaming is False:
reactor.callLater(0, self.producer.resumeProducing)
class HashBlobWriter(object):
def __init__(self, write_handle, length_getter, finished_cb):
self.write_handle = write_handle
self.length_getter = length_getter
self.finished_cb = finished_cb
self._hashsum = get_lbry_hash_obj()
2015-08-20 17:27:15 +02:00
self.len_so_far = 0
@property
def blob_hash(self):
return self._hashsum.hexdigest()
2015-08-20 17:27:15 +02:00
def write(self, data):
self._hashsum.update(data)
2015-08-20 17:27:15 +02:00
self.len_so_far += len(data)
if self.len_so_far > self.length_getter():
2016-11-30 21:20:45 +01:00
self.finished_cb(
self,
Failure(InvalidDataError("Length so far is greater than the expected length."
" %s to %s" % (self.len_so_far,
self.length_getter()))))
2015-08-20 17:27:15 +02:00
else:
if self.write_handle is None:
log.debug("Tried to write to a write_handle that was None.")
return
2015-08-20 17:27:15 +02:00
self.write_handle.write(data)
if self.len_so_far == self.length_getter():
self.finished_cb(self)
def cancel(self, reason=None):
if reason is None:
reason = Failure(DownloadCanceledError())
self.finished_cb(self, reason)
class HashBlob(object):
"""A chunk of data available on the network which is specified by a hashsum"""
def __init__(self, blob_hash, length=None):
assert is_valid_blobhash(blob_hash)
2015-08-20 17:27:15 +02:00
self.blob_hash = blob_hash
self.length = length
self.writers = {} # {Peer: writer, finished_deferred}
self.finished_deferred = None
self._verified = False
2015-08-20 17:27:15 +02:00
self.readers = 0
@property
def verified(self):
# protect verified from being modified by other classes
return self._verified
2015-08-20 17:27:15 +02:00
def set_length(self, length):
if self.length is not None and length == self.length:
return True
2017-01-17 04:23:20 +01:00
if self.length is None and 0 <= length <= conf.settings['BLOB_SIZE']:
2015-08-20 17:27:15 +02:00
self.length = length
return True
2016-11-30 21:20:45 +01:00
log.warning("Got an invalid length. Previous length: %s, Invalid length: %s",
self.length, length)
2015-08-20 17:27:15 +02:00
return False
def get_length(self):
return self.length
def is_validated(self):
return bool(self._verified)
2015-08-20 17:27:15 +02:00
def is_downloading(self):
if self.writers:
return True
return False
def read(self, write_func):
def close_self(*args):
self.close_read_handle(file_handle)
return args[0]
file_sender = FileSender()
reader = HashBlobReader(write_func)
file_handle = self.open_for_reading()
if file_handle is not None:
d = file_sender.beginFileTransfer(file_handle, reader)
d.addCallback(close_self)
else:
d = defer.fail(ValueError("Could not read the blob"))
return d
def writer_finished(self, writer, err=None):
def fire_finished_deferred():
self._verified = True
2015-08-20 17:27:15 +02:00
for p, (w, finished_deferred) in self.writers.items():
if w == writer:
finished_deferred.callback(self)
del self.writers[p]
return True
2016-11-30 21:20:45 +01:00
log.warning(
"Somehow, the writer that was accepted as being valid was already removed: %s",
writer)
2015-08-20 17:27:15 +02:00
return False
def errback_finished_deferred(err):
for p, (w, finished_deferred) in self.writers.items():
if w == writer:
finished_deferred.errback(err)
del self.writers[p]
def cancel_other_downloads():
for p, (w, finished_deferred) in self.writers.items():
w.cancel()
if err is None:
if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash:
if self._verified is False:
2015-08-20 17:27:15 +02:00
d = self._save_verified_blob(writer)
d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred)
d.addCallback(lambda _: cancel_other_downloads())
else:
errback_finished_deferred(Failure(DownloadCanceledError()))
d = defer.succeed(True)
else:
err_string = "length vs expected: {0}, {1}, hash vs expected: {2}, {3}"
err_string = err_string.format(self.length, writer.len_so_far, self.blob_hash,
writer.blob_hash)
2015-08-20 17:27:15 +02:00
errback_finished_deferred(Failure(InvalidDataError(err_string)))
d = defer.succeed(True)
else:
errback_finished_deferred(err)
d = defer.succeed(True)
d.addBoth(lambda _: self._close_writer(writer))
return d
def open_for_writing(self, peer):
pass
def open_for_reading(self):
pass
def delete(self):
pass
def close_read_handle(self, file_handle):
pass
def _close_writer(self, writer):
pass
def _save_verified_blob(self, writer):
pass
def __str__(self):
return self.blob_hash[:16]
def __repr__(self):
return '<{}({})>'.format(self.__class__.__name__, str(self))
2015-08-20 17:27:15 +02:00
class BlobFile(HashBlob):
"""A HashBlob which will be saved to the hard disk of the downloader"""
def __init__(self, blob_dir, *args):
HashBlob.__init__(self, *args)
self.blob_dir = blob_dir
self.file_path = os.path.join(blob_dir, self.blob_hash)
self.setting_verified_blob_lock = threading.Lock()
self.moved_verified_blob = False
if os.path.isfile(self.file_path):
self.set_length(os.path.getsize(self.file_path))
# This assumes that the hash of the blob has already been
# checked as part of the blob creation process. It might
# be worth having a function that checks the actual hash;
# its probably too expensive to have that check be part of
# this call.
self._verified = True
2015-08-20 17:27:15 +02:00
def open_for_writing(self, peer):
if not peer in self.writers:
log.debug("Opening %s to be written by %s", str(self), str(peer))
2015-08-20 17:27:15 +02:00
write_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir)
finished_deferred = defer.Deferred()
writer = HashBlobWriter(write_file, self.get_length, self.writer_finished)
self.writers[peer] = (writer, finished_deferred)
return finished_deferred, writer.write, writer.cancel
log.warning("Tried to download the same file twice simultaneously from the same peer")
2015-08-20 17:27:15 +02:00
return None, None, None
def open_for_reading(self):
if self._verified is True:
2015-08-20 17:27:15 +02:00
file_handle = None
try:
file_handle = open(self.file_path, 'rb')
self.readers += 1
return file_handle
except IOError:
2017-01-20 21:50:10 +01:00
log.exception('Failed to open %s', self.file_path)
2015-08-20 17:27:15 +02:00
self.close_read_handle(file_handle)
return None
def delete(self):
if not self.writers and not self.readers:
self._verified = False
2015-08-20 17:27:15 +02:00
self.moved_verified_blob = False
def delete_from_file_system():
if os.path.isfile(self.file_path):
os.remove(self.file_path)
d = threads.deferToThread(delete_from_file_system)
def log_error(err):
2016-11-30 21:20:45 +01:00
log.warning("An error occurred deleting %s: %s",
str(self.file_path), err.getErrorMessage())
2015-08-20 17:27:15 +02:00
return err
d.addErrback(log_error)
return d
else:
2016-11-30 21:20:45 +01:00
return defer.fail(Failure(
ValueError("File is currently being read or written and cannot be deleted")))
2015-08-20 17:27:15 +02:00
def close_read_handle(self, file_handle):
if file_handle is not None:
file_handle.close()
self.readers -= 1
def _close_writer(self, writer):
if writer.write_handle is not None:
log.debug("Closing %s", str(self))
2015-08-20 17:27:15 +02:00
name = writer.write_handle.name
writer.write_handle.close()
threads.deferToThread(os.remove, name)
2015-08-20 17:27:15 +02:00
writer.write_handle = None
def _save_verified_blob(self, writer):
def move_file():
with self.setting_verified_blob_lock:
if self.moved_verified_blob is False:
temp_file_name = writer.write_handle.name
writer.write_handle.close()
shutil.move(temp_file_name, self.file_path)
writer.write_handle = None
self.moved_verified_blob = True
return True
else:
raise DownloadCanceledError()
return threads.deferToThread(move_file)
class TempBlob(HashBlob):
"""A HashBlob which will only exist in memory"""
def __init__(self, *args):
HashBlob.__init__(self, *args)
self.data_buffer = ""
def open_for_writing(self, peer):
if not peer in self.writers:
temp_buffer = StringIO()
finished_deferred = defer.Deferred()
writer = HashBlobWriter(temp_buffer, self.get_length, self.writer_finished)
self.writers[peer] = (writer, finished_deferred)
return finished_deferred, writer.write, writer.cancel
return None, None, None
def open_for_reading(self):
if self._verified is True:
2015-08-20 17:27:15 +02:00
return StringIO(self.data_buffer)
return None
def delete(self):
if not self.writers and not self.readers:
self._verified = False
2015-08-20 17:27:15 +02:00
self.data_buffer = ''
return defer.succeed(True)
else:
2016-11-30 21:20:45 +01:00
return defer.fail(Failure(
ValueError("Blob is currently being read or written and cannot be deleted")))
2015-08-20 17:27:15 +02:00
def close_read_handle(self, file_handle):
file_handle.close()
def _close_writer(self, writer):
if writer.write_handle is not None:
writer.write_handle.close()
writer.write_handle = None
def _save_verified_blob(self, writer):
if not self.data_buffer:
self.data_buffer = writer.write_handle.getvalue()
writer.write_handle.close()
writer.write_handle = None
return defer.succeed(True)
else:
return defer.fail(Failure(DownloadCanceledError()))
class HashBlobCreator(object):
def __init__(self):
self._hashsum = get_lbry_hash_obj()
2015-08-20 17:27:15 +02:00
self.len_so_far = 0
self.blob_hash = None
self.length = None
def open(self):
pass
def close(self):
self.length = self.len_so_far
if self.length == 0:
self.blob_hash = None
else:
self.blob_hash = self._hashsum.hexdigest()
2015-08-20 17:27:15 +02:00
d = self._close()
if self.blob_hash is not None:
d.addCallback(lambda _: self.blob_hash)
else:
d.addCallback(lambda _: None)
return d
def write(self, data):
self._hashsum.update(data)
2015-08-20 17:27:15 +02:00
self.len_so_far += len(data)
self._write(data)
def _close(self):
pass
def _write(self, data):
pass
class BlobFileCreator(HashBlobCreator):
def __init__(self, blob_dir):
HashBlobCreator.__init__(self)
2015-08-20 17:27:15 +02:00
self.blob_dir = blob_dir
2017-07-24 09:04:15 +02:00
self.buffer = BytesIO()
2015-08-20 17:27:15 +02:00
def _close(self):
if self.blob_hash is not None:
2017-07-24 09:04:15 +02:00
self.buffer.seek(0)
out_path = os.path.join(self.blob_dir, self.blob_hash)
producer = FileBodyProducer(self.buffer)
return producer.startProducing(open(out_path, 'wb'))
return defer.succeed(True)
2015-08-20 17:27:15 +02:00
def _write(self, data):
self.buffer.write(data)
2015-08-20 17:27:15 +02:00
class TempBlobCreator(HashBlobCreator):
def __init__(self):
HashBlobCreator.__init__(self)
# TODO: use StringIO
2015-08-20 17:27:15 +02:00
self.data_buffer = ''
def _close(self):
return defer.succeed(True)
def _write(self, data):
2016-11-10 23:16:35 +01:00
self.data_buffer += data