async lbrynet.blob
This commit is contained in:
parent
c713fac2d9
commit
a5524d490c
17 changed files with 333 additions and 1330 deletions
|
@ -1,166 +0,0 @@
|
|||
import binascii
|
||||
import logging
|
||||
from io import BytesIO
|
||||
from twisted.internet import defer
|
||||
from twisted.web.client import FileBodyProducer
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, modes
|
||||
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
||||
from cryptography.hazmat.primitives.padding import PKCS7
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from lbrynet.p2p.BlobInfo import BlobInfo
|
||||
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
backend = default_backend()
|
||||
|
||||
|
||||
class CryptBlobInfo(BlobInfo):
|
||||
def __init__(self, blob_hash, blob_num, length, iv):
|
||||
super().__init__(blob_hash, blob_num, length)
|
||||
self.iv = iv
|
||||
|
||||
def get_dict(self):
|
||||
info = {
|
||||
"blob_num": self.blob_num,
|
||||
"length": self.length,
|
||||
"iv": self.iv.decode()
|
||||
}
|
||||
if self.blob_hash:
|
||||
info['blob_hash'] = self.blob_hash
|
||||
return info
|
||||
|
||||
|
||||
class StreamBlobDecryptor:
|
||||
def __init__(self, blob, key, iv, length):
|
||||
"""
|
||||
This class decrypts blob
|
||||
|
||||
blob - object which implements read() function.
|
||||
key = encryption_key
|
||||
iv = initialization vector
|
||||
blob_num = blob number (has no effect on encryption)
|
||||
length = length in bytes of blob
|
||||
"""
|
||||
self.blob = blob
|
||||
self.key = key
|
||||
self.iv = iv
|
||||
self.length = length
|
||||
self.buff = b''
|
||||
self.len_read = 0
|
||||
cipher = Cipher(AES(self.key), modes.CBC(self.iv), backend=backend)
|
||||
self.unpadder = PKCS7(AES.block_size).unpadder()
|
||||
self.cipher = cipher.decryptor()
|
||||
|
||||
def decrypt(self, write_func):
|
||||
"""
|
||||
Decrypt blob and write its content using write_func
|
||||
|
||||
write_func - function that takes decrypted string as
|
||||
argument and writes it somewhere
|
||||
|
||||
Returns:
|
||||
|
||||
deferred that returns after decrypting blob and writing content
|
||||
"""
|
||||
|
||||
def remove_padding(data):
|
||||
return self.unpadder.update(data) + self.unpadder.finalize()
|
||||
|
||||
def write_bytes():
|
||||
if self.len_read < self.length:
|
||||
num_bytes_to_decrypt = greatest_multiple(len(self.buff), (AES.block_size // 8))
|
||||
data_to_decrypt, self.buff = split(self.buff, num_bytes_to_decrypt)
|
||||
write_func(self.cipher.update(data_to_decrypt))
|
||||
|
||||
def finish_decrypt():
|
||||
bytes_left = len(self.buff) % (AES.block_size // 8)
|
||||
if bytes_left != 0:
|
||||
log.warning(binascii.hexlify(self.buff[-1 * (AES.block_size // 8):]).decode())
|
||||
raise Exception("blob %s has incorrect padding: %i bytes left" %
|
||||
(self.blob.blob_hash, bytes_left))
|
||||
data_to_decrypt, self.buff = self.buff, b''
|
||||
last_chunk = self.cipher.update(data_to_decrypt) + self.cipher.finalize()
|
||||
write_func(remove_padding(last_chunk))
|
||||
|
||||
|
||||
read_handle = self.blob.open_for_reading()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def decrypt_bytes():
|
||||
producer = FileBodyProducer(read_handle)
|
||||
buff = BytesIO()
|
||||
yield producer.startProducing(buff)
|
||||
self.buff = buff.getvalue()
|
||||
self.len_read += len(self.buff)
|
||||
write_bytes()
|
||||
finish_decrypt()
|
||||
|
||||
d = decrypt_bytes()
|
||||
return d
|
||||
|
||||
|
||||
class CryptStreamBlobMaker:
|
||||
def __init__(self, key, iv, blob_num, blob):
|
||||
"""
|
||||
This class encrypts data and writes it to a new blob
|
||||
|
||||
key = encryption_key
|
||||
iv = initialization vector
|
||||
blob_num = blob number (has no effect on encryption)
|
||||
blob = object which implements write(), close() function , close() function must
|
||||
be a deferred. (Will generally be of HashBlobCreator type)
|
||||
"""
|
||||
self.key = key
|
||||
self.iv = iv
|
||||
self.blob_num = blob_num
|
||||
self.blob = blob
|
||||
cipher = Cipher(AES(self.key), modes.CBC(self.iv), backend=backend)
|
||||
self.padder = PKCS7(AES.block_size).padder()
|
||||
self.cipher = cipher.encryptor()
|
||||
self.length = 0
|
||||
|
||||
def write(self, data):
|
||||
"""
|
||||
encrypt and write string data
|
||||
|
||||
Returns:
|
||||
tuple (done, num_bytes_to_write) where done is True if
|
||||
max bytes are written. num_bytes_to_write is the number
|
||||
of bytes that will be written from data in this call
|
||||
"""
|
||||
max_bytes_to_write = MAX_BLOB_SIZE - self.length - 1
|
||||
done = False
|
||||
if max_bytes_to_write <= len(data):
|
||||
num_bytes_to_write = max_bytes_to_write
|
||||
done = True
|
||||
else:
|
||||
num_bytes_to_write = len(data)
|
||||
data_to_write = data[:num_bytes_to_write]
|
||||
self.length += len(data_to_write)
|
||||
padded_data = self.padder.update(data_to_write)
|
||||
encrypted_data = self.cipher.update(padded_data)
|
||||
self.blob.write(encrypted_data)
|
||||
return done, num_bytes_to_write
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def close(self):
|
||||
log.debug("closing blob %s with plaintext len %s", str(self.blob_num), str(self.length))
|
||||
if self.length != 0:
|
||||
self.length += (AES.block_size // 8) - (self.length % (AES.block_size // 8))
|
||||
padded_data = self.padder.finalize()
|
||||
encrypted_data = self.cipher.update(padded_data) + self.cipher.finalize()
|
||||
self.blob.write(encrypted_data)
|
||||
|
||||
blob_hash = yield self.blob.close()
|
||||
log.debug("called the finished_callback from CryptStreamBlobMaker.close")
|
||||
blob = CryptBlobInfo(blob_hash, self.blob_num, self.length, binascii.hexlify(self.iv))
|
||||
defer.returnValue(blob)
|
||||
|
||||
|
||||
def greatest_multiple(a, b):
|
||||
"""return the largest value `c`, that is a multiple of `b` and is <= `a`"""
|
||||
return (a // b) * b
|
||||
|
||||
|
||||
def split(buff, cutoff):
|
||||
return buff[:cutoff], buff[cutoff:]
|
|
@ -1,149 +0,0 @@
|
|||
"""
|
||||
Utility for creating Crypt Streams, which are encrypted blobs and associated metadata.
|
||||
"""
|
||||
import os
|
||||
import logging
|
||||
|
||||
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
||||
from twisted.internet import defer
|
||||
from lbrynet.blob.CryptBlob import CryptStreamBlobMaker
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CryptStreamCreator:
|
||||
"""
|
||||
Create a new stream with blobs encrypted by a symmetric cipher.
|
||||
|
||||
Each blob is encrypted with the same key, but each blob has its
|
||||
own initialization vector which is associated with the blob when
|
||||
the blob is associated with the stream.
|
||||
"""
|
||||
|
||||
#implements(interfaces.IConsumer)
|
||||
|
||||
def __init__(self, blob_manager, name=None, key=None, iv_generator=None):
|
||||
"""@param blob_manager: Object that stores and provides access to blobs.
|
||||
@type blob_manager: BlobManager
|
||||
|
||||
@param name: the name of the stream, which will be presented to the user
|
||||
@type name: string
|
||||
|
||||
@param key: the raw AES key which will be used to encrypt the
|
||||
blobs. If None, a random key will be generated.
|
||||
@type key: string
|
||||
|
||||
@param iv_generator: a generator which yields initialization
|
||||
vectors for the blobs. Will be called once for each blob.
|
||||
@type iv_generator: a generator function which yields strings
|
||||
|
||||
@return: None
|
||||
"""
|
||||
self.blob_manager = blob_manager
|
||||
self.name = name
|
||||
self.key = key
|
||||
if iv_generator is None:
|
||||
self.iv_generator = self.random_iv_generator()
|
||||
else:
|
||||
self.iv_generator = iv_generator
|
||||
|
||||
self.stopped = True
|
||||
self.producer = None
|
||||
self.streaming = None
|
||||
self.blob_count = -1
|
||||
self.current_blob = None
|
||||
self.finished_deferreds = []
|
||||
|
||||
def registerProducer(self, producer, streaming):
|
||||
from twisted.internet import reactor
|
||||
|
||||
self.producer = producer
|
||||
self.streaming = streaming
|
||||
self.stopped = False
|
||||
if streaming is False:
|
||||
reactor.callLater(0, self.producer.resumeProducing)
|
||||
|
||||
def unregisterProducer(self):
|
||||
self.stopped = True
|
||||
self.producer = None
|
||||
|
||||
def _close_current_blob(self):
|
||||
# close the blob that was being written to
|
||||
# and save it to blob manager
|
||||
should_announce = self.blob_count == 0
|
||||
d = self.current_blob.close()
|
||||
d.addCallback(self._blob_finished)
|
||||
d.addCallback(lambda blob_info: self.blob_manager.creator_finished(blob_info,
|
||||
should_announce))
|
||||
self.finished_deferreds.append(d)
|
||||
self.current_blob = None
|
||||
|
||||
def stop(self):
|
||||
"""Stop creating the stream. Create the terminating zero-length blob."""
|
||||
log.debug("stop has been called for StreamCreator")
|
||||
self.stopped = True
|
||||
if self.current_blob is not None:
|
||||
self._close_current_blob()
|
||||
d = self._finalize()
|
||||
d.addCallback(lambda _: self._finished())
|
||||
return d
|
||||
|
||||
# TODO: move the stream creation process to its own thread and
|
||||
# remove the reactor from this process.
|
||||
def write(self, data):
|
||||
from twisted.internet import reactor
|
||||
self._write(data)
|
||||
if self.stopped is False and self.streaming is False:
|
||||
reactor.callLater(0, self.producer.resumeProducing)
|
||||
|
||||
@staticmethod
|
||||
def random_iv_generator():
|
||||
while 1:
|
||||
yield os.urandom(AES.block_size // 8)
|
||||
|
||||
def setup(self):
|
||||
"""Create the symmetric key if it wasn't provided"""
|
||||
|
||||
if self.key is None:
|
||||
self.key = os.urandom(AES.block_size // 8)
|
||||
|
||||
return defer.succeed(True)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _finalize(self):
|
||||
"""
|
||||
Finalize a stream by adding an empty
|
||||
blob at the end, this is to indicate that
|
||||
the stream has ended. This empty blob is not
|
||||
saved to the blob manager
|
||||
"""
|
||||
|
||||
yield defer.DeferredList(self.finished_deferreds)
|
||||
self.blob_count += 1
|
||||
iv = next(self.iv_generator)
|
||||
final_blob = self._get_blob_maker(iv, self.blob_manager.get_blob_creator())
|
||||
stream_terminator = yield final_blob.close()
|
||||
terminator_info = yield self._blob_finished(stream_terminator)
|
||||
defer.returnValue(terminator_info)
|
||||
|
||||
def _write(self, data):
|
||||
while len(data) > 0:
|
||||
if self.current_blob is None:
|
||||
self.next_blob_creator = self.blob_manager.get_blob_creator()
|
||||
self.blob_count += 1
|
||||
iv = next(self.iv_generator)
|
||||
self.current_blob = self._get_blob_maker(iv, self.next_blob_creator)
|
||||
done, num_bytes_written = self.current_blob.write(data)
|
||||
data = data[num_bytes_written:]
|
||||
if done is True:
|
||||
self._close_current_blob()
|
||||
|
||||
def _get_blob_maker(self, iv, blob_creator):
|
||||
return CryptStreamBlobMaker(self.key, iv, self.blob_count, blob_creator)
|
||||
|
||||
def _finished(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _blob_finished(self, blob_info):
|
||||
raise NotImplementedError()
|
|
@ -1,132 +0,0 @@
|
|||
"""
|
||||
Utilities for turning plain files into LBRY Files.
|
||||
"""
|
||||
|
||||
import os
|
||||
import logging
|
||||
from binascii import hexlify
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.protocols.basic import FileSender
|
||||
|
||||
from lbrynet.extras.compat import f2d
|
||||
from lbrynet.p2p.StreamDescriptor import BlobStreamDescriptorWriter, EncryptedFileStreamType
|
||||
from lbrynet.p2p.StreamDescriptor import format_sd_info, get_stream_hash, validate_descriptor
|
||||
from lbrynet.blob.CryptStreamCreator import CryptStreamCreator
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EncryptedFileStreamCreator(CryptStreamCreator):
|
||||
"""
|
||||
A CryptStreamCreator which adds itself and its additional metadata to an EncryptedFileManager
|
||||
"""
|
||||
|
||||
def __init__(self, blob_manager, lbry_file_manager, stream_name=None,
|
||||
key=None, iv_generator=None):
|
||||
super().__init__(blob_manager, stream_name, key, iv_generator)
|
||||
self.lbry_file_manager = lbry_file_manager
|
||||
self.stream_hash = None
|
||||
self.blob_infos = []
|
||||
self.sd_info = None
|
||||
|
||||
def _blob_finished(self, blob_info):
|
||||
log.debug("length: %s", blob_info.length)
|
||||
self.blob_infos.append(blob_info.get_dict())
|
||||
return blob_info
|
||||
|
||||
def _finished(self):
|
||||
# calculate the stream hash
|
||||
self.stream_hash = get_stream_hash(
|
||||
hexlify(self.name.encode()).decode(), hexlify(self.key).decode(), hexlify(self.name.encode()).decode(),
|
||||
self.blob_infos
|
||||
)
|
||||
|
||||
# generate the sd info
|
||||
self.sd_info = format_sd_info(
|
||||
EncryptedFileStreamType, hexlify(self.name.encode()).decode(), hexlify(self.key).decode(),
|
||||
hexlify(self.name.encode()).decode(), self.stream_hash, self.blob_infos
|
||||
)
|
||||
|
||||
# sanity check
|
||||
validate_descriptor(self.sd_info)
|
||||
return defer.succeed(self.stream_hash)
|
||||
|
||||
|
||||
# TODO: this should be run its own thread. Encrypting a large file can
|
||||
# be very cpu intensive and there is no need to run that on the
|
||||
# main reactor thread. The FileSender mechanism that is used is
|
||||
# great when sending over the network, but this is all local so
|
||||
# we can simply read the file from the disk without needing to
|
||||
# involve reactor.
|
||||
@defer.inlineCallbacks
|
||||
def create_lbry_file(blob_manager, storage, payment_rate_manager, lbry_file_manager, file_name, file_handle,
|
||||
key=None, iv_generator=None):
|
||||
"""Turn a plain file into an LBRY File.
|
||||
|
||||
An LBRY File is a collection of encrypted blobs of data and the metadata that binds them
|
||||
together which, when decrypted and put back together according to the metadata, results
|
||||
in the original file.
|
||||
|
||||
The stream parameters that aren't specified are generated, the file is read and broken
|
||||
into chunks and encrypted, and then a stream descriptor file with the stream parameters
|
||||
and other metadata is written to disk.
|
||||
|
||||
@param session: An Session object.
|
||||
@type session: Session
|
||||
|
||||
@param lbry_file_manager: The EncryptedFileManager object this LBRY File will be added to.
|
||||
@type lbry_file_manager: EncryptedFileManager
|
||||
|
||||
@param file_name: The path to the plain file.
|
||||
@type file_name: string
|
||||
|
||||
@param file_handle: The file-like object to read
|
||||
@type file_handle: any file-like object which can be read by twisted.protocols.basic.FileSender
|
||||
|
||||
@param key: the raw AES key which will be used to encrypt the blobs. If None, a random key will
|
||||
be generated.
|
||||
@type key: string
|
||||
|
||||
@param iv_generator: a generator which yields initialization
|
||||
vectors for the blobs. Will be called once for each blob.
|
||||
@type iv_generator: a generator function which yields strings
|
||||
|
||||
@return: a Deferred which fires with the stream_hash of the LBRY File
|
||||
@rtype: Deferred which fires with hex-encoded string
|
||||
"""
|
||||
|
||||
base_file_name = os.path.basename(file_name)
|
||||
file_directory = os.path.dirname(file_handle.name)
|
||||
|
||||
lbry_file_creator = EncryptedFileStreamCreator(
|
||||
blob_manager, lbry_file_manager, base_file_name, key, iv_generator
|
||||
)
|
||||
|
||||
yield lbry_file_creator.setup()
|
||||
# TODO: Using FileSender isn't necessary, we can just read
|
||||
# straight from the disk. The stream creation process
|
||||
# should be in its own thread anyway so we don't need to
|
||||
# worry about interacting with the twisted reactor
|
||||
file_sender = FileSender()
|
||||
yield file_sender.beginFileTransfer(file_handle, lbry_file_creator)
|
||||
|
||||
log.debug("the file sender has triggered its deferred. stopping the stream writer")
|
||||
yield lbry_file_creator.stop()
|
||||
|
||||
log.debug("making the sd blob")
|
||||
sd_info = lbry_file_creator.sd_info
|
||||
descriptor_writer = BlobStreamDescriptorWriter(blob_manager)
|
||||
sd_hash = yield descriptor_writer.create_descriptor(sd_info)
|
||||
|
||||
log.debug("saving the stream")
|
||||
yield f2d(storage.store_stream(
|
||||
sd_info['stream_hash'], sd_hash, sd_info['stream_name'], sd_info['key'],
|
||||
sd_info['suggested_file_name'], sd_info['blobs']
|
||||
))
|
||||
log.debug("adding to the file manager")
|
||||
lbry_file = yield f2d(lbry_file_manager.add_published_file(
|
||||
sd_info['stream_hash'], sd_hash, hexlify(file_directory.encode()), payment_rate_manager,
|
||||
payment_rate_manager.min_blob_data_payment_rate
|
||||
))
|
||||
defer.returnValue(lbry_file)
|
|
@ -1,189 +0,0 @@
|
|||
"""
|
||||
Download LBRY Files from LBRYnet and save them to disk.
|
||||
"""
|
||||
import logging
|
||||
from binascii import hexlify, unhexlify
|
||||
|
||||
from twisted.internet import defer
|
||||
from lbrynet.conf import Config
|
||||
from lbrynet.extras.compat import f2d
|
||||
from lbrynet.p2p.client.StreamProgressManager import FullStreamProgressManager
|
||||
from lbrynet.p2p.HTTPBlobDownloader import HTTPBlobDownloader
|
||||
from lbrynet.utils import short_hash
|
||||
from lbrynet.blob.client.EncryptedFileDownloader import EncryptedFileSaver
|
||||
from lbrynet.blob.EncryptedFileStatusReport import EncryptedFileStatusReport
|
||||
from lbrynet.p2p.StreamDescriptor import save_sd_info
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def log_status(sd_hash, status):
|
||||
if status == ManagedEncryptedFileDownloader.STATUS_RUNNING:
|
||||
status_string = "running"
|
||||
elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED:
|
||||
status_string = "stopped"
|
||||
elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED:
|
||||
status_string = "finished"
|
||||
else:
|
||||
status_string = "unknown"
|
||||
log.debug("stream %s is %s", short_hash(sd_hash), status_string)
|
||||
|
||||
|
||||
class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
||||
STATUS_RUNNING = "running"
|
||||
STATUS_STOPPED = "stopped"
|
||||
STATUS_FINISHED = "finished"
|
||||
|
||||
def __init__(self, conf: Config, rowid, stream_hash, peer_finder, rate_limiter, blob_manager, storage,
|
||||
lbry_file_manager, payment_rate_manager, wallet, download_directory, file_name, stream_name,
|
||||
sd_hash, key, suggested_file_name, download_mirrors=None):
|
||||
super().__init__(
|
||||
conf, stream_hash, peer_finder, rate_limiter, blob_manager, storage, payment_rate_manager,
|
||||
wallet, download_directory, key, stream_name, file_name
|
||||
)
|
||||
self.sd_hash = sd_hash
|
||||
self.rowid = rowid
|
||||
self.suggested_file_name = unhexlify(suggested_file_name).decode()
|
||||
self.lbry_file_manager = lbry_file_manager
|
||||
self._saving_status = False
|
||||
self.claim_id = None
|
||||
self.outpoint = None
|
||||
self.claim_name = None
|
||||
self.txid = None
|
||||
self.nout = None
|
||||
self.channel_claim_id = None
|
||||
self.channel_name = None
|
||||
self.metadata = None
|
||||
self.mirror = None
|
||||
if download_mirrors or conf.download_mirrors:
|
||||
self.mirror = HTTPBlobDownloader(
|
||||
self.blob_manager, servers=download_mirrors or conf.download_mirrors
|
||||
)
|
||||
|
||||
def set_claim_info(self, claim_info):
|
||||
self.claim_id = claim_info['claim_id']
|
||||
self.txid = claim_info['txid']
|
||||
self.nout = claim_info['nout']
|
||||
self.channel_claim_id = claim_info['channel_claim_id']
|
||||
self.outpoint = "%s:%i" % (self.txid, self.nout)
|
||||
self.claim_name = claim_info['name']
|
||||
self.channel_name = claim_info['channel_name']
|
||||
self.metadata = claim_info['value']['stream']['metadata']
|
||||
|
||||
async def get_claim_info(self, include_supports=True):
|
||||
claim_info = await self.storage.get_content_claim(self.stream_hash, include_supports)
|
||||
if claim_info:
|
||||
self.set_claim_info(claim_info)
|
||||
return claim_info
|
||||
|
||||
@property
|
||||
def saving_status(self):
|
||||
return self._saving_status
|
||||
|
||||
def restore(self, status):
|
||||
if status == ManagedEncryptedFileDownloader.STATUS_RUNNING:
|
||||
# start returns self.finished_deferred
|
||||
# which fires when we've finished downloading the file
|
||||
# and we don't want to wait for the entire download
|
||||
self.start()
|
||||
elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED:
|
||||
pass
|
||||
elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED:
|
||||
self.completed = True
|
||||
else:
|
||||
raise Exception(f"Unknown status for stream {self.stream_hash}: {status}")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def stop(self, err=None, change_status=True):
|
||||
log.debug('Stopping download for stream %s', short_hash(self.stream_hash))
|
||||
if self.mirror:
|
||||
self.mirror.stop()
|
||||
# EncryptedFileSaver deletes metadata when it's stopped. We don't want that here.
|
||||
yield super().stop(err)
|
||||
if change_status is True:
|
||||
status = yield self._save_status()
|
||||
defer.returnValue(status)
|
||||
|
||||
async def status(self):
|
||||
blobs = await self.storage.get_blobs_for_stream(self.stream_hash)
|
||||
blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None]
|
||||
completed_blobs = self.blob_manager.completed_blobs(blob_hashes)
|
||||
num_blobs_completed = len(completed_blobs)
|
||||
num_blobs_known = len(blob_hashes)
|
||||
|
||||
if self.completed:
|
||||
status = "completed"
|
||||
elif self.stopped:
|
||||
status = "stopped"
|
||||
else:
|
||||
status = "running"
|
||||
|
||||
return EncryptedFileStatusReport(
|
||||
self.file_name, num_blobs_completed, num_blobs_known, status
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _start(self):
|
||||
yield EncryptedFileSaver._start(self)
|
||||
status = yield self._save_status()
|
||||
log_status(self.sd_hash, status)
|
||||
if self.mirror:
|
||||
self.mirror.download_stream(self.stream_hash, self.sd_hash)
|
||||
defer.returnValue(status)
|
||||
|
||||
def _get_finished_deferred_callback_value(self):
|
||||
if self.completed is True:
|
||||
return "Download successful"
|
||||
else:
|
||||
return "Download stopped"
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _save_status(self):
|
||||
self._saving_status = True
|
||||
if self.completed is True:
|
||||
status = ManagedEncryptedFileDownloader.STATUS_FINISHED
|
||||
elif self.stopped is True:
|
||||
status = ManagedEncryptedFileDownloader.STATUS_STOPPED
|
||||
else:
|
||||
status = ManagedEncryptedFileDownloader.STATUS_RUNNING
|
||||
status = yield self.lbry_file_manager.change_lbry_file_status(self, status)
|
||||
self._saving_status = False
|
||||
return status
|
||||
|
||||
def save_status(self):
|
||||
return self._save_status()
|
||||
|
||||
def _get_progress_manager(self, download_manager):
|
||||
return FullStreamProgressManager(self._finished_downloading,
|
||||
self.blob_manager, download_manager)
|
||||
|
||||
|
||||
class ManagedEncryptedFileDownloaderFactory:
|
||||
#implements(IStreamDownloaderFactory)
|
||||
|
||||
def __init__(self, lbry_file_manager, blob_manager):
|
||||
self.lbry_file_manager = lbry_file_manager
|
||||
self.blob_manager = blob_manager
|
||||
|
||||
def can_download(self, sd_validator):
|
||||
# TODO: add a sd_validator for non live streams, use it
|
||||
return True
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def make_downloader(self, metadata, data_rate, payment_rate_manager, download_directory, file_name=None,
|
||||
download_mirrors=None):
|
||||
stream_hash = yield save_sd_info(self.blob_manager,
|
||||
metadata.source_blob_hash,
|
||||
metadata.validator.raw_info)
|
||||
if file_name:
|
||||
file_name = hexlify(file_name.encode())
|
||||
hex_download_directory = hexlify(download_directory.encode())
|
||||
lbry_file = yield f2d(self.lbry_file_manager.add_downloaded_file(
|
||||
stream_hash, metadata.source_blob_hash, hex_download_directory, payment_rate_manager,
|
||||
data_rate, file_name=file_name, download_mirrors=download_mirrors
|
||||
))
|
||||
defer.returnValue(lbry_file)
|
||||
|
||||
@staticmethod
|
||||
def get_description():
|
||||
return "Save the file to disk"
|
|
@ -1,254 +0,0 @@
|
|||
"""
|
||||
Keep track of which LBRY Files are downloading and store their LBRY File specific metadata
|
||||
"""
|
||||
import os
|
||||
import logging
|
||||
import random
|
||||
from binascii import hexlify, unhexlify
|
||||
|
||||
from twisted.internet import defer, task, reactor
|
||||
from twisted.python.failure import Failure
|
||||
from lbrynet.conf import Config
|
||||
from lbrynet.extras.compat import f2d
|
||||
from lbrynet.extras.reflector.reupload import reflect_file
|
||||
from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloader
|
||||
from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
|
||||
from lbrynet.p2p.StreamDescriptor import EncryptedFileStreamType, get_sd_info
|
||||
from lbrynet.blob.client.CryptStreamDownloader import AlreadyStoppedError
|
||||
from lbrynet.blob.client.CryptStreamDownloader import CurrentlyStoppingError
|
||||
from lbrynet.utils import safe_start_looping_call, safe_stop_looping_call
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EncryptedFileManager:
|
||||
"""
|
||||
Keeps track of currently opened LBRY Files, their options, and
|
||||
their LBRY File specific metadata.
|
||||
"""
|
||||
# when reflecting files, reflect up to this many files at a time
|
||||
CONCURRENT_REFLECTS = 5
|
||||
|
||||
def __init__(self, conf: Config, peer_finder, rate_limiter, blob_manager, wallet,
|
||||
payment_rate_manager, storage, sd_identifier):
|
||||
self.conf = conf
|
||||
self.auto_re_reflect = conf.reflect_uploads and conf.auto_re_reflect_interval > 0
|
||||
self.auto_re_reflect_interval = conf.auto_re_reflect_interval
|
||||
self.peer_finder = peer_finder
|
||||
self.rate_limiter = rate_limiter
|
||||
self.blob_manager = blob_manager
|
||||
self.wallet = wallet
|
||||
self.payment_rate_manager = payment_rate_manager
|
||||
self.storage = storage
|
||||
# TODO: why is sd_identifier part of the file manager?
|
||||
self.sd_identifier = sd_identifier
|
||||
self.lbry_files = []
|
||||
self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files)
|
||||
|
||||
def setup(self):
|
||||
self._add_to_sd_identifier()
|
||||
return self._start_lbry_files()
|
||||
|
||||
def get_lbry_file_status(self, lbry_file):
|
||||
return self.storage.get_lbry_file_status(lbry_file.rowid)
|
||||
|
||||
def set_lbry_file_data_payment_rate(self, lbry_file, new_rate):
|
||||
return self.storage(lbry_file.rowid, new_rate)
|
||||
|
||||
def change_lbry_file_status(self, lbry_file, status):
|
||||
log.debug("Changing status of %s to %s", lbry_file.stream_hash, status)
|
||||
return f2d(self.storage.change_file_status(lbry_file.rowid, status))
|
||||
|
||||
def get_lbry_file_status_reports(self):
|
||||
ds = []
|
||||
|
||||
for lbry_file in self.lbry_files:
|
||||
ds.append(lbry_file.status())
|
||||
|
||||
dl = defer.DeferredList(ds)
|
||||
|
||||
def filter_failures(status_reports):
|
||||
return [status_report for success, status_report in status_reports if success is True]
|
||||
|
||||
dl.addCallback(filter_failures)
|
||||
return dl
|
||||
|
||||
def _add_to_sd_identifier(self):
|
||||
downloader_factory = ManagedEncryptedFileDownloaderFactory(self, self.blob_manager)
|
||||
self.sd_identifier.add_stream_downloader_factory(
|
||||
EncryptedFileStreamType, downloader_factory)
|
||||
|
||||
def _get_lbry_file(self, rowid, stream_hash, payment_rate_manager, sd_hash, key,
|
||||
stream_name, file_name, download_directory, suggested_file_name, download_mirrors=None):
|
||||
return ManagedEncryptedFileDownloader(
|
||||
self.conf,
|
||||
rowid,
|
||||
stream_hash,
|
||||
self.peer_finder,
|
||||
self.rate_limiter,
|
||||
self.blob_manager,
|
||||
self.storage,
|
||||
self,
|
||||
payment_rate_manager,
|
||||
self.wallet,
|
||||
download_directory,
|
||||
file_name,
|
||||
stream_name=stream_name,
|
||||
sd_hash=sd_hash,
|
||||
key=key,
|
||||
suggested_file_name=suggested_file_name,
|
||||
download_mirrors=download_mirrors
|
||||
)
|
||||
|
||||
def _start_lbry_file(self, file_info, payment_rate_manager, claim_info, download_mirrors=None):
|
||||
lbry_file = self._get_lbry_file(
|
||||
file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
|
||||
file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
|
||||
file_info['suggested_file_name'], download_mirrors
|
||||
)
|
||||
if claim_info:
|
||||
lbry_file.set_claim_info(claim_info)
|
||||
try:
|
||||
# restore will raise an Exception if status is unknown
|
||||
lbry_file.restore(file_info['status'])
|
||||
self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info
|
||||
self.lbry_files.append(lbry_file)
|
||||
if len(self.lbry_files) % 500 == 0:
|
||||
log.info("Started %i files", len(self.lbry_files))
|
||||
except Exception:
|
||||
log.warning("Failed to start %i", file_info.get('rowid'))
|
||||
|
||||
async def _start_lbry_files(self):
|
||||
files = await self.storage.get_all_lbry_files()
|
||||
claim_infos = await self.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files])
|
||||
prm = self.payment_rate_manager
|
||||
|
||||
log.info("Starting %i files", len(files))
|
||||
for file_info in files:
|
||||
claim_info = claim_infos.get(file_info['stream_hash'])
|
||||
self._start_lbry_file(file_info, prm, claim_info)
|
||||
|
||||
log.info("Started %i lbry files", len(self.lbry_files))
|
||||
if self.auto_re_reflect is True:
|
||||
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval / 10)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _stop_lbry_file(self, lbry_file):
|
||||
def wait_for_finished(lbry_file, count=2):
|
||||
if count or lbry_file.saving_status is not False:
|
||||
return task.deferLater(reactor, 1, self._stop_lbry_file, lbry_file,
|
||||
count=count - 1)
|
||||
try:
|
||||
yield lbry_file.stop(change_status=False)
|
||||
self.lbry_files.remove(lbry_file)
|
||||
except CurrentlyStoppingError:
|
||||
yield wait_for_finished(lbry_file)
|
||||
except AlreadyStoppedError:
|
||||
pass
|
||||
finally:
|
||||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _stop_lbry_files(self):
|
||||
log.info("Stopping %i lbry files", len(self.lbry_files))
|
||||
yield defer.DeferredList([self._stop_lbry_file(lbry_file) for lbry_file in list(self.lbry_files)])
|
||||
|
||||
async def add_published_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager, blob_data_rate):
|
||||
status = ManagedEncryptedFileDownloader.STATUS_FINISHED
|
||||
stream_metadata = await get_sd_info(self.storage, stream_hash, include_blobs=False)
|
||||
key = stream_metadata['key']
|
||||
stream_name = stream_metadata['stream_name']
|
||||
file_name = stream_metadata['suggested_file_name']
|
||||
rowid = await self.storage.save_published_file(
|
||||
stream_hash, file_name, download_directory, blob_data_rate, status
|
||||
)
|
||||
lbry_file = self._get_lbry_file(
|
||||
rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory,
|
||||
stream_metadata['suggested_file_name'], download_mirrors=None
|
||||
)
|
||||
lbry_file.restore(status)
|
||||
await lbry_file.get_claim_info()
|
||||
self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info
|
||||
self.lbry_files.append(lbry_file)
|
||||
return lbry_file
|
||||
|
||||
async def add_downloaded_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager=None,
|
||||
blob_data_rate=None, status=None, file_name=None, download_mirrors=None):
|
||||
status = status or ManagedEncryptedFileDownloader.STATUS_STOPPED
|
||||
payment_rate_manager = payment_rate_manager or self.payment_rate_manager
|
||||
blob_data_rate = blob_data_rate or payment_rate_manager.min_blob_data_payment_rate
|
||||
stream_metadata = await get_sd_info(self.storage, stream_hash, include_blobs=False)
|
||||
key = stream_metadata['key']
|
||||
stream_name = stream_metadata['stream_name']
|
||||
file_name = file_name or stream_metadata['suggested_file_name']
|
||||
|
||||
# when we save the file we'll atomic touch the nearest file to the suggested file name
|
||||
# that doesn't yet exist in the download directory
|
||||
rowid = await self.storage.save_downloaded_file(
|
||||
stream_hash, hexlify(os.path.basename(unhexlify(file_name))), download_directory, blob_data_rate
|
||||
)
|
||||
file_name = (await self.storage.get_filename_for_rowid(rowid)).decode()
|
||||
lbry_file = self._get_lbry_file(
|
||||
rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory,
|
||||
stream_metadata['suggested_file_name'], download_mirrors
|
||||
)
|
||||
lbry_file.restore(status)
|
||||
await lbry_file.get_claim_info(include_supports=False)
|
||||
self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info
|
||||
self.lbry_files.append(lbry_file)
|
||||
return lbry_file
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_lbry_file(self, lbry_file, delete_file=False):
|
||||
if lbry_file not in self.lbry_files:
|
||||
raise ValueError("Could not find that LBRY file")
|
||||
|
||||
def wait_for_finished(count=2):
|
||||
if count <= 0 or lbry_file.saving_status is False:
|
||||
return True
|
||||
else:
|
||||
return task.deferLater(reactor, 1, wait_for_finished, count=count - 1)
|
||||
|
||||
full_path = os.path.join(lbry_file.download_directory, lbry_file.file_name)
|
||||
|
||||
try:
|
||||
yield lbry_file.stop()
|
||||
except (AlreadyStoppedError, CurrentlyStoppingError):
|
||||
yield wait_for_finished()
|
||||
|
||||
self.lbry_files.remove(lbry_file)
|
||||
|
||||
if lbry_file.stream_hash in self.storage.content_claim_callbacks:
|
||||
del self.storage.content_claim_callbacks[lbry_file.stream_hash]
|
||||
|
||||
yield lbry_file.delete_data()
|
||||
yield f2d(self.storage.delete_stream(lbry_file.stream_hash))
|
||||
|
||||
if delete_file and os.path.isfile(full_path):
|
||||
os.remove(full_path)
|
||||
|
||||
defer.returnValue(True)
|
||||
|
||||
def toggle_lbry_file_running(self, lbry_file):
|
||||
"""Toggle whether a stream reader is currently running"""
|
||||
for l in self.lbry_files:
|
||||
if l == lbry_file:
|
||||
return l.toggle_running()
|
||||
return defer.fail(Failure(ValueError("Could not find that LBRY file")))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def reflect_lbry_files(self):
|
||||
sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS)
|
||||
ds = []
|
||||
sd_hashes_to_reflect = yield f2d(self.storage.get_streams_to_re_reflect())
|
||||
for lbry_file in self.lbry_files:
|
||||
if lbry_file.sd_hash in sd_hashes_to_reflect:
|
||||
ds.append(sem.run(reflect_file, lbry_file, random.choice(self.conf.reflector_servers)))
|
||||
yield defer.DeferredList(ds)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def stop(self):
|
||||
safe_stop_looping_call(self.lbry_file_reflector)
|
||||
yield self._stop_lbry_files()
|
||||
log.info("Stopped encrypted file manager")
|
||||
defer.returnValue(True)
|
|
@ -1,6 +0,0 @@
|
|||
class EncryptedFileStatusReport:
|
||||
def __init__(self, name, num_completed, num_known, running_status):
|
||||
self.name = name
|
||||
self.num_completed = num_completed
|
||||
self.num_known = num_known
|
||||
self.running_status = running_status
|
|
@ -0,0 +1,6 @@
|
|||
from lbrynet.cryptoutils import get_lbry_hash_obj
|
||||
|
||||
MAX_BLOB_SIZE = 2 * 2 ** 20
|
||||
|
||||
# digest_size is in bytes, and blob hashes are hex encoded
|
||||
blobhash_length = get_lbry_hash_obj().digest_size * 2
|
|
@ -1,26 +1,27 @@
|
|||
import os
|
||||
import asyncio
|
||||
import binascii
|
||||
import logging
|
||||
from twisted.internet import defer
|
||||
from twisted.web.client import FileBodyProducer
|
||||
from twisted.python.failure import Failure
|
||||
from lbrynet.cryptoutils import get_lbry_hash_obj
|
||||
from lbrynet.p2p.Error import DownloadCanceledError, InvalidDataError, InvalidBlobHashError
|
||||
import typing
|
||||
from cryptography.hazmat.primitives.ciphers import Cipher, modes
|
||||
from cryptography.hazmat.primitives.ciphers.algorithms import AES
|
||||
from cryptography.hazmat.primitives.padding import PKCS7
|
||||
|
||||
from lbrynet.cryptoutils import backend, get_lbry_hash_obj
|
||||
from lbrynet.error import DownloadCancelledError, InvalidBlobHashError, InvalidDataError
|
||||
|
||||
from lbrynet.blob import MAX_BLOB_SIZE, blobhash_length
|
||||
from lbrynet.blob.blob_info import BlobInfo
|
||||
from lbrynet.blob.writer import HashBlobWriter
|
||||
from lbrynet.blob.reader import HashBlobReader
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
MAX_BLOB_SIZE = 2 * 2 ** 20
|
||||
|
||||
# digest_size is in bytes, and blob hashes are hex encoded
|
||||
blobhash_length = get_lbry_hash_obj().digest_size * 2
|
||||
|
||||
|
||||
def is_valid_hashcharacter(char):
|
||||
def is_valid_hashcharacter(char: str) -> bool:
|
||||
return char in "0123456789abcdef"
|
||||
|
||||
|
||||
def is_valid_blobhash(blobhash):
|
||||
def is_valid_blobhash(blobhash: str) -> bool:
|
||||
"""Checks whether the blobhash is the correct length and contains only
|
||||
valid characters (0-9, a-f)
|
||||
|
||||
|
@ -31,6 +32,16 @@ def is_valid_blobhash(blobhash):
|
|||
return len(blobhash) == blobhash_length and all(is_valid_hashcharacter(l) for l in blobhash)
|
||||
|
||||
|
||||
def encrypt_blob_bytes(key: bytes, iv: bytes, unencrypted: bytes) -> typing.Tuple[bytes, str]:
|
||||
cipher = Cipher(AES(key), modes.CBC(iv), backend=backend)
|
||||
padder = PKCS7(AES.block_size).padder()
|
||||
encryptor = cipher.encryptor()
|
||||
encrypted = encryptor.update(padder.update(unencrypted) + padder.finalize()) + encryptor.finalize()
|
||||
digest = get_lbry_hash_obj()
|
||||
digest.update(encrypted)
|
||||
return encrypted, digest.hexdigest()
|
||||
|
||||
|
||||
class BlobFile:
|
||||
"""
|
||||
A chunk of data available on the network which is specified by a hashsum
|
||||
|
@ -40,178 +51,137 @@ class BlobFile:
|
|||
Also can be used for reading from blobs on the local filesystem
|
||||
"""
|
||||
|
||||
def __str__(self):
|
||||
return self.blob_hash[:16]
|
||||
|
||||
def __repr__(self):
|
||||
return '<{}({})>'.format(self.__class__.__name__, str(self))
|
||||
|
||||
def __init__(self, blob_dir, blob_hash, length=None):
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, blob_hash: str,
|
||||
length: typing.Optional[int] = None,
|
||||
blob_completed_callback: typing.Optional[typing.Callable[['BlobFile'], typing.Awaitable]] = None):
|
||||
if not is_valid_blobhash(blob_hash):
|
||||
raise InvalidBlobHashError(blob_hash)
|
||||
self.loop = loop
|
||||
self.blob_hash = blob_hash
|
||||
self.length = length
|
||||
self.writers = {} # {Peer: writer, finished_deferred}
|
||||
self._verified = False
|
||||
self.readers = 0
|
||||
self.blob_dir = blob_dir
|
||||
self.file_path = os.path.join(blob_dir, self.blob_hash)
|
||||
self.blob_write_lock = defer.DeferredLock()
|
||||
self.writers: typing.List[HashBlobWriter] = []
|
||||
|
||||
self.verified: asyncio.Event = asyncio.Event(loop=self.loop)
|
||||
self.finished_writing = asyncio.Event(loop=loop)
|
||||
self.blob_write_lock = asyncio.Lock(loop=loop)
|
||||
if os.path.isfile(os.path.join(blob_dir, blob_hash)):
|
||||
length = length or int(os.stat(os.path.join(blob_dir, blob_hash)).st_size)
|
||||
self.length = length
|
||||
self.verified.set()
|
||||
self.finished_writing.set()
|
||||
self.saved_verified_blob = False
|
||||
if os.path.isfile(self.file_path):
|
||||
self.set_length(os.path.getsize(self.file_path))
|
||||
# This assumes that the hash of the blob has already been
|
||||
# checked as part of the blob creation process. It might
|
||||
# be worth having a function that checks the actual hash;
|
||||
# its probably too expensive to have that check be part of
|
||||
# this call.
|
||||
self._verified = True
|
||||
self.blob_completed_callback = blob_completed_callback
|
||||
|
||||
def open_for_writing(self, peer):
|
||||
"""
|
||||
open a blob file to be written by peer, supports concurrent
|
||||
writers, as long as they are from different peers.
|
||||
|
||||
returns tuple of (writer, finished_deferred)
|
||||
|
||||
writer - a file like object with a write() function, close() when finished
|
||||
finished_deferred - deferred that is fired when write is finished and returns
|
||||
a instance of itself as HashBlob
|
||||
"""
|
||||
if peer not in self.writers:
|
||||
log.debug("Opening %s to be written by %s", str(self), str(peer))
|
||||
finished_deferred = defer.Deferred()
|
||||
writer = HashBlobWriter(self.get_length, self.writer_finished)
|
||||
self.writers[peer] = (writer, finished_deferred)
|
||||
return writer, finished_deferred
|
||||
log.warning("Tried to download the same file twice simultaneously from the same peer")
|
||||
return None, None
|
||||
|
||||
def open_for_reading(self):
|
||||
"""
|
||||
open blob for reading
|
||||
|
||||
returns a file like object that can be read() from, and closed() when
|
||||
finished
|
||||
"""
|
||||
if self._verified is True:
|
||||
f = open(self.file_path, 'rb')
|
||||
reader = HashBlobReader(f, self.reader_finished)
|
||||
self.readers += 1
|
||||
return reader
|
||||
return None
|
||||
|
||||
def delete(self):
|
||||
"""
|
||||
delete blob file from file system, prevent deletion
|
||||
if a blob is being read from or written to
|
||||
|
||||
returns a deferred that firesback when delete is completed
|
||||
"""
|
||||
if not self.writers and not self.readers:
|
||||
self._verified = False
|
||||
self.saved_verified_blob = False
|
||||
def writer_finished(self, writer: HashBlobWriter):
|
||||
def callback(finished: asyncio.Future):
|
||||
try:
|
||||
if os.path.isfile(self.file_path):
|
||||
os.remove(self.file_path)
|
||||
except Exception as e:
|
||||
log.exception("An error occurred deleting %s:", str(self.file_path), exc_info=e)
|
||||
else:
|
||||
raise ValueError("File is currently being read or written and cannot be deleted")
|
||||
error = finished.result()
|
||||
except Exception as err:
|
||||
error = err
|
||||
if writer in self.writers: # remove this download attempt
|
||||
self.writers.remove(writer)
|
||||
if not error: # the blob downloaded, cancel all the other download attempts and set the result
|
||||
while self.writers:
|
||||
other = self.writers.pop()
|
||||
other.finished.cancel()
|
||||
t = self.loop.create_task(self.save_verified_blob(writer))
|
||||
t.add_done_callback(lambda *_: self.finished_writing.set())
|
||||
return
|
||||
if isinstance(error, (InvalidBlobHashError, InvalidDataError)):
|
||||
log.error("writer error downloading %s: %s", self.blob_hash[:8], str(error))
|
||||
elif not isinstance(error, (DownloadCancelledError, asyncio.CancelledError, asyncio.TimeoutError)):
|
||||
log.exception("something else")
|
||||
raise error
|
||||
return callback
|
||||
|
||||
@property
|
||||
def verified(self):
|
||||
async def save_verified_blob(self, writer):
|
||||
def _save_verified():
|
||||
# log.debug(f"write blob file {self.blob_hash[:8]} from {writer.peer.address}")
|
||||
if not self.saved_verified_blob and not os.path.isfile(self.file_path):
|
||||
if self.get_length() == len(writer.verified_bytes):
|
||||
with open(self.file_path, 'wb') as write_handle:
|
||||
write_handle.write(writer.verified_bytes)
|
||||
self.saved_verified_blob = True
|
||||
else:
|
||||
raise Exception("length mismatch")
|
||||
|
||||
if self.verified.is_set():
|
||||
return
|
||||
async with self.blob_write_lock:
|
||||
await self.loop.run_in_executor(None, _save_verified)
|
||||
if self.blob_completed_callback:
|
||||
await self.blob_completed_callback(self)
|
||||
self.verified.set()
|
||||
|
||||
def open_for_writing(self) -> HashBlobWriter:
|
||||
if os.path.exists(self.file_path):
|
||||
raise OSError(f"File already exists '{self.file_path}'")
|
||||
fut = asyncio.Future(loop=self.loop)
|
||||
writer = HashBlobWriter(self.blob_hash, self.get_length, fut)
|
||||
self.writers.append(writer)
|
||||
fut.add_done_callback(self.writer_finished(writer))
|
||||
return writer
|
||||
|
||||
async def sendfile(self, writer: asyncio.StreamWriter) -> int:
|
||||
"""
|
||||
Protect verified from being modified by other classes.
|
||||
verified is True if a write to a blob has completed successfully,
|
||||
or a blob has been read to have the same length as specified
|
||||
in init
|
||||
Read and send the file to the writer and return the number of bytes sent
|
||||
"""
|
||||
return self._verified
|
||||
|
||||
with open(self.file_path, 'rb') as handle:
|
||||
return await self.loop.sendfile(writer.transport, handle)
|
||||
|
||||
async def close(self):
|
||||
while self.writers:
|
||||
self.writers.pop().finished.cancel()
|
||||
|
||||
async def delete(self):
|
||||
await self.close()
|
||||
async with self.blob_write_lock:
|
||||
self.saved_verified_blob = False
|
||||
if os.path.isfile(self.file_path):
|
||||
os.remove(self.file_path)
|
||||
|
||||
def decrypt(self, key: bytes, iv: bytes) -> bytes:
|
||||
"""
|
||||
Decrypt a BlobFile to plaintext bytes
|
||||
"""
|
||||
|
||||
with open(self.file_path, "rb") as f:
|
||||
buff = f.read()
|
||||
if len(buff) != self.length:
|
||||
raise ValueError("unexpected length")
|
||||
cipher = Cipher(AES(key), modes.CBC(iv), backend=backend)
|
||||
unpadder = PKCS7(AES.block_size).unpadder()
|
||||
decryptor = cipher.decryptor()
|
||||
return unpadder.update(decryptor.update(buff) + decryptor.finalize()) + unpadder.finalize()
|
||||
|
||||
@classmethod
|
||||
async def create_from_unencrypted(cls, loop: asyncio.BaseEventLoop, blob_dir: str, key: bytes,
|
||||
iv: bytes, unencrypted: bytes, blob_num: int) -> BlobInfo:
|
||||
"""
|
||||
Create an encrypted BlobFile from plaintext bytes
|
||||
"""
|
||||
|
||||
blob_bytes, blob_hash = encrypt_blob_bytes(key, iv, unencrypted)
|
||||
length = len(blob_bytes)
|
||||
blob = cls(loop, blob_dir, blob_hash, length)
|
||||
writer = blob.open_for_writing()
|
||||
writer.write(blob_bytes)
|
||||
await blob.verified.wait()
|
||||
return BlobInfo(blob_num, length, binascii.hexlify(iv).decode(), blob_hash)
|
||||
|
||||
def set_length(self, length):
|
||||
if self.length is not None and length == self.length:
|
||||
return True
|
||||
return
|
||||
if self.length is None and 0 <= length <= MAX_BLOB_SIZE:
|
||||
self.length = length
|
||||
return True
|
||||
log.warning("Got an invalid length. Previous length: %s, Invalid length: %s",
|
||||
self.length, length)
|
||||
return False
|
||||
return
|
||||
log.warning("Got an invalid length. Previous length: %s, Invalid length: %s", self.length, length)
|
||||
|
||||
def get_length(self):
|
||||
return self.length
|
||||
|
||||
def get_is_verified(self):
|
||||
return self.verified
|
||||
|
||||
def is_downloading(self):
|
||||
if self.writers:
|
||||
return True
|
||||
return False
|
||||
|
||||
def reader_finished(self, reader):
|
||||
self.readers -= 1
|
||||
return defer.succeed(True)
|
||||
|
||||
def writer_finished(self, writer, err=None):
|
||||
def fire_finished_deferred():
|
||||
self._verified = True
|
||||
for p, (w, finished_deferred) in list(self.writers.items()):
|
||||
if w == writer:
|
||||
del self.writers[p]
|
||||
finished_deferred.callback(self)
|
||||
return True
|
||||
log.warning(
|
||||
"Somehow, the writer that was accepted as being valid was already removed: %s",
|
||||
writer)
|
||||
return False
|
||||
|
||||
def errback_finished_deferred(err):
|
||||
for p, (w, finished_deferred) in list(self.writers.items()):
|
||||
if w == writer:
|
||||
del self.writers[p]
|
||||
finished_deferred.errback(err)
|
||||
|
||||
def cancel_other_downloads():
|
||||
for p, (w, finished_deferred) in self.writers.items():
|
||||
w.close()
|
||||
|
||||
if err is None:
|
||||
if writer.len_so_far == self.length and writer.blob_hash == self.blob_hash:
|
||||
if self._verified is False:
|
||||
d = self.save_verified_blob(writer)
|
||||
d.addCallbacks(lambda _: fire_finished_deferred(), errback_finished_deferred)
|
||||
d.addCallback(lambda _: cancel_other_downloads())
|
||||
else:
|
||||
d = defer.succeed(None)
|
||||
fire_finished_deferred()
|
||||
else:
|
||||
if writer.len_so_far != self.length:
|
||||
err_string = "blob length is %i vs expected %i" % (writer.len_so_far, self.length)
|
||||
else:
|
||||
err_string = f"blob hash is {writer.blob_hash} vs expected {self.blob_hash}"
|
||||
errback_finished_deferred(Failure(InvalidDataError(err_string)))
|
||||
d = defer.succeed(None)
|
||||
else:
|
||||
errback_finished_deferred(err)
|
||||
d = defer.succeed(None)
|
||||
d.addBoth(lambda _: writer.close_handle())
|
||||
return d
|
||||
|
||||
def save_verified_blob(self, writer):
|
||||
# we cannot have multiple _save_verified_blob interrupting
|
||||
# each other, can happen since startProducing is a deferred
|
||||
return self.blob_write_lock.run(self._save_verified_blob, writer)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _save_verified_blob(self, writer):
|
||||
if self.saved_verified_blob is False:
|
||||
writer.write_handle.seek(0)
|
||||
out_path = os.path.join(self.blob_dir, self.blob_hash)
|
||||
producer = FileBodyProducer(writer.write_handle)
|
||||
yield producer.startProducing(open(out_path, 'wb'))
|
||||
self.saved_verified_blob = True
|
||||
defer.returnValue(True)
|
||||
else:
|
||||
raise DownloadCanceledError()
|
||||
return self.verified.is_set()
|
||||
|
|
19
lbrynet/blob/blob_info.py
Normal file
19
lbrynet/blob/blob_info.py
Normal file
|
@ -0,0 +1,19 @@
|
|||
import typing
|
||||
|
||||
|
||||
class BlobInfo:
|
||||
def __init__(self, blob_num: int, length: int, iv: str, blob_hash: typing.Optional[str] = None):
|
||||
self.blob_hash = blob_hash
|
||||
self.blob_num = blob_num
|
||||
self.length = length
|
||||
self.iv = iv
|
||||
|
||||
def as_dict(self) -> typing.Dict:
|
||||
d = {
|
||||
'length': self.length,
|
||||
'blob_num': self.blob_num,
|
||||
'iv': self.iv,
|
||||
}
|
||||
if self.blob_hash: # non-terminator blobs have a blob hash
|
||||
d['blob_hash'] = self.blob_hash
|
||||
return d
|
89
lbrynet/blob/blob_manager.py
Normal file
89
lbrynet/blob/blob_manager.py
Normal file
|
@ -0,0 +1,89 @@
|
|||
import typing
|
||||
import asyncio
|
||||
import logging
|
||||
from sqlite3 import IntegrityError
|
||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||
from lbrynet.blob.blob_file import BlobFile
|
||||
from lbrynet.stream.descriptor import StreamDescriptor
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbrynet.dht.protocol.data_store import DictDataStore
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BlobFileManager:
|
||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_dir: str, storage: SQLiteStorage,
|
||||
node_data_store: typing.Optional['DictDataStore'] = None):
|
||||
"""
|
||||
This class stores blobs on the hard disk
|
||||
|
||||
blob_dir - directory where blobs are stored
|
||||
storage - SQLiteStorage object
|
||||
"""
|
||||
self.loop = loop
|
||||
self.blob_dir = blob_dir
|
||||
self.storage = storage
|
||||
self._node_data_store = node_data_store
|
||||
self.completed_blob_hashes: typing.Set[str] = set() if not self._node_data_store\
|
||||
else self._node_data_store.completed_blobs
|
||||
self.blobs: typing.Dict[str, BlobFile] = {}
|
||||
|
||||
async def setup(self) -> bool:
|
||||
raw_blob_hashes = await self.get_all_verified_blobs()
|
||||
self.completed_blob_hashes.update(raw_blob_hashes)
|
||||
return True
|
||||
|
||||
def get_blob(self, blob_hash, length: typing.Optional[int] = None):
|
||||
if blob_hash in self.blobs:
|
||||
if length and self.blobs[blob_hash].length is None:
|
||||
self.blobs[blob_hash].set_length(length)
|
||||
else:
|
||||
self.blobs[blob_hash] = BlobFile(self.loop, self.blob_dir, blob_hash, length, self.blob_completed)
|
||||
return self.blobs[blob_hash]
|
||||
|
||||
def get_stream_descriptor(self, sd_hash):
|
||||
return StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_dir, self.get_blob(sd_hash))
|
||||
|
||||
async def blob_completed(self, blob: BlobFile):
|
||||
if blob.blob_hash is None:
|
||||
raise Exception("Blob hash is None")
|
||||
if not blob.length:
|
||||
raise Exception("Blob has a length of 0")
|
||||
if blob.blob_hash not in self.completed_blob_hashes:
|
||||
self.completed_blob_hashes.add(blob.blob_hash)
|
||||
await self.storage.add_completed_blob(blob.blob_hash)
|
||||
|
||||
def check_completed_blobs(self, blob_hashes: typing.List[str]) -> typing.List[str]:
|
||||
"""Returns of the blobhashes_to_check, which are valid"""
|
||||
blobs = [self.get_blob(b) for b in blob_hashes]
|
||||
return [blob.blob_hash for blob in blobs if blob.get_is_verified()]
|
||||
|
||||
async def set_should_announce(self, blob_hash: str, should_announce: bool):
|
||||
now = self.loop.time()
|
||||
return await self.storage.set_should_announce(blob_hash, now, should_announce)
|
||||
|
||||
async def get_all_verified_blobs(self) -> typing.List[str]:
|
||||
blob_hashes = await self.storage.get_all_blob_hashes()
|
||||
return self.check_completed_blobs(blob_hashes)
|
||||
|
||||
async def delete_blobs(self, blob_hashes: typing.List[str]):
|
||||
bh_to_delete_from_db = []
|
||||
for blob_hash in blob_hashes:
|
||||
if not blob_hash:
|
||||
continue
|
||||
try:
|
||||
blob = self.get_blob(blob_hash)
|
||||
await blob.delete()
|
||||
bh_to_delete_from_db.append(blob_hash)
|
||||
except Exception as e:
|
||||
log.warning("Failed to delete blob file. Reason: %s", e)
|
||||
if blob_hash in self.completed_blob_hashes:
|
||||
self.completed_blob_hashes.remove(blob_hash)
|
||||
if blob_hash in self.blobs:
|
||||
del self.blobs[blob_hash]
|
||||
try:
|
||||
await self.storage.delete_blobs_from_db(bh_to_delete_from_db)
|
||||
except IntegrityError as err:
|
||||
if str(err) != "FOREIGN KEY constraint failed":
|
||||
raise err
|
|
@ -1,51 +0,0 @@
|
|||
import os
|
||||
import logging
|
||||
from io import BytesIO
|
||||
from twisted.internet import defer
|
||||
from twisted.web.client import FileBodyProducer
|
||||
from lbrynet.cryptoutils import get_lbry_hash_obj
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BlobFileCreator:
|
||||
"""
|
||||
This class is used to create blobs on the local filesystem
|
||||
when we do not know the blob hash beforehand (i.e, when creating
|
||||
a new stream)
|
||||
"""
|
||||
def __init__(self, blob_dir):
|
||||
self.blob_dir = blob_dir
|
||||
self.buffer = BytesIO()
|
||||
self._is_open = True
|
||||
self._hashsum = get_lbry_hash_obj()
|
||||
self.len_so_far = 0
|
||||
self.blob_hash = None
|
||||
self.length = None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def close(self):
|
||||
self.length = self.len_so_far
|
||||
self.blob_hash = self._hashsum.hexdigest()
|
||||
if self.blob_hash and self._is_open and self.length > 0:
|
||||
# do not save 0 length files (empty tail blob in streams)
|
||||
# or if its been closed already
|
||||
self.buffer.seek(0)
|
||||
out_path = os.path.join(self.blob_dir, self.blob_hash)
|
||||
producer = FileBodyProducer(self.buffer)
|
||||
yield producer.startProducing(open(out_path, 'wb'))
|
||||
self._is_open = False
|
||||
if self.length > 0:
|
||||
defer.returnValue(self.blob_hash)
|
||||
else:
|
||||
# 0 length files (empty tail blob in streams )
|
||||
# must return None as their blob_hash for
|
||||
# it to be saved properly by EncryptedFileMetadataManagers
|
||||
defer.returnValue(None)
|
||||
|
||||
def write(self, data):
|
||||
if not self._is_open:
|
||||
raise IOError
|
||||
self._hashsum.update(data)
|
||||
self.len_so_far += len(data)
|
||||
self.buffer.write(data)
|
|
@ -1,30 +0,0 @@
|
|||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HashBlobReader:
|
||||
"""
|
||||
This is a file like reader class that supports
|
||||
read(size) and close()
|
||||
"""
|
||||
def __init__(self, read_handle, finished_cb):
|
||||
self.finished_cb = finished_cb
|
||||
self.finished_cb_d = None
|
||||
self.read_handle = read_handle
|
||||
|
||||
def __del__(self):
|
||||
if self.finished_cb_d is None:
|
||||
log.warning("Garbage collection was called, but reader for %s was not closed yet",
|
||||
self.read_handle.name)
|
||||
self.close()
|
||||
|
||||
def read(self, size=-1):
|
||||
return self.read_handle.read(size)
|
||||
|
||||
def close(self):
|
||||
# if we've already closed and called finished_cb, do nothing
|
||||
if self.finished_cb_d is not None:
|
||||
return
|
||||
self.read_handle.close()
|
||||
self.finished_cb_d = self.finished_cb(self)
|
|
@ -1,58 +1,71 @@
|
|||
import typing
|
||||
import logging
|
||||
import asyncio
|
||||
from io import BytesIO
|
||||
from twisted.python.failure import Failure
|
||||
from lbrynet.p2p.Error import DownloadCanceledError, InvalidDataError
|
||||
from lbrynet.error import InvalidBlobHashError, InvalidDataError
|
||||
from lbrynet.cryptoutils import get_lbry_hash_obj
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HashBlobWriter:
|
||||
def __init__(self, length_getter, finished_cb):
|
||||
self.write_handle = BytesIO()
|
||||
self.length_getter = length_getter
|
||||
self.finished_cb = finished_cb
|
||||
self.finished_cb_d = None
|
||||
def __init__(self, expected_blob_hash: str, get_length: typing.Callable[[], int],
|
||||
finished: asyncio.Future):
|
||||
self.expected_blob_hash = expected_blob_hash
|
||||
self.get_length = get_length
|
||||
self.buffer = BytesIO()
|
||||
self.finished = finished
|
||||
self.finished.add_done_callback(lambda *_: self.close_handle())
|
||||
self._hashsum = get_lbry_hash_obj()
|
||||
self.len_so_far = 0
|
||||
self.verified_bytes = b''
|
||||
|
||||
def __del__(self):
|
||||
if self.finished_cb_d is None:
|
||||
if self.buffer is not None:
|
||||
log.warning("Garbage collection was called, but writer was not closed yet")
|
||||
self.close()
|
||||
self.close_handle()
|
||||
|
||||
@property
|
||||
def blob_hash(self):
|
||||
def calculate_blob_hash(self) -> str:
|
||||
return self._hashsum.hexdigest()
|
||||
|
||||
def write(self, data):
|
||||
if self.write_handle is None:
|
||||
def closed(self):
|
||||
return self.buffer is None or self.buffer.closed
|
||||
|
||||
def write(self, data: bytes):
|
||||
expected_length = self.get_length()
|
||||
if not expected_length:
|
||||
raise IOError("unknown blob length")
|
||||
if self.buffer is None:
|
||||
log.warning("writer has already been closed")
|
||||
if not self.finished.done():
|
||||
self.finished.cancel()
|
||||
return
|
||||
raise IOError('I/O operation on closed file')
|
||||
|
||||
self._hashsum.update(data)
|
||||
self.len_so_far += len(data)
|
||||
if self.len_so_far > self.length_getter():
|
||||
self.finished_cb_d = self.finished_cb(
|
||||
self,
|
||||
Failure(InvalidDataError("Length so far is greater than the expected length."
|
||||
" %s to %s" % (self.len_so_far,
|
||||
self.length_getter()))))
|
||||
else:
|
||||
self.write_handle.write(data)
|
||||
if self.len_so_far == self.length_getter():
|
||||
self.finished_cb_d = self.finished_cb(self)
|
||||
if self.len_so_far > expected_length:
|
||||
self.close_handle()
|
||||
self.finished.set_result(InvalidDataError(
|
||||
f'Length so far is greater than the expected length. {self.len_so_far} to {expected_length}'
|
||||
))
|
||||
return
|
||||
self.buffer.write(data)
|
||||
if self.len_so_far == expected_length:
|
||||
blob_hash = self.calculate_blob_hash()
|
||||
if blob_hash != self.expected_blob_hash:
|
||||
self.close_handle()
|
||||
self.finished.set_result(InvalidBlobHashError(
|
||||
f"blob hash is {blob_hash} vs expected {self.expected_blob_hash}"
|
||||
))
|
||||
return
|
||||
self.buffer.seek(0)
|
||||
self.verified_bytes = self.buffer.read()
|
||||
self.close_handle()
|
||||
if self.finished and not (self.finished.done() or self.finished.cancelled()):
|
||||
self.finished.set_result(None)
|
||||
|
||||
def close_handle(self):
|
||||
if self.write_handle is not None:
|
||||
self.write_handle.close()
|
||||
self.write_handle = None
|
||||
|
||||
def close(self, reason=None):
|
||||
# if we've already called finished_cb because we either finished writing
|
||||
# or closed already, do nothing
|
||||
if self.finished_cb_d is not None:
|
||||
return
|
||||
if reason is None:
|
||||
reason = Failure(DownloadCanceledError())
|
||||
self.finished_cb_d = self.finished_cb(self, reason)
|
||||
if self.buffer is not None:
|
||||
self.buffer.close()
|
||||
self.buffer = None
|
||||
|
|
|
@ -1,18 +0,0 @@
|
|||
class BlobInfo:
|
||||
"""
|
||||
This structure is used to represent the metadata of a blob.
|
||||
|
||||
@ivar blob_hash: The sha384 hashsum of the blob's data.
|
||||
@type blob_hash: string, hex-encoded
|
||||
|
||||
@ivar blob_num: For streams, the position of the blob in the stream.
|
||||
@type blob_num: integer
|
||||
|
||||
@ivar length: The length of the blob in bytes.
|
||||
@type length: integer
|
||||
"""
|
||||
|
||||
def __init__(self, blob_hash, blob_num, length):
|
||||
self.blob_hash = blob_hash
|
||||
self.blob_num = blob_num
|
||||
self.length = length
|
|
@ -1,134 +0,0 @@
|
|||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from binascii import unhexlify
|
||||
from sqlite3 import IntegrityError
|
||||
from twisted.internet import defer
|
||||
from lbrynet.extras.compat import f2d
|
||||
from lbrynet.blob.blob_file import BlobFile
|
||||
from lbrynet.blob.creator import BlobFileCreator
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DiskBlobManager:
|
||||
def __init__(self, blob_dir, storage, node_datastore=None):
|
||||
"""
|
||||
This class stores blobs on the hard disk
|
||||
|
||||
blob_dir - directory where blobs are stored
|
||||
storage - SQLiteStorage object
|
||||
"""
|
||||
self.storage = storage
|
||||
self.blob_dir = blob_dir
|
||||
self._node_datastore = node_datastore
|
||||
self.blob_creator_type = BlobFileCreator
|
||||
# TODO: consider using an LRU for blobs as there could potentially
|
||||
# be thousands of blobs loaded up, many stale
|
||||
self.blobs = {}
|
||||
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
|
||||
|
||||
async def setup(self):
|
||||
if self._node_datastore is not None:
|
||||
raw_blob_hashes = await self.storage.get_all_finished_blobs()
|
||||
self._node_datastore.completed_blobs.update(raw_blob_hashes)
|
||||
|
||||
async def stop(self):
|
||||
pass
|
||||
|
||||
def get_blob(self, blob_hash, length=None):
|
||||
"""Return a blob identified by blob_hash, which may be a new blob or a
|
||||
blob that is already on the hard disk
|
||||
"""
|
||||
if length is not None and not isinstance(length, int):
|
||||
raise Exception("invalid length type: {} ({})".format(length, str(type(length))))
|
||||
if blob_hash in self.blobs:
|
||||
return self.blobs[blob_hash]
|
||||
return self._make_new_blob(blob_hash, length)
|
||||
|
||||
def get_blob_creator(self):
|
||||
return self.blob_creator_type(self.blob_dir)
|
||||
|
||||
def _make_new_blob(self, blob_hash, length=None):
|
||||
log.debug('Making a new blob for %s', blob_hash)
|
||||
blob = BlobFile(self.blob_dir, blob_hash, length)
|
||||
self.blobs[blob_hash] = blob
|
||||
return blob
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def blob_completed(self, blob, should_announce=False, next_announce_time=None):
|
||||
yield f2d(self.storage.add_completed_blob(
|
||||
blob.blob_hash, blob.length, next_announce_time, should_announce
|
||||
))
|
||||
if self._node_datastore is not None:
|
||||
self._node_datastore.completed_blobs.add(unhexlify(blob.blob_hash))
|
||||
|
||||
def completed_blobs(self, blobhashes_to_check):
|
||||
return self._completed_blobs(blobhashes_to_check)
|
||||
|
||||
def count_should_announce_blobs(self):
|
||||
return f2d(self.storage.count_should_announce_blobs())
|
||||
|
||||
def set_should_announce(self, blob_hash, should_announce):
|
||||
return f2d(self.storage.set_should_announce(
|
||||
blob_hash, asyncio.get_event_loop().time(), should_announce
|
||||
))
|
||||
|
||||
def get_should_announce(self, blob_hash):
|
||||
return f2d(self.storage.should_announce(blob_hash))
|
||||
|
||||
def creator_finished(self, blob_creator, should_announce):
|
||||
log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
|
||||
if blob_creator.blob_hash is None:
|
||||
raise Exception("Blob hash is None")
|
||||
if blob_creator.blob_hash in self.blobs:
|
||||
raise Exception("Creator finished for blob that is already marked as completed")
|
||||
if blob_creator.length is None:
|
||||
raise Exception("Blob has a length of 0")
|
||||
new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length)
|
||||
self.blobs[blob_creator.blob_hash] = new_blob
|
||||
return self.blob_completed(new_blob, should_announce)
|
||||
|
||||
def get_all_verified_blobs(self):
|
||||
d = f2d(self._get_all_verified_blob_hashes())
|
||||
d.addCallback(self.completed_blobs)
|
||||
return d
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_blobs(self, blob_hashes):
|
||||
bh_to_delete_from_db = []
|
||||
for blob_hash in blob_hashes:
|
||||
if not blob_hash:
|
||||
continue
|
||||
if self._node_datastore is not None:
|
||||
try:
|
||||
self._node_datastore.completed_blobs.remove(unhexlify(blob_hash))
|
||||
except KeyError:
|
||||
pass
|
||||
try:
|
||||
blob = self.get_blob(blob_hash)
|
||||
blob.delete()
|
||||
bh_to_delete_from_db.append(blob_hash)
|
||||
del self.blobs[blob_hash]
|
||||
except Exception as e:
|
||||
log.warning("Failed to delete blob file. Reason: %s", e)
|
||||
try:
|
||||
yield f2d(self.storage.delete_blobs_from_db(bh_to_delete_from_db))
|
||||
except IntegrityError as err:
|
||||
if str(err) != "FOREIGN KEY constraint failed":
|
||||
raise err
|
||||
|
||||
def _completed_blobs(self, blobhashes_to_check):
|
||||
"""Returns of the blobhashes_to_check, which are valid"""
|
||||
blobs = [self.get_blob(b) for b in blobhashes_to_check]
|
||||
blob_hashes = [b.blob_hash for b in blobs if b.verified]
|
||||
return blob_hashes
|
||||
|
||||
async def _get_all_verified_blob_hashes(self):
|
||||
blobs = await self.storage.get_all_blob_hashes()
|
||||
verified_blobs = []
|
||||
for blob_hash in blobs:
|
||||
file_path = os.path.join(self.blob_dir, blob_hash)
|
||||
if os.path.isfile(file_path):
|
||||
verified_blobs.append(blob_hash)
|
||||
return verified_blobs
|
0
tests/unit/blob/__init__.py
Normal file
0
tests/unit/blob/__init__.py
Normal file
35
tests/unit/blob/test_blob_file.py
Normal file
35
tests/unit/blob/test_blob_file.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
import asyncio
|
||||
import tempfile
|
||||
import shutil
|
||||
import os
|
||||
from torba.testcase import AsyncioTestCase
|
||||
from lbrynet.extras.daemon.storage import SQLiteStorage
|
||||
from lbrynet.blob.blob_manager import BlobFileManager
|
||||
|
||||
|
||||
class TestBlobfile(AsyncioTestCase):
|
||||
async def test_create_blob(self):
|
||||
blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed"
|
||||
blob_bytes = b'1' * ((2 * 2 ** 20) - 1)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(lambda: shutil.rmtree(tmp_dir))
|
||||
|
||||
storage = SQLiteStorage(os.path.join(tmp_dir, "lbrynet.sqlite"))
|
||||
blob_manager = BlobFileManager(loop, tmp_dir, storage)
|
||||
|
||||
await storage.open()
|
||||
await blob_manager.setup()
|
||||
|
||||
# add the blob on the server
|
||||
blob = blob_manager.get_blob(blob_hash, len(blob_bytes))
|
||||
self.assertEqual(blob.get_is_verified(), False)
|
||||
self.assertNotIn(blob_hash, blob_manager.completed_blob_hashes)
|
||||
|
||||
writer = blob.open_for_writing()
|
||||
writer.write(blob_bytes)
|
||||
await blob.finished_writing.wait()
|
||||
self.assertTrue(os.path.isfile(blob.file_path), True)
|
||||
self.assertEqual(blob.get_is_verified(), True)
|
||||
self.assertIn(blob_hash, blob_manager.completed_blob_hashes)
|
Loading…
Add table
Reference in a new issue