refactor EncryptedFileDownloader and EncryptedFileManager

-remove stream info manager (DBEncryptedMetadataManager)
-split `add_lbry_file` into separate `add_published_file` and `add_downloaded_file` functions
-set the download path upon adding file to the db, use the source file path for publishes
-remove the lbry file manager-wide download directory, set for each file individually
-add claim `metadata`, `claim_name`, `claim_id`, `outpoint`, `txid`, `nout`, `channel_claim_id`, and `channel_name` attributes to EncryptedFileDownloader
This commit is contained in:
Jack Robison 2018-02-12 14:03:39 -05:00
parent f8c33b6acb
commit 68542f3ae1
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
7 changed files with 228 additions and 695 deletions

View file

@ -19,6 +19,16 @@ class CryptBlobInfo(BlobInfo):
BlobInfo.__init__(self, blob_hash, blob_num, length)
self.iv = iv
def get_dict(self):
info = {
"blob_num": self.blob_num,
"length": self.length,
"iv": self.iv
}
if self.blob_hash:
info['blob_hash'] = self.blob_hash
return info
class StreamBlobDecryptor(object):
def __init__(self, blob, key, iv, length):

View file

@ -5,13 +5,13 @@ Utilities for turning plain files into LBRY Files.
import binascii
import logging
import os
from lbrynet.core.StreamDescriptor import PlainStreamDescriptorWriter
from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator
from lbrynet import conf
from lbrynet.lbry_file.StreamDescriptor import get_sd_info
from lbrynet.core.cryptoutils import get_lbry_hash_obj
from twisted.internet import defer
from twisted.protocols.basic import FileSender
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter, EncryptedFileStreamType
from lbrynet.core.StreamDescriptor import format_sd_info, get_stream_hash
from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator
log = logging.getLogger(__name__)
@ -20,58 +20,32 @@ class EncryptedFileStreamCreator(CryptStreamCreator):
"""
A CryptStreamCreator which adds itself and its additional metadata to an EncryptedFileManager
"""
def __init__(self, blob_manager, lbry_file_manager, name=None,
key=None, iv_generator=None, suggested_file_name=None):
CryptStreamCreator.__init__(self, blob_manager, name, key, iv_generator)
def __init__(self, blob_manager, lbry_file_manager, stream_name=None,
key=None, iv_generator=None):
CryptStreamCreator.__init__(self, blob_manager, stream_name, key, iv_generator)
self.lbry_file_manager = lbry_file_manager
self.suggested_file_name = suggested_file_name or name
self.stream_hash = None
self.blob_infos = []
self.sd_info = None
def _blob_finished(self, blob_info):
log.debug("length: %s", blob_info.length)
self.blob_infos.append(blob_info)
self.blob_infos.append(blob_info.get_dict())
return blob_info
def _save_stream_info(self):
stream_info_manager = self.lbry_file_manager.stream_info_manager
d = stream_info_manager.save_stream(self.stream_hash, hexlify(self.name),
hexlify(self.key),
hexlify(self.suggested_file_name),
self.blob_infos)
return d
def _get_blobs_hashsum(self):
blobs_hashsum = get_lbry_hash_obj()
for blob_info in sorted(self.blob_infos, key=lambda b_i: b_i.blob_num):
length = blob_info.length
if length != 0:
blob_hash = blob_info.blob_hash
else:
blob_hash = None
blob_num = blob_info.blob_num
iv = blob_info.iv
blob_hashsum = get_lbry_hash_obj()
if length != 0:
blob_hashsum.update(blob_hash)
blob_hashsum.update(str(blob_num))
blob_hashsum.update(iv)
blob_hashsum.update(str(length))
blobs_hashsum.update(blob_hashsum.digest())
return blobs_hashsum.digest()
def _make_stream_hash(self):
hashsum = get_lbry_hash_obj()
hashsum.update(hexlify(self.name))
hashsum.update(hexlify(self.key))
hashsum.update(hexlify(self.suggested_file_name))
hashsum.update(self._get_blobs_hashsum())
self.stream_hash = hashsum.hexdigest()
def _finished(self):
self._make_stream_hash()
d = self._save_stream_info()
return d
# calculate the stream hash
self.stream_hash = get_stream_hash(
hexlify(self.name), hexlify(self.key), hexlify(self.name),
self.blob_infos
)
# generate the sd info
self.sd_info = format_sd_info(
EncryptedFileStreamType, hexlify(self.name), hexlify(self.key),
hexlify(self.name), self.stream_hash, self.blob_infos
)
return defer.succeed(self.stream_hash)
# TODO: this should be run its own thread. Encrypting a large file can
@ -80,8 +54,8 @@ class EncryptedFileStreamCreator(CryptStreamCreator):
# great when sending over the network, but this is all local so
# we can simply read the file from the disk without needing to
# involve reactor.
def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=None,
iv_generator=None, suggested_file_name=None):
@defer.inlineCallbacks
def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=None, iv_generator=None):
"""Turn a plain file into an LBRY File.
An LBRY File is a collection of encrypted blobs of data and the metadata that binds them
@ -104,10 +78,6 @@ def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=Non
@param file_handle: The file-like object to read
@type file_handle: any file-like object which can be read by twisted.protocols.basic.FileSender
@param secret_pass_phrase: A string that will be used to generate the public key. If None, a
random string will be used.
@type secret_pass_phrase: string
@param key: the raw AES key which will be used to encrypt the blobs. If None, a random key will
be generated.
@type key: string
@ -116,53 +86,44 @@ def create_lbry_file(session, lbry_file_manager, file_name, file_handle, key=Non
vectors for the blobs. Will be called once for each blob.
@type iv_generator: a generator function which yields strings
@param suggested_file_name: what the file should be called when the LBRY File is saved to disk.
@type suggested_file_name: string
@return: a Deferred which fires with the stream_hash of the LBRY File
@rtype: Deferred which fires with hex-encoded string
"""
def stop_file(creator):
log.debug("the file sender has triggered its deferred. stopping the stream writer")
return creator.stop()
def make_stream_desc_file(stream_hash):
log.debug("creating the stream descriptor file")
descriptor_file_path = os.path.join(
session.db_dir, file_name + conf.settings['CRYPTSD_FILE_EXTENSION'])
descriptor_writer = PlainStreamDescriptorWriter(descriptor_file_path)
d = get_sd_info(lbry_file_manager.stream_info_manager, stream_hash, True)
d.addCallback(descriptor_writer.create_descriptor)
return d
base_file_name = os.path.basename(file_name)
file_directory = os.path.dirname(file_handle.name)
lbry_file_creator = EncryptedFileStreamCreator(
session.blob_manager,
lbry_file_manager,
base_file_name, key,
iv_generator,
suggested_file_name)
session.blob_manager, lbry_file_manager, base_file_name, key, iv_generator
)
def start_stream():
yield lbry_file_creator.setup()
# TODO: Using FileSender isn't necessary, we can just read
# straight from the disk. The stream creation process
# should be in its own thread anyway so we don't need to
# worry about interacting with the twisted reactor
file_sender = FileSender()
d = file_sender.beginFileTransfer(file_handle, lbry_file_creator)
d.addCallback(lambda _: stop_file(lbry_file_creator))
d.addCallback(lambda _: make_stream_desc_file(lbry_file_creator.stream_hash))
d.addCallback(lambda _: lbry_file_creator.stream_hash)
return d
yield file_sender.beginFileTransfer(file_handle, lbry_file_creator)
d = lbry_file_creator.setup()
d.addCallback(lambda _: start_stream())
return d
log.debug("the file sender has triggered its deferred. stopping the stream writer")
yield lbry_file_creator.stop()
log.debug("making the sd blob")
sd_info = lbry_file_creator.sd_info
descriptor_writer = BlobStreamDescriptorWriter(session.blob_manager)
sd_hash = yield descriptor_writer.create_descriptor(sd_info)
log.debug("saving the stream")
yield session.storage.store_stream(
sd_info['stream_hash'], sd_hash, sd_info['stream_name'], sd_info['key'],
sd_info['suggested_file_name'], sd_info['blobs']
)
log.debug("adding to the file manager")
lbry_file = yield lbry_file_manager.add_published_file(
sd_info['stream_hash'], sd_hash, binascii.hexlify(file_directory), session.payment_rate_manager,
session.payment_rate_manager.min_blob_data_payment_rate
)
defer.returnValue(lbry_file)
def hexlify(str_or_unicode):

View file

@ -2,18 +2,18 @@
Download LBRY Files from LBRYnet and save them to disk.
"""
import logging
import binascii
from zope.interface import implements
from twisted.internet import defer
from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager
from lbrynet.core.utils import short_hash
from lbrynet.core.StreamDescriptor import StreamMetadata
from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaver
from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileDownloader
from lbrynet.file_manager.EncryptedFileStatusReport import EncryptedFileStatusReport
from lbrynet.interfaces import IStreamDownloaderFactory
from lbrynet.lbry_file.StreamDescriptor import save_sd_info
from lbrynet.core.StreamDescriptor import save_sd_info
log = logging.getLogger(__name__)
@ -35,19 +35,41 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
STATUS_STOPPED = "stopped"
STATUS_FINISHED = "finished"
def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, blob_manager,
stream_info_manager, lbry_file_manager, payment_rate_manager, wallet,
download_directory, sd_hash=None, key=None, stream_name=None,
suggested_file_name=None):
EncryptedFileSaver.__init__(self, stream_hash, peer_finder,
rate_limiter, blob_manager,
stream_info_manager,
payment_rate_manager, wallet,
download_directory, key, stream_name, suggested_file_name)
def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, blob_manager, storage, lbry_file_manager,
payment_rate_manager, wallet, download_directory, file_name, stream_name, sd_hash, key,
suggested_file_name):
EncryptedFileSaver.__init__(
self, stream_hash, peer_finder, rate_limiter, blob_manager, storage, payment_rate_manager, wallet,
download_directory, key, stream_name, file_name
)
self.sd_hash = sd_hash
self.rowid = rowid
self.suggested_file_name = binascii.unhexlify(suggested_file_name)
self.lbry_file_manager = lbry_file_manager
self._saving_status = False
self.claim_id = None
self.outpoint = None
self.claim_name = None
self.txid = None
self.nout = None
self.channel_claim_id = None
self.channel_name = None
self.metadata = None
@defer.inlineCallbacks
def get_claim_info(self, include_supports=True):
claim_info = yield self.storage.get_content_claim(self.stream_hash, include_supports)
if claim_info:
self.claim_id = claim_info['claim_id']
self.txid = claim_info['txid']
self.nout = claim_info['nout']
self.channel_claim_id = claim_info['channel_claim_id']
self.outpoint = "%s:%i" % (self.txid, self.nout)
self.claim_name = claim_info['name']
self.channel_name = claim_info['channel_name']
self.metadata = claim_info['value']['stream']['metadata']
defer.returnValue(claim_info)
@property
def saving_status(self):
@ -77,8 +99,8 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
@defer.inlineCallbacks
def status(self):
blobs = yield self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
blob_hashes = [b[0] for b in blobs if b[0] is not None]
blobs = yield self.storage.get_blobs_for_stream(self.stream_hash)
blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None]
completed_blobs = yield self.blob_manager.completed_blobs(blob_hashes)
num_blobs_completed = len(completed_blobs)
num_blobs_known = len(blob_hashes)
@ -89,8 +111,9 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
status = "stopped"
else:
status = "running"
defer.returnValue(EncryptedFileStatusReport(self.file_name, num_blobs_completed,
num_blobs_known, status))
defer.returnValue(EncryptedFileStatusReport(
self.file_name, num_blobs_completed, num_blobs_known, status
))
@defer.inlineCallbacks
def _start(self):
@ -137,19 +160,16 @@ class ManagedEncryptedFileDownloaderFactory(object):
return True
@defer.inlineCallbacks
def make_downloader(self, metadata, options, payment_rate_manager, download_directory=None):
assert len(options) == 1
data_rate = options[0]
stream_hash = yield save_sd_info(self.lbry_file_manager.stream_info_manager,
metadata.validator.raw_info)
if metadata.metadata_source == StreamMetadata.FROM_BLOB:
yield self.lbry_file_manager.save_sd_blob_hash_to_stream(stream_hash,
metadata.source_blob_hash)
lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash,
def make_downloader(self, metadata, data_rate, payment_rate_manager, download_directory, file_name=None):
stream_hash = yield save_sd_info(self.lbry_file_manager.session.blob_manager,
metadata.source_blob_hash,
payment_rate_manager,
data_rate,
download_directory)
metadata.validator.raw_info)
if file_name:
file_name = binascii.hexlify(file_name)
lbry_file = yield self.lbry_file_manager.add_downloaded_file(
stream_hash, metadata.source_blob_hash, binascii.hexlify(download_directory), payment_rate_manager,
data_rate, file_name=file_name
)
defer.returnValue(lbry_file)
@staticmethod

View file

@ -1,9 +1,8 @@
"""
Keep track of which LBRY Files are downloading and store their LBRY File specific metadata
"""
import logging
import os
import logging
from twisted.internet import defer, task, reactor
from twisted.python.failure import Failure
@ -12,7 +11,7 @@ from lbrynet.reflector.reupload import reflect_stream
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType, get_sd_info
from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info
from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError
from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
@ -30,38 +29,33 @@ class EncryptedFileManager(object):
# when reflecting files, reflect up to this many files at a time
CONCURRENT_REFLECTS = 5
def __init__(self, session, stream_info_manager, sd_identifier, download_directory=None):
def __init__(self, session, sd_identifier):
self.auto_re_reflect = conf.settings['reflect_uploads']
self.auto_re_reflect_interval = conf.settings['auto_re_reflect_interval']
self.session = session
self.stream_info_manager = stream_info_manager
self.storage = session.storage
# TODO: why is sd_identifier part of the file manager?
self.sd_identifier = sd_identifier
assert sd_identifier
self.lbry_files = []
if download_directory:
self.download_directory = download_directory
else:
self.download_directory = os.getcwd()
self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files)
log.debug("Download directory for EncryptedFileManager: %s", str(self.download_directory))
@defer.inlineCallbacks
def setup(self):
yield self.stream_info_manager.setup()
yield self._add_to_sd_identifier()
yield self._start_lbry_files()
log.info("Started file manager")
def get_lbry_file_status(self, lbry_file):
return self._get_lbry_file_status(lbry_file.rowid)
return self.session.storage.get_lbry_file_status(lbry_file.rowid)
def set_lbry_file_data_payment_rate(self, lbry_file, new_rate):
return self._set_lbry_file_payment_rate(lbry_file.rowid, new_rate)
return self.session.storage(lbry_file.rowid, new_rate)
def change_lbry_file_status(self, lbry_file, status):
log.debug("Changing status of %s to %s", lbry_file.stream_hash, status)
return self._change_file_status(lbry_file.rowid, status)
return self.session.storage.change_file_status(lbry_file.rowid, status)
def get_lbry_file_status_reports(self):
ds = []
@ -77,58 +71,54 @@ class EncryptedFileManager(object):
dl.addCallback(filter_failures)
return dl
def save_sd_blob_hash_to_stream(self, stream_hash, sd_hash):
return self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, sd_hash)
def _add_to_sd_identifier(self):
downloader_factory = ManagedEncryptedFileDownloaderFactory(self)
self.sd_identifier.add_stream_downloader_factory(
EncryptedFileStreamType, downloader_factory)
def _get_lbry_file(self, rowid, stream_hash, payment_rate_manager, sd_hash, key,
stream_name, suggested_file_name, download_directory=None):
download_directory = download_directory or self.download_directory
payment_rate_manager = payment_rate_manager or self.session.payment_rate_manager
stream_name, file_name, download_directory, suggested_file_name):
return ManagedEncryptedFileDownloader(
rowid,
stream_hash,
self.session.peer_finder,
self.session.rate_limiter,
self.session.blob_manager,
self.stream_info_manager,
self.session.storage,
self,
payment_rate_manager,
self.session.wallet,
download_directory,
file_name,
stream_name=stream_name,
sd_hash=sd_hash,
key=key,
stream_name=stream_name,
suggested_file_name=suggested_file_name
)
@defer.inlineCallbacks
def _start_lbry_files(self):
files_and_options = yield self._get_all_lbry_files()
stream_infos = yield self.stream_info_manager._get_all_stream_infos()
files = yield self.session.storage.get_all_lbry_files()
b_prm = self.session.base_payment_rate_manager
payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
log.info("Trying to start %i files", len(stream_infos))
for i, (rowid, stream_hash, blob_data_rate, status) in enumerate(files_and_options):
if len(files_and_options) > 500 and i % 500 == 0:
log.info("Started %i/%i files", i, len(stream_infos))
if stream_hash in stream_infos:
lbry_file = self._get_lbry_file(rowid, stream_hash, payment_rate_manager,
stream_infos[stream_hash]['sd_hash'],
stream_infos[stream_hash]['key'],
stream_infos[stream_hash]['stream_name'],
stream_infos[stream_hash]['suggested_file_name'])
log.info("initialized file %s", lbry_file.stream_name)
log.info("Trying to start %i files", len(files))
for i, file_info in enumerate(files):
if len(files) > 500 and i % 500 == 0:
log.info("Started %i/%i files", i, len(files))
lbry_file = self._get_lbry_file(
file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
file_info['suggested_file_name']
)
yield lbry_file.get_claim_info()
try:
# restore will raise an Exception if status is unknown
lbry_file.restore(status)
lbry_file.restore(file_info['status'])
self.lbry_files.append(lbry_file)
except Exception:
log.warning("Failed to start %i", rowid)
log.warning("Failed to start %i", file_info['rowid'])
continue
log.info("Started %i lbry files", len(self.lbry_files))
if self.auto_re_reflect is True:
@ -157,17 +147,46 @@ class EncryptedFileManager(object):
yield self._stop_lbry_file(lbry_file)
@defer.inlineCallbacks
def add_lbry_file(self, stream_hash, sd_hash, payment_rate_manager=None, blob_data_rate=None,
download_directory=None, status=None):
rowid = yield self._save_lbry_file(stream_hash, blob_data_rate)
stream_metadata = yield get_sd_info(self.stream_info_manager,
stream_hash, False)
def add_published_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager, blob_data_rate):
status = ManagedEncryptedFileDownloader.STATUS_FINISHED
stream_metadata = yield get_sd_info(self.session.storage, stream_hash, include_blobs=False)
key = stream_metadata['key']
stream_name = stream_metadata['stream_name']
suggested_file_name = stream_metadata['suggested_file_name']
lbry_file = self._get_lbry_file(rowid, stream_hash, payment_rate_manager, sd_hash, key,
stream_name, suggested_file_name, download_directory)
lbry_file.restore(status or ManagedEncryptedFileDownloader.STATUS_STOPPED)
file_name = stream_metadata['suggested_file_name']
rowid = yield self.storage.save_published_file(
stream_hash, file_name, download_directory, blob_data_rate, status
)
lbry_file = self._get_lbry_file(
rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory,
stream_metadata['suggested_file_name']
)
lbry_file.restore(status)
self.lbry_files.append(lbry_file)
defer.returnValue(lbry_file)
@defer.inlineCallbacks
def add_downloaded_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager=None,
blob_data_rate=None, status=None, file_name=None):
status = status or ManagedEncryptedFileDownloader.STATUS_STOPPED
payment_rate_manager = payment_rate_manager or self.session.payment_rate_manager
blob_data_rate = blob_data_rate or payment_rate_manager.min_blob_data_payment_rate
stream_metadata = yield get_sd_info(self.session.storage, stream_hash, include_blobs=False)
key = stream_metadata['key']
stream_name = stream_metadata['stream_name']
file_name = file_name or stream_metadata['suggested_file_name']
# when we save the file we'll atomic touch the nearest file to the suggested file name
# that doesn't yet exist in the download directory
rowid = yield self.storage.save_downloaded_file(
stream_hash, os.path.basename(file_name.decode('hex')).encode('hex'), download_directory, blob_data_rate
)
file_name = yield self.session.storage.get_filename_for_rowid(rowid)
lbry_file = self._get_lbry_file(
rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory,
stream_metadata['suggested_file_name']
)
lbry_file.get_claim_info(include_supports=False)
lbry_file.restore(status)
self.lbry_files.append(lbry_file)
defer.returnValue(lbry_file)
@ -191,22 +210,8 @@ class EncryptedFileManager(object):
self.lbry_files.remove(lbry_file)
yield self._delete_lbry_file_options(lbry_file.rowid)
yield lbry_file.delete_data()
# TODO: delete this
# get count for stream hash returns the count of the lbry files with the stream hash
# in the lbry_file_options table, which will soon be removed.
stream_count = yield self.get_count_for_stream_hash(lbry_file.stream_hash)
if stream_count == 0:
yield self.stream_info_manager.delete_stream(lbry_file.stream_hash)
else:
msg = ("Can't delete stream info for %s, count is %i\n"
"The call that resulted in this warning will\n"
"be removed in the database refactor")
log.warning(msg, lbry_file.stream_hash, stream_count)
yield self.session.storage.delete_stream(lbry_file.stream_hash)
if delete_file and os.path.isfile(full_path):
os.remove(full_path)
@ -234,30 +239,3 @@ class EncryptedFileManager(object):
yield defer.DeferredList(list(self._stop_lbry_files()))
log.info("Stopped encrypted file manager")
defer.returnValue(True)
def get_count_for_stream_hash(self, stream_hash):
return self._get_count_for_stream_hash(stream_hash)
def _get_count_for_stream_hash(self, stream_hash):
return self.stream_info_manager._get_count_for_stream_hash(stream_hash)
def _delete_lbry_file_options(self, rowid):
return self.stream_info_manager._delete_lbry_file_options(rowid)
def _save_lbry_file(self, stream_hash, data_payment_rate):
return self.stream_info_manager._save_lbry_file(stream_hash, data_payment_rate)
def _get_all_lbry_files(self):
return self.stream_info_manager._get_all_lbry_files()
def _get_rowid_for_stream_hash(self, stream_hash):
return self.stream_info_manager._get_rowid_for_stream_hash(stream_hash)
def _change_file_status(self, rowid, status):
return self.stream_info_manager._change_file_status(rowid, status)
def _set_lbry_file_payment_rate(self, rowid, new_rate):
return self.stream_info_manager._set_lbry_file_payment_rate(rowid, new_rate)
def _get_lbry_file_status(self, rowid):
return self.stream_info_manager._get_lbry_file_status(rowid)

View file

@ -1,378 +0,0 @@
import os
import logging
import sqlite3
from twisted.internet import defer
from twisted.python.failure import Failure
from twisted.enterprise import adbapi
from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash, NoSuchSDHash
from lbrynet.core.sqlite_helpers import rerun_if_locked
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
log = logging.getLogger(__name__)
class DBEncryptedFileMetadataManager(object):
"""Store and provide access to LBRY file metadata using sqlite"""
def __init__(self, db_dir, file_name=None):
self.db_dir = db_dir
self._db_file_name = file_name or "lbryfile_info.db"
self.db_conn = adbapi.ConnectionPool("sqlite3", os.path.join(self.db_dir,
self._db_file_name),
check_same_thread=False)
def setup(self):
return self._open_db()
def stop(self):
self.db_conn.close()
return defer.succeed(True)
def get_all_streams(self):
return self._get_all_streams()
def save_stream(self, stream_hash, file_name, key, suggested_file_name, blobs):
d = self._store_stream(stream_hash, file_name, key, suggested_file_name)
d.addCallback(lambda _: self.add_blobs_to_stream(stream_hash, blobs))
return d
def get_stream_info(self, stream_hash):
return self._get_stream_info(stream_hash)
def check_if_stream_exists(self, stream_hash):
return self._check_if_stream_exists(stream_hash)
def delete_stream(self, stream_hash):
return self._delete_stream(stream_hash)
def add_blobs_to_stream(self, stream_hash, blobs):
return self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True)
def get_blobs_for_stream(self, stream_hash, start_blob=None,
end_blob=None, count=None, reverse=False):
log.debug("Getting blobs for stream %s. Count is %s", stream_hash, count)
def get_positions_of_start_and_end():
if start_blob is not None:
d1 = self._get_blob_num_by_hash(stream_hash, start_blob)
else:
d1 = defer.succeed(None)
if end_blob is not None:
d2 = self._get_blob_num_by_hash(stream_hash, end_blob)
else:
d2 = defer.succeed(None)
dl = defer.DeferredList([d1, d2])
def get_positions(results):
start_num = None
end_num = None
if results[0][0] is True:
start_num = results[0][1]
if results[1][0] is True:
end_num = results[1][1]
return start_num, end_num
dl.addCallback(get_positions)
return dl
def get_blob_infos(nums):
start_num, end_num = nums
return self._get_further_blob_infos(stream_hash, start_num, end_num,
count, reverse)
d = get_positions_of_start_and_end()
d.addCallback(get_blob_infos)
return d
def get_stream_of_blob(self, blob_hash):
return self._get_stream_of_blobhash(blob_hash)
def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
return self._save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash)
def get_sd_blob_hashes_for_stream(self, stream_hash):
return self._get_sd_blob_hashes_for_stream(stream_hash)
def get_stream_hash_for_sd_hash(self, sd_hash):
return self._get_stream_hash_for_sd_blob_hash(sd_hash)
@staticmethod
def _create_tables(transaction):
transaction.execute("create table if not exists lbry_files (" +
" stream_hash text primary key, " +
" key text, " +
" stream_name text, " +
" suggested_file_name text" +
")")
transaction.execute("create table if not exists lbry_file_blobs (" +
" blob_hash text, " +
" stream_hash text, " +
" position integer, " +
" iv text, " +
" length integer, " +
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
transaction.execute("create table if not exists lbry_file_descriptors (" +
" sd_blob_hash TEXT PRIMARY KEY, " +
" stream_hash TEXT, " +
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
transaction.execute("create table if not exists lbry_file_options (" +
" blob_data_rate real, " +
" status text," +
" stream_hash text,"
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
transaction.execute("create table if not exists lbry_file_metadata (" +
" lbry_file integer primary key, " +
" txid text, " +
" n integer, " +
" foreign key(lbry_file) references lbry_files(rowid)"
")")
def _open_db(self):
# check_same_thread=False is solely to quiet a spurious error that appears to be due
# to a bug in twisted, where the connection is closed by a different thread than the
# one that opened it. The individual connections in the pool are not used in multiple
# threads.
return self.db_conn.runInteraction(self._create_tables)
@rerun_if_locked
@defer.inlineCallbacks
def get_file_outpoint(self, rowid):
result = yield self.db_conn.runQuery("select txid, n from lbry_file_metadata "
"where lbry_file=?", (rowid, ))
response = None
if result:
txid, nout = result[0]
if txid is not None and nout is not None:
response = "%s:%i" % (txid, nout)
defer.returnValue(response)
@rerun_if_locked
@defer.inlineCallbacks
def save_outpoint_to_file(self, rowid, txid, nout):
existing_outpoint = yield self.get_file_outpoint(rowid)
if not existing_outpoint:
yield self.db_conn.runOperation("insert into lbry_file_metadata values "
"(?, ?, ?)", (rowid, txid, nout))
@rerun_if_locked
def _delete_stream(self, stream_hash):
d = self.db_conn.runQuery(
"select rowid, stream_hash from lbry_files where stream_hash = ?", (stream_hash,))
d.addCallback(
lambda result: result[0] if result else Failure(NoSuchStreamHash(stream_hash)))
def do_delete(transaction, row_id, s_h):
transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,))
transaction.execute("delete from lbry_file_blobs where stream_hash = ?", (s_h,))
transaction.execute("delete from lbry_file_descriptors where stream_hash = ?", (s_h,))
transaction.execute("delete from lbry_file_metadata where lbry_file = ?", (row_id,))
d.addCallback(lambda (row_id, s_h): self.db_conn.runInteraction(do_delete, row_id, s_h))
return d
@rerun_if_locked
def _store_stream(self, stream_hash, name, key, suggested_file_name):
d = self.db_conn.runQuery("insert into lbry_files values (?, ?, ?, ?)",
(stream_hash, key, name, suggested_file_name))
def check_duplicate(err):
if err.check(sqlite3.IntegrityError):
raise DuplicateStreamHashError(stream_hash)
return err
d.addErrback(check_duplicate)
return d
@rerun_if_locked
def _get_all_streams(self):
d = self.db_conn.runQuery("select stream_hash from lbry_files")
d.addCallback(lambda results: [r[0] for r in results])
return d
@rerun_if_locked
def _get_stream_info(self, stream_hash):
def get_result(res):
if res:
return res[0]
else:
raise NoSuchStreamHash(stream_hash)
d = self.db_conn.runQuery(
"select key, stream_name, suggested_file_name from lbry_files where stream_hash = ?",
(stream_hash,))
d.addCallback(get_result)
return d
@rerun_if_locked
@defer.inlineCallbacks
def _get_all_stream_infos(self):
file_results = yield self.db_conn.runQuery("select rowid, * from lbry_files")
descriptor_results = yield self.db_conn.runQuery("select stream_hash, sd_blob_hash "
"from lbry_file_descriptors")
response = {}
for (stream_hash, sd_hash) in descriptor_results:
if stream_hash in response:
log.warning("Duplicate stream %s (sd: %s)", stream_hash, sd_hash[:16])
continue
response[stream_hash] = {
'sd_hash': sd_hash
}
for (rowid, stream_hash, key, stream_name, suggested_file_name) in file_results:
if stream_hash not in response:
log.warning("Missing sd hash for %s", stream_hash)
continue
response[stream_hash]['rowid'] = rowid
response[stream_hash]['key'] = key
response[stream_hash]['stream_name'] = stream_name
response[stream_hash]['suggested_file_name'] = suggested_file_name
defer.returnValue(response)
@rerun_if_locked
def _check_if_stream_exists(self, stream_hash):
d = self.db_conn.runQuery(
"select stream_hash from lbry_files where stream_hash = ?", (stream_hash,))
d.addCallback(lambda r: True if len(r) else False)
return d
@rerun_if_locked
def _get_blob_num_by_hash(self, stream_hash, blob_hash):
d = self.db_conn.runQuery(
"select position from lbry_file_blobs where stream_hash = ? and blob_hash = ?",
(stream_hash, blob_hash))
d.addCallback(lambda r: r[0][0] if len(r) else None)
return d
@rerun_if_locked
def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False):
params = []
q_string = "select * from ("
q_string += " select blob_hash, position, iv, length from lbry_file_blobs "
q_string += " where stream_hash = ? "
params.append(stream_hash)
if start_num is not None:
q_string += " and position > ? "
params.append(start_num)
if end_num is not None:
q_string += " and position < ? "
params.append(end_num)
q_string += " order by position "
if reverse is True:
q_string += " DESC "
if count is not None:
q_string += " limit ? "
params.append(count)
q_string += ") order by position"
# Order by position is done twice so that it always returns them from lowest position to
# greatest, but the limit by clause can select the 'count' greatest or 'count' least
return self.db_conn.runQuery(q_string, tuple(params))
@rerun_if_locked
def _add_blobs_to_stream(self, stream_hash, blob_infos, ignore_duplicate_error=False):
def add_blobs(transaction):
for blob_info in blob_infos:
try:
transaction.execute("insert into lbry_file_blobs values (?, ?, ?, ?, ?)",
(blob_info.blob_hash, stream_hash, blob_info.blob_num,
blob_info.iv, blob_info.length))
except sqlite3.IntegrityError:
if ignore_duplicate_error is False:
raise
return self.db_conn.runInteraction(add_blobs)
@rerun_if_locked
def _get_stream_of_blobhash(self, blob_hash):
d = self.db_conn.runQuery("select stream_hash from lbry_file_blobs where blob_hash = ?",
(blob_hash,))
d.addCallback(lambda r: r[0][0] if len(r) else None)
return d
@rerun_if_locked
def _save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
d = self.db_conn.runOperation("insert or ignore into lbry_file_descriptors values (?, ?)",
(sd_blob_hash, stream_hash))
d.addCallback(lambda _: log.info("Saved sd blob hash %s to stream hash %s",
str(sd_blob_hash), str(stream_hash)))
return d
@rerun_if_locked
def _get_sd_blob_hashes_for_stream(self, stream_hash):
log.debug("Looking up sd blob hashes for stream hash %s", str(stream_hash))
d = self.db_conn.runQuery(
"select sd_blob_hash from lbry_file_descriptors where stream_hash = ?",
(stream_hash,))
d.addCallback(lambda results: [r[0] for r in results])
return d
@rerun_if_locked
def _get_stream_hash_for_sd_blob_hash(self, sd_blob_hash):
def _handle_result(result):
if not result:
raise NoSuchSDHash(sd_blob_hash)
return result[0][0]
log.debug("Looking up sd blob hashes for sd blob hash %s", str(sd_blob_hash))
d = self.db_conn.runQuery(
"select stream_hash from lbry_file_descriptors where sd_blob_hash = ?",
(sd_blob_hash,))
d.addCallback(_handle_result)
return d
# used by lbry file manager
@rerun_if_locked
def _save_lbry_file(self, stream_hash, data_payment_rate):
def do_save(db_transaction):
row = (data_payment_rate, ManagedEncryptedFileDownloader.STATUS_STOPPED, stream_hash)
db_transaction.execute("insert into lbry_file_options values (?, ?, ?)", row)
return db_transaction.lastrowid
return self.db_conn.runInteraction(do_save)
@rerun_if_locked
def _delete_lbry_file_options(self, rowid):
return self.db_conn.runQuery("delete from lbry_file_options where rowid = ?",
(rowid,))
@rerun_if_locked
def _set_lbry_file_payment_rate(self, rowid, new_rate):
return self.db_conn.runQuery(
"update lbry_file_options set blob_data_rate = ? where rowid = ?",
(new_rate, rowid))
@rerun_if_locked
def _get_all_lbry_files(self):
d = self.db_conn.runQuery("select rowid, stream_hash, blob_data_rate, status "
"from lbry_file_options")
return d
@rerun_if_locked
def _change_file_status(self, rowid, new_status):
d = self.db_conn.runQuery("update lbry_file_options set status = ? where rowid = ?",
(new_status, rowid))
d.addCallback(lambda _: new_status)
return d
@rerun_if_locked
def _get_lbry_file_status(self, rowid):
d = self.db_conn.runQuery("select status from lbry_file_options where rowid = ?",
(rowid,))
d.addCallback(lambda r: (r[0][0] if len(r) else None))
return d
@rerun_if_locked
def _get_count_for_stream_hash(self, stream_hash):
d = self.db_conn.runQuery("select count(*) from lbry_file_options where stream_hash = ?",
(stream_hash,))
d.addCallback(lambda r: (r[0][0] if r else 0))
return d
@rerun_if_locked
def _get_rowid_for_stream_hash(self, stream_hash):
d = self.db_conn.runQuery("select rowid from lbry_file_options where stream_hash = ?",
(stream_hash,))
d.addCallback(lambda r: (r[0][0] if len(r) else None))
return d

View file

@ -2,10 +2,9 @@ import binascii
from zope.interface import implements
from lbrynet.lbry_file.StreamDescriptor import save_sd_info
from lbrynet.core.StreamDescriptor import save_sd_info
from lbrynet.cryptstream.client.CryptStreamDownloader import CryptStreamDownloader
from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager
from lbrynet.core.StreamDescriptor import StreamMetadata
from lbrynet.interfaces import IStreamDownloaderFactory
from lbrynet.lbry_file.client.EncryptedFileMetadataHandler import EncryptedFileMetadataHandler
import os
@ -21,39 +20,21 @@ class EncryptedFileDownloader(CryptStreamDownloader):
"""Classes which inherit from this class download LBRY files"""
def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager,
stream_info_manager, payment_rate_manager, wallet, key, stream_name,
suggested_file_name=None):
storage, payment_rate_manager, wallet, key, stream_name, file_name):
CryptStreamDownloader.__init__(self, peer_finder, rate_limiter, blob_manager,
payment_rate_manager, wallet, key, stream_name)
self.stream_hash = stream_hash
self.stream_info_manager = stream_info_manager
self.suggested_file_name = binascii.unhexlify(suggested_file_name)
self.storage = storage
self.file_name = binascii.unhexlify(os.path.basename(file_name))
self._calculated_total_bytes = None
@defer.inlineCallbacks
def delete_data(self):
d1 = self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
def get_blob_hashes(blob_infos):
return [b[0] for b in blob_infos if b[0] is not None]
d1.addCallback(get_blob_hashes)
d2 = self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
def combine_blob_hashes(results):
blob_hashes = []
for success, result in results:
if success is True:
blob_hashes.extend(result)
return blob_hashes
def delete_blobs(blob_hashes):
self.blob_manager.delete_blobs(blob_hashes)
return True
dl = defer.DeferredList([d1, d2], fireOnOneErrback=True)
dl.addCallback(combine_blob_hashes)
dl.addCallback(delete_blobs)
return dl
crypt_infos = yield self.storage.get_blobs_for_stream(self.stream_hash)
blob_hashes = [b.blob_hash for b in crypt_infos if b.blob_hash]
sd_hash = yield self.storage.get_sd_blob_hash_for_stream(self.stream_hash)
blob_hashes.append(sd_hash)
yield self.blob_manager.delete_blobs(blob_hashes)
def stop(self, err=None):
d = self._close_output()
@ -76,10 +57,10 @@ class EncryptedFileDownloader(CryptStreamDownloader):
pass
def get_total_bytes(self):
d = self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
d = self.storage.get_blobs_for_stream(self.stream_hash)
def calculate_size(blobs):
return sum([b[3] for b in blobs])
return sum([b.length for b in blobs])
d.addCallback(calculate_size)
return d
@ -106,18 +87,17 @@ class EncryptedFileDownloader(CryptStreamDownloader):
def _get_metadata_handler(self, download_manager):
return EncryptedFileMetadataHandler(self.stream_hash,
self.stream_info_manager, download_manager)
self.storage, download_manager)
class EncryptedFileDownloaderFactory(object):
implements(IStreamDownloaderFactory)
def __init__(self, peer_finder, rate_limiter, blob_manager, stream_info_manager,
wallet):
def __init__(self, peer_finder, rate_limiter, blob_manager, storage, wallet):
self.peer_finder = peer_finder
self.rate_limiter = rate_limiter
self.blob_manager = blob_manager
self.stream_info_manager = stream_info_manager
self.storage = storage
self.wallet = wallet
def can_download(self, sd_validator):
@ -129,22 +109,14 @@ class EncryptedFileDownloaderFactory(object):
payment_rate_manager.min_blob_data_payment_rate = data_rate
def save_source_if_blob(stream_hash):
if metadata.metadata_source == StreamMetadata.FROM_BLOB:
d = self.stream_info_manager.save_sd_blob_hash_to_stream(
stream_hash, metadata.source_blob_hash)
else:
d = defer.succeed(True)
d.addCallback(lambda _: stream_hash)
return d
return defer.succeed(metadata.source_blob_hash)
def create_downloader(stream_hash):
downloader = self._make_downloader(stream_hash, payment_rate_manager,
metadata.validator.raw_info)
d = downloader.set_stream_info()
d.addCallback(lambda _: downloader)
return d
return defer.succeed(downloader)
d = save_sd_info(self.stream_info_manager, metadata.validator.raw_info)
d = save_sd_info(self.blob_manager, metadata.source_blob_hash, metadata.validator.raw_info)
d.addCallback(save_source_if_blob)
d.addCallback(create_downloader)
return d
@ -154,26 +126,20 @@ class EncryptedFileDownloaderFactory(object):
class EncryptedFileSaver(EncryptedFileDownloader):
def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
payment_rate_manager, wallet, download_directory, key, stream_name,
suggested_file_name):
def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, storage, payment_rate_manager, wallet,
download_directory, key, stream_name, file_name):
EncryptedFileDownloader.__init__(self, stream_hash, peer_finder, rate_limiter,
blob_manager, stream_info_manager, payment_rate_manager,
wallet, key, stream_name, suggested_file_name)
self.download_directory = download_directory
self.file_name = os.path.basename(self.suggested_file_name)
self.file_written_to = None
blob_manager, storage, payment_rate_manager,
wallet, key, stream_name, file_name)
self.download_directory = binascii.unhexlify(download_directory)
self.file_written_to = os.path.join(self.download_directory, binascii.unhexlify(file_name))
self.file_handle = None
def __str__(self):
if self.file_written_to is not None:
return str(self.file_written_to)
else:
return str(self.file_name)
def stop(self, err=None):
d = EncryptedFileDownloader.stop(self, err=err)
d.addCallback(lambda _: self._delete_from_info_manager())
return d
def _get_progress_manager(self, download_manager):
@ -184,34 +150,16 @@ class EncryptedFileSaver(EncryptedFileDownloader):
def _setup_output(self):
def open_file():
if self.file_handle is None:
file_name = self.file_name
if not file_name:
file_name = "_"
if os.path.exists(os.path.join(self.download_directory, file_name)):
ext_num = 1
def _get_file_name(ext):
if len(file_name.split(".")):
fn = ''.join(file_name.split(".")[:-1])
file_ext = ''.join(file_name.split(".")[-1])
return fn + "-" + str(ext) + "." + file_ext
else:
return file_name + "_" + str(ext)
while os.path.exists(os.path.join(self.download_directory,
_get_file_name(ext_num))):
ext_num += 1
file_name = _get_file_name(ext_num)
file_written_to = os.path.join(self.download_directory, self.file_name)
try:
self.file_handle = open(os.path.join(self.download_directory, file_name), 'wb')
self.file_written_to = os.path.join(self.download_directory, file_name)
self.file_handle = open(file_written_to, 'wb')
self.file_written_to = file_written_to
except IOError:
log.error(traceback.format_exc())
raise ValueError(
"Failed to open %s. Make sure you have permission to save files to that"
" location." %
os.path.join(self.download_directory, file_name))
" location." % file_written_to
)
return threads.deferToThread(open_file)
def _close_output(self):
@ -232,26 +180,20 @@ class EncryptedFileSaver(EncryptedFileDownloader):
self.file_handle.write(data)
return write_func
def _delete_from_info_manager(self):
return self.stream_info_manager.delete_stream(self.stream_hash)
class EncryptedFileSaverFactory(EncryptedFileDownloaderFactory):
def __init__(self, peer_finder, rate_limiter, blob_manager, stream_info_manager,
wallet, download_directory):
EncryptedFileDownloaderFactory.__init__(self, peer_finder, rate_limiter, blob_manager,
stream_info_manager, wallet)
self.download_directory = download_directory
def __init__(self, peer_finder, rate_limiter, blob_manager, storage, wallet, download_directory):
EncryptedFileDownloaderFactory.__init__(self, peer_finder, rate_limiter, blob_manager, storage, wallet)
self.download_directory = binascii.hexlify(download_directory)
def _make_downloader(self, stream_hash, payment_rate_manager, stream_info):
stream_name = stream_info.raw_info['stream_name']
key = stream_info.raw_info['key']
suggested_file_name = stream_info.raw_info['suggested_file_name']
return EncryptedFileSaver(stream_hash, self.peer_finder, self.rate_limiter,
self.blob_manager, self.stream_info_manager,
payment_rate_manager, self.wallet, self.download_directory,
key=key, stream_name=stream_name,
suggested_file_name=suggested_file_name)
return EncryptedFileSaver(
stream_hash, self.peer_finder, self.rate_limiter, self.blob_manager, self.storage, payment_rate_manager,
self.wallet, self.download_directory, key=key, stream_name=stream_name, file_name=suggested_file_name
)
@staticmethod
def get_description():

View file

@ -1,7 +1,6 @@
import logging
from zope.interface import implements
from twisted.internet import defer
from lbrynet.cryptstream.CryptBlob import CryptBlobInfo
from lbrynet.interfaces import IMetadataHandler
@ -11,9 +10,9 @@ log = logging.getLogger(__name__)
class EncryptedFileMetadataHandler(object):
implements(IMetadataHandler)
def __init__(self, stream_hash, stream_info_manager, download_manager):
def __init__(self, stream_hash, storage, download_manager):
self.stream_hash = stream_hash
self.stream_info_manager = stream_info_manager
self.storage = storage
self.download_manager = download_manager
self._final_blob_num = None
@ -21,7 +20,7 @@ class EncryptedFileMetadataHandler(object):
@defer.inlineCallbacks
def get_initial_blobs(self):
blob_infos = yield self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
blob_infos = yield self.storage.get_blobs_for_stream(self.stream_hash)
formatted_infos = self._format_initial_blobs_for_download_manager(blob_infos)
defer.returnValue(formatted_infos)
@ -32,12 +31,13 @@ class EncryptedFileMetadataHandler(object):
def _format_initial_blobs_for_download_manager(self, blob_infos):
infos = []
for i, (blob_hash, blob_num, iv, length) in enumerate(blob_infos):
if blob_hash is not None and length:
infos.append(CryptBlobInfo(blob_hash, blob_num, length, iv))
for i, crypt_blob in enumerate(blob_infos):
if crypt_blob.blob_hash is not None and crypt_blob.length:
infos.append(crypt_blob)
else:
if i != len(blob_infos) - 1:
raise Exception("Invalid stream terminator")
log.debug("Setting _final_blob_num to %s", str(blob_num - 1))
self._final_blob_num = blob_num - 1
raise Exception("Invalid stream terminator: %i of %i" %
(i, len(blob_infos) - 1))
log.debug("Setting _final_blob_num to %s", str(crypt_blob.blob_num - 1))
self._final_blob_num = crypt_blob.blob_num - 1
return infos