Merge pull request #477 from lbryio/inlinecallbacks-refactor

Refactor publish, file manager, and file downloader
This commit is contained in:
Jack Robison 2017-02-13 17:42:54 -05:00 committed by GitHub
commit aa8375673b
13 changed files with 555 additions and 677 deletions

View file

@ -16,7 +16,16 @@ at anytime.
## [0.8.3rc0] - 2017-02-10
### Changed
* add uri to stream reflector to de-obfuscate reflector logs
* Convert EncryptedFileDownloader to inlineCallbacks
* Convert EncryptedFileManager to use inlineCallbacks
* Convert Daemon._delete_lbry_file to inlineCallbacks
* Add uri to stream reflector to de-obfuscate reflector logs
* Simplify lbrynet.lbrynet_daemon.Publisher
* Reflect streams in file manager looping call rather than in each file
* Convert GetStream to inclineCallbacks
* Change callback condition in GetStream to the first data blob completing
* Add local and remote heights to blockchain status
### Fixed
* Fix recursion depth error upon failed blob
* Call stopProducing in reflector client file_sender when uploading is done
@ -24,6 +33,10 @@ at anytime.
* Fixed file_delete not deleting data from stream_info_manager [#470](https://github.com/lbryio/lbry/issues/470)
* Fixed upload of bug reports to Slack ([#472](https://github.com/lbryio/lbry/issues/472))
* Fixed claim updates [#473](https://github.com/lbryio/lbry/issues/473)
* Handle ConnectionLost error in reflector client
* Fix updating a claim where the stream doesn't change
* Fix claim_abandon
## [0.8.1] - 2017-02-01
### Changed

View file

@ -482,7 +482,7 @@ class Wallet(object):
return defer.succeed(None)
if 'error' in result:
log.warning("Got an error looking up a name: %s", result['error'])
log.warning("Got an error looking up lbry://%s: %s", name, result['error'])
return Failure(UnknownNameError(name))
_check_result_fields(result)
try:
@ -657,18 +657,20 @@ class Wallet(object):
yield self._save_name_metadata(name, claim_outpoint, _metadata['sources']['lbry_sd_hash'])
defer.returnValue(claim)
@defer.inlineCallbacks
def abandon_claim(self, txid, nout):
def _parse_abandon_claim_out(claim_out):
if not claim_out['success']:
msg = 'Abandon of {}:{} failed: {}'.format(txid, nout, claim_out['resason'])
msg = 'Abandon of {}:{} failed: {}'.format(txid, nout, claim_out['reason'])
raise Exception(msg)
claim_out = self._process_claim_out(claim_out)
log.info("Abandoned claim tx %s (n: %i) --> %s", txid, nout, claim_out)
return defer.succeed(claim_out)
claim_outpoint = ClaimOutpoint(txid, nout)
d = self._abandon_claim(claim_outpoint)
d.addCallback(lambda claim_out: _parse_abandon_claim_out(claim_out))
return d
claim_out = yield self._abandon_claim(claim_outpoint)
result = yield _parse_abandon_claim_out(claim_out)
defer.returnValue(result)
def support_claim(self, name, claim_id, amount):
def _parse_support_claim_out(claim_out):

View file

@ -1,6 +1,8 @@
import os
import sys
import subprocess
from contextlib import contextmanager
def start(path):
"""
@ -33,3 +35,18 @@ def reveal(path):
subprocess.Popen(['xdg-open', os.path.dirname(path)])
elif sys.platform == 'win32':
subprocess.Popen(['explorer', '/select', path])
@contextmanager
def get_read_handle(path):
"""
Get os independent read handle for a file
"""
if os.name == "nt":
file_mode = 'rb'
else:
file_mode = 'r'
read_handle = open(path, file_mode)
yield read_handle
read_handle.close()

View file

@ -80,19 +80,13 @@ def get_sd_info(stream_info_manager, stream_hash, include_blobs):
return d
@defer.inlineCallbacks
def publish_sd_blob(stream_info_manager, blob_manager, stream_hash):
descriptor_writer = BlobStreamDescriptorWriter(blob_manager)
d = get_sd_info(stream_info_manager, stream_hash, True)
d.addCallback(descriptor_writer.create_descriptor)
def add_sd_blob_to_stream(sd_blob_hash):
d = stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash)
d.addCallback(lambda _: sd_blob_hash)
return d
d.addCallback(add_sd_blob_to_stream)
return d
sd_info = yield get_sd_info(stream_info_manager, stream_hash, True)
sd_blob_hash = yield descriptor_writer.create_descriptor(sd_info)
yield stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash)
defer.returnValue(sd_blob_hash)
def create_plain_sd(stream_info_manager, stream_hash, file_name, overwrite_existing=False):

View file

@ -1,7 +1,6 @@
"""
Download LBRY Files from LBRYnet and save them to disk.
"""
import random
import logging
from zope.interface import implements
@ -14,8 +13,6 @@ from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileDownloa
from lbrynet.lbryfilemanager.EncryptedFileStatusReport import EncryptedFileStatusReport
from lbrynet.interfaces import IStreamDownloaderFactory
from lbrynet.lbryfile.StreamDescriptor import save_sd_info
from lbrynet.reflector import reupload
from lbrynet import conf
log = logging.getLogger(__name__)
@ -48,57 +45,28 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
def saving_status(self):
return self._saving_status
@defer.inlineCallbacks
def restore(self):
d = self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash)
def _save_stream_info(sd_hash):
if sd_hash:
self.sd_hash = sd_hash[0]
d = self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash)
d.addCallback(lambda r: _save_claim(r[0], r[1], r[2]))
return d
else:
return None
def _save_claim_id(claim_id):
self.claim_id = claim_id
return defer.succeed(None)
def _notify_bad_claim(name, txid, nout):
err_msg = "Error loading name claim for lbry file: \
lbry://%s, tx %s output %i does not contain a valid claim, deleting it"
log.error(err_msg, name, txid, nout)
return self.lbry_file_manager.delete_lbry_file(self)
def _save_claim(name, txid, nout):
self.uri = name
self.txid = txid
self.nout = nout
d = self.wallet.get_claimid(name, txid, nout)
d.addCallbacks(_save_claim_id, lambda err: _notify_bad_claim(name, txid, nout))
return d
d.addCallback(_save_stream_info)
d.addCallback(lambda _: self._reupload())
d.addCallback(lambda _: self.lbry_file_manager.get_lbry_file_status(self))
def restore_status(status):
if status == ManagedEncryptedFileDownloader.STATUS_RUNNING:
return self.start()
elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED:
return defer.succeed(False)
elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED:
self.completed = True
return defer.succeed(True)
d.addCallback(restore_status)
return d
def _reupload(self):
if not conf.settings['reflector_reupload']:
return
reflector_server = random.choice(conf.settings['reflector_servers'])
return reupload.check_and_restore_availability(self, reflector_server)
sd_hash = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
if sd_hash:
self.sd_hash = sd_hash[0]
else:
raise Exception("No sd hash for stream hash %s", self.stream_hash)
claim_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash)
if claim_metadata is None:
raise Exception("A claim doesn't exist for sd %s" % self.sd_hash)
self.uri, self.txid, self.nout = claim_metadata
self.claim_id = yield self.wallet.get_claimid(self.uri, self.txid, self.nout)
status = yield self.lbry_file_manager.get_lbry_file_status(self)
if status == ManagedEncryptedFileDownloader.STATUS_RUNNING:
yield self.start()
elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED:
defer.returnValue(False)
elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED:
self.completed = True
defer.returnValue(True)
else:
raise Exception("Unknown status for stream %s: %s", self.stream_hash, status)
@defer.inlineCallbacks
def stop(self, err=None, change_status=True):
@ -107,34 +75,24 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
yield EncryptedFileDownloader.stop(self, err=err)
if change_status is True:
status = yield self._save_status()
defer.returnValue(status)
@defer.inlineCallbacks
def status(self):
def find_completed_blobhashes(blobs):
blobhashes = [b[0] for b in blobs if b[0] is not None]
blobs = yield self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
blob_hashes = [b[0] for b in blobs if b[0] is not None]
completed_blobs = yield self.blob_manager.completed_blobs(blob_hashes)
num_blobs_completed = len(completed_blobs)
num_blobs_known = len(blob_hashes)
def get_num_completed(completed_blobs):
return len(completed_blobs), len(blobhashes)
inner_d = self.blob_manager.completed_blobs(blobhashes)
inner_d.addCallback(get_num_completed)
return inner_d
def make_full_status(progress):
num_completed = progress[0]
num_known = progress[1]
if self.completed is True:
s = "completed"
elif self.stopped is True:
s = "stopped"
else:
s = "running"
status = EncryptedFileStatusReport(self.file_name, num_completed, num_known, s)
return status
d = self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
d.addCallback(find_completed_blobhashes)
d.addCallback(make_full_status)
return d
if self.completed:
status = "completed"
elif self.stopped:
status = "stopped"
else:
status = "running"
defer.returnValue(EncryptedFileStatusReport(self.file_name, num_blobs_completed,
num_blobs_known, status))
@defer.inlineCallbacks
def _start(self):
@ -166,8 +124,9 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
status = ManagedEncryptedFileDownloader.STATUS_STOPPED
else:
status = ManagedEncryptedFileDownloader.STATUS_RUNNING
yield self.lbry_file_manager.change_lbry_file_status(self, status)
status = yield self.lbry_file_manager.change_lbry_file_status(self, status)
self._saving_status = False
defer.returnValue(status)
def _get_progress_manager(self, download_manager):
return FullStreamProgressManager(self._finished_downloading,
@ -181,35 +140,23 @@ class ManagedEncryptedFileDownloaderFactory(object):
self.lbry_file_manager = lbry_file_manager
def can_download(self, sd_validator):
# TODO: add a sd_validator for non live streams, use it
return True
def make_downloader(self, metadata, options, payment_rate_manager,
download_directory=None, file_name=None):
@defer.inlineCallbacks
def make_downloader(self, metadata, options, payment_rate_manager, download_directory=None,
file_name=None):
data_rate = options[0]
upload_allowed = options[1]
def save_source_if_blob(stream_hash):
if metadata.metadata_source == StreamMetadata.FROM_BLOB:
# TODO: should never have to dig this deep into a another classes
# members. lbry_file_manager should have a
# save_sd_blob_hash_to_stream method
d = self.lbry_file_manager.stream_info_manager.save_sd_blob_hash_to_stream(
stream_hash, metadata.source_blob_hash)
else:
d = defer.succeed(True)
d.addCallback(lambda _: stream_hash)
return d
d = save_sd_info(self.lbry_file_manager.stream_info_manager, metadata.validator.raw_info)
d.addCallback(save_source_if_blob)
d.addCallback(lambda stream_hash: self.lbry_file_manager.add_lbry_file(
stream_hash,
payment_rate_manager,
data_rate,
upload_allowed,
download_directory=download_directory,
file_name=file_name))
return d
stream_hash = yield save_sd_info(self.lbry_file_manager.stream_info_manager,
metadata.validator.raw_info)
if metadata.metadata_source == StreamMetadata.FROM_BLOB:
yield self.lbry_file_manager.save_sd_blob_hash_to_stream(stream_hash,
metadata.source_blob_hash)
lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, payment_rate_manager,
data_rate, upload_allowed,
download_directory, file_name)
defer.returnValue(lbry_file)
@staticmethod
def get_description():

View file

@ -9,6 +9,7 @@ from twisted.enterprise import adbapi
from twisted.internet import defer, task, reactor
from twisted.python.failure import Failure
from lbrynet.reflector.reupload import reflect_stream
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
@ -21,6 +22,16 @@ from lbrynet.core.sqlite_helpers import rerun_if_locked
log = logging.getLogger(__name__)
def safe_start_looping_call(looping_call, seconds=3600):
if not looping_call.running:
looping_call.start(seconds)
def safe_stop_looping_call(looping_call):
if looping_call.running:
looping_call.stop()
class EncryptedFileManager(object):
"""Keeps track of currently opened LBRY Files, their options, and
their LBRY File specific metadata.
@ -32,11 +43,13 @@ class EncryptedFileManager(object):
self.stream_info_manager = stream_info_manager
self.sd_identifier = sd_identifier
self.lbry_files = []
self.lbry_files_setup_deferred = None
self.sql_db = None
if download_directory:
self.download_directory = download_directory
else:
self.download_directory = os.getcwd()
self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files)
log.debug("Download directory for EncryptedFileManager: %s", str(self.download_directory))
@defer.inlineCallbacks
@ -44,6 +57,7 @@ class EncryptedFileManager(object):
yield self._open_db()
yield self._add_to_sd_identifier()
yield self._start_lbry_files()
safe_start_looping_call(self.lbry_file_reflector)
def get_lbry_file_status(self, lbry_file):
return self._get_lbry_file_status(lbry_file.rowid)
@ -69,6 +83,9 @@ class EncryptedFileManager(object):
dl.addCallback(filter_failures)
return dl
def save_sd_blob_hash_to_stream(self, stream_hash, sd_hash):
return self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, sd_hash)
def _add_to_sd_identifier(self):
downloader_factory = ManagedEncryptedFileDownloaderFactory(self)
self.sd_identifier.add_stream_downloader_factory(
@ -96,65 +113,78 @@ class EncryptedFileManager(object):
log.debug("Checking %s streams", len(stream_hashes))
yield defer.DeferredList(list(_iter_streams(stream_hashes)))
@defer.inlineCallbacks
def _restore_lbry_file(self, lbry_file):
try:
yield lbry_file.restore()
except Exception as err:
log.error("Failed to start stream: %s, error: %s", lbry_file.stream_hash, err)
self.lbry_files.remove(lbry_file)
# TODO: delete stream without claim instead of just removing from manager?
@defer.inlineCallbacks
def _start_lbry_files(self):
def set_options_and_restore(rowid, stream_hash, options):
b_prm = self.session.base_payment_rate_manager
payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
blob_data_rate=options)
d.addCallback(lambda downloader: downloader.restore())
return d
def log_error(err, rowid, stream_hash, options):
log.error("An error occurred while starting a lbry file: %s", err.getErrorMessage())
log.debug(rowid)
log.debug(stream_hash)
log.debug(options)
def start_lbry_files(lbry_files_and_options):
for rowid, stream_hash, options in lbry_files_and_options:
d = set_options_and_restore(rowid, stream_hash, options)
d.addErrback(lambda err: log_error(err, rowid, stream_hash, options))
log.info("Started %i lbry files", len(self.lbry_files))
return True
b_prm = self.session.base_payment_rate_manager
payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
yield self._check_stream_info_manager()
files_and_options = yield self._get_all_lbry_files()
yield start_lbry_files(files_and_options)
lbry_files_and_options = yield self._get_all_lbry_files()
dl = []
for rowid, stream_hash, options in lbry_files_and_options:
lbry_file = yield self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
blob_data_rate=options)
dl.append(self._restore_lbry_file(lbry_file))
log.debug("Started %s", lbry_file)
self.lbry_files_setup_deferred = defer.DeferredList(dl)
log.info("Started %i lbry files", len(self.lbry_files))
defer.returnValue(True)
@defer.inlineCallbacks
def start_lbry_file(self, rowid, stream_hash,
payment_rate_manager, blob_data_rate=None, upload_allowed=True,
download_directory=None, file_name=None):
if not download_directory:
download_directory = self.download_directory
payment_rate_manager.min_blob_data_payment_rate = blob_data_rate
lbry_file_downloader = ManagedEncryptedFileDownloader(rowid, stream_hash,
self.session.peer_finder,
self.session.rate_limiter,
self.session.blob_manager,
self.stream_info_manager, self,
payment_rate_manager, self.session.wallet,
download_directory,
upload_allowed,
file_name=file_name)
self.lbry_files.append(lbry_file_downloader)
d = lbry_file_downloader.set_stream_info()
d.addCallback(lambda _: lbry_file_downloader)
return d
lbry_file = ManagedEncryptedFileDownloader(rowid, stream_hash, self.session.peer_finder,
self.session.rate_limiter,
self.session.blob_manager,
self.stream_info_manager,
self, payment_rate_manager, self.session.wallet,
download_directory, upload_allowed,
file_name=file_name)
yield lbry_file.set_stream_info()
self.lbry_files.append(lbry_file)
defer.returnValue(lbry_file)
def add_lbry_file(self, stream_hash, payment_rate_manager,
blob_data_rate=None,
upload_allowed=True,
download_directory=None,
file_name=None):
d = self._save_lbry_file(stream_hash, blob_data_rate)
d.addCallback(
lambda rowid: self.start_lbry_file(
rowid, stream_hash, payment_rate_manager,
blob_data_rate, upload_allowed, download_directory, file_name))
return d
@defer.inlineCallbacks
def _stop_lbry_file(self, lbry_file):
def wait_for_finished(lbry_file, count=2):
if count or lbry_file.saving_status is not False:
return task.deferLater(reactor, 1, self._stop_lbry_file, lbry_file, count=count - 1)
try:
yield lbry_file.stop(change_status=False)
self.lbry_files.remove(lbry_file)
except CurrentlyStoppingError:
yield wait_for_finished(lbry_file)
except AlreadyStoppedError:
pass
finally:
defer.returnValue(None)
def _stop_lbry_files(self):
log.info("Stopping %i lbry files", len(self.lbry_files))
lbry_files = self.lbry_files
for lbry_file in lbry_files:
yield self._stop_lbry_file(lbry_file)
@defer.inlineCallbacks
def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None,
upload_allowed=True, download_directory=None, file_name=None):
rowid = yield self._save_lbry_file(stream_hash, blob_data_rate)
lbry_file = yield self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
blob_data_rate, upload_allowed, download_directory,
file_name)
defer.returnValue(lbry_file)
def delete_lbry_file(self, lbry_file):
for l in self.lbry_files:
@ -192,31 +222,22 @@ class EncryptedFileManager(object):
else:
return defer.fail(Failure(ValueError("Could not find that LBRY file")))
def stop(self):
log.info('Stopping %s', self)
ds = []
def wait_for_finished(lbry_file, count=2):
if count <= 0 or lbry_file.saving_status is False:
return True
else:
return task.deferLater(reactor, 1, wait_for_finished, lbry_file, count=count - 1)
def ignore_stopped(err, lbry_file):
err.trap(AlreadyStoppedError, CurrentlyStoppingError)
return wait_for_finished(lbry_file)
def _reflect_lbry_files(self):
for lbry_file in self.lbry_files:
d = lbry_file.stop(change_status=False)
d.addErrback(ignore_stopped, lbry_file)
ds.append(d)
dl = defer.DeferredList(ds)
yield reflect_stream(lbry_file)
def close_db():
self.db = None
@defer.inlineCallbacks
def reflect_lbry_files(self):
yield defer.DeferredList(list(self._reflect_lbry_files()))
dl.addCallback(lambda _: close_db())
return dl
@defer.inlineCallbacks
def stop(self):
safe_stop_looping_call(self.lbry_file_reflector)
yield defer.DeferredList(list(self._stop_lbry_files()))
yield self.sql_db.close()
self.sql_db = None
log.info("Stopped %s", self)
defer.returnValue(True)
def get_count_for_stream_hash(self, stream_hash):
return self._get_count_for_stream_hash(stream_hash)

View file

@ -2,7 +2,6 @@ import binascii
import logging.handlers
import mimetypes
import os
import random
import re
import base58
import requests
@ -16,15 +15,16 @@ from twisted.web import server
from twisted.internet import defer, threads, error, reactor, task
from twisted.internet.task import LoopingCall
from twisted.python.failure import Failure
from jsonschema import ValidationError
# TODO: importing this when internet is disabled raises a socket.gaierror
from lbryum.version import LBRYUM_VERSION as lbryum_version
from lbrynet import __version__ as lbrynet_version
from lbrynet import conf, reflector, analytics
from lbrynet import conf, analytics
from lbrynet.conf import LBRYCRD_WALLET, LBRYUM_WALLET, PTC_WALLET
from lbrynet.reflector import reupload
from lbrynet.reflector import ServerFactory as reflector_server_factory
from lbrynet.metadata.Fee import FeeValidator
from lbrynet.metadata.Metadata import Metadata, verify_name_characters
from lbrynet.metadata.Metadata import verify_name_characters
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFactory
from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileOpenerFactory
from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
@ -369,7 +369,7 @@ class Daemon(AuthJSONRPCServer):
self.connection_status_code = CONNECTION_STATUS_NETWORK
# claim_out is dictionary containing 'txid' and 'nout'
def _add_to_pending_claims(self, name, claim_out):
def _add_to_pending_claims(self, claim_out, name):
txid = claim_out['txid']
nout = claim_out['nout']
log.info("Adding lbry://%s to pending claims, txid %s nout %d" % (name, txid, nout))
@ -397,7 +397,7 @@ class Daemon(AuthJSONRPCServer):
log.warning("Re-add %s to pending claims", name)
txid, nout = self.pending_claims.pop(name)
claim_out = {'txid': txid, 'nout': nout}
self._add_to_pending_claims(name, claim_out)
self._add_to_pending_claims(claim_out, name)
def _process_lbry_file(name, lbry_file):
# lbry_file is an instance of ManagedEncryptedFileDownloader or None
@ -438,13 +438,13 @@ class Daemon(AuthJSONRPCServer):
if self.run_reflector_server:
log.info("Starting reflector server")
if self.reflector_port is not None:
reflector_factory = reflector.ServerFactory(
reflector_factory = reflector_server_factory(
self.session.peer_manager,
self.session.blob_manager
)
try:
self.reflector_server_port = reactor.listenTCP(
self.reflector_port, reflector_factory)
self.reflector_server_port = reactor.listenTCP(self.reflector_port,
reflector_factory)
log.info('Started reflector on port %s', self.reflector_port)
except error.CannotListenError as e:
log.exception("Couldn't bind reflector to port %d", self.reflector_port)
@ -763,8 +763,12 @@ class Daemon(AuthJSONRPCServer):
"""
timeout = timeout if timeout is not None else conf.settings['download_timeout']
helper = _DownloadNameHelper(
self, name, timeout, download_directory, file_name, wait_for_write)
try:
helper = _DownloadNameHelper(self, name, timeout, download_directory, file_name,
wait_for_write)
except Exception as err:
log.exception(err)
raise err
if not stream_info:
self.waiting_on[name] = True
@ -774,6 +778,28 @@ class Daemon(AuthJSONRPCServer):
sd_hash, file_path = yield helper.wait_or_get_stream(stream_info, lbry_file)
defer.returnValue((sd_hash, file_path))
@defer.inlineCallbacks
def _publish_stream(self, name, bid, metadata, file_path=None, fee=None):
publisher = Publisher(self.session, self.lbry_file_manager, self.session.wallet)
verify_name_characters(name)
if bid <= 0.0:
raise Exception("Invalid bid")
if fee:
metadata = yield publisher.add_fee_to_metadata(metadata, fee)
if not file_path:
claim_out = yield publisher.update_stream(name, bid, metadata)
else:
claim_out = yield publisher.publish_stream(name, file_path, bid, metadata)
d = reupload.reflect_stream(publisher.lbry_file)
d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name),
log.exception)
log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'],
claim_out['nout'])
yield self._add_to_pending_claims(claim_out, name)
self.looping_call_manager.start(Checker.PENDING_CLAIM, 30)
defer.returnValue(claim_out)
def add_stream(self, name, timeout, download_directory, file_name, stream_info):
"""Makes, adds and starts a stream"""
self.streams[name] = GetStream(self.sd_identifier,
@ -786,8 +812,7 @@ class Daemon(AuthJSONRPCServer):
timeout=timeout,
download_directory=download_directory,
file_name=file_name)
d = self.streams[name].start(stream_info, name)
return d
return self.streams[name].start(stream_info, name)
def _get_long_count_timestamp(self):
dt = utils.utcnow() - utils.datetime_obj(year=2012, month=12, day=21)
@ -811,33 +836,24 @@ class Daemon(AuthJSONRPCServer):
helper = _ResolveNameHelper(self, name, force_refresh)
return helper.get_deferred()
@defer.inlineCallbacks
def _delete_lbry_file(self, lbry_file, delete_file=True):
d = self.lbry_file_manager.delete_lbry_file(lbry_file)
stream_hash = lbry_file.stream_hash
filename = os.path.join(self.download_directory, lbry_file.file_name)
def finish_deletion(lbry_file):
d = lbry_file.delete_data()
d.addCallback(lambda _: _delete_stream_data(lbry_file))
return d
def _delete_stream_data(lbry_file):
s_h = lbry_file.stream_hash
d = self.lbry_file_manager.get_count_for_stream_hash(s_h)
# TODO: could possibly be a timing issue here
d.addCallback(lambda c: self.stream_info_manager.delete_stream(s_h) if c == 0 else True)
if delete_file:
def remove_if_file():
filename = os.path.join(self.download_directory, lbry_file.file_name)
if os.path.isfile(filename):
os.remove(filename)
else:
return defer.succeed(None)
d.addCallback(lambda _: remove_if_file)
return d
d.addCallback(lambda _: finish_deletion(lbry_file))
d.addCallback(lambda _: log.info("Delete lbry file"))
return d
yield self.lbry_file_manager.delete_lbry_file(lbry_file)
yield lbry_file.delete_data()
stream_count = yield self.lbry_file_manager.get_count_for_stream_hash(stream_hash)
if stream_count == 0:
yield self.stream_info_manager.delete_stream(stream_hash)
else:
log.warning("Can't delete stream info for %s, count is %i", stream_hash, stream_count)
if delete_file:
if os.path.isfile(filename):
os.remove(filename)
log.info("Deleted file %s", filename)
log.info("Deleted stream %s", stream_hash)
defer.returnValue(True)
def _get_or_download_sd_blob(self, blob, sd_hash):
if blob:
@ -981,29 +997,6 @@ class Daemon(AuthJSONRPCServer):
])
return d
def _reflect(self, lbry_file):
if not lbry_file:
return defer.fail(Exception("no lbry file given to reflect"))
if lbry_file.stream_hash is None:
return defer.fail(Exception("no stream hash"))
factory = reflector.ClientFactory(
self.session.blob_manager,
self.lbry_file_manager.stream_info_manager,
lbry_file.stream_hash,
lbry_file.uri
)
return run_reflector_factory(factory)
def _reflect_blobs(self, blob_hashes):
if not blob_hashes:
return defer.fail(Exception("no lbry file given to reflect"))
log.info("Reflecting %i blobs" % len(blob_hashes))
factory = reflector.BlobClientFactory(
self.session.blob_manager,
blob_hashes
)
return run_reflector_factory(factory)
############################################################################
# #
# JSON-RPC API methods start here #
@ -1026,7 +1019,7 @@ class Daemon(AuthJSONRPCServer):
'lbry_id': base58.b58encode(self.lbryid)[:SHORT_ID_LEN],
'installation_id': conf.settings.get_installation_id()[:SHORT_ID_LEN],
'is_running': self.announced_startup,
'is_first_run': self.session.wallet.is_first_run if has_wallet else None,
'is_first_run': self.session.wallet.is_first_run if has_wallet else None,
'startup_status': {
'code': self.startup_status[0],
'message': self.startup_status[1],
@ -1056,6 +1049,8 @@ class Daemon(AuthJSONRPCServer):
local_height = self.session.wallet.network.get_local_height()
remote_height = self.session.wallet.network.get_server_height()
response['blocks_behind'] = remote_height - local_height
response['local_height'] = local_height
response['remote_height'] = remote_height
best_hash = yield self.session.wallet.get_best_blockhash()
response['blockchain_status'] = {'best_blockhash': best_hash}
defer.returnValue(response)
@ -1502,8 +1497,7 @@ class Daemon(AuthJSONRPCServer):
max_tries = 3
while tries <= max_tries:
try:
log.info(
'Making try %s / %s to start download of %s', tries, max_tries, name)
log.info('Making try %s / %s to start download of %s', tries, max_tries, name)
new_sd_hash, file_path = yield self._download_name(
name=name,
timeout=timeout,
@ -1514,10 +1508,10 @@ class Daemon(AuthJSONRPCServer):
)
break
except Exception as e:
log.exception('Failed to get %s', name)
log.warning('Failed to get %s', name)
if tries == max_tries:
self.analytics_manager.send_download_errored(download_id, name, stream_info)
response = yield self._render_response(str(e))
response = yield self._render_response(e.message)
defer.returnValue(response)
tries += 1
# TODO: should stream_hash key be changed to sd_hash?
@ -1527,7 +1521,7 @@ class Daemon(AuthJSONRPCServer):
}
stream = self.streams.get(name)
if stream:
stream.downloader.finished_deferred.addCallback(
stream.finished_deferred.addCallback(
lambda _: self.analytics_manager.send_download_finished(
download_id, name, stream_info)
)
@ -1640,16 +1634,17 @@ class Daemon(AuthJSONRPCServer):
return d
@AuthJSONRPCServer.auth_required
def jsonrpc_publish(self, name, file_path, bid, metadata, fee=None):
def jsonrpc_publish(self, name, bid, metadata, file_path=None, fee=None):
"""
Make a new name claim and publish associated data to lbrynet
Args:
'name': name to be claimed, string
'file_path': path to file to be associated with name, string
'bid': amount of credits to commit in this claim, float
'metadata': metadata dictionary
optional 'fee'
'name': str, name to be claimed, string
'bid': float, amount of credits to commit in this claim,
'metadata': dict, Metadata compliant (can be missing sources if a file is provided)
'file_path' (optional): str, path to file to be associated with name, if not given
the stream from your existing claim for the name will be used
'fee' (optional): dict, FeeValidator compliant
Returns:
'success' : True if claim was succesful , False otherwise
'reason' : if not succesful, give reason
@ -1659,17 +1654,6 @@ class Daemon(AuthJSONRPCServer):
'claim_id' : claim id of the resulting transaction
"""
def _set_address(address, currency, m):
log.info("Generated new address for key fee: " + str(address))
m['fee'][currency]['address'] = address
return m
def _reflect_if_possible(sd_hash, claim_out):
d = self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False)
d.addCallback(self._reflect)
d.addCallback(lambda _: claim_out)
return d
log.info("Publish: %s", {
'name': name,
'file_path': file_path,
@ -1677,49 +1661,9 @@ class Daemon(AuthJSONRPCServer):
'metadata': metadata,
'fee': fee,
})
verify_name_characters(name)
if bid <= 0.0:
return defer.fail(Exception("Invalid bid"))
try:
metadata = Metadata(metadata)
make_lbry_file = False
sd_hash = metadata['sources']['lbry_sd_hash']
log.info("Update publish for %s using existing stream", name)
except ValidationError:
make_lbry_file = True
sd_hash = None
if not file_path:
raise Exception("No file given to publish")
if not os.path.isfile(file_path):
raise Exception("Specified file for publish doesnt exist: %s" % file_path)
self.looping_call_manager.start(Checker.PENDING_CLAIM, 30)
d = self._resolve_name(name, force_refresh=True)
d.addErrback(lambda _: None)
if fee is not None:
metadata['fee'] = fee
assert len(metadata['fee']) == 1, "Too many fees"
for c in metadata['fee']:
if 'address' not in metadata['fee'][c]:
d.addCallback(lambda _: self.session.wallet.get_new_address())
d.addCallback(lambda addr: _set_address(addr, c, metadata))
else:
d.addCallback(lambda _: metadata)
if make_lbry_file:
pub = Publisher(self.session, self.lbry_file_manager, self.session.wallet)
d.addCallback(lambda meta: pub.start(name, file_path, bid, meta))
else:
d.addCallback(lambda meta: self.session.wallet.claim_name(name, bid, meta))
if sd_hash:
d.addCallback(lambda claim_out: _reflect_if_possible(sd_hash, claim_out))
d.addCallback(lambda claim_out: self._add_to_pending_claims(name, claim_out))
d = self._publish_stream(name, bid, metadata, file_path, fee)
d.addCallback(lambda r: self._render_response(r))
return d
@AuthJSONRPCServer.auth_required
@ -1730,6 +1674,7 @@ class Daemon(AuthJSONRPCServer):
return self.jsonrpc_claim_abandon(**kwargs)
@AuthJSONRPCServer.auth_required
@defer.inlineCallbacks
def jsonrpc_claim_abandon(self, txid, nout):
"""
Abandon a name and reclaim credits from the claim
@ -1741,15 +1686,14 @@ class Daemon(AuthJSONRPCServer):
txid : txid of resulting transaction if succesful
fee : fee paid for the transaction if succesful
"""
def _disp(x):
log.info("Abandoned name claim tx " + str(x))
return self._render_response(x)
d = defer.Deferred()
d.addCallback(lambda _: self.session.wallet.abandon_claim(txid, nout))
d.addCallback(_disp)
d.callback(None) # TODO: is this line necessary???
return d
try:
abandon_claim_tx = yield self.session.wallet.abandon_claim(txid, nout)
response = yield self._render_response(abandon_claim_tx)
except Exception as err:
log.warning(err)
response = yield self._render_response(err)
defer.returnValue(response)
@AuthJSONRPCServer.auth_required
def jsonrpc_abandon_name(self, **kwargs):
@ -2279,7 +2223,7 @@ class Daemon(AuthJSONRPCServer):
"""
d = self.session.blob_manager.get_all_verified_blobs()
d.addCallback(self._reflect_blobs)
d.addCallback(reupload.reflect_blob_hashes, self.session.blob_manager)
d.addCallback(lambda r: self._render_response(r))
return d
@ -2388,9 +2332,7 @@ def get_sd_hash(stream_info):
class _DownloadNameHelper(object):
def __init__(self, daemon, name,
timeout=None,
download_directory=None, file_name=None,
def __init__(self, daemon, name, timeout=None, download_directory=None, file_name=None,
wait_for_write=True):
self.daemon = daemon
self.name = name
@ -2440,16 +2382,24 @@ class _DownloadNameHelper(object):
@defer.inlineCallbacks
def _get_stream(self, stream_info):
was_successful, sd_hash, download_path = yield self.daemon.add_stream(
self.name, self.timeout, self.download_directory, self.file_name, stream_info)
if not was_successful:
log.warning("lbry://%s timed out, removing from streams", self.name)
try:
download_path = yield self.daemon.add_stream(
self.name, self.timeout, self.download_directory, self.file_name, stream_info)
except (InsufficientFundsError, Exception) as err:
if Failure(err).check(InsufficientFundsError):
log.warning("Insufficient funds to download lbry://%s", self.name)
self.remove_from_wait("Insufficient funds")
else:
log.warning("lbry://%s timed out, removing from streams", self.name)
self.remove_from_wait("Timed out")
if self.daemon.streams[self.name].downloader is not None:
yield self.daemon._delete_lbry_file(self.daemon.streams[self.name].downloader)
del self.daemon.streams[self.name]
self.remove_from_wait("Timed out")
raise Exception("Timed out")
raise err
if self.wait_for_write:
yield self._wait_for_write()
defer.returnValue((sd_hash, download_path))
defer.returnValue((self.daemon.streams[self.name].sd_hash, download_path))
def _wait_for_write(self):
d = defer.succeed(None)
@ -2699,13 +2649,3 @@ def get_lbry_file_search_value(search_fields):
if value:
return searchtype, value
raise NoValidSearch('{} is missing a valid search type'.format(search_fields))
def run_reflector_factory(factory):
reflector_server = random.choice(conf.settings['reflector_servers'])
reflector_address, reflector_port = reflector_server
log.info("Start reflector client")
d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
d.addCallback(lambda _: factory.finished_deferred)
return d

View file

@ -1,7 +1,5 @@
import logging
import os
from copy import deepcopy
from twisted.internet import defer
from twisted.internet.task import LoopingCall
@ -15,7 +13,6 @@ INITIALIZING_CODE = 'initializing'
DOWNLOAD_METADATA_CODE = 'downloading_metadata'
DOWNLOAD_TIMEOUT_CODE = 'timeout'
DOWNLOAD_RUNNING_CODE = 'running'
# TODO: is this ever used?
DOWNLOAD_STOPPED_CODE = 'stopped'
STREAM_STAGES = [
(INITIALIZING_CODE, 'Initializing'),
@ -29,144 +26,160 @@ STREAM_STAGES = [
log = logging.getLogger(__name__)
def safe_start(looping_call):
if not looping_call.running:
looping_call.start(1)
def safe_stop(looping_call):
if looping_call.running:
looping_call.stop()
class GetStream(object):
def __init__(self, sd_identifier, session, wallet,
lbry_file_manager, exchange_rate_manager,
max_key_fee, data_rate=0.5, timeout=None,
download_directory=None, file_name=None):
if timeout is None:
timeout = conf.settings['download_timeout']
self.wallet = wallet
self.resolved_name = None
self.description = None
self.fee = None
self.data_rate = data_rate
def __init__(self, sd_identifier, session, wallet, lbry_file_manager, exchange_rate_manager,
max_key_fee, data_rate=None, timeout=None, download_directory=None,
file_name=None):
self.timeout = timeout or conf.settings['download_timeout']
self.data_rate = data_rate or conf.settings['data_rate']
self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1]
self.download_directory = download_directory or conf.settings['download_directory']
self.file_name = file_name
self.timeout_counter = 0
self.code = None
self.sd_hash = None
self.wallet = wallet
self.session = session
self.exchange_rate_manager = exchange_rate_manager
self.payment_rate_manager = self.session.payment_rate_manager
self.lbry_file_manager = lbry_file_manager
self.sd_identifier = sd_identifier
self.sd_hash = None
self.max_key_fee = max_key_fee
self.stream_info = None
self.stream_info_manager = None
self._d = defer.Deferred(None)
self.timeout = timeout
self.timeout_counter = 0
self.download_directory = download_directory
self.download_path = None
self.downloader = None
# fired after the metadata has been downloaded and the
# actual file has been started
self.finished = defer.Deferred(None)
self.checker = LoopingCall(self.check_status)
self.code = STREAM_STAGES[0]
# fired when the download is complete
self.finished_deferred = defer.Deferred(None)
# fired after the metadata and the first data blob have been downloaded
self.data_downloading_deferred = defer.Deferred(None)
@property
def download_path(self):
return os.path.join(self.download_directory, self.downloader.file_name)
def _check_status(self, status):
if status.num_completed and not self.data_downloading_deferred.called:
self.data_downloading_deferred.callback(True)
if self.data_downloading_deferred.called:
safe_stop(self.checker)
else:
log.info("Downloading stream data (%i seconds)", self.timeout_counter)
def check_status(self):
"""
Check if we've got the first data blob in the stream yet
"""
self.timeout_counter += 1
if self.timeout_counter >= self.timeout:
if not self.data_downloading_deferred.called:
self.data_downloading_deferred.errback(Exception("Timeout"))
safe_stop(self.checker)
elif self.downloader:
d = self.downloader.status()
d.addCallback(self._check_status)
else:
log.info("Downloading stream descriptor blob (%i seconds)", self.timeout_counter)
# download_path is set after the sd blob has been downloaded
if self.download_path:
self.checker.stop()
self.finished.callback((True, self.sd_hash, self.download_path))
elif self.timeout_counter >= self.timeout:
log.info("Timeout downloading lbry://%s", self.resolved_name)
self.checker.stop()
self._d.cancel()
self.code = STREAM_STAGES[4]
self.finished.callback((False, None, None))
def _convert_max_fee(self):
def convert_max_fee(self):
max_fee = FeeValidator(self.max_key_fee)
if max_fee.currency_symbol == "LBC":
return max_fee.amount
return self.exchange_rate_manager.to_lbc(self.max_key_fee).amount
def set_status(self, status, name):
log.info("Download lbry://%s status changed to %s" % (name, status))
self.code = next(s for s in STREAM_STAGES if s[0] == status)
def check_fee(self, fee):
validated_fee = FeeValidator(fee)
max_key_fee = self.convert_max_fee()
converted_fee = self.exchange_rate_manager.to_lbc(validated_fee).amount
if converted_fee > self.wallet.get_balance():
raise InsufficientFundsError('Unable to pay the key fee of %s' % converted_fee)
if converted_fee > max_key_fee:
raise KeyFeeAboveMaxAllowed('Key fee %s above max allowed %s' % (converted_fee,
max_key_fee))
return validated_fee
def get_downloader_factory(self, factories):
for factory in factories:
if isinstance(factory, ManagedEncryptedFileDownloaderFactory):
return factory
raise Exception('No suitable factory was found in {}'.format(factories))
@defer.inlineCallbacks
def get_downloader(self, factory, stream_metadata):
downloader_options = [self.data_rate, True]
downloader = yield factory.make_downloader(stream_metadata, downloader_options,
self.payment_rate_manager,
download_directory=self.download_directory,
file_name=self.file_name)
defer.returnValue(downloader)
def _pay_key_fee(self, address, fee_lbc, name):
log.info("Pay key fee %f --> %s", fee_lbc, address)
reserved_points = self.wallet.reserve_points(address, fee_lbc)
if reserved_points is None:
raise InsufficientFundsError('Unable to pay the key fee of %s for %s' % (fee_lbc, name))
return self.wallet.send_points_to_address(reserved_points, fee_lbc)
@defer.inlineCallbacks
def pay_key_fee(self, fee, name):
if fee is not None:
fee_lbc = self.exchange_rate_manager.to_lbc(fee).amount
yield self._pay_key_fee(fee.address, fee_lbc, name)
else:
defer.returnValue(None)
@defer.inlineCallbacks
def finish(self, results, name):
self.set_status(DOWNLOAD_STOPPED_CODE, name)
log.info("Finished downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6],
self.download_path)
safe_stop(self.checker)
status = yield self.downloader.status()
self._check_status(status)
defer.returnValue(self.download_path)
@defer.inlineCallbacks
def download(self, stream_info, name):
self.set_status(INITIALIZING_CODE, name)
self.sd_hash = stream_info['sources']['lbry_sd_hash']
if 'fee' in stream_info:
fee = self.check_fee(stream_info['fee'])
else:
fee = None
self.set_status(DOWNLOAD_METADATA_CODE, name)
sd_blob = yield download_sd_blob(self.session, self.sd_hash, self.payment_rate_manager)
stream_metadata = yield self.sd_identifier.get_metadata_for_sd_blob(sd_blob)
factory = self.get_downloader_factory(stream_metadata.factories)
self.downloader = yield self.get_downloader(factory, stream_metadata)
self.set_status(DOWNLOAD_RUNNING_CODE, name)
if fee:
yield self.pay_key_fee(fee, name)
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.finished_deferred = self.downloader.start()
self.finished_deferred.addCallback(self.finish, name)
yield self.data_downloading_deferred
@defer.inlineCallbacks
def start(self, stream_info, name):
def _cancel(err):
# this callback sequence gets cancelled in check_status if
# it takes too long when that happens, we want the logic
# to live in check_status
if err.check(defer.CancelledError):
return
if self.checker:
self.checker.stop()
self.finished.errback(err)
def _set_status(x, status):
log.info("Download lbry://%s status changed to %s" % (self.resolved_name, status))
self.code = next(s for s in STREAM_STAGES if s[0] == status)
return x
def get_downloader_factory(metadata):
for factory in metadata.factories:
if isinstance(factory, ManagedEncryptedFileDownloaderFactory):
return factory, metadata
raise Exception('No suitable factory was found in {}'.format(metadata.factories))
def make_downloader(args):
factory, metadata = args
return factory.make_downloader(metadata,
[self.data_rate, True],
self.payment_rate_manager,
download_directory=self.download_directory,
file_name=self.file_name)
self.resolved_name = name
self.stream_info = deepcopy(stream_info)
self.description = self.stream_info['description']
self.sd_hash = self.stream_info['sources']['lbry_sd_hash']
if 'fee' in self.stream_info:
self.fee = FeeValidator(self.stream_info['fee'])
max_key_fee = self._convert_max_fee()
converted_fee = self.exchange_rate_manager.to_lbc(self.fee).amount
if converted_fee > self.wallet.get_balance():
msg = "Insufficient funds to download lbry://{}. Need {:0.2f}, have {:0.2f}".format(
self.resolved_name, converted_fee, self.wallet.get_balance())
raise InsufficientFundsError(msg)
if converted_fee > max_key_fee:
msg = "Key fee {:0.2f} above limit of {:0.2f} didn't download lbry://{}".format(
converted_fee, max_key_fee, self.resolved_name)
raise KeyFeeAboveMaxAllowed(msg)
log.info(
"Key fee %f below limit of %f, downloading lbry://%s",
converted_fee, max_key_fee, self.resolved_name)
self.checker.start(1)
self._d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE))
self._d.addCallback(lambda _: download_sd_blob(
self.session, self.sd_hash, self.payment_rate_manager))
self._d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
self._d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE))
self._d.addCallback(get_downloader_factory)
self._d.addCallback(make_downloader)
self._d.addCallbacks(self._start_download, _cancel)
self._d.callback(None)
return self.finished
def _start_download(self, downloader):
log.info('Starting download for %s', self.resolved_name)
self.downloader = downloader
self.download_path = os.path.join(downloader.download_directory, downloader.file_name)
d = self._pay_key_fee()
d.addCallback(lambda _: log.info(
"Downloading %s --> %s", self.sd_hash, self.downloader.file_name))
d.addCallback(lambda _: self.downloader.start())
def _pay_key_fee(self):
if self.fee is not None:
fee_lbc = self.exchange_rate_manager.to_lbc(self.fee).amount
reserved_points = self.wallet.reserve_points(self.fee.address, fee_lbc)
if reserved_points is None:
log.warning('Unable to pay the key fee of %s for %s', fee_lbc, self.resolved_name)
# TODO: If we get here, nobody will know that there was an error
# as nobody actually cares about self._d
return defer.fail(InsufficientFundsError())
return self.wallet.send_points_to_address(reserved_points, fee_lbc)
return defer.succeed(None)
try:
safe_start(self.checker)
yield self.download(stream_info, name)
defer.returnValue(self.download_path)
except Exception as err:
safe_stop(self.checker)
raise err

View file

@ -1,16 +1,13 @@
import logging
import mimetypes
import os
import random
from twisted.internet import threads, defer, reactor
from twisted.internet import defer
from lbrynet.core import file_utils
from lbrynet.lbryfilemanager.EncryptedFileCreator import create_lbry_file
from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob
from lbrynet.metadata.Metadata import Metadata
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet import reflector
from lbrynet import conf
from lbrynet.metadata.Fee import FeeValidator
log = logging.getLogger(__name__)
@ -21,154 +18,52 @@ class Publisher(object):
self.session = session
self.lbry_file_manager = lbry_file_manager
self.wallet = wallet
self.received_file_name = False
self.file_path = None
self.file_name = None
self.publish_name = None
self.bid_amount = None
self.verified = False
self.lbry_file = None
self.txid = None
self.nout = None
self.claim_id = None
self.fee = None
self.stream_hash = None
# TODO: this needs to be passed into the constructor
reflector_server = random.choice(conf.settings['reflector_servers'])
self.reflector_server, self.reflector_port = reflector_server[0], reflector_server[1]
self.metadata = {}
def start(self, name, file_path, bid, metadata):
@defer.inlineCallbacks
def add_fee_to_metadata(self, metadata, fee):
metadata['fee'] = FeeValidator(fee)
assert len(fee) == 1, "Too many fees"
for currency in fee:
if 'address' not in fee[currency]:
new_address = yield self.session.wallet.get_new_address()
fee[currency]['address'] = new_address
metadata['fee'] = FeeValidator(fee)
defer.returnValue(metadata)
@defer.inlineCallbacks
def publish_stream(self, name, file_path, bid, metadata):
log.info('Starting publish for %s', name)
def _show_result():
log.info(
"Success! Published %s --> lbry://%s txid: %s nout: %d",
self.file_name, self.publish_name, self.txid, self.nout
)
return defer.succeed({
'nout': self.nout,
'txid': self.txid,
'claim_id': self.claim_id,
'fee': self.fee,
})
self.publish_name = name
self.file_path = file_path
self.bid_amount = bid
self.metadata = metadata
# TODO: we cannot have this sort of code scattered throughout
# our code base. Use polymorphism instead
if os.name == "nt":
file_mode = 'rb'
else:
file_mode = 'r'
d = self._check_file_path(self.file_path)
# TODO: ensure that we aren't leaving this resource open
d.addCallback(lambda _: create_lbry_file(self.session, self.lbry_file_manager,
self.file_name, open(self.file_path, file_mode)))
d.addCallback(self.add_to_lbry_files)
d.addCallback(lambda _: self._create_sd_blob())
d.addCallback(lambda _: self._claim_name())
d.addCallback(lambda _: self.set_status())
d.addCallback(lambda _: self.start_reflector())
d.addCallbacks(
lambda _: _show_result(),
errback=log.fail(self._throw_publish_error),
errbackArgs=(
"An error occurred publishing %s to %s", self.file_name, self.publish_name)
)
return d
def start_reflector(self):
# TODO: is self.reflector_server unused?
reflector_server = random.choice(conf.settings['reflector_servers'])
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
log.info("Reflecting new publication")
factory = reflector.ClientFactory(
self.session.blob_manager,
self.lbry_file_manager.stream_info_manager,
self.stream_hash,
self.publish_name
)
d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
d.addCallback(lambda _: factory.finished_deferred)
return d
def _check_file_path(self, file_path):
def check_file_threaded():
f = open(file_path)
f.close()
self.file_name = os.path.basename(self.file_path)
return True
return threads.deferToThread(check_file_threaded)
def set_lbry_file(self, lbry_file_downloader):
self.lbry_file = lbry_file_downloader
return defer.succeed(None)
def add_to_lbry_files(self, stream_hash):
self.stream_hash = stream_hash
file_name = os.path.basename(file_path)
with file_utils.get_read_handle(file_path) as read_handle:
stream_hash = yield create_lbry_file(self.session, self.lbry_file_manager, file_name,
read_handle)
prm = self.session.payment_rate_manager
d = self.lbry_file_manager.add_lbry_file(stream_hash, prm)
d.addCallback(self.set_lbry_file)
return d
self.lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, prm)
sd_hash = yield publish_sd_blob(self.lbry_file_manager.stream_info_manager,
self.session.blob_manager, self.lbry_file.stream_hash)
if 'sources' not in metadata:
metadata['sources'] = {}
metadata['sources']['lbry_sd_hash'] = sd_hash
metadata['content_type'] = get_content_type(file_path)
metadata['ver'] = Metadata.current_version
claim_out = yield self.make_claim(name, bid, metadata)
defer.returnValue(claim_out)
def _create_sd_blob(self):
log.debug('Creating stream descriptor blob')
d = publish_sd_blob(self.lbry_file_manager.stream_info_manager,
self.session.blob_manager,
self.lbry_file.stream_hash)
@defer.inlineCallbacks
def update_stream(self, name, bid, metadata):
my_claim = yield self.wallet.get_my_claim(name)
updated_metadata = my_claim['value']
for meta_key in metadata:
updated_metadata[meta_key] = metadata[meta_key]
claim_out = yield self.make_claim(name, bid, updated_metadata)
defer.returnValue(claim_out)
def set_sd_hash(sd_hash):
log.debug('stream descriptor hash: %s', sd_hash)
if 'sources' not in self.metadata:
self.metadata['sources'] = {}
self.metadata['sources']['lbry_sd_hash'] = sd_hash
d.addCallback(set_sd_hash)
return d
def set_status(self):
log.debug('Setting status')
d = self.lbry_file_manager.change_lbry_file_status(
self.lbry_file, ManagedEncryptedFileDownloader.STATUS_FINISHED)
d.addCallback(lambda _: self.lbry_file.restore())
return d
def _claim_name(self):
log.debug('Claiming name')
self._update_metadata()
m = Metadata(self.metadata)
def set_claim_out(claim_out):
log.debug('Name claimed using txid: %s, nout: %d, claim_id: %s, fee :%f',
claim_out['txid'], claim_out['nout'],
claim_out['claim_id'], claim_out['fee'])
self.txid = claim_out['txid']
self.nout = claim_out['nout']
self.claim_id = claim_out['claim_id']
self.fee = claim_out['fee']
d = self.wallet.claim_name(self.publish_name, self.bid_amount, m)
d.addCallback(set_claim_out)
return d
def _update_metadata(self):
filename = os.path.join(self.lbry_file.download_directory, self.lbry_file.file_name)
self.metadata['content_type'] = get_content_type(filename)
self.metadata['ver'] = Metadata.current_version
def _throw_publish_error(self, err):
# TODO: I'm not a fan of the log and re-throw, especially when
# the new exception is more generic. Look over this to
# see if there is a reason not to remove the errback
# handler and allow the original exception to move up
# the stack.
raise Exception("Publish failed")
@defer.inlineCallbacks
def make_claim(self, name, bid, metadata):
validated_metadata = Metadata(metadata)
claim_out = yield self.wallet.claim_name(name, bid, validated_metadata)
defer.returnValue(claim_out)
def get_content_type(filename):

View file

@ -16,7 +16,6 @@ class EncryptedFileReflectorClient(Protocol):
# Protocol stuff
def connectionMade(self):
log.debug("Connected to reflector")
self.blob_manager = self.factory.blob_manager
self.response_buff = ''
self.outgoing_buff = ''
self.blob_hashes_to_send = []
@ -25,8 +24,6 @@ class EncryptedFileReflectorClient(Protocol):
self.read_handle = None
self.sent_stream_info = False
self.received_descriptor_response = False
self.protocol_version = self.factory.protocol_version
self.lbry_uri = "lbry://%s" % self.factory.lbry_uri
self.received_server_version = False
self.server_version = None
self.stream_descriptor = None
@ -41,6 +38,26 @@ class EncryptedFileReflectorClient(Protocol):
d.addErrback(
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
@property
def lbry_uri(self):
return "lbry://%s" % self.factory.lbry_uri
@property
def blob_manager(self):
return self.factory.blob_manager
@property
def protocol_version(self):
return self.factory.protocol_version
@property
def stream_info_manager(self):
return self.factory.stream_info_manager
@property
def stream_hash(self):
return self.factory.stream_hash
def dataReceived(self, data):
self.response_buff += data
try:
@ -56,16 +73,22 @@ class EncryptedFileReflectorClient(Protocol):
def connectionLost(self, reason):
if reason.check(error.ConnectionDone):
if not self.needed_blobs:
log.info("Reflector has all blobs for %s", self.lbry_uri)
log.info("Reflector has all blobs for %s (%s)",
self.lbry_uri, self.stream_descriptor)
elif not self.reflected_blobs:
log.info("No more completed blobs for %s to reflect, %i are still needed",
self.lbry_uri, len(self.needed_blobs))
log.info("No more completed blobs for %s (%s) to reflect, %i are still needed",
self.lbry_uri, self.stream_descriptor, len(self.needed_blobs))
else:
log.info('Finished sending reflector %i blobs for %s',
len(self.reflected_blobs), self.lbry_uri)
log.info('Finished sending reflector %i blobs for %s (%s)',
len(self.reflected_blobs), self.lbry_uri, self.stream_descriptor)
self.factory.finished_deferred.callback(self.reflected_blobs)
elif reason.check(error.ConnectionLost):
log.warning("Stopped reflecting %s (%s) after sending %i blobs", self.lbry_uri,
self.stream_descriptor, len(self.reflected_blobs))
self.factory.finished_deferred.callback(self.reflected_blobs)
else:
log.info('Reflector finished for %s: %s', self.lbry_uri, reason)
log.info('Reflector finished for %s (%s): %s', self.lbry_uri, self.stream_descriptor,
reason)
self.factory.finished_deferred.callback(reason)
# IConsumer stuff
@ -118,6 +141,7 @@ class EncryptedFileReflectorClient(Protocol):
[blob for blob in filtered if blob.blob_hash in self.needed_blobs])
d.addCallback(_show_missing_blobs)
d.addCallback(self.set_blobs_to_send)
d.addCallback(lambda _: None if self.descriptor_needed else self.set_not_uploading())
return d
def send_request(self, request_dict):
@ -158,8 +182,9 @@ class EncryptedFileReflectorClient(Protocol):
self.next_blob_to_send.close_read_handle(self.read_handle)
self.read_handle = None
self.next_blob_to_send = None
self.file_sender.stopProducing()
self.file_sender = None
if self.file_sender is not None:
self.file_sender.stopProducing()
self.file_sender = None
return defer.succeed(None)
def start_transfer(self):
@ -292,15 +317,31 @@ class EncryptedFileReflectorClient(Protocol):
class EncryptedFileReflectorClientFactory(ClientFactory):
protocol = EncryptedFileReflectorClient
def __init__(self, blob_manager, stream_info_manager, stream_hash, lbry_uri):
self.protocol_version = REFLECTOR_V2
self.blob_manager = blob_manager
self.stream_info_manager = stream_info_manager
self.stream_hash = stream_hash
self.lbry_uri = lbry_uri
def __init__(self, lbry_file):
self._lbry_file = lbry_file
self.p = None
self.finished_deferred = defer.Deferred()
@property
def blob_manager(self):
return self._lbry_file.blob_manager
@property
def stream_info_manager(self):
return self._lbry_file.stream_info_manager
@property
def stream_hash(self):
return self._lbry_file.stream_hash
@property
def lbry_uri(self):
return self._lbry_file.uri
@property
def protocol_version(self):
return REFLECTOR_V2
def buildProtocol(self, addr):
p = self.protocol()
p.factory = self

View file

@ -1,45 +1,33 @@
import logging
from twisted.internet import reactor
from twisted.internet.error import ConnectionLost, ConnectionDone
from lbrynet.reflector import BlobClientFactory, ClientFactory
import random
log = logging.getLogger(__name__)
def _check_if_reflector_has_stream(lbry_file, reflector_server):
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
factory = BlobClientFactory(
lbry_file.blob_manager,
[lbry_file.sd_hash]
)
d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
d.addCallback(lambda _: factory.finished_deferred)
d.addCallback(lambda _: not factory.sent_blobs)
return d
from twisted.internet import reactor, defer
from lbrynet import conf
from lbrynet.reflector import ClientFactory, BlobClientFactory
@defer.inlineCallbacks
def _reflect_stream(lbry_file, reflector_server):
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
factory = ClientFactory(
lbry_file.blob_manager,
lbry_file.stream_info_manager,
lbry_file.stream_hash,
lbry_file.uri
)
d = reactor.resolve(reflector_address)
d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
d.addCallback(lambda _: factory.finished_deferred)
return d
factory = ClientFactory(lbry_file)
ip = yield reactor.resolve(reflector_address)
yield reactor.connectTCP(ip, reflector_port, factory)
yield factory.finished_deferred
def _catch_error(err, uri):
msg = "An error occurred while checking availability for lbry://%s: %s"
log.error(msg, uri, err.getTraceback())
@defer.inlineCallbacks
def _reflect_blobs(blob_manager, blob_hashes, reflector_server):
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
factory = BlobClientFactory(blob_manager, blob_hashes)
ip = yield reactor.resolve(reflector_address)
yield reactor.connectTCP(ip, reflector_port, factory)
yield factory.finished_deferred
def check_and_restore_availability(lbry_file, reflector_server):
d = _reflect_stream(lbry_file, reflector_server)
d.addErrback(lambda err: err.trap(ConnectionDone, ConnectionLost))
d.addErrback(_catch_error, lbry_file.uri)
return d
def reflect_stream(lbry_file):
reflector_server = random.choice(conf.settings['reflector_servers'])
return _reflect_stream(lbry_file, reflector_server)
def reflect_blob_hashes(blob_hashes, blob_manager):
reflector_server = random.choice(conf.settings['reflector_servers'])
return _reflect_blobs(blob_manager, blob_hashes, reflector_server)

View file

@ -170,12 +170,10 @@ class TestReflector(unittest.TestCase):
return d
def send_to_server():
factory = reflector.ClientFactory(
self.session.blob_manager,
self.stream_info_manager,
self.stream_hash,
"fake_uri"
)
fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager,
self.stream_info_manager,
self.stream_hash)
factory = reflector.ClientFactory(fake_lbry_file)
from twisted.internet import reactor
reactor.connectTCP('localhost', self.port, factory)

View file

@ -10,6 +10,15 @@ from lbrynet import conf
KB = 2**10
class FakeLBRYFile(object):
def __init__(self, blob_manager, stream_info_manager, stream_hash, uri="fake_uri"):
self.blob_manager = blob_manager
self.stream_info_manager = stream_info_manager
self.stream_hash = stream_hash
self.uri = "fake_uri"
class Node(object):
def __init__(self, *args, **kwargs):
pass