Merge branch 'blob-improvements'
This commit is contained in:
commit
24d1632faa
35 changed files with 737 additions and 772 deletions
19
CHANGELOG.md
19
CHANGELOG.md
|
@ -13,8 +13,8 @@ at anytime.
|
|||
*
|
||||
|
||||
### Fixed
|
||||
*
|
||||
*
|
||||
* Fixed handling cancelled blob and availability requests
|
||||
* Fixed redundant blob requests to a peer
|
||||
|
||||
### Deprecated
|
||||
* Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead.
|
||||
|
@ -25,12 +25,19 @@ at anytime.
|
|||
*
|
||||
|
||||
### Added
|
||||
*
|
||||
*
|
||||
* Added WAL pragma to sqlite3
|
||||
* Added unit tests for `BlobFile`
|
||||
* Updated exchange rate tests for the lbry.io api
|
||||
* Use `hashlib` for sha384 instead of `pycrypto`
|
||||
* Use `cryptography` instead of `pycrypto` for blob encryption and decryption
|
||||
* Use `cryptography` for PKCS7 instead of doing it manually
|
||||
* Use `BytesIO` buffers instead of temp files when processing blobs
|
||||
* Refactored and pruned blob related classes into `lbrynet.blobs`
|
||||
* Changed several `assert`s to raise more useful errors
|
||||
|
||||
### Removed
|
||||
*
|
||||
*
|
||||
* Removed `TempBlobFile`
|
||||
* Removed unused `EncryptedFileOpener`
|
||||
|
||||
|
||||
## [0.16.1] - 2017-09-20
|
||||
|
|
4
lbrynet/blob/__init__.py
Normal file
4
lbrynet/blob/__init__.py
Normal file
|
@ -0,0 +1,4 @@
|
|||
from blob_file import BlobFile
|
||||
from creator import BlobFileCreator
|
||||
from writer import HashBlobWriter
|
||||
from reader import HashBlobReader
|
227
lbrynet/blob/blob_file.py
Normal file
227
lbrynet/blob/blob_file.py
Normal file
|
@ -0,0 +1,227 @@
|
|||
import logging
|
||||
import os
|
||||
import threading
|
||||
from twisted.internet import defer, threads
|
||||
from twisted.protocols.basic import FileSender
|
||||
from twisted.web.client import FileBodyProducer
|
||||
from twisted.python.failure import Failure
|
||||
from lbrynet import conf
|
||||
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError
|
||||
from lbrynet.core.utils import is_valid_blobhash
|
||||
from lbrynet.blob.writer import HashBlobWriter
|
||||
from lbrynet.blob.reader import HashBlobReader
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BlobFile(object):
|
||||
"""
|
||||
A chunk of data available on the network which is specified by a hashsum
|
||||
|
||||
This class is used to create blobs on the local filesystem
|
||||
when we already know the blob hash before hand (i.e., when downloading blobs)
|
||||
Also can be used for reading from blobs on the local filesystem
|
||||
"""
|
||||
|
||||
def __str__(self):
|
||||
return self.blob_hash[:16]
|
||||
|
||||
def __repr__(self):
|
||||
return '<{}({})>'.format(self.__class__.__name__, str(self))
|
||||
|
||||
def __init__(self, blob_dir, blob_hash, length=None):
|
||||
if not is_valid_blobhash(blob_hash):
|
||||
raise InvalidBlobHashError(blob_hash)
|
||||
self.blob_hash = blob_hash
|
||||
self.length = length
|
||||
self.writers = {} # {Peer: writer, finished_deferred}
|
||||
self._verified = False
|
||||
self.readers = 0
|
||||
self.blob_dir = blob_dir
|
||||
self.file_path = os.path.join(blob_dir, self.blob_hash)
|
||||
self.setting_verified_blob_lock = threading.Lock()
|
||||
self.moved_verified_blob = False
|
||||
if os.path.isfile(self.file_path):
|
||||
self.set_length(os.path.getsize(self.file_path))
|
||||
# This assumes that the hash of the blob has already been
|
||||
# checked as part of the blob creation process. It might
|
||||
# be worth having a function that checks the actual hash;
|
||||
# its probably too expensive to have that check be part of
|
||||
# this call.
|
||||
self._verified = True
|
||||
|
||||
def open_for_writing(self, peer):
|
||||
"""
|
||||
open a blob file to be written by peer, supports concurrent
|
||||
writers, as long as they are from differnt peers.
|
||||
|
||||
returns tuple of (writer, finished_deferred)
|
||||
|
||||
writer - a file like object with a write() function, close() when finished
|
||||
finished_deferred - deferred that is fired when write is finished and returns
|
||||
a instance of itself as HashBlob
|
||||
"""
|
||||
if not peer in self.writers:
|
||||
log.debug("Opening %s to be written by %s", str(self), str(peer))
|
||||
finished_deferred = defer.Deferred()
|
||||
writer = HashBlobWriter(self.get_length, self.writer_finished)
|
||||
self.writers[peer] = (writer, finished_deferred)
|
||||
return (writer, finished_deferred)
|
||||
log.warning("Tried to download the same file twice simultaneously from the same peer")
|
||||
return None, None
|
||||
|
||||
def open_for_reading(self):
|
||||
"""
|
||||
open blob for reading
|
||||
|
||||
returns a file handle that can be read() from.
|
||||
once finished with the file handle, user must call close_read_handle()
|
||||
otherwise blob cannot be deleted.
|
||||
"""
|
||||
if self._verified is True:
|
||||
file_handle = None
|
||||
try:
|
||||
file_handle = open(self.file_path, 'rb')
|
||||
self.readers += 1
|
||||
return file_handle
|
||||
except IOError:
|
||||
log.exception('Failed to open %s', self.file_path)
|
||||
self.close_read_handle(file_handle)
|
||||
return None
|
||||
|
||||
def delete(self):
|
||||
"""
|
||||
delete blob file from file system, prevent deletion
|
||||
if a blob is being read from or written to
|
||||
|
||||
returns a deferred that firesback when delete is completed
|
||||
"""
|
||||
if not self.writers and not self.readers:
|
||||
self._verified = False
|
||||
self.moved_verified_blob = False
|
||||
|
||||
def delete_from_file_system():
|
||||
if os.path.isfile(self.file_path):
|
||||
os.remove(self.file_path)
|
||||
|
||||
d = threads.deferToThread(delete_from_file_system)
|
||||
|
||||
def log_error(err):
|
||||
log.warning("An error occurred deleting %s: %s",
|
||||
str(self.file_path), err.getErrorMessage())
|
||||
return err
|
||||
|
||||
d.addErrback(log_error)
|
||||
return d
|
||||
else:
|
||||
return defer.fail(Failure(
|
||||
ValueError("File is currently being read or written and cannot be deleted")))
|
||||
|
||||
@property
|
||||
def verified(self):
|
||||
"""
|
||||
Protect verified from being modified by other classes.
|
||||
verified is True if a write to a blob has completed succesfully,
|
||||
or a blob has been read to have the same length as specified
|
||||
in init
|
||||
"""
|
||||
return self._verified
|
||||
|
||||
def set_length(self, length):
|
||||
if self.length is not None and length == self.length:
|
||||
return True
|
||||
if self.length is None and 0 <= length <= conf.settings['BLOB_SIZE']:
|
||||
self.length = length
|
||||
return True
|
||||
log.warning("Got an invalid length. Previous length: %s, Invalid length: %s",
|
||||
self.length, length)
|
||||
return False
|
||||
|
||||
def get_length(self):
|
||||
return self.length
|
||||
|
||||
def get_is_verified(self):
|
||||
return self.verified
|
||||
|
||||
def is_downloading(self):
|
||||
if self.writers:
|
||||
return True
|
||||
return False
|
||||
|
||||
def read(self, write_func):
|
||||
def close_self(*args):
|
||||
self.close_read_handle(file_handle)
|
||||
return args[0]
|
||||
|
||||
file_sender = FileSender()
|
||||
reader = HashBlobReader(write_func)
|
||||
file_handle = self.open_for_reading()
|
||||
if file_handle is not None:
|
||||
d = file_sender.beginFileTransfer(file_handle, reader)
|
||||
d.addCallback(close_self)
|
||||
else:
|
||||
d = defer.fail(IOError("Could not read the blob"))
|
||||
return d
|
||||
|
||||
def writer_finished(self, writer, err=None):
|
||||
def fire_finished_deferred():
|
||||
self._verified = True
|
||||
for p, (w, finished_deferred) in self.writers.items():
|
||||
if w == writer:
|
||||
del self.writers[p]
|
||||
finished_deferred.callback(self)
|
||||
return True
|
||||
log.warning(
|
||||
"Somehow, the writer that was accepted as being valid was already removed: %s",
|
||||
writer)
|
||||
return False
|
||||
|
||||
def errback_finished_deferred(err):
|
||||
for p, (w, finished_deferred) in self.writers.items():
|
||||
if w == writer:
|
||||
del self.writers[p]
|
||||
finished_deferred.errback(err)
|
||||
|
||||
def cancel_other_downloads():
|
||||
for p, (w, finished_deferred) in self.writers.items():
|
||||
w.close()
|
||||
|
||||
if err is None:
|
||||
if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash:
|
||||
if self._verified is False:
|
||||
d = self._save_verified_blob(writer)
|
||||
d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred)
|
||||
d.addCallback(lambda _: cancel_other_downloads())
|
||||
else:
|
||||
errback_finished_deferred(Failure(DownloadCanceledError()))
|
||||
d = defer.succeed(True)
|
||||
else:
|
||||
err_string = "length vs expected: {0}, {1}, hash vs expected: {2}, {3}"
|
||||
err_string = err_string.format(self.length, writer.len_so_far, self.blob_hash,
|
||||
writer.blob_hash)
|
||||
errback_finished_deferred(Failure(InvalidDataError(err_string)))
|
||||
d = defer.succeed(True)
|
||||
else:
|
||||
errback_finished_deferred(err)
|
||||
d = defer.succeed(True)
|
||||
d.addBoth(lambda _: writer.close_handle())
|
||||
return d
|
||||
|
||||
def close_read_handle(self, file_handle):
|
||||
if file_handle is not None:
|
||||
file_handle.close()
|
||||
self.readers -= 1
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _save_verified_blob(self, writer):
|
||||
with self.setting_verified_blob_lock:
|
||||
if self.moved_verified_blob is False:
|
||||
writer.write_handle.seek(0)
|
||||
out_path = os.path.join(self.blob_dir, self.blob_hash)
|
||||
producer = FileBodyProducer(writer.write_handle)
|
||||
yield producer.startProducing(open(out_path, 'wb'))
|
||||
self.moved_verified_blob = True
|
||||
defer.returnValue(True)
|
||||
else:
|
||||
raise DownloadCanceledError()
|
51
lbrynet/blob/creator.py
Normal file
51
lbrynet/blob/creator.py
Normal file
|
@ -0,0 +1,51 @@
|
|||
import os
|
||||
import logging
|
||||
from io import BytesIO
|
||||
from twisted.internet import defer
|
||||
from twisted.web.client import FileBodyProducer
|
||||
from lbrynet.core.cryptoutils import get_lbry_hash_obj
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BlobFileCreator(object):
|
||||
"""
|
||||
This class is used to create blobs on the local filesystem
|
||||
when we do not know the blob hash beforehand (i.e, when creating
|
||||
a new stream)
|
||||
"""
|
||||
def __init__(self, blob_dir):
|
||||
self.blob_dir = blob_dir
|
||||
self.buffer = BytesIO()
|
||||
self._is_open = True
|
||||
self._hashsum = get_lbry_hash_obj()
|
||||
self.len_so_far = 0
|
||||
self.blob_hash = None
|
||||
self.length = None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def close(self):
|
||||
self.length = self.len_so_far
|
||||
self.blob_hash = self._hashsum.hexdigest()
|
||||
if self.blob_hash and self._is_open and self.length > 0:
|
||||
# do not save 0 length files (empty tail blob in streams)
|
||||
# or if its been closed already
|
||||
self.buffer.seek(0)
|
||||
out_path = os.path.join(self.blob_dir, self.blob_hash)
|
||||
producer = FileBodyProducer(self.buffer)
|
||||
yield producer.startProducing(open(out_path, 'wb'))
|
||||
self._is_open = False
|
||||
if self.length > 0:
|
||||
defer.returnValue(self.blob_hash)
|
||||
else:
|
||||
# 0 length files (empty tail blob in streams )
|
||||
# must return None as their blob_hash for
|
||||
# it to be saved properly by EncryptedFileMetadataManagers
|
||||
defer.returnValue(None)
|
||||
|
||||
def write(self, data):
|
||||
if not self._is_open:
|
||||
raise IOError
|
||||
self._hashsum.update(data)
|
||||
self.len_so_far += len(data)
|
||||
self.buffer.write(data)
|
30
lbrynet/blob/reader.py
Normal file
30
lbrynet/blob/reader.py
Normal file
|
@ -0,0 +1,30 @@
|
|||
import logging
|
||||
from twisted.internet import interfaces
|
||||
from zope.interface import implements
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HashBlobReader(object):
|
||||
implements(interfaces.IConsumer)
|
||||
|
||||
def __init__(self, write_func):
|
||||
self.write_func = write_func
|
||||
|
||||
def registerProducer(self, producer, streaming):
|
||||
from twisted.internet import reactor
|
||||
|
||||
self.producer = producer
|
||||
self.streaming = streaming
|
||||
if self.streaming is False:
|
||||
reactor.callLater(0, self.producer.resumeProducing)
|
||||
|
||||
def unregisterProducer(self):
|
||||
pass
|
||||
|
||||
def write(self, data):
|
||||
from twisted.internet import reactor
|
||||
|
||||
self.write_func(data)
|
||||
if self.streaming is False:
|
||||
reactor.callLater(0, self.producer.resumeProducing)
|
53
lbrynet/blob/writer.py
Normal file
53
lbrynet/blob/writer.py
Normal file
|
@ -0,0 +1,53 @@
|
|||
import logging
|
||||
from io import BytesIO
|
||||
from twisted.python.failure import Failure
|
||||
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError
|
||||
from lbrynet.core.cryptoutils import get_lbry_hash_obj
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HashBlobWriter(object):
|
||||
def __init__(self, length_getter, finished_cb):
|
||||
self.write_handle = BytesIO()
|
||||
self.length_getter = length_getter
|
||||
self.finished_cb = finished_cb
|
||||
self.finished_cb_d = None
|
||||
self._hashsum = get_lbry_hash_obj()
|
||||
self.len_so_far = 0
|
||||
|
||||
@property
|
||||
def blob_hash(self):
|
||||
return self._hashsum.hexdigest()
|
||||
|
||||
def write(self, data):
|
||||
if self.write_handle is None:
|
||||
log.exception("writer has already been closed")
|
||||
raise IOError('I/O operation on closed file')
|
||||
|
||||
self._hashsum.update(data)
|
||||
self.len_so_far += len(data)
|
||||
if self.len_so_far > self.length_getter():
|
||||
self.finished_cb_d = self.finished_cb(
|
||||
self,
|
||||
Failure(InvalidDataError("Length so far is greater than the expected length."
|
||||
" %s to %s" % (self.len_so_far,
|
||||
self.length_getter()))))
|
||||
else:
|
||||
self.write_handle.write(data)
|
||||
if self.len_so_far == self.length_getter():
|
||||
self.finished_cb_d = self.finished_cb(self)
|
||||
|
||||
def close_handle(self):
|
||||
if self.write_handle is not None:
|
||||
self.write_handle.close()
|
||||
self.write_handle = None
|
||||
|
||||
def close(self, reason=None):
|
||||
# if we've already called finished_cb because we either finished writing
|
||||
# or closed already, do nothing
|
||||
if self.finished_cb_d is not None:
|
||||
return
|
||||
if reason is None:
|
||||
reason = Failure(DownloadCanceledError())
|
||||
self.finished_cb_d = self.finished_cb(self, reason)
|
|
@ -6,7 +6,8 @@ import sqlite3
|
|||
from twisted.internet import threads, defer, reactor
|
||||
from twisted.enterprise import adbapi
|
||||
from lbrynet import conf
|
||||
from lbrynet.core.HashBlob import BlobFile, BlobFileCreator
|
||||
from lbrynet.blob.blob_file import BlobFile
|
||||
from lbrynet.blob.creator import BlobFileCreator
|
||||
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
|
||||
from lbrynet.core.sqlite_helpers import rerun_if_locked
|
||||
|
||||
|
@ -29,7 +30,6 @@ class DiskBlobManager(DHTHashSupplier):
|
|||
self.blob_dir = blob_dir
|
||||
self.db_file = os.path.join(db_dir, "blobs.db")
|
||||
self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False)
|
||||
self.blob_type = BlobFile
|
||||
self.blob_creator_type = BlobFileCreator
|
||||
# TODO: consider using an LRU for blobs as there could potentially
|
||||
# be thousands of blobs loaded up, many stale
|
||||
|
@ -51,7 +51,8 @@ class DiskBlobManager(DHTHashSupplier):
|
|||
"""Return a blob identified by blob_hash, which may be a new blob or a
|
||||
blob that is already on the hard disk
|
||||
"""
|
||||
assert length is None or isinstance(length, int)
|
||||
if length is not None and not isinstance(length, int):
|
||||
raise Exception("invalid length type: %s (%s)", length, str(type(length)))
|
||||
if blob_hash in self.blobs:
|
||||
return defer.succeed(self.blobs[blob_hash])
|
||||
return self._make_new_blob(blob_hash, length)
|
||||
|
@ -61,7 +62,7 @@ class DiskBlobManager(DHTHashSupplier):
|
|||
|
||||
def _make_new_blob(self, blob_hash, length=None):
|
||||
log.debug('Making a new blob for %s', blob_hash)
|
||||
blob = self.blob_type(self.blob_dir, blob_hash, length)
|
||||
blob = BlobFile(self.blob_dir, blob_hash, length)
|
||||
self.blobs[blob_hash] = blob
|
||||
return defer.succeed(blob)
|
||||
|
||||
|
@ -89,10 +90,13 @@ class DiskBlobManager(DHTHashSupplier):
|
|||
|
||||
def creator_finished(self, blob_creator, should_announce):
|
||||
log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
|
||||
assert blob_creator.blob_hash is not None
|
||||
assert blob_creator.blob_hash not in self.blobs
|
||||
assert blob_creator.length is not None
|
||||
new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, blob_creator.length)
|
||||
if blob_creator.blob_hash is None:
|
||||
raise Exception("Blob hash is None")
|
||||
if blob_creator.blob_hash in self.blobs:
|
||||
raise Exception("Creator finished for blob that is already marked as completed")
|
||||
if blob_creator.length is None:
|
||||
raise Exception("Blob has a length of 0")
|
||||
new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length)
|
||||
self.blobs[blob_creator.blob_hash] = new_blob
|
||||
next_announce_time = self.get_next_announce_time()
|
||||
d = self.blob_completed(new_blob, next_announce_time, should_announce)
|
||||
|
@ -137,6 +141,7 @@ class DiskBlobManager(DHTHashSupplier):
|
|||
# threads.
|
||||
|
||||
def create_tables(transaction):
|
||||
transaction.execute('PRAGMA journal_mode=WAL')
|
||||
transaction.execute("create table if not exists blobs (" +
|
||||
" blob_hash text primary key, " +
|
||||
" blob_length integer, " +
|
||||
|
@ -255,5 +260,3 @@ class DiskBlobManager(DHTHashSupplier):
|
|||
"insert into upload values (null, ?, ?, ?, ?) ",
|
||||
(blob_hash, str(host), float(rate), ts))
|
||||
return d
|
||||
|
||||
|
||||
|
|
|
@ -1,413 +0,0 @@
|
|||
from StringIO import StringIO
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
import threading
|
||||
import shutil
|
||||
from twisted.internet import interfaces, defer, threads
|
||||
from twisted.protocols.basic import FileSender
|
||||
from twisted.python.failure import Failure
|
||||
from zope.interface import implements
|
||||
from lbrynet import conf
|
||||
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError
|
||||
from lbrynet.core.cryptoutils import get_lbry_hash_obj
|
||||
from lbrynet.core.utils import is_valid_blobhash
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HashBlobReader(object):
|
||||
implements(interfaces.IConsumer)
|
||||
|
||||
def __init__(self, write_func):
|
||||
self.write_func = write_func
|
||||
|
||||
def registerProducer(self, producer, streaming):
|
||||
|
||||
from twisted.internet import reactor
|
||||
|
||||
self.producer = producer
|
||||
self.streaming = streaming
|
||||
if self.streaming is False:
|
||||
reactor.callLater(0, self.producer.resumeProducing)
|
||||
|
||||
def unregisterProducer(self):
|
||||
pass
|
||||
|
||||
def write(self, data):
|
||||
|
||||
from twisted.internet import reactor
|
||||
|
||||
self.write_func(data)
|
||||
if self.streaming is False:
|
||||
reactor.callLater(0, self.producer.resumeProducing)
|
||||
|
||||
|
||||
class HashBlobWriter(object):
|
||||
def __init__(self, write_handle, length_getter, finished_cb):
|
||||
self.write_handle = write_handle
|
||||
self.length_getter = length_getter
|
||||
self.finished_cb = finished_cb
|
||||
self._hashsum = get_lbry_hash_obj()
|
||||
self.len_so_far = 0
|
||||
|
||||
@property
|
||||
def blob_hash(self):
|
||||
return self._hashsum.hexdigest()
|
||||
|
||||
def write(self, data):
|
||||
self._hashsum.update(data)
|
||||
self.len_so_far += len(data)
|
||||
if self.len_so_far > self.length_getter():
|
||||
self.finished_cb(
|
||||
self,
|
||||
Failure(InvalidDataError("Length so far is greater than the expected length."
|
||||
" %s to %s" % (self.len_so_far,
|
||||
self.length_getter()))))
|
||||
else:
|
||||
if self.write_handle is None:
|
||||
log.debug("Tried to write to a write_handle that was None.")
|
||||
return
|
||||
self.write_handle.write(data)
|
||||
if self.len_so_far == self.length_getter():
|
||||
self.finished_cb(self)
|
||||
|
||||
def cancel(self, reason=None):
|
||||
if reason is None:
|
||||
reason = Failure(DownloadCanceledError())
|
||||
self.finished_cb(self, reason)
|
||||
|
||||
|
||||
class HashBlob(object):
|
||||
"""A chunk of data available on the network which is specified by a hashsum"""
|
||||
|
||||
def __init__(self, blob_hash, length=None):
|
||||
assert is_valid_blobhash(blob_hash)
|
||||
self.blob_hash = blob_hash
|
||||
self.length = length
|
||||
self.writers = {} # {Peer: writer, finished_deferred}
|
||||
self.finished_deferred = None
|
||||
self._verified = False
|
||||
self.readers = 0
|
||||
|
||||
@property
|
||||
def verified(self):
|
||||
# protect verified from being modified by other classes
|
||||
return self._verified
|
||||
|
||||
def set_length(self, length):
|
||||
if self.length is not None and length == self.length:
|
||||
return True
|
||||
if self.length is None and 0 <= length <= conf.settings['BLOB_SIZE']:
|
||||
self.length = length
|
||||
return True
|
||||
log.warning("Got an invalid length. Previous length: %s, Invalid length: %s",
|
||||
self.length, length)
|
||||
return False
|
||||
|
||||
def get_length(self):
|
||||
return self.length
|
||||
|
||||
def is_validated(self):
|
||||
return bool(self._verified)
|
||||
|
||||
def is_downloading(self):
|
||||
if self.writers:
|
||||
return True
|
||||
return False
|
||||
|
||||
def read(self, write_func):
|
||||
|
||||
def close_self(*args):
|
||||
self.close_read_handle(file_handle)
|
||||
return args[0]
|
||||
|
||||
file_sender = FileSender()
|
||||
reader = HashBlobReader(write_func)
|
||||
file_handle = self.open_for_reading()
|
||||
if file_handle is not None:
|
||||
d = file_sender.beginFileTransfer(file_handle, reader)
|
||||
d.addCallback(close_self)
|
||||
else:
|
||||
d = defer.fail(ValueError("Could not read the blob"))
|
||||
return d
|
||||
|
||||
def writer_finished(self, writer, err=None):
|
||||
|
||||
def fire_finished_deferred():
|
||||
self._verified = True
|
||||
for p, (w, finished_deferred) in self.writers.items():
|
||||
if w == writer:
|
||||
finished_deferred.callback(self)
|
||||
del self.writers[p]
|
||||
return True
|
||||
log.warning(
|
||||
"Somehow, the writer that was accepted as being valid was already removed: %s",
|
||||
writer)
|
||||
return False
|
||||
|
||||
def errback_finished_deferred(err):
|
||||
for p, (w, finished_deferred) in self.writers.items():
|
||||
if w == writer:
|
||||
finished_deferred.errback(err)
|
||||
del self.writers[p]
|
||||
|
||||
def cancel_other_downloads():
|
||||
for p, (w, finished_deferred) in self.writers.items():
|
||||
w.cancel()
|
||||
|
||||
if err is None:
|
||||
if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash:
|
||||
if self._verified is False:
|
||||
d = self._save_verified_blob(writer)
|
||||
d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred)
|
||||
d.addCallback(lambda _: cancel_other_downloads())
|
||||
else:
|
||||
errback_finished_deferred(Failure(DownloadCanceledError()))
|
||||
d = defer.succeed(True)
|
||||
else:
|
||||
err_string = "length vs expected: {0}, {1}, hash vs expected: {2}, {3}"
|
||||
err_string = err_string.format(self.length, writer.len_so_far, self.blob_hash,
|
||||
writer.blob_hash)
|
||||
errback_finished_deferred(Failure(InvalidDataError(err_string)))
|
||||
d = defer.succeed(True)
|
||||
else:
|
||||
errback_finished_deferred(err)
|
||||
d = defer.succeed(True)
|
||||
|
||||
d.addBoth(lambda _: self._close_writer(writer))
|
||||
return d
|
||||
|
||||
def open_for_writing(self, peer):
|
||||
pass
|
||||
|
||||
def open_for_reading(self):
|
||||
pass
|
||||
|
||||
def delete(self):
|
||||
pass
|
||||
|
||||
def close_read_handle(self, file_handle):
|
||||
pass
|
||||
|
||||
def _close_writer(self, writer):
|
||||
pass
|
||||
|
||||
def _save_verified_blob(self, writer):
|
||||
pass
|
||||
|
||||
def __str__(self):
|
||||
return self.blob_hash[:16]
|
||||
|
||||
def __repr__(self):
|
||||
return '<{}({})>'.format(self.__class__.__name__, str(self))
|
||||
|
||||
|
||||
class BlobFile(HashBlob):
|
||||
"""A HashBlob which will be saved to the hard disk of the downloader"""
|
||||
|
||||
def __init__(self, blob_dir, *args):
|
||||
HashBlob.__init__(self, *args)
|
||||
self.blob_dir = blob_dir
|
||||
self.file_path = os.path.join(blob_dir, self.blob_hash)
|
||||
self.setting_verified_blob_lock = threading.Lock()
|
||||
self.moved_verified_blob = False
|
||||
if os.path.isfile(self.file_path):
|
||||
self.set_length(os.path.getsize(self.file_path))
|
||||
# This assumes that the hash of the blob has already been
|
||||
# checked as part of the blob creation process. It might
|
||||
# be worth having a function that checks the actual hash;
|
||||
# its probably too expensive to have that check be part of
|
||||
# this call.
|
||||
self._verified = True
|
||||
|
||||
def open_for_writing(self, peer):
|
||||
if not peer in self.writers:
|
||||
log.debug("Opening %s to be written by %s", str(self), str(peer))
|
||||
write_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir)
|
||||
finished_deferred = defer.Deferred()
|
||||
writer = HashBlobWriter(write_file, self.get_length, self.writer_finished)
|
||||
|
||||
self.writers[peer] = (writer, finished_deferred)
|
||||
return finished_deferred, writer.write, writer.cancel
|
||||
log.warning("Tried to download the same file twice simultaneously from the same peer")
|
||||
return None, None, None
|
||||
|
||||
def open_for_reading(self):
|
||||
if self._verified is True:
|
||||
file_handle = None
|
||||
try:
|
||||
file_handle = open(self.file_path, 'rb')
|
||||
self.readers += 1
|
||||
return file_handle
|
||||
except IOError:
|
||||
log.exception('Failed to open %s', self.file_path)
|
||||
self.close_read_handle(file_handle)
|
||||
return None
|
||||
|
||||
def delete(self):
|
||||
if not self.writers and not self.readers:
|
||||
self._verified = False
|
||||
self.moved_verified_blob = False
|
||||
|
||||
def delete_from_file_system():
|
||||
if os.path.isfile(self.file_path):
|
||||
os.remove(self.file_path)
|
||||
|
||||
d = threads.deferToThread(delete_from_file_system)
|
||||
|
||||
def log_error(err):
|
||||
log.warning("An error occurred deleting %s: %s",
|
||||
str(self.file_path), err.getErrorMessage())
|
||||
return err
|
||||
|
||||
d.addErrback(log_error)
|
||||
return d
|
||||
else:
|
||||
return defer.fail(Failure(
|
||||
ValueError("File is currently being read or written and cannot be deleted")))
|
||||
|
||||
def close_read_handle(self, file_handle):
|
||||
if file_handle is not None:
|
||||
file_handle.close()
|
||||
self.readers -= 1
|
||||
|
||||
def _close_writer(self, writer):
|
||||
if writer.write_handle is not None:
|
||||
log.debug("Closing %s", str(self))
|
||||
name = writer.write_handle.name
|
||||
writer.write_handle.close()
|
||||
threads.deferToThread(os.remove, name)
|
||||
writer.write_handle = None
|
||||
|
||||
def _save_verified_blob(self, writer):
|
||||
|
||||
def move_file():
|
||||
with self.setting_verified_blob_lock:
|
||||
if self.moved_verified_blob is False:
|
||||
temp_file_name = writer.write_handle.name
|
||||
writer.write_handle.close()
|
||||
shutil.move(temp_file_name, self.file_path)
|
||||
writer.write_handle = None
|
||||
self.moved_verified_blob = True
|
||||
return True
|
||||
else:
|
||||
raise DownloadCanceledError()
|
||||
|
||||
return threads.deferToThread(move_file)
|
||||
|
||||
|
||||
class TempBlob(HashBlob):
|
||||
"""A HashBlob which will only exist in memory"""
|
||||
def __init__(self, *args):
|
||||
HashBlob.__init__(self, *args)
|
||||
self.data_buffer = ""
|
||||
|
||||
def open_for_writing(self, peer):
|
||||
if not peer in self.writers:
|
||||
temp_buffer = StringIO()
|
||||
finished_deferred = defer.Deferred()
|
||||
writer = HashBlobWriter(temp_buffer, self.get_length, self.writer_finished)
|
||||
|
||||
self.writers[peer] = (writer, finished_deferred)
|
||||
return finished_deferred, writer.write, writer.cancel
|
||||
return None, None, None
|
||||
|
||||
def open_for_reading(self):
|
||||
if self._verified is True:
|
||||
return StringIO(self.data_buffer)
|
||||
return None
|
||||
|
||||
def delete(self):
|
||||
if not self.writers and not self.readers:
|
||||
self._verified = False
|
||||
self.data_buffer = ''
|
||||
return defer.succeed(True)
|
||||
else:
|
||||
return defer.fail(Failure(
|
||||
ValueError("Blob is currently being read or written and cannot be deleted")))
|
||||
|
||||
def close_read_handle(self, file_handle):
|
||||
file_handle.close()
|
||||
|
||||
def _close_writer(self, writer):
|
||||
if writer.write_handle is not None:
|
||||
writer.write_handle.close()
|
||||
writer.write_handle = None
|
||||
|
||||
def _save_verified_blob(self, writer):
|
||||
if not self.data_buffer:
|
||||
self.data_buffer = writer.write_handle.getvalue()
|
||||
writer.write_handle.close()
|
||||
writer.write_handle = None
|
||||
return defer.succeed(True)
|
||||
else:
|
||||
return defer.fail(Failure(DownloadCanceledError()))
|
||||
|
||||
|
||||
class HashBlobCreator(object):
|
||||
def __init__(self):
|
||||
self._hashsum = get_lbry_hash_obj()
|
||||
self.len_so_far = 0
|
||||
self.blob_hash = None
|
||||
self.length = None
|
||||
|
||||
def open(self):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
self.length = self.len_so_far
|
||||
if self.length == 0:
|
||||
self.blob_hash = None
|
||||
else:
|
||||
self.blob_hash = self._hashsum.hexdigest()
|
||||
d = self._close()
|
||||
if self.blob_hash is not None:
|
||||
d.addCallback(lambda _: self.blob_hash)
|
||||
else:
|
||||
d.addCallback(lambda _: None)
|
||||
return d
|
||||
|
||||
def write(self, data):
|
||||
self._hashsum.update(data)
|
||||
self.len_so_far += len(data)
|
||||
self._write(data)
|
||||
|
||||
def _close(self):
|
||||
pass
|
||||
|
||||
def _write(self, data):
|
||||
pass
|
||||
|
||||
|
||||
class BlobFileCreator(HashBlobCreator):
|
||||
def __init__(self, blob_dir):
|
||||
HashBlobCreator.__init__(self)
|
||||
self.blob_dir = blob_dir
|
||||
self.out_file = tempfile.NamedTemporaryFile(delete=False, dir=self.blob_dir)
|
||||
|
||||
def _close(self):
|
||||
temp_file_name = self.out_file.name
|
||||
self.out_file.close()
|
||||
if self.blob_hash is not None:
|
||||
shutil.move(temp_file_name, os.path.join(self.blob_dir, self.blob_hash))
|
||||
else:
|
||||
os.remove(temp_file_name)
|
||||
return defer.succeed(True)
|
||||
|
||||
def _write(self, data):
|
||||
self.out_file.write(data)
|
||||
|
||||
|
||||
class TempBlobCreator(HashBlobCreator):
|
||||
def __init__(self):
|
||||
HashBlobCreator.__init__(self)
|
||||
# TODO: use StringIO
|
||||
self.data_buffer = ''
|
||||
|
||||
def _close(self):
|
||||
return defer.succeed(True)
|
||||
|
||||
def _write(self, data):
|
||||
self.data_buffer += data
|
|
@ -1,76 +0,0 @@
|
|||
import logging
|
||||
from twisted.internet import interfaces, defer
|
||||
from zope.interface import implements
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StreamCreator(object):
|
||||
"""Classes which derive from this class create a 'stream', which can be any
|
||||
collection of associated blobs and associated metadata. These classes
|
||||
use the IConsumer interface to get data from an IProducer and transform
|
||||
the data into a 'stream'"""
|
||||
|
||||
implements(interfaces.IConsumer)
|
||||
|
||||
def __init__(self, name):
|
||||
"""
|
||||
@param name: the name of the stream
|
||||
"""
|
||||
self.name = name
|
||||
self.stopped = True
|
||||
self.producer = None
|
||||
self.streaming = None
|
||||
self.blob_count = -1
|
||||
self.current_blob = None
|
||||
self.finished_deferreds = []
|
||||
|
||||
def _blob_finished(self, blob_info):
|
||||
pass
|
||||
|
||||
def registerProducer(self, producer, streaming):
|
||||
|
||||
from twisted.internet import reactor
|
||||
|
||||
self.producer = producer
|
||||
self.streaming = streaming
|
||||
self.stopped = False
|
||||
if streaming is False:
|
||||
reactor.callLater(0, self.producer.resumeProducing)
|
||||
|
||||
def unregisterProducer(self):
|
||||
self.stopped = True
|
||||
self.producer = None
|
||||
|
||||
def stop(self):
|
||||
"""Stop creating the stream. Create the terminating zero-length blob."""
|
||||
log.debug("stop has been called for StreamCreator")
|
||||
self.stopped = True
|
||||
if self.current_blob is not None:
|
||||
current_blob = self.current_blob
|
||||
d = current_blob.close()
|
||||
d.addCallback(self._blob_finished)
|
||||
self.finished_deferreds.append(d)
|
||||
self.current_blob = None
|
||||
self._finalize()
|
||||
dl = defer.DeferredList(self.finished_deferreds)
|
||||
dl.addCallback(lambda _: self._finished())
|
||||
return dl
|
||||
|
||||
def _finalize(self):
|
||||
pass
|
||||
|
||||
def _finished(self):
|
||||
pass
|
||||
|
||||
# TODO: move the stream creation process to its own thread and
|
||||
# remove the reactor from this process.
|
||||
def write(self, data):
|
||||
from twisted.internet import reactor
|
||||
self._write(data)
|
||||
if self.stopped is False and self.streaming is False:
|
||||
reactor.callLater(0, self.producer.resumeProducing)
|
||||
|
||||
def _write(self, data):
|
||||
pass
|
|
@ -4,6 +4,7 @@ from decimal import Decimal
|
|||
|
||||
from twisted.internet import defer
|
||||
from twisted.python.failure import Failure
|
||||
from twisted.internet.error import ConnectionAborted
|
||||
from zope.interface import implements
|
||||
|
||||
from lbrynet.core.Error import ConnectionClosedBeforeResponseError
|
||||
|
@ -225,7 +226,8 @@ class RequestHelper(object):
|
|||
self.requestor._update_local_score(self.peer, score)
|
||||
|
||||
def _request_failed(self, reason, request_type):
|
||||
if reason.check(RequestCanceledError):
|
||||
if reason.check(DownloadCanceledError, RequestCanceledError, ConnectionAborted,
|
||||
ConnectionClosedBeforeResponseError):
|
||||
return
|
||||
if reason.check(NoResponseError):
|
||||
self.requestor._incompatible_peers.append(self.peer)
|
||||
|
@ -463,13 +465,13 @@ class DownloadRequest(RequestHelper):
|
|||
def find_blob(self, to_download):
|
||||
"""Return the first blob in `to_download` that is successfully opened for write."""
|
||||
for blob in to_download:
|
||||
if blob.is_validated():
|
||||
if blob.get_is_verified():
|
||||
log.debug('Skipping blob %s as its already validated', blob)
|
||||
continue
|
||||
d, write_func, cancel_func = blob.open_for_writing(self.peer)
|
||||
writer, d = blob.open_for_writing(self.peer)
|
||||
if d is not None:
|
||||
return BlobDownloadDetails(blob, d, write_func, cancel_func, self.peer)
|
||||
log.debug('Skipping blob %s as there was an issue opening it for writing', blob)
|
||||
return BlobDownloadDetails(blob, d, writer.write, writer.close, self.peer)
|
||||
log.warning('Skipping blob %s as there was an issue opening it for writing', blob)
|
||||
return None
|
||||
|
||||
def _make_request(self, blob_details):
|
||||
|
|
|
@ -50,6 +50,7 @@ class ClientProtocol(Protocol, TimeoutMixin):
|
|||
log.debug("Data receieved from %s", self.peer)
|
||||
self.setTimeout(None)
|
||||
self._rate_limiter.report_dl_bytes(len(data))
|
||||
|
||||
if self._downloading_blob is True:
|
||||
self._blob_download_request.write(data)
|
||||
else:
|
||||
|
@ -101,8 +102,7 @@ class ClientProtocol(Protocol, TimeoutMixin):
|
|||
d = self.add_request(blob_request)
|
||||
self._blob_download_request = blob_request
|
||||
blob_request.finished_deferred.addCallbacks(self._downloading_finished,
|
||||
self._downloading_failed)
|
||||
blob_request.finished_deferred.addErrback(self._handle_response_error)
|
||||
self._handle_response_error)
|
||||
return d
|
||||
else:
|
||||
raise ValueError("There is already a blob download request active")
|
||||
|
@ -110,7 +110,7 @@ class ClientProtocol(Protocol, TimeoutMixin):
|
|||
def cancel_requests(self):
|
||||
self.connection_closing = True
|
||||
ds = []
|
||||
err = failure.Failure(RequestCanceledError())
|
||||
err = RequestCanceledError()
|
||||
for key, d in self._response_deferreds.items():
|
||||
del self._response_deferreds[key]
|
||||
d.errback(err)
|
||||
|
@ -119,6 +119,7 @@ class ClientProtocol(Protocol, TimeoutMixin):
|
|||
self._blob_download_request.cancel(err)
|
||||
ds.append(self._blob_download_request.finished_deferred)
|
||||
self._blob_download_request = None
|
||||
self._downloading_blob = False
|
||||
return defer.DeferredList(ds)
|
||||
|
||||
######### Internal request handling #########
|
||||
|
@ -176,15 +177,24 @@ class ClientProtocol(Protocol, TimeoutMixin):
|
|||
|
||||
def _handle_response_error(self, err):
|
||||
# If an error gets to this point, log it and kill the connection.
|
||||
expected_errors = (MisbehavingPeerError, ConnectionClosedBeforeResponseError,
|
||||
DownloadCanceledError, RequestCanceledError)
|
||||
if not err.check(expected_errors):
|
||||
if err.check(DownloadCanceledError, RequestCanceledError, error.ConnectionAborted):
|
||||
# TODO: (wish-list) it seems silly to close the connection over this, and it shouldn't
|
||||
# TODO: always be this way. it's done this way now because the client has no other way
|
||||
# TODO: of telling the server it wants the download to stop. It would be great if the
|
||||
# TODO: protocol had such a mechanism.
|
||||
log.info("Closing the connection to %s because the download of blob %s was canceled",
|
||||
self.peer, self._blob_download_request.blob)
|
||||
result = None
|
||||
elif not err.check(MisbehavingPeerError, ConnectionClosedBeforeResponseError):
|
||||
log.warning("The connection to %s is closing due to: %s", self.peer, err)
|
||||
result = err
|
||||
else:
|
||||
log.error("The connection to %s is closing due to an unexpected error: %s",
|
||||
self.peer, err.getErrorMessage())
|
||||
if not err.check(RequestCanceledError):
|
||||
# The connection manager is closing the connection, so
|
||||
# there's no need to do it here.
|
||||
return err
|
||||
self.peer, err)
|
||||
result = err
|
||||
|
||||
self.transport.loseConnection()
|
||||
return result
|
||||
|
||||
def _handle_response(self, response):
|
||||
ds = []
|
||||
|
@ -225,7 +235,7 @@ class ClientProtocol(Protocol, TimeoutMixin):
|
|||
log.debug("Asking for another request from %s", self.peer)
|
||||
self._ask_for_request()
|
||||
else:
|
||||
log.debug("Not asking for another request from %s", self.peer)
|
||||
log.warning("Not asking for another request from %s", self.peer)
|
||||
self.transport.loseConnection()
|
||||
|
||||
dl.addCallback(get_next_request)
|
||||
|
@ -236,16 +246,6 @@ class ClientProtocol(Protocol, TimeoutMixin):
|
|||
self._downloading_blob = False
|
||||
return arg
|
||||
|
||||
def _downloading_failed(self, err):
|
||||
if err.check(DownloadCanceledError):
|
||||
# TODO: (wish-list) it seems silly to close the connection over this, and it shouldn't
|
||||
# TODO: always be this way. it's done this way now because the client has no other way
|
||||
# TODO: of telling the server it wants the download to stop. It would be great if the
|
||||
# TODO: protocol had such a mechanism.
|
||||
log.debug("Closing the connection to %s because the download of blob %s was canceled",
|
||||
self.peer, self._blob_download_request.blob)
|
||||
return err
|
||||
|
||||
######### IRateLimited #########
|
||||
|
||||
def throttle_upload(self):
|
||||
|
|
|
@ -64,14 +64,14 @@ class SingleProgressManager(object):
|
|||
|
||||
def stream_position(self):
|
||||
blobs = self.download_manager.blobs
|
||||
if blobs and blobs[0].is_validated():
|
||||
if blobs and blobs[0].get_is_verified():
|
||||
return 1
|
||||
return 0
|
||||
|
||||
def needed_blobs(self):
|
||||
blobs = self.download_manager.blobs
|
||||
assert len(blobs) == 1
|
||||
return [b for b in blobs.itervalues() if not b.is_validated()]
|
||||
return [b for b in blobs.itervalues() if not b.get_is_verified()]
|
||||
|
||||
|
||||
class DummyBlobHandler(object):
|
||||
|
|
|
@ -93,7 +93,7 @@ class FullStreamProgressManager(StreamProgressManager):
|
|||
return (
|
||||
i not in blobs or
|
||||
(
|
||||
not blobs[i].is_validated() and
|
||||
not blobs[i].get_is_verified() and
|
||||
i not in self.provided_blob_nums
|
||||
)
|
||||
)
|
||||
|
@ -112,7 +112,7 @@ class FullStreamProgressManager(StreamProgressManager):
|
|||
blobs = self.download_manager.blobs
|
||||
return [
|
||||
b for n, b in blobs.iteritems()
|
||||
if not b.is_validated() and not n in self.provided_blob_nums
|
||||
if not b.get_is_verified() and not n in self.provided_blob_nums
|
||||
]
|
||||
|
||||
######### internal #########
|
||||
|
@ -145,7 +145,7 @@ class FullStreamProgressManager(StreamProgressManager):
|
|||
|
||||
current_blob_num = self.last_blob_outputted + 1
|
||||
|
||||
if current_blob_num in blobs and blobs[current_blob_num].is_validated():
|
||||
if current_blob_num in blobs and blobs[current_blob_num].get_is_verified():
|
||||
log.debug("Outputting blob %s", str(self.last_blob_outputted + 1))
|
||||
self.provided_blob_nums.append(self.last_blob_outputted + 1)
|
||||
d = self.download_manager.handle_blob(self.last_blob_outputted + 1)
|
||||
|
@ -154,10 +154,11 @@ class FullStreamProgressManager(StreamProgressManager):
|
|||
d.addCallback(lambda _: check_if_finished())
|
||||
|
||||
def log_error(err):
|
||||
log.warning("Error occurred in the output loop. Error: %s", err.getErrorMessage())
|
||||
log.warning("Error occurred in the output loop. Error: %s", err)
|
||||
if self.outputting_d is not None and not self.outputting_d.called:
|
||||
self.outputting_d.callback(True)
|
||||
self.outputting_d = None
|
||||
self.stop()
|
||||
|
||||
d.addErrback(log_error)
|
||||
else:
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
from Crypto.Hash import SHA384
|
||||
import seccure
|
||||
import hashlib
|
||||
|
||||
|
||||
def get_lbry_hash_obj():
|
||||
return SHA384.new()
|
||||
return hashlib.sha384()
|
||||
|
||||
|
||||
def get_pub_key(pass_phrase):
|
||||
|
|
|
@ -143,7 +143,7 @@ class BlobRequestHandler(object):
|
|||
def open_blob_for_reading(self, blob, response):
|
||||
response_fields = {}
|
||||
d = defer.succeed(None)
|
||||
if blob.is_validated():
|
||||
if blob.get_is_verified():
|
||||
read_handle = blob.open_for_reading()
|
||||
if read_handle is not None:
|
||||
self.currently_uploading = blob
|
||||
|
|
|
@ -1,11 +1,16 @@
|
|||
import binascii
|
||||
import logging
|
||||
from Crypto.Cipher import AES
|
||||
from twisted.internet import defer
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, modes
|
||||
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
||||
from cryptography.hazmat.primitives.padding import PKCS7
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from lbrynet import conf
|
||||
from lbrynet.core.BlobInfo import BlobInfo
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
backend = default_backend()
|
||||
|
||||
|
||||
class CryptBlobInfo(BlobInfo):
|
||||
|
@ -31,7 +36,9 @@ class StreamBlobDecryptor(object):
|
|||
self.length = length
|
||||
self.buff = b''
|
||||
self.len_read = 0
|
||||
self.cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
|
||||
cipher = Cipher(AES(self.key), modes.CBC(self.iv), backend=backend)
|
||||
self.unpadder = PKCS7(AES.block_size).unpadder()
|
||||
self.cipher = cipher.decryptor()
|
||||
|
||||
def decrypt(self, write_func):
|
||||
"""
|
||||
|
@ -42,22 +49,23 @@ class StreamBlobDecryptor(object):
|
|||
"""
|
||||
|
||||
def remove_padding(data):
|
||||
pad_len = ord(data[-1])
|
||||
data, padding = data[:-1 * pad_len], data[-1 * pad_len:]
|
||||
for c in padding:
|
||||
assert ord(c) == pad_len
|
||||
return data
|
||||
return self.unpadder.update(data) + self.unpadder.finalize()
|
||||
|
||||
def write_bytes():
|
||||
if self.len_read < self.length:
|
||||
num_bytes_to_decrypt = greatest_multiple(len(self.buff), self.cipher.block_size)
|
||||
num_bytes_to_decrypt = greatest_multiple(len(self.buff), (AES.block_size / 8))
|
||||
data_to_decrypt, self.buff = split(self.buff, num_bytes_to_decrypt)
|
||||
write_func(self.cipher.decrypt(data_to_decrypt))
|
||||
write_func(self.cipher.update(data_to_decrypt))
|
||||
|
||||
def finish_decrypt():
|
||||
assert len(self.buff) % self.cipher.block_size == 0
|
||||
bytes_left = len(self.buff) % (AES.block_size / 8)
|
||||
if bytes_left != 0:
|
||||
log.warning(self.buff[-1 * (AES.block_size / 8):].encode('hex'))
|
||||
raise Exception("blob %s has incorrect padding: %i bytes left" %
|
||||
(self.blob.blob_hash, bytes_left))
|
||||
data_to_decrypt, self.buff = self.buff, b''
|
||||
write_func(remove_padding(self.cipher.decrypt(data_to_decrypt)))
|
||||
last_chunk = self.cipher.update(data_to_decrypt) + self.cipher.finalize()
|
||||
write_func(remove_padding(last_chunk))
|
||||
|
||||
def decrypt_bytes(data):
|
||||
self.buff += data
|
||||
|
@ -84,8 +92,9 @@ class CryptStreamBlobMaker(object):
|
|||
self.iv = iv
|
||||
self.blob_num = blob_num
|
||||
self.blob = blob
|
||||
self.cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
|
||||
self.buff = b''
|
||||
cipher = Cipher(AES(self.key), modes.CBC(self.iv), backend=backend)
|
||||
self.padder = PKCS7(AES.block_size).padder()
|
||||
self.cipher = cipher.encryptor()
|
||||
self.length = 0
|
||||
|
||||
def write(self, data):
|
||||
|
@ -104,39 +113,26 @@ class CryptStreamBlobMaker(object):
|
|||
done = True
|
||||
else:
|
||||
num_bytes_to_write = len(data)
|
||||
self.length += num_bytes_to_write
|
||||
data_to_write = data[:num_bytes_to_write]
|
||||
self.buff += data_to_write
|
||||
self._write_buffer()
|
||||
self.length += len(data_to_write)
|
||||
padded_data = self.padder.update(data_to_write)
|
||||
encrypted_data = self.cipher.update(padded_data)
|
||||
self.blob.write(encrypted_data)
|
||||
return done, num_bytes_to_write
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def close(self):
|
||||
log.debug("closing blob %s with plaintext len %s", str(self.blob_num), str(self.length))
|
||||
if self.length != 0:
|
||||
self._close_buffer()
|
||||
d = self.blob.close()
|
||||
d.addCallback(self._return_info)
|
||||
self.length += (AES.block_size / 8) - (self.length % (AES.block_size / 8))
|
||||
padded_data = self.padder.finalize()
|
||||
encrypted_data = self.cipher.update(padded_data) + self.cipher.finalize()
|
||||
self.blob.write(encrypted_data)
|
||||
|
||||
blob_hash = yield self.blob.close()
|
||||
log.debug("called the finished_callback from CryptStreamBlobMaker.close")
|
||||
return d
|
||||
|
||||
def _write_buffer(self):
|
||||
num_bytes_to_encrypt = (len(self.buff) // AES.block_size) * AES.block_size
|
||||
data_to_encrypt, self.buff = split(self.buff, num_bytes_to_encrypt)
|
||||
encrypted_data = self.cipher.encrypt(data_to_encrypt)
|
||||
self.blob.write(encrypted_data)
|
||||
|
||||
def _close_buffer(self):
|
||||
data_to_encrypt, self.buff = self.buff, b''
|
||||
assert len(data_to_encrypt) < AES.block_size
|
||||
pad_len = AES.block_size - len(data_to_encrypt)
|
||||
padded_data = data_to_encrypt + chr(pad_len) * pad_len
|
||||
self.length += pad_len
|
||||
assert len(padded_data) == AES.block_size
|
||||
encrypted_data = self.cipher.encrypt(padded_data)
|
||||
self.blob.write(encrypted_data)
|
||||
|
||||
def _return_info(self, blob_hash):
|
||||
return CryptBlobInfo(blob_hash, self.blob_num, self.length, binascii.hexlify(self.iv))
|
||||
blob = CryptBlobInfo(blob_hash, self.blob_num, self.length, binascii.hexlify(self.iv))
|
||||
defer.returnValue(blob)
|
||||
|
||||
|
||||
def greatest_multiple(a, b):
|
||||
|
|
|
@ -3,25 +3,27 @@ Utility for creating Crypt Streams, which are encrypted blobs and associated met
|
|||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from twisted.internet import interfaces, defer
|
||||
from zope.interface import implements
|
||||
from Crypto import Random
|
||||
from Crypto.Cipher import AES
|
||||
|
||||
from twisted.internet import defer
|
||||
from lbrynet.core.StreamCreator import StreamCreator
|
||||
from lbrynet.cryptstream.CryptBlob import CryptStreamBlobMaker
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CryptStreamCreator(StreamCreator):
|
||||
"""Create a new stream with blobs encrypted by a symmetric cipher.
|
||||
class CryptStreamCreator(object):
|
||||
"""
|
||||
Create a new stream with blobs encrypted by a symmetric cipher.
|
||||
|
||||
Each blob is encrypted with the same key, but each blob has its
|
||||
own initialization vector which is associated with the blob when
|
||||
the blob is associated with the stream.
|
||||
"""
|
||||
|
||||
implements(interfaces.IConsumer)
|
||||
|
||||
def __init__(self, blob_manager, name=None, key=None, iv_generator=None):
|
||||
"""@param blob_manager: Object that stores and provides access to blobs.
|
||||
@type blob_manager: BlobManager
|
||||
|
@ -39,14 +41,59 @@ class CryptStreamCreator(StreamCreator):
|
|||
|
||||
@return: None
|
||||
"""
|
||||
StreamCreator.__init__(self, name)
|
||||
self.blob_manager = blob_manager
|
||||
self.name = name
|
||||
self.key = key
|
||||
if iv_generator is None:
|
||||
self.iv_generator = self.random_iv_generator()
|
||||
else:
|
||||
self.iv_generator = iv_generator
|
||||
|
||||
self.stopped = True
|
||||
self.producer = None
|
||||
self.streaming = None
|
||||
self.blob_count = -1
|
||||
self.current_blob = None
|
||||
self.finished_deferreds = []
|
||||
|
||||
def registerProducer(self, producer, streaming):
|
||||
from twisted.internet import reactor
|
||||
|
||||
self.producer = producer
|
||||
self.streaming = streaming
|
||||
self.stopped = False
|
||||
if streaming is False:
|
||||
reactor.callLater(0, self.producer.resumeProducing)
|
||||
|
||||
def unregisterProducer(self):
|
||||
self.stopped = True
|
||||
self.producer = None
|
||||
|
||||
def stop(self):
|
||||
"""Stop creating the stream. Create the terminating zero-length blob."""
|
||||
log.debug("stop has been called for StreamCreator")
|
||||
self.stopped = True
|
||||
if self.current_blob is not None:
|
||||
current_blob = self.current_blob
|
||||
d = current_blob.close()
|
||||
d.addCallback(self._blob_finished)
|
||||
d.addErrback(self._error)
|
||||
self.finished_deferreds.append(d)
|
||||
self.current_blob = None
|
||||
self._finalize()
|
||||
dl = defer.DeferredList(self.finished_deferreds)
|
||||
dl.addCallback(lambda _: self._finished())
|
||||
dl.addErrback(self._error)
|
||||
return dl
|
||||
|
||||
# TODO: move the stream creation process to its own thread and
|
||||
# remove the reactor from this process.
|
||||
def write(self, data):
|
||||
from twisted.internet import reactor
|
||||
self._write(data)
|
||||
if self.stopped is False and self.streaming is False:
|
||||
reactor.callLater(0, self.producer.resumeProducing)
|
||||
|
||||
@staticmethod
|
||||
def random_iv_generator():
|
||||
while 1:
|
||||
|
@ -77,11 +124,6 @@ class CryptStreamCreator(StreamCreator):
|
|||
self.finished_deferreds.append(d)
|
||||
|
||||
def _write(self, data):
|
||||
def close_blob(blob):
|
||||
d = blob.close()
|
||||
d.addCallback(self._blob_finished)
|
||||
self.finished_deferreds.append(d)
|
||||
|
||||
while len(data) > 0:
|
||||
if self.current_blob is None:
|
||||
self.next_blob_creator = self.blob_manager.get_blob_creator()
|
||||
|
@ -94,10 +136,19 @@ class CryptStreamCreator(StreamCreator):
|
|||
should_announce = self.blob_count == 0
|
||||
d = self.current_blob.close()
|
||||
d.addCallback(self._blob_finished)
|
||||
d.addCallback(lambda _: self.blob_manager.creator_finished(
|
||||
self.next_blob_creator, should_announce))
|
||||
d.addCallback(lambda blob_info: self.blob_manager.creator_finished(blob_info,
|
||||
should_announce))
|
||||
self.finished_deferreds.append(d)
|
||||
self.current_blob = None
|
||||
|
||||
def _get_blob_maker(self, iv, blob_creator):
|
||||
return CryptStreamBlobMaker(self.key, iv, self.blob_count, blob_creator)
|
||||
|
||||
def _error(self, error):
|
||||
log.error(error)
|
||||
|
||||
def _finished(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _blob_finished(self, blob_info):
|
||||
raise NotImplementedError()
|
||||
|
|
|
@ -29,7 +29,6 @@ from lbrynet.reflector import reupload
|
|||
from lbrynet.reflector import ServerFactory as reflector_server_factory
|
||||
from lbrynet.core.log_support import configure_loggly_handler
|
||||
from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory
|
||||
from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileOpenerFactory
|
||||
from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
|
||||
from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager
|
||||
from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType
|
||||
|
@ -578,17 +577,8 @@ class Daemon(AuthJSONRPCServer):
|
|||
self.session.wallet,
|
||||
self.download_directory
|
||||
)
|
||||
self.sd_identifier.add_stream_downloader_factory(
|
||||
EncryptedFileStreamType, file_saver_factory)
|
||||
file_opener_factory = EncryptedFileOpenerFactory(
|
||||
self.session.peer_finder,
|
||||
self.session.rate_limiter,
|
||||
self.session.blob_manager,
|
||||
self.stream_info_manager,
|
||||
self.session.wallet
|
||||
)
|
||||
self.sd_identifier.add_stream_downloader_factory(
|
||||
EncryptedFileStreamType, file_opener_factory)
|
||||
self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType,
|
||||
file_saver_factory)
|
||||
return defer.succeed(None)
|
||||
|
||||
def _download_blob(self, blob_hash, rate_manager=None, timeout=None):
|
||||
|
@ -2467,10 +2457,11 @@ class Daemon(AuthJSONRPCServer):
|
|||
blob_hashes = [blob_hash]
|
||||
elif stream_hash:
|
||||
blobs = yield self.get_blobs_for_stream_hash(stream_hash)
|
||||
blob_hashes = [blob.blob_hash for blob in blobs if blob.is_validated()]
|
||||
blob_hashes = [blob.blob_hash for blob in blobs if blob.get_is_verified()]
|
||||
elif sd_hash:
|
||||
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
|
||||
blob_hashes = [sd_hash] + [blob.blob_hash for blob in blobs if blob.is_validated()]
|
||||
blob_hashes = [sd_hash] + [blob.blob_hash for blob in blobs if
|
||||
blob.get_is_verified()]
|
||||
else:
|
||||
raise Exception('single argument must be specified')
|
||||
yield self.session.blob_manager._immediate_announce(blob_hashes)
|
||||
|
@ -2573,9 +2564,9 @@ class Daemon(AuthJSONRPCServer):
|
|||
blobs = self.session.blob_manager.blobs.itervalues()
|
||||
|
||||
if needed:
|
||||
blobs = [blob for blob in blobs if not blob.is_validated()]
|
||||
blobs = [blob for blob in blobs if not blob.get_is_verified()]
|
||||
if finished:
|
||||
blobs = [blob for blob in blobs if blob.is_validated()]
|
||||
blobs = [blob for blob in blobs if blob.get_is_verified()]
|
||||
|
||||
blob_hashes = [blob.blob_hash for blob in blobs]
|
||||
page_size = page_size or len(blob_hashes)
|
||||
|
|
|
@ -64,9 +64,8 @@ class MarketFeed(object):
|
|||
self.rate = ExchangeRate(self.market, price, int(time.time()))
|
||||
|
||||
def _log_error(self, err):
|
||||
log.warning(
|
||||
"There was a problem updating %s exchange rate information from %s",
|
||||
self.market, self.name)
|
||||
log.warning("There was a problem updating %s exchange rate information from %s\n%s",
|
||||
self.market, self.name, err)
|
||||
|
||||
def _update_price(self):
|
||||
d = threads.deferToThread(self._make_request)
|
||||
|
@ -141,7 +140,10 @@ class LBRYioBTCFeed(MarketFeed):
|
|||
)
|
||||
|
||||
def _handle_response(self, response):
|
||||
try:
|
||||
json_response = json.loads(response)
|
||||
except ValueError:
|
||||
raise InvalidExchangeRateResponse(self.name, "invalid rate response : %s" % response)
|
||||
if 'data' not in json_response:
|
||||
raise InvalidExchangeRateResponse(self.name, 'result not found')
|
||||
return defer.succeed(1.0 / json_response['data']['btc_usd'])
|
||||
|
|
|
@ -29,8 +29,9 @@ class EncryptedFileStreamCreator(CryptStreamCreator):
|
|||
self.blob_infos = []
|
||||
|
||||
def _blob_finished(self, blob_info):
|
||||
log.debug("length: %s", str(blob_info.length))
|
||||
log.debug("length: %s", blob_info.length)
|
||||
self.blob_infos.append(blob_info)
|
||||
return blob_info
|
||||
|
||||
def _save_stream_info(self):
|
||||
stream_info_manager = self.lbry_file_manager.stream_info_manager
|
||||
|
@ -40,10 +41,6 @@ class EncryptedFileStreamCreator(CryptStreamCreator):
|
|||
self.blob_infos)
|
||||
return d
|
||||
|
||||
def setup(self):
|
||||
d = CryptStreamCreator.setup(self)
|
||||
return d
|
||||
|
||||
def _get_blobs_hashsum(self):
|
||||
blobs_hashsum = get_lbry_hash_obj()
|
||||
for blob_info in sorted(self.blob_infos, key=lambda b_i: b_i.blob_num):
|
||||
|
|
|
@ -303,8 +303,10 @@ class EncryptedFileManager(object):
|
|||
|
||||
@rerun_if_locked
|
||||
def _change_file_status(self, rowid, new_status):
|
||||
return self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?",
|
||||
d = self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?",
|
||||
(new_status, rowid))
|
||||
d.addCallback(lambda _: new_status)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_lbry_file_status(self, rowid):
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import subprocess
|
||||
import binascii
|
||||
|
||||
from zope.interface import implements
|
||||
|
@ -10,8 +9,7 @@ from lbrynet.core.StreamDescriptor import StreamMetadata
|
|||
from lbrynet.interfaces import IStreamDownloaderFactory
|
||||
from lbrynet.lbry_file.client.EncryptedFileMetadataHandler import EncryptedFileMetadataHandler
|
||||
import os
|
||||
from twisted.internet import defer, threads, reactor
|
||||
from twisted.python.procutils import which
|
||||
from twisted.internet import defer, threads
|
||||
import logging
|
||||
import traceback
|
||||
|
||||
|
@ -282,90 +280,3 @@ class EncryptedFileSaverFactory(EncryptedFileDownloaderFactory):
|
|||
@staticmethod
|
||||
def get_description():
|
||||
return "Save"
|
||||
|
||||
|
||||
class EncryptedFileOpener(EncryptedFileDownloader):
|
||||
def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
|
||||
payment_rate_manager, wallet):
|
||||
EncryptedFileDownloader.__init__(self, stream_hash,
|
||||
peer_finder, rate_limiter,
|
||||
blob_manager, stream_info_manager,
|
||||
payment_rate_manager, wallet,
|
||||
)
|
||||
self.process = None
|
||||
self.process_log = None
|
||||
|
||||
def stop(self, err=None):
|
||||
d = EncryptedFileDownloader.stop(self, err=err)
|
||||
d.addCallback(lambda _: self._delete_from_info_manager())
|
||||
return d
|
||||
|
||||
def _get_progress_manager(self, download_manager):
|
||||
return FullStreamProgressManager(self._finished_downloading,
|
||||
self.blob_manager,
|
||||
download_manager)
|
||||
|
||||
def _setup_output(self):
|
||||
def start_process():
|
||||
if os.name == "nt":
|
||||
paths = [r'C:\Program Files\VideoLAN\VLC\vlc.exe',
|
||||
r'C:\Program Files (x86)\VideoLAN\VLC\vlc.exe']
|
||||
for p in paths:
|
||||
if os.path.exists(p):
|
||||
vlc_path = p
|
||||
break
|
||||
else:
|
||||
raise ValueError("You must install VLC media player to stream files")
|
||||
else:
|
||||
vlc_path = 'vlc'
|
||||
self.process_log = open("vlc.out", 'a')
|
||||
try:
|
||||
self.process = subprocess.Popen([vlc_path, '-'], stdin=subprocess.PIPE,
|
||||
stdout=self.process_log, stderr=self.process_log)
|
||||
except OSError:
|
||||
raise ValueError("VLC media player could not be opened")
|
||||
|
||||
d = threads.deferToThread(start_process)
|
||||
return d
|
||||
|
||||
def _close_output(self):
|
||||
if self.process is not None:
|
||||
self.process.stdin.close()
|
||||
self.process = None
|
||||
return defer.succeed(True)
|
||||
|
||||
def _get_write_func(self):
|
||||
def write_func(data):
|
||||
if self.stopped is False and self.process is not None:
|
||||
try:
|
||||
self.process.stdin.write(data)
|
||||
except IOError:
|
||||
reactor.callLater(0, self.stop)
|
||||
return write_func
|
||||
|
||||
def _delete_from_info_manager(self):
|
||||
return self.stream_info_manager.delete_stream(self.stream_hash)
|
||||
|
||||
|
||||
class EncryptedFileOpenerFactory(EncryptedFileDownloaderFactory):
|
||||
def can_download(self, sd_validator):
|
||||
if which('vlc'):
|
||||
return True
|
||||
elif os.name == "nt":
|
||||
paths = [r'C:\Program Files\VideoLAN\VLC\vlc.exe',
|
||||
r'C:\Program Files (x86)\VideoLAN\VLC\vlc.exe']
|
||||
for p in paths:
|
||||
if os.path.exists(p):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _make_downloader(self, stream_hash, payment_rate_manager, stream_info):
|
||||
return EncryptedFileOpener(stream_hash, self.peer_finder,
|
||||
self.rate_limiter, self.blob_manager,
|
||||
self.stream_info_manager,
|
||||
payment_rate_manager, self.wallet,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def get_description():
|
||||
return "Stream"
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import logging
|
||||
from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
from lbrynet.cryptstream.CryptBlob import CryptBlobInfo
|
||||
from lbrynet.interfaces import IMetadataHandler
|
||||
|
||||
|
@ -18,10 +19,11 @@ class EncryptedFileMetadataHandler(object):
|
|||
|
||||
######### IMetadataHandler #########
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_initial_blobs(self):
|
||||
d = self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
|
||||
d.addCallback(self._format_initial_blobs_for_download_manager)
|
||||
return d
|
||||
blob_infos = yield self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
|
||||
formatted_infos = self._format_initial_blobs_for_download_manager(blob_infos)
|
||||
defer.returnValue(formatted_infos)
|
||||
|
||||
def final_blob_num(self):
|
||||
return self._final_blob_num
|
||||
|
@ -30,10 +32,12 @@ class EncryptedFileMetadataHandler(object):
|
|||
|
||||
def _format_initial_blobs_for_download_manager(self, blob_infos):
|
||||
infos = []
|
||||
for blob_hash, blob_num, iv, length in blob_infos:
|
||||
if blob_hash is not None:
|
||||
for i, (blob_hash, blob_num, iv, length) in enumerate(blob_infos):
|
||||
if blob_hash is not None and length:
|
||||
infos.append(CryptBlobInfo(blob_hash, blob_num, length, iv))
|
||||
else:
|
||||
if i != len(blob_infos) - 1:
|
||||
raise Exception("Invalid stream terminator")
|
||||
log.debug("Setting _final_blob_num to %s", str(blob_num - 1))
|
||||
self._final_blob_num = blob_num - 1
|
||||
return infos
|
||||
|
|
|
@ -132,7 +132,7 @@ class BlobReflectorClient(Protocol):
|
|||
return self.set_not_uploading()
|
||||
|
||||
def open_blob_for_reading(self, blob):
|
||||
if blob.is_validated():
|
||||
if blob.get_is_verified():
|
||||
read_handle = blob.open_for_reading()
|
||||
if read_handle is not None:
|
||||
log.debug('Getting ready to send %s', blob.blob_hash)
|
||||
|
|
|
@ -112,11 +112,11 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
def get_validated_blobs(self, blobs_in_stream):
|
||||
def get_blobs(blobs):
|
||||
for (blob, _, _, blob_len) in blobs:
|
||||
if blob:
|
||||
if blob and blob_len:
|
||||
yield self.blob_manager.get_blob(blob, blob_len)
|
||||
|
||||
dl = defer.DeferredList(list(get_blobs(blobs_in_stream)), consumeErrors=True)
|
||||
dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.is_validated()])
|
||||
dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.get_is_verified()])
|
||||
return dl
|
||||
|
||||
def set_blobs_to_send(self, blobs_to_send):
|
||||
|
@ -253,7 +253,7 @@ class EncryptedFileReflectorClient(Protocol):
|
|||
return self.set_not_uploading()
|
||||
|
||||
def open_blob_for_reading(self, blob):
|
||||
if blob.is_validated():
|
||||
if blob.get_is_verified():
|
||||
read_handle = blob.open_for_reading()
|
||||
if read_handle is not None:
|
||||
log.debug('Getting ready to send %s', blob.blob_hash)
|
||||
|
|
|
@ -35,11 +35,11 @@ class ReflectorServer(Protocol):
|
|||
self.peer_version = None
|
||||
self.receiving_blob = False
|
||||
self.incoming_blob = None
|
||||
self.blob_write = None
|
||||
self.blob_finished_d = None
|
||||
self.cancel_write = None
|
||||
self.request_buff = ""
|
||||
|
||||
self.blob_writer = None
|
||||
|
||||
def connectionLost(self, reason=failure.Failure(error.ConnectionDone())):
|
||||
log.info("Reflector upload from %s finished" % self.peer.host)
|
||||
|
||||
|
@ -82,14 +82,14 @@ class ReflectorServer(Protocol):
|
|||
"""
|
||||
|
||||
blob = self.incoming_blob
|
||||
self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer)
|
||||
self.blob_writer, self.blob_finished_d = blob.open_for_writing(self.peer)
|
||||
self.blob_finished_d.addCallback(self._on_completed_blob, response_key)
|
||||
self.blob_finished_d.addErrback(self._on_failed_blob, response_key)
|
||||
|
||||
def close_blob(self):
|
||||
self.blob_writer.close()
|
||||
self.blob_writer = None
|
||||
self.blob_finished_d = None
|
||||
self.blob_write = None
|
||||
self.cancel_write = None
|
||||
self.incoming_blob = None
|
||||
self.receiving_blob = False
|
||||
|
||||
|
@ -99,7 +99,7 @@ class ReflectorServer(Protocol):
|
|||
|
||||
def dataReceived(self, data):
|
||||
if self.receiving_blob:
|
||||
self.blob_write(data)
|
||||
self.blob_writer.write(data)
|
||||
else:
|
||||
log.debug('Not yet recieving blob, data needs further processing')
|
||||
self.request_buff += data
|
||||
|
@ -110,7 +110,7 @@ class ReflectorServer(Protocol):
|
|||
d.addErrback(self.handle_error)
|
||||
if self.receiving_blob and extra_data:
|
||||
log.debug('Writing extra data to blob')
|
||||
self.blob_write(extra_data)
|
||||
self.blob_writer.write(extra_data)
|
||||
|
||||
def _get_valid_response(self, response_msg):
|
||||
extra_data = None
|
||||
|
@ -221,7 +221,7 @@ class ReflectorServer(Protocol):
|
|||
sd_blob_hash = request_dict[SD_BLOB_HASH]
|
||||
sd_blob_size = request_dict[SD_BLOB_SIZE]
|
||||
|
||||
if self.blob_write is None:
|
||||
if self.blob_writer is None:
|
||||
d = self.blob_manager.get_blob(sd_blob_hash, sd_blob_size)
|
||||
d.addCallback(self.get_descriptor_response)
|
||||
d.addCallback(self.send_response)
|
||||
|
@ -231,7 +231,7 @@ class ReflectorServer(Protocol):
|
|||
return d
|
||||
|
||||
def get_descriptor_response(self, sd_blob):
|
||||
if sd_blob.is_validated():
|
||||
if sd_blob.get_is_verified():
|
||||
d = defer.succeed({SEND_SD_BLOB: False})
|
||||
d.addCallback(self.request_needed_blobs, sd_blob)
|
||||
else:
|
||||
|
@ -267,7 +267,7 @@ class ReflectorServer(Protocol):
|
|||
if 'blob_hash' in blob and 'length' in blob:
|
||||
blob_hash, blob_len = blob['blob_hash'], blob['length']
|
||||
d = self.blob_manager.get_blob(blob_hash, blob_len)
|
||||
d.addCallback(lambda blob: blob_hash if not blob.is_validated() else None)
|
||||
d.addCallback(lambda blob: blob_hash if not blob.get_is_verified() else None)
|
||||
yield d
|
||||
|
||||
def handle_blob_request(self, request_dict):
|
||||
|
@ -293,7 +293,7 @@ class ReflectorServer(Protocol):
|
|||
blob_hash = request_dict[BLOB_HASH]
|
||||
blob_size = request_dict[BLOB_SIZE]
|
||||
|
||||
if self.blob_write is None:
|
||||
if self.blob_writer is None:
|
||||
log.debug('Received info for blob: %s', blob_hash[:16])
|
||||
d = self.blob_manager.get_blob(blob_hash, blob_size)
|
||||
d.addCallback(self.get_blob_response)
|
||||
|
@ -305,7 +305,7 @@ class ReflectorServer(Protocol):
|
|||
return d
|
||||
|
||||
def get_blob_response(self, blob):
|
||||
if blob.is_validated():
|
||||
if blob.get_is_verified():
|
||||
return defer.succeed({SEND_BLOB: False})
|
||||
else:
|
||||
self.incoming_blob = blob
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
Twisted==16.6.0
|
||||
cryptography==2.0.3
|
||||
appdirs==1.4.3
|
||||
argparse==1.2.1
|
||||
docopt==0.6.2
|
||||
|
|
|
@ -10,7 +10,7 @@ from twisted.internet import reactor
|
|||
|
||||
from lbrynet import conf
|
||||
from lbrynet.cryptstream import CryptBlob
|
||||
from lbrynet.core import HashBlob
|
||||
from lbrynet.blob import BlobFile
|
||||
from lbrynet.core import log_support
|
||||
|
||||
|
||||
|
@ -46,7 +46,7 @@ def decrypt_blob(blob_file, key, iv, output):
|
|||
filename = os.path.abspath(blob_file)
|
||||
length = os.path.getsize(filename)
|
||||
directory, blob_hash = os.path.split(filename)
|
||||
blob = HashBlob.BlobFile(directory, blob_hash, True, length)
|
||||
blob = BlobFile(directory, blob_hash, length)
|
||||
decryptor = CryptBlob.StreamBlobDecryptor(
|
||||
blob, binascii.unhexlify(key), binascii.unhexlify(iv), length)
|
||||
with open(output, 'w') as f:
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet import reactor
|
||||
|
@ -13,7 +13,7 @@ from lbrynet import conf
|
|||
from lbrynet.core import log_support
|
||||
from lbrynet.core import BlobManager
|
||||
from lbrynet.core import HashAnnouncer
|
||||
from lbrynet.core import HashBlob
|
||||
from lbrynet.blob import BlobFile
|
||||
from lbrynet.core import RateLimiter
|
||||
from lbrynet.core import Peer
|
||||
from lbrynet.core import Wallet
|
||||
|
@ -31,13 +31,14 @@ def main(args=None):
|
|||
parser.add_argument('--timeout', type=int, default=30)
|
||||
parser.add_argument('peer')
|
||||
parser.add_argument('blob_hash')
|
||||
parser.add_argument('directory', type=str, default=os.getcwd())
|
||||
args = parser.parse_args(args)
|
||||
|
||||
log_support.configure_console(level='DEBUG')
|
||||
|
||||
announcer = HashAnnouncer.DummyHashAnnouncer()
|
||||
blob_manager = MyBlobManager(announcer)
|
||||
blob = HashBlob.TempBlob(args.blob_hash, False)
|
||||
blob = BlobFile(args.directory, args.blob_hash)
|
||||
download_manager = SingleBlobDownloadManager(blob)
|
||||
peer = Peer.Peer(*conf.server_port(args.peer))
|
||||
payment_rate_manager = DumbPaymentRateManager()
|
||||
|
|
|
@ -1,17 +1,18 @@
|
|||
"""Encrypt a single file using the given key and iv"""
|
||||
import argparse
|
||||
import binascii
|
||||
import logging
|
||||
import StringIO
|
||||
import sys
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet import reactor
|
||||
from twisted.protocols import basic
|
||||
from twisted.web.client import FileBodyProducer
|
||||
|
||||
from lbrynet import conf
|
||||
from lbrynet.cryptstream import CryptBlob
|
||||
from lbrynet.core import log_support
|
||||
from lbrynet.core import cryptoutils
|
||||
from lbrynet.core.HashAnnouncer import DummyHashAnnouncer
|
||||
from lbrynet.core.BlobManager import DiskBlobManager
|
||||
from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator
|
||||
|
||||
|
||||
log = logging.getLogger('decrypt_blob')
|
||||
|
@ -26,7 +27,7 @@ def main():
|
|||
args = parser.parse_args()
|
||||
log_support.configure_console(level='DEBUG')
|
||||
|
||||
d = run(args)
|
||||
run(args)
|
||||
reactor.run()
|
||||
|
||||
|
||||
|
@ -40,29 +41,23 @@ def run(args):
|
|||
reactor.callLater(0, reactor.stop)
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def encrypt_blob(filename, key, iv):
|
||||
blob = Blob()
|
||||
blob_maker = CryptBlob.CryptStreamBlobMaker(
|
||||
binascii.unhexlify(key), binascii.unhexlify(iv), 0, blob)
|
||||
with open(filename) as fin:
|
||||
blob_maker.write(fin.read())
|
||||
blob_maker.close()
|
||||
dummy_announcer = DummyHashAnnouncer()
|
||||
manager = DiskBlobManager(dummy_announcer, '.', '.')
|
||||
yield manager.setup()
|
||||
creator = CryptStreamCreator(manager, filename, key, iv_generator(iv))
|
||||
with open(filename, 'r') as infile:
|
||||
producer = FileBodyProducer(infile, readSize=2**22)
|
||||
yield producer.startProducing(creator)
|
||||
yield creator.stop()
|
||||
|
||||
|
||||
class Blob(object):
|
||||
def __init__(self):
|
||||
self.data = StringIO.StringIO()
|
||||
|
||||
def write(self, data):
|
||||
self.data.write(data)
|
||||
|
||||
def close(self):
|
||||
hashsum = cryptoutils.get_lbry_hash_obj()
|
||||
buffer = self.data.getvalue()
|
||||
hashsum.update(buffer)
|
||||
with open(hashsum.hexdigest(), 'w') as fout:
|
||||
fout.write(buffer)
|
||||
return defer.succeed(True)
|
||||
def iv_generator(iv):
|
||||
iv = int(iv, 16)
|
||||
while 1:
|
||||
iv += 1
|
||||
yield ("%016d" % iv)[-16:]
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -182,7 +182,7 @@ class TestReflector(unittest.TestCase):
|
|||
return factory.finished_deferred
|
||||
|
||||
def verify_blob_completed(blob, blob_size):
|
||||
self.assertTrue(blob.is_validated())
|
||||
self.assertTrue(blob.get_is_verified())
|
||||
self.assertEqual(blob_size, blob.length)
|
||||
return
|
||||
|
||||
|
@ -213,7 +213,7 @@ class TestReflector(unittest.TestCase):
|
|||
return factory.finished_deferred
|
||||
|
||||
def verify_blob_completed(blob, blob_size):
|
||||
self.assertTrue(blob.is_validated())
|
||||
self.assertTrue(blob.get_is_verified())
|
||||
self.assertEqual(blob_size, blob.length)
|
||||
|
||||
d = send_to_server([x[0] for x in self.expected_blobs])
|
||||
|
@ -244,7 +244,7 @@ class TestReflector(unittest.TestCase):
|
|||
return factory.finished_deferred
|
||||
|
||||
def verify_blob_completed(blob, blob_size):
|
||||
self.assertTrue(blob.is_validated())
|
||||
self.assertTrue(blob.get_is_verified())
|
||||
self.assertEqual(blob_size, blob.length)
|
||||
|
||||
d = send_to_server([x[0] for x in self.expected_blobs])
|
||||
|
|
|
@ -53,7 +53,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
|
|||
|
||||
def test_blob_unavailable_when_blob_not_validated(self):
|
||||
blob = mock.Mock()
|
||||
blob.is_validated.return_value = False
|
||||
blob.get_is_verified.return_value = False
|
||||
self.blob_manager.get_blob.return_value = defer.succeed(blob)
|
||||
query = {
|
||||
'blob_data_payment_rate': 1.0,
|
||||
|
@ -68,7 +68,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
|
|||
|
||||
def test_blob_unavailable_when_blob_cannot_be_opened(self):
|
||||
blob = mock.Mock()
|
||||
blob.is_validated.return_value = True
|
||||
blob.get_is_verified.return_value = True
|
||||
blob.open_for_reading.return_value = None
|
||||
self.blob_manager.get_blob.return_value = defer.succeed(blob)
|
||||
query = {
|
||||
|
@ -84,7 +84,7 @@ class TestBlobRequestHandlerQueries(unittest.TestCase):
|
|||
|
||||
def test_blob_details_are_set_when_all_conditions_are_met(self):
|
||||
blob = mock.Mock()
|
||||
blob.is_validated.return_value = True
|
||||
blob.get_is_verified.return_value = True
|
||||
blob.open_for_reading.return_value = True
|
||||
blob.blob_hash = 'DEADBEEF'
|
||||
blob.length = 42
|
||||
|
|
|
@ -47,8 +47,8 @@ class BlobManagerTest(unittest.TestCase):
|
|||
yield self.bm.setup()
|
||||
blob = yield self.bm.get_blob(blob_hash,len(data))
|
||||
|
||||
finished_d, write, cancel =yield blob.open_for_writing(self.peer)
|
||||
yield write(data)
|
||||
writer, finished_d = yield blob.open_for_writing(self.peer)
|
||||
yield writer.write(data)
|
||||
yield self.bm.blob_completed(blob)
|
||||
yield self.bm.add_blob_to_upload_history(blob_hash,'test',len(data))
|
||||
|
||||
|
@ -105,7 +105,7 @@ class BlobManagerTest(unittest.TestCase):
|
|||
|
||||
# open the last blob
|
||||
blob = yield self.bm.get_blob(blob_hashes[-1])
|
||||
finished_d, write, cancel = yield blob.open_for_writing(self.peer)
|
||||
writer, finished_d = yield blob.open_for_writing(self.peer)
|
||||
|
||||
# delete the last blob and check if it still exists
|
||||
out = yield self.bm.delete_blobs([blob_hash])
|
||||
|
@ -114,4 +114,3 @@ class BlobManagerTest(unittest.TestCase):
|
|||
self.assertTrue(blob_hashes[-1] in blobs)
|
||||
self.assertTrue(os.path.isfile(os.path.join(self.blob_dir,blob_hashes[-1])))
|
||||
|
||||
blob._close_writer(blob.writers[self.peer][0])
|
||||
|
|
127
tests/unit/core/test_HashBlob.py
Normal file
127
tests/unit/core/test_HashBlob.py
Normal file
|
@ -0,0 +1,127 @@
|
|||
from lbrynet.blob import BlobFile
|
||||
from lbrynet.core.Error import DownloadCanceledError, InvalidDataError
|
||||
|
||||
|
||||
from tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir, random_lbry_hash
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
import os
|
||||
import time
|
||||
|
||||
|
||||
class BlobFileTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.db_dir, self.blob_dir = mk_db_and_blob_dir()
|
||||
self.fake_content_len = 64
|
||||
self.fake_content = bytearray('0'*self.fake_content_len)
|
||||
self.fake_content_hash = '53871b26a08e90cb62142f2a39f0b80de41792322b0ca5602b6eb7b5cf067c49498a7492bb9364bbf90f40c1c5412105'
|
||||
|
||||
def tearDown(self):
|
||||
rm_db_and_blob_dir(self.db_dir, self.blob_dir)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_good_write_and_read(self):
|
||||
# test a write that should succeed
|
||||
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
|
||||
self.assertFalse(blob_file.verified)
|
||||
|
||||
writer, finished_d = blob_file.open_for_writing(peer=1)
|
||||
writer.write(self.fake_content)
|
||||
writer.close()
|
||||
out = yield finished_d
|
||||
self.assertTrue(isinstance(out, BlobFile))
|
||||
self.assertTrue(out.verified)
|
||||
self.assertEqual(self.fake_content_len, out.get_length())
|
||||
|
||||
# read from the instance used to write to, and verify content
|
||||
f = blob_file.open_for_reading()
|
||||
c = f.read()
|
||||
self.assertEqual(c, self.fake_content)
|
||||
self.assertFalse(out.is_downloading())
|
||||
|
||||
# read from newly declared instance, and verify content
|
||||
del blob_file
|
||||
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
|
||||
self.assertTrue(blob_file.verified)
|
||||
f = blob_file.open_for_reading()
|
||||
c = f.read()
|
||||
self.assertEqual(c, self.fake_content)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_delete(self):
|
||||
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
|
||||
writer, finished_d = blob_file.open_for_writing(peer=1)
|
||||
writer.write(self.fake_content)
|
||||
out = yield finished_d
|
||||
out = yield blob_file.delete()
|
||||
|
||||
blob_file = BlobFile(self.blob_dir, self.fake_content_hash)
|
||||
self.assertFalse(blob_file.verified)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_too_much_write(self):
|
||||
# writing too much data should result in failure
|
||||
expected_length= 16
|
||||
content = bytearray('0'*32)
|
||||
blob_hash = random_lbry_hash()
|
||||
blob_file = BlobFile(self.blob_dir, blob_hash, expected_length)
|
||||
writer, finished_d = blob_file.open_for_writing(peer=1)
|
||||
writer.write(content)
|
||||
out = yield self.assertFailure(finished_d, InvalidDataError)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_bad_hash(self):
|
||||
# test a write that should fail because its content's hash
|
||||
# does not equal the blob_hash
|
||||
length= 64
|
||||
content = bytearray('0'*length)
|
||||
blob_hash = random_lbry_hash()
|
||||
blob_file = BlobFile(self.blob_dir, blob_hash, length)
|
||||
writer, finished_d = blob_file.open_for_writing(peer=1)
|
||||
writer.write(content)
|
||||
yield self.assertFailure(finished_d, InvalidDataError)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_close_on_incomplete_write(self):
|
||||
# write all but 1 byte of data,
|
||||
blob_file = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
|
||||
writer, finished_d = blob_file.open_for_writing(peer=1)
|
||||
writer.write(self.fake_content[:self.fake_content_len-1])
|
||||
writer.close()
|
||||
yield self.assertFailure(finished_d, DownloadCanceledError)
|
||||
|
||||
# writes after close will throw a IOError exception
|
||||
with self.assertRaises(IOError):
|
||||
writer.write(self.fake_content)
|
||||
|
||||
# another call to close will do nothing
|
||||
writer.close()
|
||||
|
||||
# file should not exist, since we did not finish write
|
||||
blob_file_2 = BlobFile(self.blob_dir, self.fake_content_hash, self.fake_content_len)
|
||||
out = blob_file_2.open_for_reading()
|
||||
self.assertEqual(None, out)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_multiple_writers(self):
|
||||
# start first writer and write half way, and then start second writer and write everything
|
||||
blob_hash = self.fake_content_hash
|
||||
blob_file = BlobFile(self.blob_dir, blob_hash, self.fake_content_len)
|
||||
writer_1, finished_d_1 = blob_file.open_for_writing(peer=1)
|
||||
writer_1.write(self.fake_content[:self.fake_content_len/2])
|
||||
|
||||
writer_2, finished_d_2 = blob_file.open_for_writing(peer=2)
|
||||
writer_2.write(self.fake_content)
|
||||
out_2 = yield finished_d_2
|
||||
out_1 = yield self.assertFailure(finished_d_1, DownloadCanceledError)
|
||||
|
||||
self.assertTrue(isinstance(out_2, BlobFile))
|
||||
self.assertTrue(out_2.verified)
|
||||
self.assertEqual(self.fake_content_len, out_2.get_length())
|
||||
|
||||
f = blob_file.open_for_reading()
|
||||
c = f.read()
|
||||
self.assertEqual(self.fake_content_len, len(c))
|
||||
self.assertEqual(bytearray(c), self.fake_content)
|
||||
|
||||
|
|
@ -1,7 +1,6 @@
|
|||
from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
from lbrynet.cryptstream import CryptBlob
|
||||
from lbrynet.core.HashBlob import TempBlobCreator
|
||||
from lbrynet import conf
|
||||
|
||||
from tests.mocks import mock_conf_settings
|
||||
|
|
Loading…
Reference in a new issue