lbry-sdk/lbrynet/lbrynet_daemon/LBRYDownloader.py
Jack c2ec066c85 add LBRYFileProducer
-Add LBRYFileProducer, to host the contents of a download without
having to keep re-opening it as it is added to

-included sd hash in ManagedLBRYFileDownloader, to make comparing the
contents of the file manager against name claims easier

-add get_lbry_file function, which returns information about a LBRY
file found by sd_hash, file name, or lbry uri
2016-04-24 04:42:42 -04:00

243 lines
9.7 KiB
Python

import json
import logging
import os
from datetime import datetime
from twisted.internet import defer
from twisted.internet.task import LoopingCall
from lbrynet.core.Error import InvalidStreamInfoError, InsufficientFundsError
from lbrynet.core.PaymentRateManager import PaymentRateManager
from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloaderFactory
from lbrynet.conf import DEFAULT_TIMEOUT
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
class GetStream(object):
def __init__(self, sd_identifier, session, wallet, lbry_file_manager, max_key_fee, pay_key=True, data_rate=0.5,
timeout=DEFAULT_TIMEOUT, download_directory=None):
self.wallet = wallet
self.resolved_name = None
self.description = None
self.key_fee = None
self.key_fee_address = None
self.data_rate = data_rate
self.pay_key = pay_key
self.name = None
self.session = session
self.payment_rate_manager = PaymentRateManager(self.session.base_payment_rate_manager)
self.lbry_file_manager = lbry_file_manager
self.sd_identifier = sd_identifier
self.stream_hash = None
self.max_key_fee = max_key_fee
self.stream_info = None
self.stream_info_manager = None
self.d = defer.Deferred(None)
self.timeout = timeout
self.timeout_counter = 0
self.download_directory = download_directory
self.download_path = None
self.downloader = None
self.finished = defer.Deferred()
self.checker = LoopingCall(self.check_status)
def check_status(self):
self.timeout_counter += 1
if self.download_path:
self.checker.stop()
self.finished.callback((self.stream_hash, self.download_path))
elif self.timeout_counter >= self.timeout:
log.info("Timeout downloading lbry://" + self.resolved_name + ", " + str(self.stream_info))
self.checker.stop()
self.d.cancel()
self.finished.callback(False)
def start(self, stream_info, name):
self.resolved_name = name
self.stream_info = stream_info
if 'stream_hash' in self.stream_info.keys():
self.description = self.stream_info['description']
if 'key_fee' in self.stream_info.keys():
self.key_fee = float(self.stream_info['key_fee'])
if 'key_fee_address' in self.stream_info.keys():
self.key_fee_address = self.stream_info['key_fee_address']
else:
self.key_fee_address = None
else:
self.key_fee = None
self.key_fee_address = None
self.stream_hash = self.stream_info['stream_hash']
if isinstance(self.stream_hash, dict):
self.stream_hash = self.stream_hash['sd_hash']
else:
log.error("InvalidStreamInfoError in autofetcher: ", stream_info)
raise InvalidStreamInfoError(self.stream_info)
if self.key_fee > self.max_key_fee:
if self.pay_key:
log.info("Key fee (" + str(self.key_fee) + ") above limit of " + str(
self.max_key_fee) + ", didn't download lbry://" + str(self.resolved_name))
return defer.fail(None)
else:
pass
def _cause_timeout():
self.timeout_counter = self.timeout * 2
self.checker.start(1)
self.d.addCallback(lambda _: download_sd_blob(self.session, self.stream_hash, self.payment_rate_manager))
self.d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
self.d.addCallback(lambda metadata: (
next(factory for factory in metadata.factories if isinstance(factory, ManagedLBRYFileDownloaderFactory)),
metadata))
self.d.addCallback(lambda (factory, metadata): factory.make_downloader(metadata,
[self.data_rate, True],
self.payment_rate_manager,
download_directory=self.download_directory))
self.d.addCallbacks(self._start_download, lambda _: _cause_timeout())
self.d.callback(None)
return self.finished
def _start_download(self, downloader):
def _pay_key_fee():
if self.key_fee is not None and self.key_fee_address is not None:
reserved_points = self.wallet.reserve_points(self.key_fee_address, self.key_fee)
if reserved_points is None:
return defer.fail(InsufficientFundsError())
log.info("Key fee: " + str(self.key_fee) + " | " + str(self.key_fee_address))
return self.wallet.send_points_to_address(reserved_points, self.key_fee)
return defer.succeed(None)
if self.pay_key:
d = _pay_key_fee()
else:
d = defer.Deferred()
self.downloader = downloader
self.download_path = os.path.join(downloader.download_directory, downloader.file_name)
d.addCallback(lambda _: log.info("Downloading " + str(self.stream_hash) + " --> " + str(self.download_path)))
d.addCallback(lambda _: self.downloader.start())
class FetcherDaemon(object):
def __init__(self, session, lbry_file_manager, lbry_file_metadata_manager, wallet, sd_identifier, autofetcher_conf,
verbose=False):
self.autofetcher_conf = autofetcher_conf
self.max_key_fee = 0.0
self.sd_identifier = sd_identifier
self.wallet = wallet
self.session = session
self.lbry_file_manager = lbry_file_manager
self.lbry_metadata_manager = lbry_file_metadata_manager
self.seen = []
self.lastbestblock = None
self.search = None
self.first_run = True
self.is_running = False
self.verbose = verbose
self._get_autofetcher_conf()
def start(self):
if not self.is_running:
self.is_running = True
self.search = LoopingCall(self._looped_search)
self.search.start(1)
log.info("Starting autofetcher")
else:
log.info("Autofetcher is already running")
def stop(self):
if self.is_running:
self.search.stop()
self.is_running = False
else:
log.info("Autofetcher isn't running, there's nothing to stop")
def check_if_running(self):
if self.is_running:
msg = "Autofetcher is running\n"
msg += "Last block hash: " + str(self.lastbestblock)
else:
msg = "Autofetcher is not running"
return msg
def _get_names(self):
d = self.wallet.get_best_blockhash()
d.addCallback(lambda blockhash: get_new_streams(blockhash) if blockhash != self.lastbestblock else [])
def get_new_streams(blockhash):
self.lastbestblock = blockhash
d = self.wallet.get_block(blockhash)
d.addCallback(lambda block: get_new_streams_in_txes(block['tx'], blockhash))
return d
def get_new_streams_in_txes(txids, blockhash):
ds = []
for t in txids:
d = self.wallet.get_claims_from_tx(t)
d.addCallback(get_new_streams_in_tx, t, blockhash)
ds.append(d)
d = defer.DeferredList(ds, consumeErrors=True)
d.addCallback(lambda result: [r[1] for r in result if r[0]])
d.addCallback(lambda stream_lists: [stream for streams in stream_lists for stream in streams])
return d
def get_new_streams_in_tx(claims, t, blockhash):
rtn = []
if claims:
for claim in claims:
if claim not in self.seen:
msg = "[" + str(datetime.now()) + "] New claim | lbry://" + str(claim['name']) + \
" | stream hash: " + str(json.loads(claim['value'])['stream_hash'])
log.info(msg)
if self.verbose:
print msg
rtn.append((claim['name'], t))
self.seen.append(claim)
else:
if self.verbose:
print "[" + str(datetime.now()) + "] No claims in block", blockhash
return rtn
d.addCallback(lambda streams: defer.DeferredList(
[self.wallet.get_stream_info_from_txid(name, t) for name, t in streams]))
return d
def _download_claims(self, claims):
if claims:
for claim in claims:
stream = GetStream(self.sd_identifier, self.session, self.wallet, self.lbry_file_manager,
self.max_key_fee, pay_key=False)
stream.start(claim[1])
return defer.succeed(None)
def _looped_search(self):
d = self._get_names()
d.addCallback(self._download_claims)
return d
def _get_autofetcher_conf(self):
settings = {"maxkey": "0.0"}
if os.path.exists(self.autofetcher_conf):
conf = open(self.autofetcher_conf)
for l in conf:
if l.startswith("maxkey="):
settings["maxkey"] = float(l[7:].rstrip('\n'))
conf.close()
else:
conf = open(self.autofetcher_conf, "w")
conf.write("maxkey=10.0")
conf.close()
settings["maxkey"] = 10.0
log.info("No autofetcher conf file found, making one with max key fee of 10.0")
self.max_key_fee = settings["maxkey"]