From 84bd4fdc3efdd8b4e05ec4c5e23f04448c0bf70f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 17 Jul 2017 02:11:36 -0300 Subject: [PATCH 01/54] fix encryption script --- scripts/encrypt_blob.py | 45 ++++++++++++++++++----------------------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/scripts/encrypt_blob.py b/scripts/encrypt_blob.py index 440ea7a8d..a30fbcc98 100644 --- a/scripts/encrypt_blob.py +++ b/scripts/encrypt_blob.py @@ -1,17 +1,16 @@ """Encrypt a single file using the given key and iv""" import argparse -import binascii import logging -import StringIO import sys from twisted.internet import defer from twisted.internet import reactor from lbrynet import conf -from lbrynet.cryptstream import CryptBlob from lbrynet.core import log_support -from lbrynet.core import cryptoutils +from lbrynet.core.HashAnnouncer import DummyHashAnnouncer +from lbrynet.core.BlobManager import DiskBlobManager +from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator log = logging.getLogger('decrypt_blob') @@ -26,7 +25,7 @@ def main(): args = parser.parse_args() log_support.configure_console(level='DEBUG') - d = run(args) + run(args) reactor.run() @@ -40,29 +39,25 @@ def run(args): reactor.callLater(0, reactor.stop) +@defer.inlineCallbacks def encrypt_blob(filename, key, iv): - blob = Blob() - blob_maker = CryptBlob.CryptStreamBlobMaker( - binascii.unhexlify(key), binascii.unhexlify(iv), 0, blob) - with open(filename) as fin: - blob_maker.write(fin.read()) - blob_maker.close() + dummy_announcer = DummyHashAnnouncer() + manager = DiskBlobManager(dummy_announcer, '.', '.') + yield manager.setup() + creator = CryptStreamCreator(manager, filename, key, iv_generator(iv)) + with open(filename, 'r') as infile: + data = infile.read(2**14) + while data: + yield creator.write(data) + data = infile.read(2**14) + yield creator.stop() -class Blob(object): - def __init__(self): - self.data = StringIO.StringIO() - - def write(self, data): - self.data.write(data) - - def close(self): - hashsum = cryptoutils.get_lbry_hash_obj() - buffer = self.data.getvalue() - hashsum.update(buffer) - with open(hashsum.hexdigest(), 'w') as fout: - fout.write(buffer) - return defer.succeed(True) +def iv_generator(iv): + iv = int(iv, 16) + while 1: + iv += 1 + yield ("%016d" % iv)[-16:] if __name__ == '__main__': From 5bbb29fd792fdaa4cacc4f23534630b55051ac04 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 17 Jul 2017 02:14:01 -0300 Subject: [PATCH 02/54] add error handling to blobs creation --- lbrynet/core/StreamCreator.py | 5 +++++ lbrynet/cryptstream/CryptStreamCreator.py | 1 + 2 files changed, 6 insertions(+) diff --git a/lbrynet/core/StreamCreator.py b/lbrynet/core/StreamCreator.py index 4aa0ae542..3b7877f91 100644 --- a/lbrynet/core/StreamCreator.py +++ b/lbrynet/core/StreamCreator.py @@ -51,13 +51,18 @@ class StreamCreator(object): current_blob = self.current_blob d = current_blob.close() d.addCallback(self._blob_finished) + d.addErrback(self._error) self.finished_deferreds.append(d) self.current_blob = None self._finalize() dl = defer.DeferredList(self.finished_deferreds) dl.addCallback(lambda _: self._finished()) + dl.addErrback(self._error) return dl + def _error(self, error): + log.error(error) + def _finalize(self): pass diff --git a/lbrynet/cryptstream/CryptStreamCreator.py b/lbrynet/cryptstream/CryptStreamCreator.py index e5b3c8bf3..782eda0f1 100644 --- a/lbrynet/cryptstream/CryptStreamCreator.py +++ b/lbrynet/cryptstream/CryptStreamCreator.py @@ -80,6 +80,7 @@ class CryptStreamCreator(StreamCreator): def close_blob(blob): d = blob.close() d.addCallback(self._blob_finished) + d.addErrback(self._error) self.finished_deferreds.append(d) while len(data) > 0: From 5d69e74010fdf643279cc1c0ed4453d04a64dafc Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 18 Jul 2017 04:29:43 -0300 Subject: [PATCH 03/54] use WAL on sqlite3 --- lbrynet/core/BlobManager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index b5407604c..cb7d738f9 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -137,6 +137,7 @@ class DiskBlobManager(DHTHashSupplier): # threads. def create_tables(transaction): + transaction.execute('PRAGMA journal_mode=WAL') transaction.execute("create table if not exists blobs (" + " blob_hash text primary key, " + " blob_length integer, " + From 7262e8415067145f9af6e2920c829cf9c3245c75 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 18 Jul 2017 04:30:21 -0300 Subject: [PATCH 04/54] use a buffer to wait for blob completion --- lbrynet/core/HashBlob.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index d80cede96..4b51280b1 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -385,19 +385,21 @@ class BlobFileCreator(HashBlobCreator): def __init__(self, blob_dir): HashBlobCreator.__init__(self) self.blob_dir = blob_dir - self.out_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir) + self.buffer = StringIO() def _close(self): - temp_file_name = self.out_file.name - self.out_file.close() if self.blob_hash is not None: - shutil.move(temp_file_name, os.path.join(self.blob_dir, self.blob_hash)) - else: - os.remove(temp_file_name) + def _twrite(data, blob_dir, blob_hash): + with open(os.path.join(blob_dir, blob_hash), 'w') as out_file: + out_file.write(data.getvalue()) + d = threads.deferToThread(_twrite, self.buffer, + self.blob_dir, self.blob_hash) + del self.buffer + return d return defer.succeed(True) def _write(self, data): - self.out_file.write(data) + self.buffer.write(data) class TempBlobCreator(HashBlobCreator): From a3e5c61b506cf13af5ccbda532e16fa44f632fa0 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 18 Jul 2017 04:31:35 -0300 Subject: [PATCH 05/54] use FileBodyProducer for encrypt_blob script --- scripts/encrypt_blob.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/encrypt_blob.py b/scripts/encrypt_blob.py index a30fbcc98..3d3552f48 100644 --- a/scripts/encrypt_blob.py +++ b/scripts/encrypt_blob.py @@ -5,6 +5,8 @@ import sys from twisted.internet import defer from twisted.internet import reactor +from twisted.protocols import basic +from twisted.web.client import FileBodyProducer from lbrynet import conf from lbrynet.core import log_support @@ -46,10 +48,8 @@ def encrypt_blob(filename, key, iv): yield manager.setup() creator = CryptStreamCreator(manager, filename, key, iv_generator(iv)) with open(filename, 'r') as infile: - data = infile.read(2**14) - while data: - yield creator.write(data) - data = infile.read(2**14) + producer = FileBodyProducer(infile, readSize=2**22) + yield producer.startProducing(creator) yield creator.stop() From a6b8327fcf005388f251b5d6951b33b6c1e842cf Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 24 Jul 2017 04:04:15 -0300 Subject: [PATCH 06/54] save using a FBP instead of thread --- lbrynet/core/HashBlob.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index 4b51280b1..52ef6c645 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -1,4 +1,4 @@ -from StringIO import StringIO +from io import BytesIO, StringIO import logging import os import tempfile @@ -6,6 +6,7 @@ import threading import shutil from twisted.internet import interfaces, defer, threads from twisted.protocols.basic import FileSender +from twisted.web.client import FileBodyProducer from twisted.python.failure import Failure from zope.interface import implements from lbrynet import conf @@ -385,17 +386,14 @@ class BlobFileCreator(HashBlobCreator): def __init__(self, blob_dir): HashBlobCreator.__init__(self) self.blob_dir = blob_dir - self.buffer = StringIO() + self.buffer = BytesIO() def _close(self): if self.blob_hash is not None: - def _twrite(data, blob_dir, blob_hash): - with open(os.path.join(blob_dir, blob_hash), 'w') as out_file: - out_file.write(data.getvalue()) - d = threads.deferToThread(_twrite, self.buffer, - self.blob_dir, self.blob_hash) - del self.buffer - return d + self.buffer.seek(0) + out_path = os.path.join(self.blob_dir, self.blob_hash) + producer = FileBodyProducer(self.buffer) + return producer.startProducing(open(out_path, 'wb')) return defer.succeed(True) def _write(self, data): From d2fc1daf26b880a21b5e0ffcfe7c52c3b4e9086e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 24 Jul 2017 04:04:49 -0300 Subject: [PATCH 07/54] use hashlib --- lbrynet/core/cryptoutils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lbrynet/core/cryptoutils.py b/lbrynet/core/cryptoutils.py index 7c0c5c40c..2528c7e69 100644 --- a/lbrynet/core/cryptoutils.py +++ b/lbrynet/core/cryptoutils.py @@ -1,9 +1,9 @@ -from Crypto.Hash import SHA384 import seccure +import hashlib def get_lbry_hash_obj(): - return SHA384.new() + return hashlib.sha384() def get_pub_key(pass_phrase): From e69ba6470724a29b44299753568baa6218375afe Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 24 Jul 2017 04:05:40 -0300 Subject: [PATCH 08/54] pycrypt -> cryptography + remove manual padding, use lib --- lbrynet/cryptstream/CryptBlob.py | 53 +++++++++++++++----------------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/lbrynet/cryptstream/CryptBlob.py b/lbrynet/cryptstream/CryptBlob.py index 0b26d6f89..71bfa7020 100644 --- a/lbrynet/cryptstream/CryptBlob.py +++ b/lbrynet/cryptstream/CryptBlob.py @@ -1,11 +1,15 @@ import binascii import logging -from Crypto.Cipher import AES +from cryptography.hazmat.primitives.ciphers import Cipher, modes +from cryptography.hazmat.primitives.ciphers.algorithms import AES +from cryptography.hazmat.primitives.padding import PKCS7 +from cryptography.hazmat.backends import default_backend from lbrynet import conf from lbrynet.core.BlobInfo import BlobInfo log = logging.getLogger(__name__) +backend = default_backend() class CryptBlobInfo(BlobInfo): @@ -31,7 +35,9 @@ class StreamBlobDecryptor(object): self.length = length self.buff = b'' self.len_read = 0 - self.cipher = AES.new(self.key, AES.MODE_CBC, self.iv) + cipher = Cipher(AES(self.key), modes.CBC(self.iv), backend=backend) + self.unpadder = PKCS7(AES.block_size).unpadder() + self.cipher = cipher.decryptor() def decrypt(self, write_func): """ @@ -42,22 +48,19 @@ class StreamBlobDecryptor(object): """ def remove_padding(data): - pad_len = ord(data[-1]) - data, padding = data[:-1 * pad_len], data[-1 * pad_len:] - for c in padding: - assert ord(c) == pad_len - return data + return self.unpadder.update(data) + self.unpadder.finalize() def write_bytes(): if self.len_read < self.length: - num_bytes_to_decrypt = greatest_multiple(len(self.buff), self.cipher.block_size) + num_bytes_to_decrypt = greatest_multiple(len(self.buff), (AES.block_size / 8)) data_to_decrypt, self.buff = split(self.buff, num_bytes_to_decrypt) - write_func(self.cipher.decrypt(data_to_decrypt)) + write_func(self.cipher.update(data_to_decrypt)) def finish_decrypt(): - assert len(self.buff) % self.cipher.block_size == 0 + assert len(self.buff) % (AES.block_size / 8) == 0 data_to_decrypt, self.buff = self.buff, b'' - write_func(remove_padding(self.cipher.decrypt(data_to_decrypt))) + last_chunk = self.cipher.update(data_to_decrypt) + self.cipher.finalize() + write_func(remove_padding(last_chunk)) def decrypt_bytes(data): self.buff += data @@ -84,8 +87,9 @@ class CryptStreamBlobMaker(object): self.iv = iv self.blob_num = blob_num self.blob = blob - self.cipher = AES.new(self.key, AES.MODE_CBC, self.iv) - self.buff = b'' + cipher = Cipher(AES(self.key), modes.CBC(self.iv), backend=backend) + self.padder = PKCS7(AES.block_size).padder() + self.cipher = cipher.encryptor() self.length = 0 def write(self, data): @@ -104,10 +108,11 @@ class CryptStreamBlobMaker(object): done = True else: num_bytes_to_write = len(data) - self.length += num_bytes_to_write data_to_write = data[:num_bytes_to_write] - self.buff += data_to_write - self._write_buffer() + self.length += len(data_to_write) + padded_data = self.padder.update(data_to_write) + encrypted_data = self.cipher.update(padded_data) + self.blob.write(encrypted_data) return done, num_bytes_to_write def close(self): @@ -119,20 +124,10 @@ class CryptStreamBlobMaker(object): log.debug("called the finished_callback from CryptStreamBlobMaker.close") return d - def _write_buffer(self): - num_bytes_to_encrypt = (len(self.buff) // AES.block_size) * AES.block_size - data_to_encrypt, self.buff = split(self.buff, num_bytes_to_encrypt) - encrypted_data = self.cipher.encrypt(data_to_encrypt) - self.blob.write(encrypted_data) - def _close_buffer(self): - data_to_encrypt, self.buff = self.buff, b'' - assert len(data_to_encrypt) < AES.block_size - pad_len = AES.block_size - len(data_to_encrypt) - padded_data = data_to_encrypt + chr(pad_len) * pad_len - self.length += pad_len - assert len(padded_data) == AES.block_size - encrypted_data = self.cipher.encrypt(padded_data) + self.length += (AES.block_size / 8) - (self.length % (AES.block_size / 8)) + padded_data = self.padder.finalize() + encrypted_data = self.cipher.update(padded_data) + self.cipher.finalize() self.blob.write(encrypted_data) def _return_info(self, blob_hash): From d03aded31dae455912110c7f641e0a0b0b1479dc Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 24 Jul 2017 04:51:13 -0300 Subject: [PATCH 09/54] add cryptography as a direct requirement --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 1f3bda1d7..192b6a0a8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ Twisted==16.6.0 +cryptography==1.9 appdirs==1.4.3 argparse==1.2.1 docopt==0.6.2 From 6022aa925b833cc5e63c5a6393be385531983177 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 24 Jul 2017 06:25:11 -0300 Subject: [PATCH 10/54] fix import --- lbrynet/core/HashBlob.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index 52ef6c645..0a90dea2f 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -1,4 +1,5 @@ -from io import BytesIO, StringIO +from StringIO import StringIO +from io import BytesIO import logging import os import tempfile From c53a18960534898c59ca31edb48bbea22a19a4b7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 7 Sep 2017 11:53:02 -0400 Subject: [PATCH 11/54] remove TempBlob, clean up blob classes, use FBP and BytesIO in BlobFile --- lbrynet/core/HashBlob.py | 168 +++++------------------ scripts/download_blob_from_peer.py | 5 +- tests/unit/cryptstream/test_cryptblob.py | 1 - 3 files changed, 40 insertions(+), 134 deletions(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index 0a90dea2f..ac13c6d61 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -1,10 +1,7 @@ -from StringIO import StringIO from io import BytesIO import logging import os -import tempfile import threading -import shutil from twisted.internet import interfaces, defer, threads from twisted.protocols.basic import FileSender from twisted.web.client import FileBodyProducer @@ -120,7 +117,6 @@ class HashBlob(object): return False def read(self, write_func): - def close_self(*args): self.close_read_handle(file_handle) return args[0] @@ -182,22 +178,22 @@ class HashBlob(object): return d def open_for_writing(self, peer): - pass + raise NotImplementedError() def open_for_reading(self): - pass + raise NotImplementedError() def delete(self): - pass + raise NotImplementedError() def close_read_handle(self, file_handle): - pass + raise NotImplementedError() def _close_writer(self, writer): - pass + raise NotImplementedError() def _save_verified_blob(self, writer): - pass + raise NotImplementedError() def __str__(self): return self.blob_hash[:16] @@ -209,12 +205,13 @@ class HashBlob(object): class BlobFile(HashBlob): """A HashBlob which will be saved to the hard disk of the downloader""" - def __init__(self, blob_dir, *args): - HashBlob.__init__(self, *args) + def __init__(self, blob_dir, blob_hash, length=None): + HashBlob.__init__(self, blob_hash, length) self.blob_dir = blob_dir self.file_path = os.path.join(blob_dir, self.blob_hash) self.setting_verified_blob_lock = threading.Lock() self.moved_verified_blob = False + self.buffer = BytesIO() if os.path.isfile(self.file_path): self.set_length(os.path.getsize(self.file_path)) # This assumes that the hash of the blob has already been @@ -227,10 +224,8 @@ class BlobFile(HashBlob): def open_for_writing(self, peer): if not peer in self.writers: log.debug("Opening %s to be written by %s", str(self), str(peer)) - write_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir) finished_deferred = defer.Deferred() - writer = HashBlobWriter(write_file, self.get_length, self.writer_finished) - + writer = HashBlobWriter(self.buffer, self.get_length, self.writer_finished) self.writers[peer] = (writer, finished_deferred) return finished_deferred, writer.write, writer.cancel log.warning("Tried to download the same file twice simultaneously from the same peer") @@ -278,137 +273,48 @@ class BlobFile(HashBlob): def _close_writer(self, writer): if writer.write_handle is not None: log.debug("Closing %s", str(self)) - name = writer.write_handle.name writer.write_handle.close() - threads.deferToThread(os.remove, name) writer.write_handle = None + @defer.inlineCallbacks def _save_verified_blob(self, writer): - - def move_file(): - with self.setting_verified_blob_lock: - if self.moved_verified_blob is False: - temp_file_name = writer.write_handle.name - writer.write_handle.close() - shutil.move(temp_file_name, self.file_path) - writer.write_handle = None - self.moved_verified_blob = True - return True - else: - raise DownloadCanceledError() - - return threads.deferToThread(move_file) + with self.setting_verified_blob_lock: + if self.moved_verified_blob is False: + self.buffer.seek(0) + out_path = os.path.join(self.blob_dir, self.blob_hash) + producer = FileBodyProducer(self.buffer) + yield producer.startProducing(open(out_path, 'wb')) + self.moved_verified_blob = True + defer.returnValue(True) + else: + raise DownloadCanceledError() -class TempBlob(HashBlob): - """A HashBlob which will only exist in memory""" - def __init__(self, *args): - HashBlob.__init__(self, *args) - self.data_buffer = "" - - def open_for_writing(self, peer): - if not peer in self.writers: - temp_buffer = StringIO() - finished_deferred = defer.Deferred() - writer = HashBlobWriter(temp_buffer, self.get_length, self.writer_finished) - - self.writers[peer] = (writer, finished_deferred) - return finished_deferred, writer.write, writer.cancel - return None, None, None - - def open_for_reading(self): - if self._verified is True: - return StringIO(self.data_buffer) - return None - - def delete(self): - if not self.writers and not self.readers: - self._verified = False - self.data_buffer = '' - return defer.succeed(True) - else: - return defer.fail(Failure( - ValueError("Blob is currently being read or written and cannot be deleted"))) - - def close_read_handle(self, file_handle): - file_handle.close() - - def _close_writer(self, writer): - if writer.write_handle is not None: - writer.write_handle.close() - writer.write_handle = None - - def _save_verified_blob(self, writer): - if not self.data_buffer: - self.data_buffer = writer.write_handle.getvalue() - writer.write_handle.close() - writer.write_handle = None - return defer.succeed(True) - else: - return defer.fail(Failure(DownloadCanceledError())) - - -class HashBlobCreator(object): - def __init__(self): +class BlobFileCreator(object): + def __init__(self, blob_dir): + self.blob_dir = blob_dir + self.buffer = BytesIO() + self._is_open = True self._hashsum = get_lbry_hash_obj() self.len_so_far = 0 self.blob_hash = None self.length = None - def open(self): - pass - + @defer.inlineCallbacks def close(self): self.length = self.len_so_far - if self.length == 0: - self.blob_hash = None - else: - self.blob_hash = self._hashsum.hexdigest() - d = self._close() - if self.blob_hash is not None: - d.addCallback(lambda _: self.blob_hash) - else: - d.addCallback(lambda _: None) - return d - - def write(self, data): - self._hashsum.update(data) - self.len_so_far += len(data) - self._write(data) - - def _close(self): - pass - - def _write(self, data): - pass - - -class BlobFileCreator(HashBlobCreator): - def __init__(self, blob_dir): - HashBlobCreator.__init__(self) - self.blob_dir = blob_dir - self.buffer = BytesIO() - - def _close(self): - if self.blob_hash is not None: + self.blob_hash = self._hashsum.hexdigest() + if self.blob_hash and self._is_open: self.buffer.seek(0) out_path = os.path.join(self.blob_dir, self.blob_hash) producer = FileBodyProducer(self.buffer) - return producer.startProducing(open(out_path, 'wb')) - return defer.succeed(True) + yield producer.startProducing(open(out_path, 'wb')) + self._is_open = False + defer.returnValue(self.blob_hash) - def _write(self, data): + def write(self, data): + if not self._is_open: + raise IOError + self._hashsum.update(data) + self.len_so_far += len(data) self.buffer.write(data) - - -class TempBlobCreator(HashBlobCreator): - def __init__(self): - HashBlobCreator.__init__(self) - # TODO: use StringIO - self.data_buffer = '' - - def _close(self): - return defer.succeed(True) - - def _write(self, data): - self.data_buffer += data diff --git a/scripts/download_blob_from_peer.py b/scripts/download_blob_from_peer.py index fffc44c9a..02d9a1693 100644 --- a/scripts/download_blob_from_peer.py +++ b/scripts/download_blob_from_peer.py @@ -2,7 +2,7 @@ import argparse import logging import sys -import tempfile +import os from twisted.internet import defer from twisted.internet import reactor @@ -31,13 +31,14 @@ def main(args=None): parser.add_argument('--timeout', type=int, default=30) parser.add_argument('peer') parser.add_argument('blob_hash') + parser.add_argument('directory', type=str, default=os.getcwd()) args = parser.parse_args(args) log_support.configure_console(level='DEBUG') announcer = HashAnnouncer.DummyHashAnnouncer() blob_manager = MyBlobManager(announcer) - blob = HashBlob.TempBlob(args.blob_hash, False) + blob = HashBlob.BlobFile(args.directory, args.blob_hash) download_manager = SingleBlobDownloadManager(blob) peer = Peer.Peer(*conf.server_port(args.peer)) payment_rate_manager = DumbPaymentRateManager() diff --git a/tests/unit/cryptstream/test_cryptblob.py b/tests/unit/cryptstream/test_cryptblob.py index cb1110d77..624f1a747 100644 --- a/tests/unit/cryptstream/test_cryptblob.py +++ b/tests/unit/cryptstream/test_cryptblob.py @@ -1,7 +1,6 @@ from twisted.trial import unittest from twisted.internet import defer from lbrynet.cryptstream import CryptBlob -from lbrynet.core.HashBlob import TempBlobCreator from lbrynet import conf from tests.mocks import mock_conf_settings From 1148a533bfd919feb18341628603fa2b9486fa2e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 7 Sep 2017 11:53:58 -0400 Subject: [PATCH 12/54] raise NotImplementedError on StreamCreator._blob_finished (sanity check) --- lbrynet/core/StreamCreator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/core/StreamCreator.py b/lbrynet/core/StreamCreator.py index 3b7877f91..5093651be 100644 --- a/lbrynet/core/StreamCreator.py +++ b/lbrynet/core/StreamCreator.py @@ -27,7 +27,7 @@ class StreamCreator(object): self.finished_deferreds = [] def _blob_finished(self, blob_info): - pass + raise NotImplementedError() def registerProducer(self, producer, streaming): From a72fef07c062be5763d8fdb0f0a97cffcbbe426e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 7 Sep 2017 11:54:39 -0400 Subject: [PATCH 13/54] inlinecallbacks --- lbrynet/cryptstream/CryptBlob.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/lbrynet/cryptstream/CryptBlob.py b/lbrynet/cryptstream/CryptBlob.py index 71bfa7020..54552b588 100644 --- a/lbrynet/cryptstream/CryptBlob.py +++ b/lbrynet/cryptstream/CryptBlob.py @@ -1,5 +1,6 @@ import binascii import logging +from twisted.internet import defer from cryptography.hazmat.primitives.ciphers import Cipher, modes from cryptography.hazmat.primitives.ciphers.algorithms import AES from cryptography.hazmat.primitives.padding import PKCS7 @@ -115,23 +116,19 @@ class CryptStreamBlobMaker(object): self.blob.write(encrypted_data) return done, num_bytes_to_write + @defer.inlineCallbacks def close(self): log.debug("closing blob %s with plaintext len %s", str(self.blob_num), str(self.length)) if self.length != 0: - self._close_buffer() - d = self.blob.close() - d.addCallback(self._return_info) + self.length += (AES.block_size / 8) - (self.length % (AES.block_size / 8)) + padded_data = self.padder.finalize() + encrypted_data = self.cipher.update(padded_data) + self.cipher.finalize() + self.blob.write(encrypted_data) + + blob_hash = yield self.blob.close() log.debug("called the finished_callback from CryptStreamBlobMaker.close") - return d - - def _close_buffer(self): - self.length += (AES.block_size / 8) - (self.length % (AES.block_size / 8)) - padded_data = self.padder.finalize() - encrypted_data = self.cipher.update(padded_data) + self.cipher.finalize() - self.blob.write(encrypted_data) - - def _return_info(self, blob_hash): - return CryptBlobInfo(blob_hash, self.blob_num, self.length, binascii.hexlify(self.iv)) + blob = CryptBlobInfo(blob_hash, self.blob_num, self.length, binascii.hexlify(self.iv)) + defer.returnValue(blob) def greatest_multiple(a, b): From 632fd764faad7289da0162883ce956008eb5169d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 7 Sep 2017 11:55:18 -0400 Subject: [PATCH 14/54] show more information for a blob padding error --- lbrynet/cryptstream/CryptBlob.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lbrynet/cryptstream/CryptBlob.py b/lbrynet/cryptstream/CryptBlob.py index 54552b588..a7303a588 100644 --- a/lbrynet/cryptstream/CryptBlob.py +++ b/lbrynet/cryptstream/CryptBlob.py @@ -58,7 +58,11 @@ class StreamBlobDecryptor(object): write_func(self.cipher.update(data_to_decrypt)) def finish_decrypt(): - assert len(self.buff) % (AES.block_size / 8) == 0 + bytes_left = len(self.buff) % (AES.block_size / 8) + if bytes_left != 0: + log.warning(self.buff[-1 * (AES.block_size / 8):].encode('hex')) + raise Exception("blob %s has incorrect padding: %i bytes left" % + (self.blob.blob_hash, bytes_left)) data_to_decrypt, self.buff = self.buff, b'' last_chunk = self.cipher.update(data_to_decrypt) + self.cipher.finalize() write_func(remove_padding(last_chunk)) From cfe73a862714e576fe074b00f3ad62b899df1800 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 7 Sep 2017 11:55:41 -0400 Subject: [PATCH 15/54] fix hanging streamprogressmanager --- lbrynet/core/client/StreamProgressManager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbrynet/core/client/StreamProgressManager.py b/lbrynet/core/client/StreamProgressManager.py index 29aea9d1a..d28112ac6 100644 --- a/lbrynet/core/client/StreamProgressManager.py +++ b/lbrynet/core/client/StreamProgressManager.py @@ -154,10 +154,11 @@ class FullStreamProgressManager(StreamProgressManager): d.addCallback(lambda _: check_if_finished()) def log_error(err): - log.warning("Error occurred in the output loop. Error: %s", err.getErrorMessage()) + log.warning("Error occurred in the output loop. Error: %s", err) if self.outputting_d is not None and not self.outputting_d.called: self.outputting_d.callback(True) self.outputting_d = None + self.stop() d.addErrback(log_error) else: From e3cc3992b0a87bc619bdee761e170a0ca0160e09 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 7 Sep 2017 11:56:47 -0400 Subject: [PATCH 16/54] remove unnecessary functions --- lbrynet/cryptstream/CryptStreamCreator.py | 6 ------ lbrynet/file_manager/EncryptedFileCreator.py | 4 ---- 2 files changed, 10 deletions(-) diff --git a/lbrynet/cryptstream/CryptStreamCreator.py b/lbrynet/cryptstream/CryptStreamCreator.py index 782eda0f1..9a3225f9a 100644 --- a/lbrynet/cryptstream/CryptStreamCreator.py +++ b/lbrynet/cryptstream/CryptStreamCreator.py @@ -77,12 +77,6 @@ class CryptStreamCreator(StreamCreator): self.finished_deferreds.append(d) def _write(self, data): - def close_blob(blob): - d = blob.close() - d.addCallback(self._blob_finished) - d.addErrback(self._error) - self.finished_deferreds.append(d) - while len(data) > 0: if self.current_blob is None: self.next_blob_creator = self.blob_manager.get_blob_creator() diff --git a/lbrynet/file_manager/EncryptedFileCreator.py b/lbrynet/file_manager/EncryptedFileCreator.py index 99d15e5e4..67b5702b7 100644 --- a/lbrynet/file_manager/EncryptedFileCreator.py +++ b/lbrynet/file_manager/EncryptedFileCreator.py @@ -40,10 +40,6 @@ class EncryptedFileStreamCreator(CryptStreamCreator): self.blob_infos) return d - def setup(self): - d = CryptStreamCreator.setup(self) - return d - def _get_blobs_hashsum(self): blobs_hashsum = get_lbry_hash_obj() for blob_info in sorted(self.blob_infos, key=lambda b_i: b_i.blob_num): From 100c18d9b63a73cb0002f3c27e33f12dd2e01e92 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 7 Sep 2017 12:00:55 -0400 Subject: [PATCH 17/54] send CryptBlobInfo (not a CryptStreamBlobMaker) to DiskBlobManager.creator_finished --- lbrynet/cryptstream/CryptStreamCreator.py | 4 ++-- lbrynet/file_manager/EncryptedFileCreator.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/lbrynet/cryptstream/CryptStreamCreator.py b/lbrynet/cryptstream/CryptStreamCreator.py index 9a3225f9a..446c29951 100644 --- a/lbrynet/cryptstream/CryptStreamCreator.py +++ b/lbrynet/cryptstream/CryptStreamCreator.py @@ -89,8 +89,8 @@ class CryptStreamCreator(StreamCreator): should_announce = self.blob_count == 0 d = self.current_blob.close() d.addCallback(self._blob_finished) - d.addCallback(lambda _: self.blob_manager.creator_finished( - self.next_blob_creator, should_announce)) + d.addCallback(lambda blob_info: self.blob_manager.creator_finished(blob_info, + should_announce)) self.finished_deferreds.append(d) self.current_blob = None diff --git a/lbrynet/file_manager/EncryptedFileCreator.py b/lbrynet/file_manager/EncryptedFileCreator.py index 67b5702b7..bf6d3bea7 100644 --- a/lbrynet/file_manager/EncryptedFileCreator.py +++ b/lbrynet/file_manager/EncryptedFileCreator.py @@ -29,8 +29,9 @@ class EncryptedFileStreamCreator(CryptStreamCreator): self.blob_infos = [] def _blob_finished(self, blob_info): - log.debug("length: %s", str(blob_info.length)) + log.debug("length: %s", blob_info.length) self.blob_infos.append(blob_info) + return blob_info def _save_stream_info(self): stream_info_manager = self.lbry_file_manager.stream_info_manager From 070978248e42faad4c75800d14ebd1d27fdc9abe Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 7 Sep 2017 12:01:31 -0400 Subject: [PATCH 18/54] return new lbry file status from _change_file_status --- lbrynet/file_manager/EncryptedFileManager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 212efea48..9d38548dd 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -303,8 +303,10 @@ class EncryptedFileManager(object): @rerun_if_locked def _change_file_status(self, rowid, new_status): - return self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?", + d = self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?", (new_status, rowid)) + d.addCallback(lambda _: new_status) + return d @rerun_if_locked def _get_lbry_file_status(self, rowid): From b98cd24e10aaedd6c70c6b07cb1c062c16aa88b1 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 7 Sep 2017 15:03:01 -0400 Subject: [PATCH 19/54] don't reflect empty last blob --- lbrynet/reflector/client/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 26882d186..d7a3ab96a 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -112,7 +112,7 @@ class EncryptedFileReflectorClient(Protocol): def get_validated_blobs(self, blobs_in_stream): def get_blobs(blobs): for (blob, _, _, blob_len) in blobs: - if blob: + if blob and blob_len: yield self.blob_manager.get_blob(blob, blob_len) dl = defer.DeferredList(list(get_blobs(blobs_in_stream)), consumeErrors=True) From 90bce0b37523c85f5fb5c14ee41e75f180d25130 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 7 Sep 2017 15:25:41 -0400 Subject: [PATCH 20/54] fix setting _final_blob_num in EncryptedFileMetadataHandler --- .../client/EncryptedFileMetadataHandler.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py b/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py index bd09dfdfc..116ac7080 100644 --- a/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py +++ b/lbrynet/lbry_file/client/EncryptedFileMetadataHandler.py @@ -1,5 +1,6 @@ import logging from zope.interface import implements +from twisted.internet import defer from lbrynet.cryptstream.CryptBlob import CryptBlobInfo from lbrynet.interfaces import IMetadataHandler @@ -18,10 +19,11 @@ class EncryptedFileMetadataHandler(object): ######### IMetadataHandler ######### + @defer.inlineCallbacks def get_initial_blobs(self): - d = self.stream_info_manager.get_blobs_for_stream(self.stream_hash) - d.addCallback(self._format_initial_blobs_for_download_manager) - return d + blob_infos = yield self.stream_info_manager.get_blobs_for_stream(self.stream_hash) + formatted_infos = self._format_initial_blobs_for_download_manager(blob_infos) + defer.returnValue(formatted_infos) def final_blob_num(self): return self._final_blob_num @@ -30,10 +32,12 @@ class EncryptedFileMetadataHandler(object): def _format_initial_blobs_for_download_manager(self, blob_infos): infos = [] - for blob_hash, blob_num, iv, length in blob_infos: - if blob_hash is not None: + for i, (blob_hash, blob_num, iv, length) in enumerate(blob_infos): + if blob_hash is not None and length: infos.append(CryptBlobInfo(blob_hash, blob_num, length, iv)) else: + if i != len(blob_infos) - 1: + raise Exception("Invalid stream terminator") log.debug("Setting _final_blob_num to %s", str(blob_num - 1)) self._final_blob_num = blob_num - 1 return infos From 487f2490ab520c136e558cea30fec847471cd2b6 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 7 Sep 2017 15:38:47 -0400 Subject: [PATCH 21/54] simplify CryptStreamCreator --- lbrynet/core/StreamCreator.py | 81 ----------------------- lbrynet/cryptstream/CryptStreamCreator.py | 70 ++++++++++++++++++-- 2 files changed, 63 insertions(+), 88 deletions(-) delete mode 100644 lbrynet/core/StreamCreator.py diff --git a/lbrynet/core/StreamCreator.py b/lbrynet/core/StreamCreator.py deleted file mode 100644 index 5093651be..000000000 --- a/lbrynet/core/StreamCreator.py +++ /dev/null @@ -1,81 +0,0 @@ -import logging -from twisted.internet import interfaces, defer -from zope.interface import implements - - -log = logging.getLogger(__name__) - - -class StreamCreator(object): - """Classes which derive from this class create a 'stream', which can be any - collection of associated blobs and associated metadata. These classes - use the IConsumer interface to get data from an IProducer and transform - the data into a 'stream'""" - - implements(interfaces.IConsumer) - - def __init__(self, name): - """ - @param name: the name of the stream - """ - self.name = name - self.stopped = True - self.producer = None - self.streaming = None - self.blob_count = -1 - self.current_blob = None - self.finished_deferreds = [] - - def _blob_finished(self, blob_info): - raise NotImplementedError() - - def registerProducer(self, producer, streaming): - - from twisted.internet import reactor - - self.producer = producer - self.streaming = streaming - self.stopped = False - if streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - def unregisterProducer(self): - self.stopped = True - self.producer = None - - def stop(self): - """Stop creating the stream. Create the terminating zero-length blob.""" - log.debug("stop has been called for StreamCreator") - self.stopped = True - if self.current_blob is not None: - current_blob = self.current_blob - d = current_blob.close() - d.addCallback(self._blob_finished) - d.addErrback(self._error) - self.finished_deferreds.append(d) - self.current_blob = None - self._finalize() - dl = defer.DeferredList(self.finished_deferreds) - dl.addCallback(lambda _: self._finished()) - dl.addErrback(self._error) - return dl - - def _error(self, error): - log.error(error) - - def _finalize(self): - pass - - def _finished(self): - pass - - # TODO: move the stream creation process to its own thread and - # remove the reactor from this process. - def write(self, data): - from twisted.internet import reactor - self._write(data) - if self.stopped is False and self.streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - def _write(self, data): - pass diff --git a/lbrynet/cryptstream/CryptStreamCreator.py b/lbrynet/cryptstream/CryptStreamCreator.py index 446c29951..9c94ad476 100644 --- a/lbrynet/cryptstream/CryptStreamCreator.py +++ b/lbrynet/cryptstream/CryptStreamCreator.py @@ -3,25 +3,27 @@ Utility for creating Crypt Streams, which are encrypted blobs and associated met """ import logging - +from twisted.internet import interfaces, defer +from zope.interface import implements from Crypto import Random from Crypto.Cipher import AES - -from twisted.internet import defer -from lbrynet.core.StreamCreator import StreamCreator from lbrynet.cryptstream.CryptBlob import CryptStreamBlobMaker log = logging.getLogger(__name__) -class CryptStreamCreator(StreamCreator): - """Create a new stream with blobs encrypted by a symmetric cipher. +class CryptStreamCreator(object): + """ + Create a new stream with blobs encrypted by a symmetric cipher. Each blob is encrypted with the same key, but each blob has its own initialization vector which is associated with the blob when the blob is associated with the stream. """ + + implements(interfaces.IConsumer) + def __init__(self, blob_manager, name=None, key=None, iv_generator=None): """@param blob_manager: Object that stores and provides access to blobs. @type blob_manager: BlobManager @@ -39,14 +41,59 @@ class CryptStreamCreator(StreamCreator): @return: None """ - StreamCreator.__init__(self, name) self.blob_manager = blob_manager + self.name = name self.key = key if iv_generator is None: self.iv_generator = self.random_iv_generator() else: self.iv_generator = iv_generator + self.stopped = True + self.producer = None + self.streaming = None + self.blob_count = -1 + self.current_blob = None + self.finished_deferreds = [] + + def registerProducer(self, producer, streaming): + from twisted.internet import reactor + + self.producer = producer + self.streaming = streaming + self.stopped = False + if streaming is False: + reactor.callLater(0, self.producer.resumeProducing) + + def unregisterProducer(self): + self.stopped = True + self.producer = None + + def stop(self): + """Stop creating the stream. Create the terminating zero-length blob.""" + log.debug("stop has been called for StreamCreator") + self.stopped = True + if self.current_blob is not None: + current_blob = self.current_blob + d = current_blob.close() + d.addCallback(self._blob_finished) + d.addErrback(self._error) + self.finished_deferreds.append(d) + self.current_blob = None + self._finalize() + dl = defer.DeferredList(self.finished_deferreds) + dl.addCallback(lambda _: self._finished()) + dl.addErrback(self._error) + return dl + + # TODO: move the stream creation process to its own thread and + # remove the reactor from this process. + def write(self, data): + from twisted.internet import reactor + self._write(data) + if self.stopped is False and self.streaming is False: + reactor.callLater(0, self.producer.resumeProducing) + @staticmethod def random_iv_generator(): while 1: @@ -96,3 +143,12 @@ class CryptStreamCreator(StreamCreator): def _get_blob_maker(self, iv, blob_creator): return CryptStreamBlobMaker(self.key, iv, self.blob_count, blob_creator) + + def _error(self, error): + log.error(error) + + def _finished(self): + raise NotImplementedError() + + def _blob_finished(self, blob_info): + raise NotImplementedError() From 637d1f265b8ec43c25a6c989507f03fc4a51c0f4 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 12 Sep 2017 11:42:17 -0400 Subject: [PATCH 22/54] add unit tests for HashBlob.py classes --- tests/unit/core/test_HashBlob.py | 109 +++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 tests/unit/core/test_HashBlob.py diff --git a/tests/unit/core/test_HashBlob.py b/tests/unit/core/test_HashBlob.py new file mode 100644 index 000000000..d4a7a4d57 --- /dev/null +++ b/tests/unit/core/test_HashBlob.py @@ -0,0 +1,109 @@ +from lbrynet.core.HashBlob import HashBlob, BlobFile +from lbrynet.core.Error import DownloadCanceledError, InvalidDataError + + +from tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir, random_lbry_hash +from twisted.trial import unittest +from twisted.internet import defer +import os +import time + + + +class BlobFileTest(unittest.TestCase): + def setUp(self): + self.db_dir, self.blob_dir = mk_db_and_blob_dir() + self.fake_content_len = 64 + self.fake_content = bytearray('0'*self.fake_content_len) + self.fake_content_hash = '53871b26a08e90cb62142f2a39f0b80de41792322b0ca5602b6eb7b5cf067c49498a7492bb9364bbf90f40c1c5412105' + + + def tearDown(self): + rm_db_and_blob_dir(self.db_dir, self.blob_dir) + + + @defer.inlineCallbacks + def test_good_write_and_read(self): + # test a write that should succeed + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + self.assertFalse(blob_file.verified) + + finished_d, write_func, cancel_func = blob_file.open_for_writing(peer=1) + write_func(self.fake_content) + out = yield finished_d + self.assertTrue(isinstance(out,HashBlob)) + self.assertTrue(out.verified) + self.assertEqual(self.fake_content_len, out.get_length()) + + # read from the instance used to write to, and verify content + f = blob_file.open_for_reading() + c = f.read() + self.assertEqual(c, self.fake_content) + self.assertFalse(out.is_downloading()) + + # read from newly declared instance, and verify content + del blob_file + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + self.assertTrue(blob_file.verified) + f = blob_file.open_for_reading() + c = f.read() + self.assertEqual(c, self.fake_content) + + @defer.inlineCallbacks + def test_delete(self): + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + finished_d, write_func, cancel_func = blob_file.open_for_writing(peer=1) + write_func(self.fake_content) + out = yield finished_d + out = yield blob_file.delete() + + blob_file = BlobFile(self.blob_dir, self.fake_content_hash) + self.assertFalse(blob_file.verified) + + @defer.inlineCallbacks + def test_too_much_write(self): + # writing too much data should result in failure + expected_length= 16 + content = bytearray('0'*32) + blob_hash = random_lbry_hash() + blob_file = BlobFile(self.blob_dir, blob_hash, expected_length) + finished_d, write_func, cancel_func = blob_file.open_for_writing(peer=1) + write_func(content) + out = yield self.assertFailure(finished_d, InvalidDataError) + + @defer.inlineCallbacks + def test_bad_hash(self): + # test a write that should fail because its content's hash + # does not equal the blob_hash + length= 64 + content = bytearray('0'*length) + blob_hash = random_lbry_hash() + blob_file = BlobFile(self.blob_dir, blob_hash, length) + finished_d, write_func, cancel_func = blob_file.open_for_writing(peer=1) + write_func(content) + yield self.assertFailure(finished_d, InvalidDataError) + + + @defer.inlineCallbacks + def test_multiple_writers(self): + # start first writer and write half way, and then start second writer and write everything + blob_hash = self.fake_content_hash + blob_file = BlobFile(self.blob_dir, blob_hash, self.fake_content_len) + finished_d_1, write_func_1, cancel_func_1 = blob_file.open_for_writing(peer=1) + write_func_1(self.fake_content[:self.fake_content_len/2]) + + finished_d_2, write_func_2, cancel_func_2 = blob_file.open_for_writing(peer=2) + write_func_2(self.fake_content) + out_2 = yield finished_d_2 + out_1 = yield self.assertFailure(finished_d_1, DownloadCanceledError) + + self.assertTrue(isinstance(out_2,HashBlob)) + self.assertTrue(out_2.verified) + self.assertEqual(self.fake_content_len, out_2.get_length()) + + f = blob_file.open_for_reading() + c = f.read() + self.assertEqual(self.fake_content_len, len(c)) + self.assertEqual(bytearray(c), self.fake_content) + + From b655cd4fa6cb21158cd1dd3e9d918aeb33ed2f39 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 12 Sep 2017 12:01:06 -0400 Subject: [PATCH 23/54] add better comments for classes in HashBlob.py --- lbrynet/core/HashBlob.py | 44 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index ac13c6d61..df3891aeb 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -92,7 +92,12 @@ class HashBlob(object): @property def verified(self): - # protect verified from being modified by other classes + """ + Protect verified from being modified by other classes. + verified is True if a write to a blob has completed succesfully, + or a blob has been read to have the same length as specified + in init + """ return self._verified def set_length(self, length): @@ -203,8 +208,11 @@ class HashBlob(object): class BlobFile(HashBlob): - """A HashBlob which will be saved to the hard disk of the downloader""" - + """ + This class is used to create blobs on the local filesystem + when we already know the blob hash before hand (i.e., when downloading blobs) + Also can be used for reading from blobs on the local filesystem + """ def __init__(self, blob_dir, blob_hash, length=None): HashBlob.__init__(self, blob_hash, length) self.blob_dir = blob_dir @@ -222,6 +230,17 @@ class BlobFile(HashBlob): self._verified = True def open_for_writing(self, peer): + """ + open a blob file to be written by peer, supports concurrent + writers, as long as they are from differnt peers. + + returns tuple of (finished_deferred, writer.writer, writer.cancel) + + finished_deferred - deferred that is fired when write is finished and returns + a instance of itself as HashBlob + writer.write - function used to write to file, argument is data to be written + writer.cancel - function used to cancel the write, takes no argument + """ if not peer in self.writers: log.debug("Opening %s to be written by %s", str(self), str(peer)) finished_deferred = defer.Deferred() @@ -232,6 +251,13 @@ class BlobFile(HashBlob): return None, None, None def open_for_reading(self): + """ + open blob for reading + + returns a file handle that can be read() from. + once finished with the file handle, user must call close_read_handle() + otherwise blob cannot be deleted. + """ if self._verified is True: file_handle = None try: @@ -244,6 +270,12 @@ class BlobFile(HashBlob): return None def delete(self): + """ + delete blob file from file system, prevent deletion + if a blob is being read from or written to + + returns a deferred that firesback when delete is completed + """ if not self.writers and not self.readers: self._verified = False self.moved_verified_blob = False @@ -289,8 +321,12 @@ class BlobFile(HashBlob): else: raise DownloadCanceledError() - class BlobFileCreator(object): + """ + This class is used to create blobs on the local filesystem + when we do not know the blob hash beforehand (i.e, when creating + a new stream) + """ def __init__(self, blob_dir): self.blob_dir = blob_dir self.buffer = BytesIO() From f2deee720132995422a2a7b933b044daedd619d4 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 12 Sep 2017 12:03:49 -0400 Subject: [PATCH 24/54] fireback finished_deferred after deleting from self.writers so that BlobFile state is accurate after finished_deferred is called --- lbrynet/core/HashBlob.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index df3891aeb..f305896a4 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -15,7 +15,6 @@ from lbrynet.core.utils import is_valid_blobhash log = logging.getLogger(__name__) - class HashBlobReader(object): implements(interfaces.IConsumer) @@ -142,8 +141,8 @@ class HashBlob(object): self._verified = True for p, (w, finished_deferred) in self.writers.items(): if w == writer: - finished_deferred.callback(self) del self.writers[p] + finished_deferred.callback(self) return True log.warning( "Somehow, the writer that was accepted as being valid was already removed: %s", @@ -153,8 +152,8 @@ class HashBlob(object): def errback_finished_deferred(err): for p, (w, finished_deferred) in self.writers.items(): if w == writer: - finished_deferred.errback(err) del self.writers[p] + finished_deferred.errback(err) def cancel_other_downloads(): for p, (w, finished_deferred) in self.writers.items(): From ab513d076be23a6d73084406915c64ae201edcad Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 12 Sep 2017 14:10:28 -0400 Subject: [PATCH 25/54] fix multiple writers writing to the same buffer --- lbrynet/core/HashBlob.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index f305896a4..16a01e173 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -43,8 +43,8 @@ class HashBlobReader(object): class HashBlobWriter(object): - def __init__(self, write_handle, length_getter, finished_cb): - self.write_handle = write_handle + def __init__(self, length_getter, finished_cb): + self.write_handle = BytesIO() self.length_getter = length_getter self.finished_cb = finished_cb self._hashsum = get_lbry_hash_obj() @@ -218,7 +218,6 @@ class BlobFile(HashBlob): self.file_path = os.path.join(blob_dir, self.blob_hash) self.setting_verified_blob_lock = threading.Lock() self.moved_verified_blob = False - self.buffer = BytesIO() if os.path.isfile(self.file_path): self.set_length(os.path.getsize(self.file_path)) # This assumes that the hash of the blob has already been @@ -243,7 +242,7 @@ class BlobFile(HashBlob): if not peer in self.writers: log.debug("Opening %s to be written by %s", str(self), str(peer)) finished_deferred = defer.Deferred() - writer = HashBlobWriter(self.buffer, self.get_length, self.writer_finished) + writer = HashBlobWriter(self.get_length, self.writer_finished) self.writers[peer] = (writer, finished_deferred) return finished_deferred, writer.write, writer.cancel log.warning("Tried to download the same file twice simultaneously from the same peer") @@ -311,9 +310,9 @@ class BlobFile(HashBlob): def _save_verified_blob(self, writer): with self.setting_verified_blob_lock: if self.moved_verified_blob is False: - self.buffer.seek(0) + writer.write_handle.seek(0) out_path = os.path.join(self.blob_dir, self.blob_hash) - producer = FileBodyProducer(self.buffer) + producer = FileBodyProducer(writer.write_handle) yield producer.startProducing(open(out_path, 'wb')) self.moved_verified_blob = True defer.returnValue(True) From 0f95712a0f18b6b6997eee9005ea7862c7c9dff8 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 12 Sep 2017 23:14:19 -0400 Subject: [PATCH 26/54] rename HashBlobWriter.cancel() as close() to be more file like --- lbrynet/core/HashBlob.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index 16a01e173..60f8cb37d 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -71,9 +71,10 @@ class HashBlobWriter(object): if self.len_so_far == self.length_getter(): self.finished_cb(self) - def cancel(self, reason=None): + def close(self, reason=None): if reason is None: reason = Failure(DownloadCanceledError()) + self.finished_cb(self, reason) @@ -157,7 +158,7 @@ class HashBlob(object): def cancel_other_downloads(): for p, (w, finished_deferred) in self.writers.items(): - w.cancel() + w.close() if err is None: if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash: @@ -232,19 +233,19 @@ class BlobFile(HashBlob): open a blob file to be written by peer, supports concurrent writers, as long as they are from differnt peers. - returns tuple of (finished_deferred, writer.writer, writer.cancel) + returns tuple of (finished_deferred, writer.writer, writer.close) finished_deferred - deferred that is fired when write is finished and returns a instance of itself as HashBlob writer.write - function used to write to file, argument is data to be written - writer.cancel - function used to cancel the write, takes no argument + writer.close - function used to cancel the write, takes no argument """ if not peer in self.writers: log.debug("Opening %s to be written by %s", str(self), str(peer)) finished_deferred = defer.Deferred() writer = HashBlobWriter(self.get_length, self.writer_finished) self.writers[peer] = (writer, finished_deferred) - return finished_deferred, writer.write, writer.cancel + return finished_deferred, writer.write, writer.close log.warning("Tried to download the same file twice simultaneously from the same peer") return None, None, None From 94ff4e82bc3e795567379a99f0a13be118760336 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 12 Sep 2017 23:33:26 -0400 Subject: [PATCH 27/54] remove HashBob._close_writer(), move the closing of write handler to HashBlobWriter --- lbrynet/core/HashBlob.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index 60f8cb37d..053362118 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -74,6 +74,9 @@ class HashBlobWriter(object): def close(self, reason=None): if reason is None: reason = Failure(DownloadCanceledError()) + if self.write_handle is not None: + self.write_handle.close() + self.write_handle = None self.finished_cb(self, reason) @@ -137,7 +140,6 @@ class HashBlob(object): return d def writer_finished(self, writer, err=None): - def fire_finished_deferred(): self._verified = True for p, (w, finished_deferred) in self.writers.items(): @@ -179,7 +181,6 @@ class HashBlob(object): errback_finished_deferred(err) d = defer.succeed(True) - d.addBoth(lambda _: self._close_writer(writer)) return d def open_for_writing(self, peer): @@ -194,9 +195,6 @@ class HashBlob(object): def close_read_handle(self, file_handle): raise NotImplementedError() - def _close_writer(self, writer): - raise NotImplementedError() - def _save_verified_blob(self, writer): raise NotImplementedError() @@ -301,12 +299,6 @@ class BlobFile(HashBlob): file_handle.close() self.readers -= 1 - def _close_writer(self, writer): - if writer.write_handle is not None: - log.debug("Closing %s", str(self)) - writer.write_handle.close() - writer.write_handle = None - @defer.inlineCallbacks def _save_verified_blob(self, writer): with self.setting_verified_blob_lock: From 196aa24b8b64917f077051acbaf087abf872d5c7 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 13 Sep 2017 12:58:11 -0400 Subject: [PATCH 28/54] add HashBlobWriter.close_handle() function to just close the write handle --- lbrynet/core/HashBlob.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index 053362118..452e4ce7a 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -71,13 +71,17 @@ class HashBlobWriter(object): if self.len_so_far == self.length_getter(): self.finished_cb(self) - def close(self, reason=None): - if reason is None: - reason = Failure(DownloadCanceledError()) + def close_handle(self): if self.write_handle is not None: self.write_handle.close() self.write_handle = None + def close(self, reason=None): + # we've already closed, so do nothing + if self.write_handle is None: + return + if reason is None: + reason = Failure(DownloadCanceledError()) self.finished_cb(self, reason) @@ -180,7 +184,7 @@ class HashBlob(object): else: errback_finished_deferred(err) d = defer.succeed(True) - + d.addBoth(lambda _: writer.close_handle()) return d def open_for_writing(self, peer): From 468a16af584aa7e393cd0bd63a028c92fe6bda07 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 13 Sep 2017 13:04:05 -0400 Subject: [PATCH 29/54] move check for write_handle to the beginning and raise exception when writing to a closed file handle --- lbrynet/core/HashBlob.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index 452e4ce7a..49258ba89 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -55,6 +55,10 @@ class HashBlobWriter(object): return self._hashsum.hexdigest() def write(self, data): + if self.write_handle is None: + log.info("writer has already been closed") + raise ValueError('I/O operation on closed file') + self._hashsum.update(data) self.len_so_far += len(data) if self.len_so_far > self.length_getter(): @@ -64,9 +68,6 @@ class HashBlobWriter(object): " %s to %s" % (self.len_so_far, self.length_getter())))) else: - if self.write_handle is None: - log.debug("Tried to write to a write_handle that was None.") - return self.write_handle.write(data) if self.len_so_far == self.length_getter(): self.finished_cb(self) From 39c4db3471c892884b5b621c3c190e5c6286b39a Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 13 Sep 2017 14:01:20 -0400 Subject: [PATCH 30/54] make sure that we don't call HashBlobWriter.finished_cb multiple times when calling close() --- lbrynet/core/HashBlob.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index 49258ba89..686242a01 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -47,6 +47,7 @@ class HashBlobWriter(object): self.write_handle = BytesIO() self.length_getter = length_getter self.finished_cb = finished_cb + self.finished_cb_d = None self._hashsum = get_lbry_hash_obj() self.len_so_far = 0 @@ -62,7 +63,7 @@ class HashBlobWriter(object): self._hashsum.update(data) self.len_so_far += len(data) if self.len_so_far > self.length_getter(): - self.finished_cb( + self.finished_cb_d = self.finished_cb( self, Failure(InvalidDataError("Length so far is greater than the expected length." " %s to %s" % (self.len_so_far, @@ -70,7 +71,7 @@ class HashBlobWriter(object): else: self.write_handle.write(data) if self.len_so_far == self.length_getter(): - self.finished_cb(self) + self.finished_cb_d = self.finished_cb(self) def close_handle(self): if self.write_handle is not None: @@ -78,12 +79,13 @@ class HashBlobWriter(object): self.write_handle = None def close(self, reason=None): - # we've already closed, so do nothing - if self.write_handle is None: + # if we've already called finished_cb because we either finished writing + # or closed already, do nothing + if self.finished_cb_d is not None: return if reason is None: reason = Failure(DownloadCanceledError()) - self.finished_cb(self, reason) + self.finished_cb_d = self.finished_cb(self, reason) class HashBlob(object): From e92321a9c12545fd11ad6a44081c27118579522e Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 12 Sep 2017 23:52:42 -0400 Subject: [PATCH 31/54] have BlobFile.open_for_writing() return the writer instead of write and close functions --- lbrynet/core/HashBlob.py | 9 ++++----- lbrynet/core/client/BlobRequester.py | 4 ++-- lbrynet/reflector/server/server.py | 18 +++++++++--------- tests/unit/core/test_BlobManager.py | 7 +++---- tests/unit/core/test_HashBlob.py | 24 ++++++++++++------------ 5 files changed, 30 insertions(+), 32 deletions(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index 686242a01..c87f1bab9 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -238,21 +238,20 @@ class BlobFile(HashBlob): open a blob file to be written by peer, supports concurrent writers, as long as they are from differnt peers. - returns tuple of (finished_deferred, writer.writer, writer.close) + returns tuple of (writer, finished_deferred) + writer - a file like object with a write() function, close() when finished finished_deferred - deferred that is fired when write is finished and returns a instance of itself as HashBlob - writer.write - function used to write to file, argument is data to be written - writer.close - function used to cancel the write, takes no argument """ if not peer in self.writers: log.debug("Opening %s to be written by %s", str(self), str(peer)) finished_deferred = defer.Deferred() writer = HashBlobWriter(self.get_length, self.writer_finished) self.writers[peer] = (writer, finished_deferred) - return finished_deferred, writer.write, writer.close + return (writer, finished_deferred) log.warning("Tried to download the same file twice simultaneously from the same peer") - return None, None, None + return None, None def open_for_reading(self): """ diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index 95ffaa327..a887b24c6 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -466,9 +466,9 @@ class DownloadRequest(RequestHelper): if blob.is_validated(): log.debug('Skipping blob %s as its already validated', blob) continue - d, write_func, cancel_func = blob.open_for_writing(self.peer) + writer, d = blob.open_for_writing(self.peer) if d is not None: - return BlobDownloadDetails(blob, d, write_func, cancel_func, self.peer) + return BlobDownloadDetails(blob, d, writer.write, writer.close, self.peer) log.debug('Skipping blob %s as there was an issue opening it for writing', blob) return None diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index 8467e5321..1c41c9eea 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -35,11 +35,11 @@ class ReflectorServer(Protocol): self.peer_version = None self.receiving_blob = False self.incoming_blob = None - self.blob_write = None self.blob_finished_d = None - self.cancel_write = None self.request_buff = "" + self.blob_writer = None + def connectionLost(self, reason=failure.Failure(error.ConnectionDone())): log.info("Reflector upload from %s finished" % self.peer.host) @@ -82,14 +82,14 @@ class ReflectorServer(Protocol): """ blob = self.incoming_blob - self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer) + self.blob_writer, self.blob_finished_d = blob.open_for_writing(self.peer) self.blob_finished_d.addCallback(self._on_completed_blob, response_key) self.blob_finished_d.addErrback(self._on_failed_blob, response_key) def close_blob(self): + self.blob_writer.close() + self.blob_writer = None self.blob_finished_d = None - self.blob_write = None - self.cancel_write = None self.incoming_blob = None self.receiving_blob = False @@ -99,7 +99,7 @@ class ReflectorServer(Protocol): def dataReceived(self, data): if self.receiving_blob: - self.blob_write(data) + self.blob_writer.write(data) else: log.debug('Not yet recieving blob, data needs further processing') self.request_buff += data @@ -110,7 +110,7 @@ class ReflectorServer(Protocol): d.addErrback(self.handle_error) if self.receiving_blob and extra_data: log.debug('Writing extra data to blob') - self.blob_write(extra_data) + self.blob_writer.write(extra_data) def _get_valid_response(self, response_msg): extra_data = None @@ -221,7 +221,7 @@ class ReflectorServer(Protocol): sd_blob_hash = request_dict[SD_BLOB_HASH] sd_blob_size = request_dict[SD_BLOB_SIZE] - if self.blob_write is None: + if self.blob_writer is None: d = self.blob_manager.get_blob(sd_blob_hash, sd_blob_size) d.addCallback(self.get_descriptor_response) d.addCallback(self.send_response) @@ -293,7 +293,7 @@ class ReflectorServer(Protocol): blob_hash = request_dict[BLOB_HASH] blob_size = request_dict[BLOB_SIZE] - if self.blob_write is None: + if self.blob_writer is None: log.debug('Received info for blob: %s', blob_hash[:16]) d = self.blob_manager.get_blob(blob_hash, blob_size) d.addCallback(self.get_blob_response) diff --git a/tests/unit/core/test_BlobManager.py b/tests/unit/core/test_BlobManager.py index 1b7271dc2..f6b4a1f04 100644 --- a/tests/unit/core/test_BlobManager.py +++ b/tests/unit/core/test_BlobManager.py @@ -47,8 +47,8 @@ class BlobManagerTest(unittest.TestCase): yield self.bm.setup() blob = yield self.bm.get_blob(blob_hash,len(data)) - finished_d, write, cancel =yield blob.open_for_writing(self.peer) - yield write(data) + writer, finished_d = yield blob.open_for_writing(self.peer) + yield writer.write(data) yield self.bm.blob_completed(blob) yield self.bm.add_blob_to_upload_history(blob_hash,'test',len(data)) @@ -105,7 +105,7 @@ class BlobManagerTest(unittest.TestCase): # open the last blob blob = yield self.bm.get_blob(blob_hashes[-1]) - finished_d, write, cancel = yield blob.open_for_writing(self.peer) + writer, finished_d = yield blob.open_for_writing(self.peer) # delete the last blob and check if it still exists out = yield self.bm.delete_blobs([blob_hash]) @@ -114,4 +114,3 @@ class BlobManagerTest(unittest.TestCase): self.assertTrue(blob_hashes[-1] in blobs) self.assertTrue(os.path.isfile(os.path.join(self.blob_dir,blob_hashes[-1]))) - blob._close_writer(blob.writers[self.peer][0]) diff --git a/tests/unit/core/test_HashBlob.py b/tests/unit/core/test_HashBlob.py index d4a7a4d57..d8843f390 100644 --- a/tests/unit/core/test_HashBlob.py +++ b/tests/unit/core/test_HashBlob.py @@ -28,8 +28,8 @@ class BlobFileTest(unittest.TestCase): blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) self.assertFalse(blob_file.verified) - finished_d, write_func, cancel_func = blob_file.open_for_writing(peer=1) - write_func(self.fake_content) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(self.fake_content) out = yield finished_d self.assertTrue(isinstance(out,HashBlob)) self.assertTrue(out.verified) @@ -52,8 +52,8 @@ class BlobFileTest(unittest.TestCase): @defer.inlineCallbacks def test_delete(self): blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) - finished_d, write_func, cancel_func = blob_file.open_for_writing(peer=1) - write_func(self.fake_content) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(self.fake_content) out = yield finished_d out = yield blob_file.delete() @@ -67,8 +67,8 @@ class BlobFileTest(unittest.TestCase): content = bytearray('0'*32) blob_hash = random_lbry_hash() blob_file = BlobFile(self.blob_dir, blob_hash, expected_length) - finished_d, write_func, cancel_func = blob_file.open_for_writing(peer=1) - write_func(content) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(content) out = yield self.assertFailure(finished_d, InvalidDataError) @defer.inlineCallbacks @@ -79,8 +79,8 @@ class BlobFileTest(unittest.TestCase): content = bytearray('0'*length) blob_hash = random_lbry_hash() blob_file = BlobFile(self.blob_dir, blob_hash, length) - finished_d, write_func, cancel_func = blob_file.open_for_writing(peer=1) - write_func(content) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(content) yield self.assertFailure(finished_d, InvalidDataError) @@ -89,11 +89,11 @@ class BlobFileTest(unittest.TestCase): # start first writer and write half way, and then start second writer and write everything blob_hash = self.fake_content_hash blob_file = BlobFile(self.blob_dir, blob_hash, self.fake_content_len) - finished_d_1, write_func_1, cancel_func_1 = blob_file.open_for_writing(peer=1) - write_func_1(self.fake_content[:self.fake_content_len/2]) + writer_1, finished_d_1 = blob_file.open_for_writing(peer=1) + writer_1.write(self.fake_content[:self.fake_content_len/2]) - finished_d_2, write_func_2, cancel_func_2 = blob_file.open_for_writing(peer=2) - write_func_2(self.fake_content) + writer_2, finished_d_2 = blob_file.open_for_writing(peer=2) + writer_2.write(self.fake_content) out_2 = yield finished_d_2 out_1 = yield self.assertFailure(finished_d_1, DownloadCanceledError) From f816cc16377b22f0abca73317eeee822eebb700c Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 13 Sep 2017 14:01:53 -0400 Subject: [PATCH 32/54] add some unit tests for BlobFile.close() --- tests/unit/core/test_HashBlob.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/unit/core/test_HashBlob.py b/tests/unit/core/test_HashBlob.py index d8843f390..3bea14c4e 100644 --- a/tests/unit/core/test_HashBlob.py +++ b/tests/unit/core/test_HashBlob.py @@ -30,6 +30,7 @@ class BlobFileTest(unittest.TestCase): writer, finished_d = blob_file.open_for_writing(peer=1) writer.write(self.fake_content) + writer.close() out = yield finished_d self.assertTrue(isinstance(out,HashBlob)) self.assertTrue(out.verified) @@ -83,6 +84,26 @@ class BlobFileTest(unittest.TestCase): writer.write(content) yield self.assertFailure(finished_d, InvalidDataError) + @defer.inlineCallbacks + def test_close_on_incomplete_write(self): + # write all but 1 byte of data, + blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + writer, finished_d = blob_file.open_for_writing(peer=1) + writer.write(self.fake_content[:self.fake_content_len-1]) + writer.close() + yield self.assertFailure(finished_d, DownloadCanceledError) + + # writes after close will throw a ValueError exception + with self.assertRaises(ValueError): + writer.write(self.fake_content) + + # another call to close will do nothing + writer.close() + + # file should not exist, since we did not finish write + blob_file_2 = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len) + out = blob_file_2.open_for_reading() + self.assertEqual(None, out) @defer.inlineCallbacks def test_multiple_writers(self): From 7d6e62eb776810ca766893911f5c5264b16152d8 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 13 Sep 2017 15:27:43 -0400 Subject: [PATCH 33/54] consolidate HashBlob and BlobFile --- lbrynet/core/HashBlob.py | 217 ++++++++++++++----------------- tests/unit/core/test_HashBlob.py | 9 +- 2 files changed, 103 insertions(+), 123 deletions(-) diff --git a/lbrynet/core/HashBlob.py b/lbrynet/core/HashBlob.py index c87f1bab9..8aa822db2 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/core/HashBlob.py @@ -8,13 +8,14 @@ from twisted.web.client import FileBodyProducer from twisted.python.failure import Failure from zope.interface import implements from lbrynet import conf -from lbrynet.core.Error import DownloadCanceledError, InvalidDataError +from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.utils import is_valid_blobhash log = logging.getLogger(__name__) + class HashBlobReader(object): implements(interfaces.IConsumer) @@ -22,7 +23,6 @@ class HashBlobReader(object): self.write_func = write_func def registerProducer(self, producer, streaming): - from twisted.internet import reactor self.producer = producer @@ -34,7 +34,6 @@ class HashBlobReader(object): pass def write(self, data): - from twisted.internet import reactor self.write_func(data) @@ -58,6 +57,7 @@ class HashBlobWriter(object): def write(self, data): if self.write_handle is None: log.info("writer has already been closed") + # can this be changed to IOError? raise ValueError('I/O operation on closed file') self._hashsum.update(data) @@ -88,17 +88,109 @@ class HashBlobWriter(object): self.finished_cb_d = self.finished_cb(self, reason) -class HashBlob(object): - """A chunk of data available on the network which is specified by a hashsum""" +class BlobFile(object): + """ + A chunk of data available on the network which is specified by a hashsum - def __init__(self, blob_hash, length=None): - assert is_valid_blobhash(blob_hash) + This class is used to create blobs on the local filesystem + when we already know the blob hash before hand (i.e., when downloading blobs) + Also can be used for reading from blobs on the local filesystem + """ + + def __str__(self): + return self.blob_hash[:16] + + def __repr__(self): + return '<{}({})>'.format(self.__class__.__name__, str(self)) + + def __init__(self, blob_dir, blob_hash, length=None): + if not is_valid_blobhash(blob_hash): + raise InvalidBlobHashError(blob_hash) self.blob_hash = blob_hash self.length = length self.writers = {} # {Peer: writer, finished_deferred} self.finished_deferred = None self._verified = False self.readers = 0 + self.blob_dir = blob_dir + self.file_path = os.path.join(blob_dir, self.blob_hash) + self.setting_verified_blob_lock = threading.Lock() + self.moved_verified_blob = False + if os.path.isfile(self.file_path): + self.set_length(os.path.getsize(self.file_path)) + # This assumes that the hash of the blob has already been + # checked as part of the blob creation process. It might + # be worth having a function that checks the actual hash; + # its probably too expensive to have that check be part of + # this call. + self._verified = True + + def open_for_writing(self, peer): + """ + open a blob file to be written by peer, supports concurrent + writers, as long as they are from differnt peers. + + returns tuple of (writer, finished_deferred) + + writer - a file like object with a write() function, close() when finished + finished_deferred - deferred that is fired when write is finished and returns + a instance of itself as HashBlob + """ + if not peer in self.writers: + log.debug("Opening %s to be written by %s", str(self), str(peer)) + finished_deferred = defer.Deferred() + writer = HashBlobWriter(self.get_length, self.writer_finished) + self.writers[peer] = (writer, finished_deferred) + return (writer, finished_deferred) + log.warning("Tried to download the same file twice simultaneously from the same peer") + return None, None + + def open_for_reading(self): + """ + open blob for reading + + returns a file handle that can be read() from. + once finished with the file handle, user must call close_read_handle() + otherwise blob cannot be deleted. + """ + if self._verified is True: + file_handle = None + try: + file_handle = open(self.file_path, 'rb') + self.readers += 1 + return file_handle + except IOError: + log.exception('Failed to open %s', self.file_path) + self.close_read_handle(file_handle) + return None + + def delete(self): + """ + delete blob file from file system, prevent deletion + if a blob is being read from or written to + + returns a deferred that firesback when delete is completed + """ + if not self.writers and not self.readers: + self._verified = False + self.moved_verified_blob = False + + def delete_from_file_system(): + if os.path.isfile(self.file_path): + os.remove(self.file_path) + + d = threads.deferToThread(delete_from_file_system) + + def log_error(err): + log.warning("An error occurred deleting %s: %s", + str(self.file_path), err.getErrorMessage()) + return err + + d.addErrback(log_error) + return d + else: + return defer.fail(Failure( + ValueError("File is currently being read or written and cannot be deleted"))) @property def verified(self): @@ -190,116 +282,6 @@ class HashBlob(object): d.addBoth(lambda _: writer.close_handle()) return d - def open_for_writing(self, peer): - raise NotImplementedError() - - def open_for_reading(self): - raise NotImplementedError() - - def delete(self): - raise NotImplementedError() - - def close_read_handle(self, file_handle): - raise NotImplementedError() - - def _save_verified_blob(self, writer): - raise NotImplementedError() - - def __str__(self): - return self.blob_hash[:16] - - def __repr__(self): - return '<{}({})>'.format(self.__class__.__name__, str(self)) - - -class BlobFile(HashBlob): - """ - This class is used to create blobs on the local filesystem - when we already know the blob hash before hand (i.e., when downloading blobs) - Also can be used for reading from blobs on the local filesystem - """ - def __init__(self, blob_dir, blob_hash, length=None): - HashBlob.__init__(self, blob_hash, length) - self.blob_dir = blob_dir - self.file_path = os.path.join(blob_dir, self.blob_hash) - self.setting_verified_blob_lock = threading.Lock() - self.moved_verified_blob = False - if os.path.isfile(self.file_path): - self.set_length(os.path.getsize(self.file_path)) - # This assumes that the hash of the blob has already been - # checked as part of the blob creation process. It might - # be worth having a function that checks the actual hash; - # its probably too expensive to have that check be part of - # this call. - self._verified = True - - def open_for_writing(self, peer): - """ - open a blob file to be written by peer, supports concurrent - writers, as long as they are from differnt peers. - - returns tuple of (writer, finished_deferred) - - writer - a file like object with a write() function, close() when finished - finished_deferred - deferred that is fired when write is finished and returns - a instance of itself as HashBlob - """ - if not peer in self.writers: - log.debug("Opening %s to be written by %s", str(self), str(peer)) - finished_deferred = defer.Deferred() - writer = HashBlobWriter(self.get_length, self.writer_finished) - self.writers[peer] = (writer, finished_deferred) - return (writer, finished_deferred) - log.warning("Tried to download the same file twice simultaneously from the same peer") - return None, None - - def open_for_reading(self): - """ - open blob for reading - - returns a file handle that can be read() from. - once finished with the file handle, user must call close_read_handle() - otherwise blob cannot be deleted. - """ - if self._verified is True: - file_handle = None - try: - file_handle = open(self.file_path, 'rb') - self.readers += 1 - return file_handle - except IOError: - log.exception('Failed to open %s', self.file_path) - self.close_read_handle(file_handle) - return None - - def delete(self): - """ - delete blob file from file system, prevent deletion - if a blob is being read from or written to - - returns a deferred that firesback when delete is completed - """ - if not self.writers and not self.readers: - self._verified = False - self.moved_verified_blob = False - - def delete_from_file_system(): - if os.path.isfile(self.file_path): - os.remove(self.file_path) - - d = threads.deferToThread(delete_from_file_system) - - def log_error(err): - log.warning("An error occurred deleting %s: %s", - str(self.file_path), err.getErrorMessage()) - return err - - d.addErrback(log_error) - return d - else: - return defer.fail(Failure( - ValueError("File is currently being read or written and cannot be deleted"))) - def close_read_handle(self, file_handle): if file_handle is not None: file_handle.close() @@ -318,6 +300,7 @@ class BlobFile(HashBlob): else: raise DownloadCanceledError() + class BlobFileCreator(object): """ This class is used to create blobs on the local filesystem diff --git a/tests/unit/core/test_HashBlob.py b/tests/unit/core/test_HashBlob.py index 3bea14c4e..076e201bb 100644 --- a/tests/unit/core/test_HashBlob.py +++ b/tests/unit/core/test_HashBlob.py @@ -1,4 +1,4 @@ -from lbrynet.core.HashBlob import HashBlob, BlobFile +from lbrynet.core.HashBlob import BlobFile from lbrynet.core.Error import DownloadCanceledError, InvalidDataError @@ -9,7 +9,6 @@ import os import time - class BlobFileTest(unittest.TestCase): def setUp(self): self.db_dir, self.blob_dir = mk_db_and_blob_dir() @@ -17,11 +16,9 @@ class BlobFileTest(unittest.TestCase): self.fake_content = bytearray('0'*self.fake_content_len) self.fake_content_hash = '53871b26a08e90cb62142f2a39f0b80de41792322b0ca5602b6eb7b5cf067c49498a7492bb9364bbf90f40c1c5412105' - def tearDown(self): rm_db_and_blob_dir(self.db_dir, self.blob_dir) - @defer.inlineCallbacks def test_good_write_and_read(self): # test a write that should succeed @@ -32,7 +29,7 @@ class BlobFileTest(unittest.TestCase): writer.write(self.fake_content) writer.close() out = yield finished_d - self.assertTrue(isinstance(out,HashBlob)) + self.assertTrue(isinstance(out, BlobFile)) self.assertTrue(out.verified) self.assertEqual(self.fake_content_len, out.get_length()) @@ -118,7 +115,7 @@ class BlobFileTest(unittest.TestCase): out_2 = yield finished_d_2 out_1 = yield self.assertFailure(finished_d_1, DownloadCanceledError) - self.assertTrue(isinstance(out_2,HashBlob)) + self.assertTrue(isinstance(out_2, BlobFile)) self.assertTrue(out_2.verified) self.assertEqual(self.fake_content_len, out_2.get_length()) From 14636a5d3822d3a98716cd30d1c51af687dd7474 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 13 Sep 2017 15:46:39 -0400 Subject: [PATCH 34/54] split up HashBlob.py into lbrynet.blob --- lbrynet/blob/__init__.py | 4 + .../{core/HashBlob.py => blob/blob_file.py} | 114 +----------------- lbrynet/blob/creator.py | 43 +++++++ lbrynet/blob/reader.py | 30 +++++ lbrynet/blob/writer.py | 54 +++++++++ lbrynet/core/BlobManager.py | 3 +- scripts/decrypt_blob.py | 4 +- scripts/download_blob_from_peer.py | 4 +- tests/unit/core/test_HashBlob.py | 2 +- 9 files changed, 141 insertions(+), 117 deletions(-) create mode 100644 lbrynet/blob/__init__.py rename lbrynet/{core/HashBlob.py => blob/blob_file.py} (69%) create mode 100644 lbrynet/blob/creator.py create mode 100644 lbrynet/blob/reader.py create mode 100644 lbrynet/blob/writer.py diff --git a/lbrynet/blob/__init__.py b/lbrynet/blob/__init__.py new file mode 100644 index 000000000..e605ea317 --- /dev/null +++ b/lbrynet/blob/__init__.py @@ -0,0 +1,4 @@ +from blob_file import BlobFile +from creator import BlobFileCreator +from writer import HashBlobWriter +from reader import HashBlobReader diff --git a/lbrynet/core/HashBlob.py b/lbrynet/blob/blob_file.py similarity index 69% rename from lbrynet/core/HashBlob.py rename to lbrynet/blob/blob_file.py index 8aa822db2..9eaad184f 100644 --- a/lbrynet/core/HashBlob.py +++ b/lbrynet/blob/blob_file.py @@ -1,93 +1,20 @@ -from io import BytesIO import logging import os import threading -from twisted.internet import interfaces, defer, threads +from twisted.internet import defer, threads from twisted.protocols.basic import FileSender from twisted.web.client import FileBodyProducer from twisted.python.failure import Failure -from zope.interface import implements from lbrynet import conf from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError -from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.utils import is_valid_blobhash +from lbrynet.blob.writer import HashBlobWriter +from lbrynet.blob.reader import HashBlobReader log = logging.getLogger(__name__) -class HashBlobReader(object): - implements(interfaces.IConsumer) - - def __init__(self, write_func): - self.write_func = write_func - - def registerProducer(self, producer, streaming): - from twisted.internet import reactor - - self.producer = producer - self.streaming = streaming - if self.streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - def unregisterProducer(self): - pass - - def write(self, data): - from twisted.internet import reactor - - self.write_func(data) - if self.streaming is False: - reactor.callLater(0, self.producer.resumeProducing) - - -class HashBlobWriter(object): - def __init__(self, length_getter, finished_cb): - self.write_handle = BytesIO() - self.length_getter = length_getter - self.finished_cb = finished_cb - self.finished_cb_d = None - self._hashsum = get_lbry_hash_obj() - self.len_so_far = 0 - - @property - def blob_hash(self): - return self._hashsum.hexdigest() - - def write(self, data): - if self.write_handle is None: - log.info("writer has already been closed") - # can this be changed to IOError? - raise ValueError('I/O operation on closed file') - - self._hashsum.update(data) - self.len_so_far += len(data) - if self.len_so_far > self.length_getter(): - self.finished_cb_d = self.finished_cb( - self, - Failure(InvalidDataError("Length so far is greater than the expected length." - " %s to %s" % (self.len_so_far, - self.length_getter())))) - else: - self.write_handle.write(data) - if self.len_so_far == self.length_getter(): - self.finished_cb_d = self.finished_cb(self) - - def close_handle(self): - if self.write_handle is not None: - self.write_handle.close() - self.write_handle = None - - def close(self, reason=None): - # if we've already called finished_cb because we either finished writing - # or closed already, do nothing - if self.finished_cb_d is not None: - return - if reason is None: - reason = Failure(DownloadCanceledError()) - self.finished_cb_d = self.finished_cb(self, reason) - - class BlobFile(object): """ A chunk of data available on the network which is specified by a hashsum @@ -299,38 +226,3 @@ class BlobFile(object): defer.returnValue(True) else: raise DownloadCanceledError() - - -class BlobFileCreator(object): - """ - This class is used to create blobs on the local filesystem - when we do not know the blob hash beforehand (i.e, when creating - a new stream) - """ - def __init__(self, blob_dir): - self.blob_dir = blob_dir - self.buffer = BytesIO() - self._is_open = True - self._hashsum = get_lbry_hash_obj() - self.len_so_far = 0 - self.blob_hash = None - self.length = None - - @defer.inlineCallbacks - def close(self): - self.length = self.len_so_far - self.blob_hash = self._hashsum.hexdigest() - if self.blob_hash and self._is_open: - self.buffer.seek(0) - out_path = os.path.join(self.blob_dir, self.blob_hash) - producer = FileBodyProducer(self.buffer) - yield producer.startProducing(open(out_path, 'wb')) - self._is_open = False - defer.returnValue(self.blob_hash) - - def write(self, data): - if not self._is_open: - raise IOError - self._hashsum.update(data) - self.len_so_far += len(data) - self.buffer.write(data) diff --git a/lbrynet/blob/creator.py b/lbrynet/blob/creator.py new file mode 100644 index 000000000..417d08d85 --- /dev/null +++ b/lbrynet/blob/creator.py @@ -0,0 +1,43 @@ +import os +import logging +from io import BytesIO +from twisted.internet import defer +from twisted.web.client import FileBodyProducer +from lbrynet.core.cryptoutils import get_lbry_hash_obj + +log = logging.getLogger(__name__) + + +class BlobFileCreator(object): + """ + This class is used to create blobs on the local filesystem + when we do not know the blob hash beforehand (i.e, when creating + a new stream) + """ + def __init__(self, blob_dir): + self.blob_dir = blob_dir + self.buffer = BytesIO() + self._is_open = True + self._hashsum = get_lbry_hash_obj() + self.len_so_far = 0 + self.blob_hash = None + self.length = None + + @defer.inlineCallbacks + def close(self): + self.length = self.len_so_far + self.blob_hash = self._hashsum.hexdigest() + if self.blob_hash and self._is_open: + self.buffer.seek(0) + out_path = os.path.join(self.blob_dir, self.blob_hash) + producer = FileBodyProducer(self.buffer) + yield producer.startProducing(open(out_path, 'wb')) + self._is_open = False + defer.returnValue(self.blob_hash) + + def write(self, data): + if not self._is_open: + raise IOError + self._hashsum.update(data) + self.len_so_far += len(data) + self.buffer.write(data) diff --git a/lbrynet/blob/reader.py b/lbrynet/blob/reader.py new file mode 100644 index 000000000..c85cc38f3 --- /dev/null +++ b/lbrynet/blob/reader.py @@ -0,0 +1,30 @@ +import logging +from twisted.internet import interfaces +from zope.interface import implements + +log = logging.getLogger(__name__) + + +class HashBlobReader(object): + implements(interfaces.IConsumer) + + def __init__(self, write_func): + self.write_func = write_func + + def registerProducer(self, producer, streaming): + from twisted.internet import reactor + + self.producer = producer + self.streaming = streaming + if self.streaming is False: + reactor.callLater(0, self.producer.resumeProducing) + + def unregisterProducer(self): + pass + + def write(self, data): + from twisted.internet import reactor + + self.write_func(data) + if self.streaming is False: + reactor.callLater(0, self.producer.resumeProducing) diff --git a/lbrynet/blob/writer.py b/lbrynet/blob/writer.py new file mode 100644 index 000000000..7e6f4d13f --- /dev/null +++ b/lbrynet/blob/writer.py @@ -0,0 +1,54 @@ +import logging +from io import BytesIO +from twisted.python.failure import Failure +from lbrynet.core.Error import DownloadCanceledError, InvalidDataError +from lbrynet.core.cryptoutils import get_lbry_hash_obj + +log = logging.getLogger(__name__) + + +class HashBlobWriter(object): + def __init__(self, length_getter, finished_cb): + self.write_handle = BytesIO() + self.length_getter = length_getter + self.finished_cb = finished_cb + self.finished_cb_d = None + self._hashsum = get_lbry_hash_obj() + self.len_so_far = 0 + + @property + def blob_hash(self): + return self._hashsum.hexdigest() + + def write(self, data): + if self.write_handle is None: + log.info("writer has already been closed") + # can this be changed to IOError? + raise ValueError('I/O operation on closed file') + + self._hashsum.update(data) + self.len_so_far += len(data) + if self.len_so_far > self.length_getter(): + self.finished_cb_d = self.finished_cb( + self, + Failure(InvalidDataError("Length so far is greater than the expected length." + " %s to %s" % (self.len_so_far, + self.length_getter())))) + else: + self.write_handle.write(data) + if self.len_so_far == self.length_getter(): + self.finished_cb_d = self.finished_cb(self) + + def close_handle(self): + if self.write_handle is not None: + self.write_handle.close() + self.write_handle = None + + def close(self, reason=None): + # if we've already called finished_cb because we either finished writing + # or closed already, do nothing + if self.finished_cb_d is not None: + return + if reason is None: + reason = Failure(DownloadCanceledError()) + self.finished_cb_d = self.finished_cb(self, reason) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index cb7d738f9..48ee53e24 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -6,7 +6,8 @@ import sqlite3 from twisted.internet import threads, defer, reactor from twisted.enterprise import adbapi from lbrynet import conf -from lbrynet.core.HashBlob import BlobFile, BlobFileCreator +from lbrynet.blob.blob_file import BlobFile +from lbrynet.blob.creator import BlobFileCreator from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.sqlite_helpers import rerun_if_locked diff --git a/scripts/decrypt_blob.py b/scripts/decrypt_blob.py index bc905bf2e..4f5c8b8e9 100644 --- a/scripts/decrypt_blob.py +++ b/scripts/decrypt_blob.py @@ -10,7 +10,7 @@ from twisted.internet import reactor from lbrynet import conf from lbrynet.cryptstream import CryptBlob -from lbrynet.core import HashBlob +from lbrynet.blob import BlobFile from lbrynet.core import log_support @@ -46,7 +46,7 @@ def decrypt_blob(blob_file, key, iv, output): filename = os.path.abspath(blob_file) length = os.path.getsize(filename) directory, blob_hash = os.path.split(filename) - blob = HashBlob.BlobFile(directory, blob_hash, True, length) + blob = BlobFile(directory, blob_hash, length) decryptor = CryptBlob.StreamBlobDecryptor( blob, binascii.unhexlify(key), binascii.unhexlify(iv), length) with open(output, 'w') as f: diff --git a/scripts/download_blob_from_peer.py b/scripts/download_blob_from_peer.py index 02d9a1693..43a510328 100644 --- a/scripts/download_blob_from_peer.py +++ b/scripts/download_blob_from_peer.py @@ -13,7 +13,7 @@ from lbrynet import conf from lbrynet.core import log_support from lbrynet.core import BlobManager from lbrynet.core import HashAnnouncer -from lbrynet.core import HashBlob +from lbrynet.blob import BlobFile from lbrynet.core import RateLimiter from lbrynet.core import Peer from lbrynet.core import Wallet @@ -38,7 +38,7 @@ def main(args=None): announcer = HashAnnouncer.DummyHashAnnouncer() blob_manager = MyBlobManager(announcer) - blob = HashBlob.BlobFile(args.directory, args.blob_hash) + blob = BlobFile(args.directory, args.blob_hash) download_manager = SingleBlobDownloadManager(blob) peer = Peer.Peer(*conf.server_port(args.peer)) payment_rate_manager = DumbPaymentRateManager() diff --git a/tests/unit/core/test_HashBlob.py b/tests/unit/core/test_HashBlob.py index 076e201bb..179313720 100644 --- a/tests/unit/core/test_HashBlob.py +++ b/tests/unit/core/test_HashBlob.py @@ -1,4 +1,4 @@ -from lbrynet.core.HashBlob import BlobFile +from lbrynet.blob import BlobFile from lbrynet.core.Error import DownloadCanceledError, InvalidDataError From 6f71a5003cd469a3b953320a38cfb645978f8731 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 13 Sep 2017 16:42:17 -0400 Subject: [PATCH 35/54] remove unused EncryptedFileOpener --- lbrynet/daemon/Daemon.py | 14 +-- .../client/EncryptedFileDownloader.py | 87 ------------------- 2 files changed, 2 insertions(+), 99 deletions(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 8eb218fd3..a91c008f0 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -29,7 +29,6 @@ from lbrynet.reflector import reupload from lbrynet.reflector import ServerFactory as reflector_server_factory from lbrynet.core.log_support import configure_loggly_handler from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory -from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileOpenerFactory from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType @@ -578,17 +577,8 @@ class Daemon(AuthJSONRPCServer): self.session.wallet, self.download_directory ) - self.sd_identifier.add_stream_downloader_factory( - EncryptedFileStreamType, file_saver_factory) - file_opener_factory = EncryptedFileOpenerFactory( - self.session.peer_finder, - self.session.rate_limiter, - self.session.blob_manager, - self.stream_info_manager, - self.session.wallet - ) - self.sd_identifier.add_stream_downloader_factory( - EncryptedFileStreamType, file_opener_factory) + self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, + file_saver_factory) return defer.succeed(None) def _download_blob(self, blob_hash, rate_manager=None, timeout=None): diff --git a/lbrynet/lbry_file/client/EncryptedFileDownloader.py b/lbrynet/lbry_file/client/EncryptedFileDownloader.py index e18c6f2cf..1ba7a9533 100644 --- a/lbrynet/lbry_file/client/EncryptedFileDownloader.py +++ b/lbrynet/lbry_file/client/EncryptedFileDownloader.py @@ -282,90 +282,3 @@ class EncryptedFileSaverFactory(EncryptedFileDownloaderFactory): @staticmethod def get_description(): return "Save" - - -class EncryptedFileOpener(EncryptedFileDownloader): - def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager, - payment_rate_manager, wallet): - EncryptedFileDownloader.__init__(self, stream_hash, - peer_finder, rate_limiter, - blob_manager, stream_info_manager, - payment_rate_manager, wallet, - ) - self.process = None - self.process_log = None - - def stop(self, err=None): - d = EncryptedFileDownloader.stop(self, err=err) - d.addCallback(lambda _: self._delete_from_info_manager()) - return d - - def _get_progress_manager(self, download_manager): - return FullStreamProgressManager(self._finished_downloading, - self.blob_manager, - download_manager) - - def _setup_output(self): - def start_process(): - if os.name == "nt": - paths = [r'C:\Program Files\VideoLAN\VLC\vlc.exe', - r'C:\Program Files (x86)\VideoLAN\VLC\vlc.exe'] - for p in paths: - if os.path.exists(p): - vlc_path = p - break - else: - raise ValueError("You must install VLC media player to stream files") - else: - vlc_path = 'vlc' - self.process_log = open("vlc.out", 'a') - try: - self.process = subprocess.Popen([vlc_path, '-'], stdin=subprocess.PIPE, - stdout=self.process_log, stderr=self.process_log) - except OSError: - raise ValueError("VLC media player could not be opened") - - d = threads.deferToThread(start_process) - return d - - def _close_output(self): - if self.process is not None: - self.process.stdin.close() - self.process = None - return defer.succeed(True) - - def _get_write_func(self): - def write_func(data): - if self.stopped is False and self.process is not None: - try: - self.process.stdin.write(data) - except IOError: - reactor.callLater(0, self.stop) - return write_func - - def _delete_from_info_manager(self): - return self.stream_info_manager.delete_stream(self.stream_hash) - - -class EncryptedFileOpenerFactory(EncryptedFileDownloaderFactory): - def can_download(self, sd_validator): - if which('vlc'): - return True - elif os.name == "nt": - paths = [r'C:\Program Files\VideoLAN\VLC\vlc.exe', - r'C:\Program Files (x86)\VideoLAN\VLC\vlc.exe'] - for p in paths: - if os.path.exists(p): - return True - return False - - def _make_downloader(self, stream_hash, payment_rate_manager, stream_info): - return EncryptedFileOpener(stream_hash, self.peer_finder, - self.rate_limiter, self.blob_manager, - self.stream_info_manager, - payment_rate_manager, self.wallet, - ) - - @staticmethod - def get_description(): - return "Stream" From e046af57fab19369189a281c3c2ff71d9b1651a9 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Thu, 14 Sep 2017 12:46:30 -0400 Subject: [PATCH 36/54] fix empty tail blobs in stream from being saved --- lbrynet/blob/creator.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/lbrynet/blob/creator.py b/lbrynet/blob/creator.py index 417d08d85..963986d5c 100644 --- a/lbrynet/blob/creator.py +++ b/lbrynet/blob/creator.py @@ -27,13 +27,21 @@ class BlobFileCreator(object): def close(self): self.length = self.len_so_far self.blob_hash = self._hashsum.hexdigest() - if self.blob_hash and self._is_open: + if self.blob_hash and self._is_open and self.length > 0: + # do not save 0 length files (empty tail blob in streams) + # or if its been closed already self.buffer.seek(0) out_path = os.path.join(self.blob_dir, self.blob_hash) producer = FileBodyProducer(self.buffer) yield producer.startProducing(open(out_path, 'wb')) self._is_open = False - defer.returnValue(self.blob_hash) + if self.length > 0: + defer.returnValue(self.blob_hash) + else: + # 0 length files (empty tail blob in streams ) + # must return None as their blob_hash for + # it to be saved properly by EncryptedFileMetadataManagers + defer.returnValue(None) def write(self, data): if not self._is_open: From af3ab968159b226e2b36cc991f371222aa8c5601 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Thu, 14 Sep 2017 14:25:55 -0400 Subject: [PATCH 37/54] deleting unused BlobFile.finished_deferred --- lbrynet/blob/blob_file.py | 1 - 1 file changed, 1 deletion(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 9eaad184f..032dcbb64 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -36,7 +36,6 @@ class BlobFile(object): self.blob_hash = blob_hash self.length = length self.writers = {} # {Peer: writer, finished_deferred} - self.finished_deferred = None self._verified = False self.readers = 0 self.blob_dir = blob_dir From 3efefe8efdecc7ca5100d36f27c26f19c87baa04 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Thu, 14 Sep 2017 15:08:07 -0400 Subject: [PATCH 38/54] use cryptography version 2.0.3 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 192b6a0a8..d45fb6976 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ Twisted==16.6.0 -cryptography==1.9 +cryptography==2.0.3 appdirs==1.4.3 argparse==1.2.1 docopt==0.6.2 From 8d2cc4a4c02c6e5a35ea42903ccc1b019daaf319 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Thu, 14 Sep 2017 15:23:45 -0400 Subject: [PATCH 39/54] lint, remove ununsed imports --- lbrynet/lbry_file/client/EncryptedFileDownloader.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lbrynet/lbry_file/client/EncryptedFileDownloader.py b/lbrynet/lbry_file/client/EncryptedFileDownloader.py index 1ba7a9533..735c8027e 100644 --- a/lbrynet/lbry_file/client/EncryptedFileDownloader.py +++ b/lbrynet/lbry_file/client/EncryptedFileDownloader.py @@ -1,4 +1,3 @@ -import subprocess import binascii from zope.interface import implements @@ -10,8 +9,7 @@ from lbrynet.core.StreamDescriptor import StreamMetadata from lbrynet.interfaces import IStreamDownloaderFactory from lbrynet.lbry_file.client.EncryptedFileMetadataHandler import EncryptedFileMetadataHandler import os -from twisted.internet import defer, threads, reactor -from twisted.python.procutils import which +from twisted.internet import defer, threads import logging import traceback From ad061b5ea3e14b8ce12a9d3e30502b9b6fe930fd Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 15 Sep 2017 09:56:01 -0400 Subject: [PATCH 40/54] use fixed BlobFile type --- lbrynet/core/BlobManager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 48ee53e24..5834c8c31 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -30,7 +30,6 @@ class DiskBlobManager(DHTHashSupplier): self.blob_dir = blob_dir self.db_file = os.path.join(db_dir, "blobs.db") self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False) - self.blob_type = BlobFile self.blob_creator_type = BlobFileCreator # TODO: consider using an LRU for blobs as there could potentially # be thousands of blobs loaded up, many stale @@ -62,7 +61,7 @@ class DiskBlobManager(DHTHashSupplier): def _make_new_blob(self, blob_hash, length=None): log.debug('Making a new blob for %s', blob_hash) - blob = self.blob_type(self.blob_dir, blob_hash, length) + blob = BlobFile(self.blob_dir, blob_hash, length) self.blobs[blob_hash] = blob return defer.succeed(blob) From b9b5e755db8e5d2ab437b9f5354fb05b06fbc36f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 15 Sep 2017 09:56:15 -0400 Subject: [PATCH 41/54] raise rather than assert --- lbrynet/core/BlobManager.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 5834c8c31..59bb51a2f 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -89,10 +89,13 @@ class DiskBlobManager(DHTHashSupplier): def creator_finished(self, blob_creator, should_announce): log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash) - assert blob_creator.blob_hash is not None - assert blob_creator.blob_hash not in self.blobs - assert blob_creator.length is not None - new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, blob_creator.length) + if blob_creator.blob_hash is None: + raise Exception("Blob hash is None") + if blob_creator.blob_hash in self.blobs: + raise Exception("Creator finished for blob that is already marked as completed") + if blob_creator.length is None: + raise Exception("Blob has a length of 0") + new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length) self.blobs[blob_creator.blob_hash] = new_blob next_announce_time = self.get_next_announce_time() d = self.blob_completed(new_blob, next_announce_time, should_announce) @@ -256,5 +259,3 @@ class DiskBlobManager(DHTHashSupplier): "insert into upload values (null, ?, ?, ?, ?) ", (blob_hash, str(host), float(rate), ts)) return d - - From 96357ab83341cd39b2fd075a3f6801f03ccb1758 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 15 Sep 2017 09:56:22 -0400 Subject: [PATCH 42/54] exchange rate error --- lbrynet/daemon/ExchangeRateManager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lbrynet/daemon/ExchangeRateManager.py b/lbrynet/daemon/ExchangeRateManager.py index 998d06d0d..2cc187e2d 100644 --- a/lbrynet/daemon/ExchangeRateManager.py +++ b/lbrynet/daemon/ExchangeRateManager.py @@ -65,8 +65,8 @@ class MarketFeed(object): def _log_error(self, err): log.warning( - "There was a problem updating %s exchange rate information from %s", - self.market, self.name) + "There was a problem updating %s exchange rate information from %s\n%s", + self.market, self.name, err) def _update_price(self): d = threads.deferToThread(self._make_request) From 421141b958113d2aca03b1e21443747400e4b9f7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 15 Sep 2017 10:48:54 -0400 Subject: [PATCH 43/54] raise instead of assert --- lbrynet/core/BlobManager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 59bb51a2f..e3b41d3cf 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -51,7 +51,8 @@ class DiskBlobManager(DHTHashSupplier): """Return a blob identified by blob_hash, which may be a new blob or a blob that is already on the hard disk """ - assert length is None or isinstance(length, int) + if length is not None and not isinstance(length, int): + raise Exception("invalid length type: %s (%s)", length, str(type(length))) if blob_hash in self.blobs: return defer.succeed(self.blobs[blob_hash]) return self._make_new_blob(blob_hash, length) From 8419e1e1d5ef7ef851600859c3c5617eae6f0e9c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 15 Sep 2017 13:46:38 -0400 Subject: [PATCH 44/54] change some ValueErrors to IOErrors --- lbrynet/blob/blob_file.py | 2 +- lbrynet/blob/writer.py | 5 ++--- tests/unit/core/test_HashBlob.py | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index 032dcbb64..c7a1c45b1 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -161,7 +161,7 @@ class BlobFile(object): d = file_sender.beginFileTransfer(file_handle, reader) d.addCallback(close_self) else: - d = defer.fail(ValueError("Could not read the blob")) + d = defer.fail(IOError("Could not read the blob")) return d def writer_finished(self, writer, err=None): diff --git a/lbrynet/blob/writer.py b/lbrynet/blob/writer.py index 7e6f4d13f..a95430386 100644 --- a/lbrynet/blob/writer.py +++ b/lbrynet/blob/writer.py @@ -22,9 +22,8 @@ class HashBlobWriter(object): def write(self, data): if self.write_handle is None: - log.info("writer has already been closed") - # can this be changed to IOError? - raise ValueError('I/O operation on closed file') + log.exception("writer has already been closed") + raise IOError('I/O operation on closed file') self._hashsum.update(data) self.len_so_far += len(data) diff --git a/tests/unit/core/test_HashBlob.py b/tests/unit/core/test_HashBlob.py index 179313720..d1c282478 100644 --- a/tests/unit/core/test_HashBlob.py +++ b/tests/unit/core/test_HashBlob.py @@ -90,8 +90,8 @@ class BlobFileTest(unittest.TestCase): writer.close() yield self.assertFailure(finished_d, DownloadCanceledError) - # writes after close will throw a ValueError exception - with self.assertRaises(ValueError): + # writes after close will throw a IOError exception + with self.assertRaises(IOError): writer.write(self.fake_content) # another call to close will do nothing From adf89a9d1ae4a99756c4f84f905e78a1d0dd11f4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 15 Sep 2017 13:49:07 -0400 Subject: [PATCH 45/54] logging --- lbrynet/core/client/BlobRequester.py | 2 +- lbrynet/daemon/ExchangeRateManager.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index a887b24c6..f9d61aa3f 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -469,7 +469,7 @@ class DownloadRequest(RequestHelper): writer, d = blob.open_for_writing(self.peer) if d is not None: return BlobDownloadDetails(blob, d, writer.write, writer.close, self.peer) - log.debug('Skipping blob %s as there was an issue opening it for writing', blob) + log.warning('Skipping blob %s as there was an issue opening it for writing', blob) return None def _make_request(self, blob_details): diff --git a/lbrynet/daemon/ExchangeRateManager.py b/lbrynet/daemon/ExchangeRateManager.py index 2cc187e2d..df7968f55 100644 --- a/lbrynet/daemon/ExchangeRateManager.py +++ b/lbrynet/daemon/ExchangeRateManager.py @@ -64,8 +64,7 @@ class MarketFeed(object): self.rate = ExchangeRate(self.market, price, int(time.time())) def _log_error(self, err): - log.warning( - "There was a problem updating %s exchange rate information from %s\n%s", + log.warning("There was a problem updating %s exchange rate information from %s\n%s", self.market, self.name, err) def _update_price(self): @@ -141,7 +140,10 @@ class LBRYioBTCFeed(MarketFeed): ) def _handle_response(self, response): - json_response = json.loads(response) + try: + json_response = json.loads(response) + except ValueError: + raise InvalidExchangeRateResponse("invalid rate response : %s" % response) if 'data' not in json_response: raise InvalidExchangeRateResponse(self.name, 'result not found') return defer.succeed(1.0 / json_response['data']['btc_usd']) From 19ff0941f52fc58e72edaa5d78a448a4ed9ad9ae Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 15 Sep 2017 13:50:38 -0400 Subject: [PATCH 46/54] fix ClientProtocol. _handle_response_error --- lbrynet/core/client/ClientProtocol.py | 38 ++++++++++++--------------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index a9dca8307..b0498bb00 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -100,9 +100,7 @@ class ClientProtocol(Protocol, TimeoutMixin): if self._blob_download_request is None: d = self.add_request(blob_request) self._blob_download_request = blob_request - blob_request.finished_deferred.addCallbacks(self._downloading_finished, - self._downloading_failed) - blob_request.finished_deferred.addErrback(self._handle_response_error) + blob_request.finished_deferred.addCallbacks(self._downloading_finished, self._handle_response_error) return d else: raise ValueError("There is already a blob download request active") @@ -110,12 +108,14 @@ class ClientProtocol(Protocol, TimeoutMixin): def cancel_requests(self): self.connection_closing = True ds = [] - err = failure.Failure(RequestCanceledError()) + err = RequestCanceledError() for key, d in self._response_deferreds.items(): del self._response_deferreds[key] + log.info("cancel download response") d.errback(err) ds.append(d) if self._blob_download_request is not None: + log.info("cancel download request") self._blob_download_request.cancel(err) ds.append(self._blob_download_request.finished_deferred) self._blob_download_request = None @@ -176,14 +176,20 @@ class ClientProtocol(Protocol, TimeoutMixin): def _handle_response_error(self, err): # If an error gets to this point, log it and kill the connection. - expected_errors = (MisbehavingPeerError, ConnectionClosedBeforeResponseError, - DownloadCanceledError, RequestCanceledError) - if not err.check(expected_errors): + if err.check(DownloadCanceledError, RequestCanceledError): + # TODO: (wish-list) it seems silly to close the connection over this, and it shouldn't + # TODO: always be this way. it's done this way now because the client has no other way + # TODO: of telling the server it wants the download to stop. It would be great if the + # TODO: protocol had such a mechanism. + log.info("Closing the connection to %s because the download of blob %s was canceled", + self.peer, self._blob_download_request.blob) + return + elif not err.check(MisbehavingPeerError, ConnectionClosedBeforeResponseError): + log.warning("The connection to %s is closing due to: %s", err) + return err + else: log.error("The connection to %s is closing due to an unexpected error: %s", - self.peer, err.getErrorMessage()) - if not err.check(RequestCanceledError): - # The connection manager is closing the connection, so - # there's no need to do it here. + self.peer, err) return err def _handle_response(self, response): @@ -236,16 +242,6 @@ class ClientProtocol(Protocol, TimeoutMixin): self._downloading_blob = False return arg - def _downloading_failed(self, err): - if err.check(DownloadCanceledError): - # TODO: (wish-list) it seems silly to close the connection over this, and it shouldn't - # TODO: always be this way. it's done this way now because the client has no other way - # TODO: of telling the server it wants the download to stop. It would be great if the - # TODO: protocol had such a mechanism. - log.debug("Closing the connection to %s because the download of blob %s was canceled", - self.peer, self._blob_download_request.blob) - return err - ######### IRateLimited ######### def throttle_upload(self): From 85f25a8d99f41029424fbd25a58164dd7b585e74 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 15 Sep 2017 13:57:06 -0400 Subject: [PATCH 47/54] remove debug logging --- lbrynet/core/client/ClientProtocol.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index b0498bb00..86b27277b 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -111,11 +111,9 @@ class ClientProtocol(Protocol, TimeoutMixin): err = RequestCanceledError() for key, d in self._response_deferreds.items(): del self._response_deferreds[key] - log.info("cancel download response") d.errback(err) ds.append(d) if self._blob_download_request is not None: - log.info("cancel download request") self._blob_download_request.cancel(err) ds.append(self._blob_download_request.finished_deferred) self._blob_download_request = None From ea49cddf52ac6a085fffef4413e35a6d7ccb98a9 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Fri, 15 Sep 2017 14:17:34 -0400 Subject: [PATCH 48/54] catch IOError when writing --- lbrynet/core/client/ClientProtocol.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index 86b27277b..9d77a841a 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -51,7 +51,14 @@ class ClientProtocol(Protocol, TimeoutMixin): self.setTimeout(None) self._rate_limiter.report_dl_bytes(len(data)) if self._downloading_blob is True: - self._blob_download_request.write(data) + try: + self._blob_download_request.write(data) + except IOError as e: + #TODO: we need to fix this so that we do not even + #attempt to download the same blob from different peers + msg = "Failed to write, blob is likely closed by another peer finishing download" + log.warn(msg) + self.transport.loseConnection() else: self._response_buff += data if len(self._response_buff) > conf.settings['MAX_RESPONSE_INFO_SIZE']: From e50ade85be351b056b8e148c2d8289372f5f4b01 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Fri, 15 Sep 2017 15:02:15 -0400 Subject: [PATCH 49/54] catch IOError properly --- lbrynet/core/client/ClientProtocol.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index 9d77a841a..dcb07439a 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -50,15 +50,20 @@ class ClientProtocol(Protocol, TimeoutMixin): log.debug("Data receieved from %s", self.peer) self.setTimeout(None) self._rate_limiter.report_dl_bytes(len(data)) + + def in_case_IOError(): + #TODO: writes can raise IOError if another peer finished + #writing and closes the other peeer's writers. Fix this + #by preventing peers from downloading the same blobs + msg = "Failed to write, blob is likely closed by another peer finishing download" + log.warn(msg) + self.transport.loseConnection() + if self._downloading_blob is True: try: self._blob_download_request.write(data) except IOError as e: - #TODO: we need to fix this so that we do not even - #attempt to download the same blob from different peers - msg = "Failed to write, blob is likely closed by another peer finishing download" - log.warn(msg) - self.transport.loseConnection() + in_case_IOError() else: self._response_buff += data if len(self._response_buff) > conf.settings['MAX_RESPONSE_INFO_SIZE']: @@ -70,7 +75,12 @@ class ClientProtocol(Protocol, TimeoutMixin): self._response_buff = '' self._handle_response(response) if self._downloading_blob is True and len(extra_data) != 0: - self._blob_download_request.write(extra_data) + try: + self._blob_download_request.write(extra_data) + except IOError as e: + in_case_IOError() + + def timeoutConnection(self): log.info("Connection timed out to %s", self.peer) From b6e9aa420c9287e74ce186bb57d7b1ea25f9268f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 15 Sep 2017 15:09:56 -0400 Subject: [PATCH 50/54] fix cancelled blob request? --- lbrynet/core/client/BlobRequester.py | 3 ++- lbrynet/core/client/ClientProtocol.py | 14 +++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index f9d61aa3f..5e5aecf76 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -4,6 +4,7 @@ from decimal import Decimal from twisted.internet import defer from twisted.python.failure import Failure +from twisted.internet.error import ConnectionAborted from zope.interface import implements from lbrynet.core.Error import ConnectionClosedBeforeResponseError @@ -225,7 +226,7 @@ class RequestHelper(object): self.requestor._update_local_score(self.peer, score) def _request_failed(self, reason, request_type): - if reason.check(RequestCanceledError): + if reason.check(DownloadCanceledError, RequestCanceledError, ConnectionAborted): return if reason.check(NoResponseError): self.requestor._incompatible_peers.append(self.peer) diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index dcb07439a..6df7080f0 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -134,6 +134,7 @@ class ClientProtocol(Protocol, TimeoutMixin): self._blob_download_request.cancel(err) ds.append(self._blob_download_request.finished_deferred) self._blob_download_request = None + self._downloading_blob = False return defer.DeferredList(ds) ######### Internal request handling ######### @@ -191,21 +192,24 @@ class ClientProtocol(Protocol, TimeoutMixin): def _handle_response_error(self, err): # If an error gets to this point, log it and kill the connection. - if err.check(DownloadCanceledError, RequestCanceledError): + if err.check(DownloadCanceledError, RequestCanceledError, error.ConnectionAborted): # TODO: (wish-list) it seems silly to close the connection over this, and it shouldn't # TODO: always be this way. it's done this way now because the client has no other way # TODO: of telling the server it wants the download to stop. It would be great if the # TODO: protocol had such a mechanism. log.info("Closing the connection to %s because the download of blob %s was canceled", self.peer, self._blob_download_request.blob) - return + result = None elif not err.check(MisbehavingPeerError, ConnectionClosedBeforeResponseError): log.warning("The connection to %s is closing due to: %s", err) - return err + result = err else: log.error("The connection to %s is closing due to an unexpected error: %s", self.peer, err) - return err + result = err + + self.transport.loseConnection() + return result def _handle_response(self, response): ds = [] @@ -246,7 +250,7 @@ class ClientProtocol(Protocol, TimeoutMixin): log.debug("Asking for another request from %s", self.peer) self._ask_for_request() else: - log.debug("Not asking for another request from %s", self.peer) + log.warning("Not asking for another request from %s", self.peer) self.transport.loseConnection() dl.addCallback(get_next_request) From ab3c98703486875c5297ed5a7ab70762fe6b2051 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Fri, 15 Sep 2017 15:10:31 -0400 Subject: [PATCH 51/54] fix lints --- lbrynet/core/client/ClientProtocol.py | 5 +++-- lbrynet/daemon/ExchangeRateManager.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index 6df7080f0..b6cf89f81 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -117,7 +117,8 @@ class ClientProtocol(Protocol, TimeoutMixin): if self._blob_download_request is None: d = self.add_request(blob_request) self._blob_download_request = blob_request - blob_request.finished_deferred.addCallbacks(self._downloading_finished, self._handle_response_error) + blob_request.finished_deferred.addCallbacks(self._downloading_finished, + self._handle_response_error) return d else: raise ValueError("There is already a blob download request active") @@ -201,7 +202,7 @@ class ClientProtocol(Protocol, TimeoutMixin): self.peer, self._blob_download_request.blob) result = None elif not err.check(MisbehavingPeerError, ConnectionClosedBeforeResponseError): - log.warning("The connection to %s is closing due to: %s", err) + log.warning("The connection to %s is closing due to: %s", self.peer, err) result = err else: log.error("The connection to %s is closing due to an unexpected error: %s", diff --git a/lbrynet/daemon/ExchangeRateManager.py b/lbrynet/daemon/ExchangeRateManager.py index df7968f55..805df2db1 100644 --- a/lbrynet/daemon/ExchangeRateManager.py +++ b/lbrynet/daemon/ExchangeRateManager.py @@ -143,7 +143,7 @@ class LBRYioBTCFeed(MarketFeed): try: json_response = json.loads(response) except ValueError: - raise InvalidExchangeRateResponse("invalid rate response : %s" % response) + raise InvalidExchangeRateResponse(self.name, "invalid rate response : %s" % response) if 'data' not in json_response: raise InvalidExchangeRateResponse(self.name, 'result not found') return defer.succeed(1.0 / json_response['data']['btc_usd']) From ffbcd822638c99dfa8662042cc7b65c5fb87de12 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 15 Sep 2017 18:44:51 -0400 Subject: [PATCH 52/54] fix redundant blob request to peer --- lbrynet/core/client/BlobRequester.py | 3 ++- lbrynet/core/client/ClientProtocol.py | 20 ++------------------ 2 files changed, 4 insertions(+), 19 deletions(-) diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index 5e5aecf76..aa15c5961 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -226,7 +226,8 @@ class RequestHelper(object): self.requestor._update_local_score(self.peer, score) def _request_failed(self, reason, request_type): - if reason.check(DownloadCanceledError, RequestCanceledError, ConnectionAborted): + if reason.check(DownloadCanceledError, RequestCanceledError, ConnectionAborted, + ConnectionClosedBeforeResponseError): return if reason.check(NoResponseError): self.requestor._incompatible_peers.append(self.peer) diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index b6cf89f81..b8861a6ab 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -51,19 +51,8 @@ class ClientProtocol(Protocol, TimeoutMixin): self.setTimeout(None) self._rate_limiter.report_dl_bytes(len(data)) - def in_case_IOError(): - #TODO: writes can raise IOError if another peer finished - #writing and closes the other peeer's writers. Fix this - #by preventing peers from downloading the same blobs - msg = "Failed to write, blob is likely closed by another peer finishing download" - log.warn(msg) - self.transport.loseConnection() - if self._downloading_blob is True: - try: - self._blob_download_request.write(data) - except IOError as e: - in_case_IOError() + self._blob_download_request.write(data) else: self._response_buff += data if len(self._response_buff) > conf.settings['MAX_RESPONSE_INFO_SIZE']: @@ -75,12 +64,7 @@ class ClientProtocol(Protocol, TimeoutMixin): self._response_buff = '' self._handle_response(response) if self._downloading_blob is True and len(extra_data) != 0: - try: - self._blob_download_request.write(extra_data) - except IOError as e: - in_case_IOError() - - + self._blob_download_request.write(extra_data) def timeoutConnection(self): log.info("Connection timed out to %s", self.peer) From dbb658adecc84771ef745911535a593427699e87 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 18 Sep 2017 12:09:04 -0400 Subject: [PATCH 53/54] update changelog --- CHANGELOG.md | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 948c71531..0e645f9b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,8 +13,8 @@ at anytime. * ### Fixed - * - * + * Fixed handling cancelled blob and availability requests + * Fixed redundant blob requests to a peer ### Deprecated * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. @@ -25,12 +25,19 @@ at anytime. * ### Added - * - * + * Added WAL pragma to sqlite3 + * Added unit tests for `BlobFile` + * Updated exchange rate tests for the lbry.io api + * Use `hashlib` for sha384 instead of `pycrypto` + * Use `cryptography` instead of `pycrypto` for blob encryption and decryption + * Use `cryptography` for PKCS7 instead of doing it manually + * Use `BytesIO` buffers instead of temp files when processing blobs + * Refactored and pruned blob related classes into `lbrynet.blobs` + * Changed several `assert`s to raise more useful errors ### Removed - * - * + * Removed `TempBlobFile` + * Removed unused `EncryptedFileOpener` ## [0.16.1] - 2017-09-20 From 6cbe86d0575bf4e8c203d3abf36faf0c7cb9a97e Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 20 Sep 2017 14:02:34 -0400 Subject: [PATCH 54/54] rename is_validated() to get_is_verified() to distinguish from verified property --- lbrynet/blob/blob_file.py | 4 ++-- lbrynet/core/client/BlobRequester.py | 2 +- lbrynet/core/client/StandaloneBlobDownloader.py | 4 ++-- lbrynet/core/client/StreamProgressManager.py | 6 +++--- lbrynet/core/server/BlobRequestHandler.py | 2 +- lbrynet/daemon/Daemon.py | 9 +++++---- lbrynet/reflector/client/blob.py | 2 +- lbrynet/reflector/client/client.py | 4 ++-- lbrynet/reflector/server/server.py | 6 +++--- tests/functional/test_reflector.py | 6 +++--- tests/unit/core/server/test_BlobRequestHandler.py | 6 +++--- 11 files changed, 26 insertions(+), 25 deletions(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index c7a1c45b1..f1a2010ee 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -141,8 +141,8 @@ class BlobFile(object): def get_length(self): return self.length - def is_validated(self): - return bool(self._verified) + def get_is_verified(self): + return self.verified def is_downloading(self): if self.writers: diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index aa15c5961..1ce4f7205 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -465,7 +465,7 @@ class DownloadRequest(RequestHelper): def find_blob(self, to_download): """Return the first blob in `to_download` that is successfully opened for write.""" for blob in to_download: - if blob.is_validated(): + if blob.get_is_verified(): log.debug('Skipping blob %s as its already validated', blob) continue writer, d = blob.open_for_writing(self.peer) diff --git a/lbrynet/core/client/StandaloneBlobDownloader.py b/lbrynet/core/client/StandaloneBlobDownloader.py index 97dc4727f..f7a108c65 100644 --- a/lbrynet/core/client/StandaloneBlobDownloader.py +++ b/lbrynet/core/client/StandaloneBlobDownloader.py @@ -64,14 +64,14 @@ class SingleProgressManager(object): def stream_position(self): blobs = self.download_manager.blobs - if blobs and blobs[0].is_validated(): + if blobs and blobs[0].get_is_verified(): return 1 return 0 def needed_blobs(self): blobs = self.download_manager.blobs assert len(blobs) == 1 - return [b for b in blobs.itervalues() if not b.is_validated()] + return [b for b in blobs.itervalues() if not b.get_is_verified()] class DummyBlobHandler(object): diff --git a/lbrynet/core/client/StreamProgressManager.py b/lbrynet/core/client/StreamProgressManager.py index d28112ac6..bc16fe560 100644 --- a/lbrynet/core/client/StreamProgressManager.py +++ b/lbrynet/core/client/StreamProgressManager.py @@ -93,7 +93,7 @@ class FullStreamProgressManager(StreamProgressManager): return ( i not in blobs or ( - not blobs[i].is_validated() and + not blobs[i].get_is_verified() and i not in self.provided_blob_nums ) ) @@ -112,7 +112,7 @@ class FullStreamProgressManager(StreamProgressManager): blobs = self.download_manager.blobs return [ b for n, b in blobs.iteritems() - if not b.is_validated() and not n in self.provided_blob_nums + if not b.get_is_verified() and not n in self.provided_blob_nums ] ######### internal ######### @@ -145,7 +145,7 @@ class FullStreamProgressManager(StreamProgressManager): current_blob_num = self.last_blob_outputted + 1 - if current_blob_num in blobs and blobs[current_blob_num].is_validated(): + if current_blob_num in blobs and blobs[current_blob_num].get_is_verified(): log.debug("Outputting blob %s", str(self.last_blob_outputted + 1)) self.provided_blob_nums.append(self.last_blob_outputted + 1) d = self.download_manager.handle_blob(self.last_blob_outputted + 1) diff --git a/lbrynet/core/server/BlobRequestHandler.py b/lbrynet/core/server/BlobRequestHandler.py index b95b3ca84..308d0c822 100644 --- a/lbrynet/core/server/BlobRequestHandler.py +++ b/lbrynet/core/server/BlobRequestHandler.py @@ -143,7 +143,7 @@ class BlobRequestHandler(object): def open_blob_for_reading(self, blob, response): response_fields = {} d = defer.succeed(None) - if blob.is_validated(): + if blob.get_is_verified(): read_handle = blob.open_for_reading() if read_handle is not None: self.currently_uploading = blob diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index a91c008f0..5a45b78dd 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -2457,10 +2457,11 @@ class Daemon(AuthJSONRPCServer): blob_hashes = [blob_hash] elif stream_hash: blobs = yield self.get_blobs_for_stream_hash(stream_hash) - blob_hashes = [blob.blob_hash for blob in blobs if blob.is_validated()] + blob_hashes = [blob.blob_hash for blob in blobs if blob.get_is_verified()] elif sd_hash: blobs = yield self.get_blobs_for_sd_hash(sd_hash) - blob_hashes = [sd_hash] + [blob.blob_hash for blob in blobs if blob.is_validated()] + blob_hashes = [sd_hash] + [blob.blob_hash for blob in blobs if + blob.get_is_verified()] else: raise Exception('single argument must be specified') yield self.session.blob_manager._immediate_announce(blob_hashes) @@ -2563,9 +2564,9 @@ class Daemon(AuthJSONRPCServer): blobs = self.session.blob_manager.blobs.itervalues() if needed: - blobs = [blob for blob in blobs if not blob.is_validated()] + blobs = [blob for blob in blobs if not blob.get_is_verified()] if finished: - blobs = [blob for blob in blobs if blob.is_validated()] + blobs = [blob for blob in blobs if blob.get_is_verified()] blob_hashes = [blob.blob_hash for blob in blobs] page_size = page_size or len(blob_hashes) diff --git a/lbrynet/reflector/client/blob.py b/lbrynet/reflector/client/blob.py index 854dc6489..1f1c540a2 100644 --- a/lbrynet/reflector/client/blob.py +++ b/lbrynet/reflector/client/blob.py @@ -132,7 +132,7 @@ class BlobReflectorClient(Protocol): return self.set_not_uploading() def open_blob_for_reading(self, blob): - if blob.is_validated(): + if blob.get_is_verified(): read_handle = blob.open_for_reading() if read_handle is not None: log.debug('Getting ready to send %s', blob.blob_hash) diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index d7a3ab96a..ebf605b02 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -116,7 +116,7 @@ class EncryptedFileReflectorClient(Protocol): yield self.blob_manager.get_blob(blob, blob_len) dl = defer.DeferredList(list(get_blobs(blobs_in_stream)), consumeErrors=True) - dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.is_validated()]) + dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.get_is_verified()]) return dl def set_blobs_to_send(self, blobs_to_send): @@ -253,7 +253,7 @@ class EncryptedFileReflectorClient(Protocol): return self.set_not_uploading() def open_blob_for_reading(self, blob): - if blob.is_validated(): + if blob.get_is_verified(): read_handle = blob.open_for_reading() if read_handle is not None: log.debug('Getting ready to send %s', blob.blob_hash) diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index 1c41c9eea..7ca4b3cde 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -231,7 +231,7 @@ class ReflectorServer(Protocol): return d def get_descriptor_response(self, sd_blob): - if sd_blob.is_validated(): + if sd_blob.get_is_verified(): d = defer.succeed({SEND_SD_BLOB: False}) d.addCallback(self.request_needed_blobs, sd_blob) else: @@ -267,7 +267,7 @@ class ReflectorServer(Protocol): if 'blob_hash' in blob and 'length' in blob: blob_hash, blob_len = blob['blob_hash'], blob['length'] d = self.blob_manager.get_blob(blob_hash, blob_len) - d.addCallback(lambda blob: blob_hash if not blob.is_validated() else None) + d.addCallback(lambda blob: blob_hash if not blob.get_is_verified() else None) yield d def handle_blob_request(self, request_dict): @@ -305,7 +305,7 @@ class ReflectorServer(Protocol): return d def get_blob_response(self, blob): - if blob.is_validated(): + if blob.get_is_verified(): return defer.succeed({SEND_BLOB: False}) else: self.incoming_blob = blob diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index 9eb638fa0..10e598b9f 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -182,7 +182,7 @@ class TestReflector(unittest.TestCase): return factory.finished_deferred def verify_blob_completed(blob, blob_size): - self.assertTrue(blob.is_validated()) + self.assertTrue(blob.get_is_verified()) self.assertEqual(blob_size, blob.length) return @@ -213,7 +213,7 @@ class TestReflector(unittest.TestCase): return factory.finished_deferred def verify_blob_completed(blob, blob_size): - self.assertTrue(blob.is_validated()) + self.assertTrue(blob.get_is_verified()) self.assertEqual(blob_size, blob.length) d = send_to_server([x[0] for x in self.expected_blobs]) @@ -244,7 +244,7 @@ class TestReflector(unittest.TestCase): return factory.finished_deferred def verify_blob_completed(blob, blob_size): - self.assertTrue(blob.is_validated()) + self.assertTrue(blob.get_is_verified()) self.assertEqual(blob_size, blob.length) d = send_to_server([x[0] for x in self.expected_blobs]) diff --git a/tests/unit/core/server/test_BlobRequestHandler.py b/tests/unit/core/server/test_BlobRequestHandler.py index af2197d0b..32103e374 100644 --- a/tests/unit/core/server/test_BlobRequestHandler.py +++ b/tests/unit/core/server/test_BlobRequestHandler.py @@ -53,7 +53,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): def test_blob_unavailable_when_blob_not_validated(self): blob = mock.Mock() - blob.is_validated.return_value = False + blob.get_is_verified.return_value = False self.blob_manager.get_blob.return_value = defer.succeed(blob) query = { 'blob_data_payment_rate': 1.0, @@ -68,7 +68,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): def test_blob_unavailable_when_blob_cannot_be_opened(self): blob = mock.Mock() - blob.is_validated.return_value = True + blob.get_is_verified.return_value = True blob.open_for_reading.return_value = None self.blob_manager.get_blob.return_value = defer.succeed(blob) query = { @@ -84,7 +84,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase): def test_blob_details_are_set_when_all_conditions_are_met(self): blob = mock.Mock() - blob.is_validated.return_value = True + blob.get_is_verified.return_value = True blob.open_for_reading.return_value = True blob.blob_hash = 'DEADBEEF' blob.length = 42