get rid of autofetcher in daemon
the purpose of the autofetcher was to automatically back up and host published content, it is simpler to do this in a separate script that uses existing daemon functions than to have it be built in
This commit is contained in:
parent
019de08c64
commit
c1d0f9cf1b
2 changed files with 0 additions and 182 deletions
|
@ -143,7 +143,6 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
||||||
self.wallet_type = wallet_type
|
self.wallet_type = wallet_type
|
||||||
self.first_run = None
|
self.first_run = None
|
||||||
self.log_file = LOG_FILENAME
|
self.log_file = LOG_FILENAME
|
||||||
self.fetcher = None
|
|
||||||
self.current_db_revision = 1
|
self.current_db_revision = 1
|
||||||
self.run_server = True
|
self.run_server = True
|
||||||
self.session = None
|
self.session = None
|
||||||
|
@ -680,11 +679,6 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
||||||
|
|
||||||
return defer.succeed(True)
|
return defer.succeed(True)
|
||||||
|
|
||||||
def _setup_fetcher(self):
|
|
||||||
self.fetcher = FetcherDaemon(self.session, self.lbry_file_manager, self.lbry_file_metadata_manager,
|
|
||||||
self.session.wallet, self.sd_identifier, self.autofetcher_conf)
|
|
||||||
return defer.succeed(None)
|
|
||||||
|
|
||||||
def _setup_data_directory(self):
|
def _setup_data_directory(self):
|
||||||
self.startup_status = STARTUP_STAGES[1]
|
self.startup_status = STARTUP_STAGES[1]
|
||||||
log.info("Loading databases...")
|
log.info("Loading databases...")
|
||||||
|
@ -1288,48 +1282,6 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
||||||
else:
|
else:
|
||||||
return self._render_response(self.jsonrpc_help.__doc__, OK_CODE)
|
return self._render_response(self.jsonrpc_help.__doc__, OK_CODE)
|
||||||
|
|
||||||
def jsonrpc_start_fetcher(self):
|
|
||||||
"""
|
|
||||||
Start automatically downloading new name claims as they occur (off by default)
|
|
||||||
|
|
||||||
Args:
|
|
||||||
None
|
|
||||||
Returns:
|
|
||||||
confirmation message
|
|
||||||
"""
|
|
||||||
|
|
||||||
self.fetcher.start()
|
|
||||||
log.info('[' + str(datetime.now()) + '] Start autofetcher')
|
|
||||||
# self._log_to_slack('[' + str(datetime.now()) + '] Start autofetcher')
|
|
||||||
return self._render_response("Started autofetching claims", OK_CODE)
|
|
||||||
|
|
||||||
def jsonrpc_stop_fetcher(self):
|
|
||||||
"""
|
|
||||||
Stop automatically downloading new name claims as they occur
|
|
||||||
|
|
||||||
Args:
|
|
||||||
None
|
|
||||||
Returns:
|
|
||||||
confirmation message
|
|
||||||
"""
|
|
||||||
|
|
||||||
self.fetcher.stop()
|
|
||||||
log.info('[' + str(datetime.now()) + '] Stop autofetcher')
|
|
||||||
return self._render_response("Stopped autofetching claims", OK_CODE)
|
|
||||||
|
|
||||||
def jsonrpc_fetcher_status(self):
|
|
||||||
"""
|
|
||||||
Get fetcher status
|
|
||||||
|
|
||||||
Args:
|
|
||||||
None
|
|
||||||
Returns:
|
|
||||||
True/False
|
|
||||||
"""
|
|
||||||
|
|
||||||
log.info("[" + str(datetime.now()) + "] Get fetcher status")
|
|
||||||
return self._render_response(self.fetcher.check_if_running(), OK_CODE)
|
|
||||||
|
|
||||||
def jsonrpc_get_balance(self):
|
def jsonrpc_get_balance(self):
|
||||||
"""
|
"""
|
||||||
Get balance
|
Get balance
|
||||||
|
@ -1865,7 +1817,6 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
||||||
d.addCallback(lambda r: self._render_response(r, OK_CODE))
|
d.addCallback(lambda r: self._render_response(r, OK_CODE))
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
def jsonrpc_get_nametrie(self):
|
def jsonrpc_get_nametrie(self):
|
||||||
"""
|
"""
|
||||||
Get the nametrie
|
Get the nametrie
|
||||||
|
@ -1901,23 +1852,6 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
||||||
#
|
#
|
||||||
# return d
|
# return d
|
||||||
|
|
||||||
def jsonrpc_toggle_fetcher_verbose(self):
|
|
||||||
"""
|
|
||||||
Toggle fetcher verbose mode
|
|
||||||
|
|
||||||
Args:
|
|
||||||
None
|
|
||||||
Returns:
|
|
||||||
Fetcher verbose status, bool
|
|
||||||
"""
|
|
||||||
|
|
||||||
if self.fetcher.verbose:
|
|
||||||
self.fetcher.verbose = False
|
|
||||||
else:
|
|
||||||
self.fetcher.verbose = True
|
|
||||||
|
|
||||||
return self._render_response(self.fetcher.verbose, OK_CODE)
|
|
||||||
|
|
||||||
def jsonrpc_check_for_new_version(self):
|
def jsonrpc_check_for_new_version(self):
|
||||||
"""
|
"""
|
||||||
Checks local version against versions in __init__.py and version.py in the lbrynet and lbryum repos
|
Checks local version against versions in __init__.py and version.py in the lbrynet and lbryum repos
|
||||||
|
|
|
@ -138,119 +138,3 @@ class GetStream(object):
|
||||||
self.download_path = os.path.join(downloader.download_directory, downloader.file_name)
|
self.download_path = os.path.join(downloader.download_directory, downloader.file_name)
|
||||||
d.addCallback(lambda _: log.info("[" + str(datetime.now()) + "] Downloading " + str(self.stream_hash) + " --> " + str(self.download_path)))
|
d.addCallback(lambda _: log.info("[" + str(datetime.now()) + "] Downloading " + str(self.stream_hash) + " --> " + str(self.download_path)))
|
||||||
d.addCallback(lambda _: self.downloader.start())
|
d.addCallback(lambda _: self.downloader.start())
|
||||||
|
|
||||||
|
|
||||||
class FetcherDaemon(object):
|
|
||||||
def __init__(self, session, lbry_file_manager, lbry_file_metadata_manager, wallet, sd_identifier, autofetcher_conf,
|
|
||||||
verbose=False):
|
|
||||||
self.autofetcher_conf = autofetcher_conf
|
|
||||||
self.max_key_fee = 0.0
|
|
||||||
self.sd_identifier = sd_identifier
|
|
||||||
self.wallet = wallet
|
|
||||||
self.session = session
|
|
||||||
self.lbry_file_manager = lbry_file_manager
|
|
||||||
self.lbry_metadata_manager = lbry_file_metadata_manager
|
|
||||||
self.seen = []
|
|
||||||
self.lastbestblock = None
|
|
||||||
self.search = None
|
|
||||||
self.first_run = True
|
|
||||||
self.is_running = False
|
|
||||||
self.verbose = verbose
|
|
||||||
self._get_autofetcher_conf()
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
if not self.is_running:
|
|
||||||
self.is_running = True
|
|
||||||
self.search = LoopingCall(self._looped_search)
|
|
||||||
self.search.start(1)
|
|
||||||
log.info("Starting autofetcher")
|
|
||||||
else:
|
|
||||||
log.info("Autofetcher is already running")
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
if self.is_running:
|
|
||||||
self.search.stop()
|
|
||||||
self.is_running = False
|
|
||||||
else:
|
|
||||||
log.info("Autofetcher isn't running, there's nothing to stop")
|
|
||||||
|
|
||||||
def check_if_running(self):
|
|
||||||
if self.is_running:
|
|
||||||
msg = "Autofetcher is running\n"
|
|
||||||
msg += "Last block hash: " + str(self.lastbestblock)
|
|
||||||
else:
|
|
||||||
msg = "Autofetcher is not running"
|
|
||||||
return msg
|
|
||||||
|
|
||||||
def _get_names(self):
|
|
||||||
d = self.wallet.get_best_blockhash()
|
|
||||||
d.addCallback(lambda blockhash: get_new_streams(blockhash) if blockhash != self.lastbestblock else [])
|
|
||||||
|
|
||||||
def get_new_streams(blockhash):
|
|
||||||
self.lastbestblock = blockhash
|
|
||||||
d = self.wallet.get_block(blockhash)
|
|
||||||
d.addCallback(lambda block: get_new_streams_in_txes(block['tx'], blockhash))
|
|
||||||
return d
|
|
||||||
|
|
||||||
def get_new_streams_in_txes(txids, blockhash):
|
|
||||||
ds = []
|
|
||||||
for t in txids:
|
|
||||||
d = self.wallet.get_claims_from_tx(t)
|
|
||||||
d.addCallback(get_new_streams_in_tx, t, blockhash)
|
|
||||||
ds.append(d)
|
|
||||||
d = defer.DeferredList(ds, consumeErrors=True)
|
|
||||||
d.addCallback(lambda result: [r[1] for r in result if r[0]])
|
|
||||||
d.addCallback(lambda stream_lists: [stream for streams in stream_lists for stream in streams])
|
|
||||||
return d
|
|
||||||
|
|
||||||
def get_new_streams_in_tx(claims, t, blockhash):
|
|
||||||
rtn = []
|
|
||||||
if claims:
|
|
||||||
for claim in claims:
|
|
||||||
if claim not in self.seen:
|
|
||||||
msg = "[" + str(datetime.now()) + "] New claim | lbry://" + str(claim['name']) + \
|
|
||||||
" | stream hash: " + str(json.loads(claim['value'])['stream_hash'])
|
|
||||||
log.info(msg)
|
|
||||||
if self.verbose:
|
|
||||||
print msg
|
|
||||||
rtn.append((claim['name'], t))
|
|
||||||
self.seen.append(claim)
|
|
||||||
else:
|
|
||||||
if self.verbose:
|
|
||||||
print "[" + str(datetime.now()) + "] No claims in block", blockhash
|
|
||||||
return rtn
|
|
||||||
|
|
||||||
d.addCallback(lambda streams: defer.DeferredList(
|
|
||||||
[self.wallet.get_stream_info_from_txid(name, t) for name, t in streams]))
|
|
||||||
return d
|
|
||||||
|
|
||||||
def _download_claims(self, claims):
|
|
||||||
if claims:
|
|
||||||
for claim in claims:
|
|
||||||
stream = GetStream(self.sd_identifier, self.session, self.wallet, self.lbry_file_manager,
|
|
||||||
self.max_key_fee, pay_key=False)
|
|
||||||
stream.start(claim[1])
|
|
||||||
|
|
||||||
return defer.succeed(None)
|
|
||||||
|
|
||||||
def _looped_search(self):
|
|
||||||
d = self._get_names()
|
|
||||||
d.addCallback(self._download_claims)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def _get_autofetcher_conf(self):
|
|
||||||
settings = {"maxkey": "0.0"}
|
|
||||||
if os.path.exists(self.autofetcher_conf):
|
|
||||||
conf = open(self.autofetcher_conf)
|
|
||||||
for l in conf:
|
|
||||||
if l.startswith("maxkey="):
|
|
||||||
settings["maxkey"] = float(l[7:].rstrip('\n'))
|
|
||||||
conf.close()
|
|
||||||
else:
|
|
||||||
conf = open(self.autofetcher_conf, "w")
|
|
||||||
conf.write("maxkey=10.0")
|
|
||||||
conf.close()
|
|
||||||
settings["maxkey"] = 10.0
|
|
||||||
log.info("No autofetcher conf file found, making one with max key fee of 10.0")
|
|
||||||
|
|
||||||
self.max_key_fee = settings["maxkey"]
|
|
||||||
|
|
Loading…
Reference in a new issue