fix files

-don’t automatically remove files that can’t be found from
LBRYFileManager

-add timeout to GetStream
This commit is contained in:
Jack 2016-03-17 00:44:04 -04:00
parent 4e34f861c9
commit 36e299b4cf
5 changed files with 127 additions and 71 deletions

View file

@ -297,8 +297,8 @@ class LBRYWallet(object):
except (ValueError, TypeError):
return Failure(InvalidStreamInfoError(name))
known_fields = ['stream_hash', 'name', 'description', 'key_fee', 'key_fee_address', 'thumbnail',
'content_license', 'sources', 'fee']
known_sources = ['lbry_sd_hash']
'content_license', 'sources', 'fee', 'author']
known_sources = ['lbry_sd_hash', 'btih', 'url']
known_fee_types = {'LBC': ['amount', 'address']}
for field in known_fields:
if field in value_dict:
@ -334,7 +334,7 @@ class LBRYWallet(object):
return Failure(UnknownNameError(name))
def claim_name(self, name, sd_hash, amount, description=None, key_fee=None,
key_fee_address=None, thumbnail=None, content_license=None):
key_fee_address=None, thumbnail=None, content_license=None, author=None, sources=None):
value = {"sources": {'lbry_sd_hash': sd_hash}}
if description is not None:
value['description'] = description
@ -344,6 +344,10 @@ class LBRYWallet(object):
value['thumbnail'] = thumbnail
if content_license is not None:
value['content_license'] = content_license
if author is not None:
value['author'] = author
if sources is not None:
value['sources'] = sources
d = self._send_name_claim(name, json.dumps(value), amount)

View file

@ -34,8 +34,8 @@ class LBRYFileManager(object):
self.sd_identifier = sd_identifier
self.lbry_files = []
self.sql_db = None
self.delete_data = delete_data
self.check_exists_loop = LoopingCall(self.check_files_exist)
# self.delete_data = delete_data
# self.check_exists_loop = LoopingCall(self.check_files_exist)
if sys.platform.startswith("darwin"):
self.download_directory = os.path.join(os.path.expanduser("~"), 'Downloads')
else:
@ -43,34 +43,34 @@ class LBRYFileManager(object):
log.debug("Download directory for LBRYFileManager: %s", str(self.download_directory))
def setup(self):
self.check_exists_loop.start(10)
# self.check_exists_loop.start(10)
d = self._open_db()
d.addCallback(lambda _: self._add_to_sd_identifier())
d.addCallback(lambda _: self._start_lbry_files())
return d
def check_files_exist(self):
def _disp(deleted_files):
if deleted_files[0][0]:
for file in bad_files:
log.info("[" + str(datetime.now()) + "] Detected " + file.file_name + " was deleted, removing from file manager")
def _delete_stream_data(lbry_file):
s_h = lbry_file.stream_hash
d = self.get_count_for_stream_hash(s_h)
# TODO: could possibly be a timing issue here
d.addCallback(lambda c: self.stream_info_manager.delete_stream(s_h) if c == 0 else True)
return d
bad_files = [lbry_file for lbry_file in self.lbry_files
if lbry_file.completed == True and
os.path.isfile(os.path.join(self.download_directory, lbry_file.file_name)) == False]
d = defer.DeferredList([self.delete_lbry_file(lbry_file) for lbry_file in bad_files], consumeErrors=True)
d.addCallback(lambda files: _disp(files) if len(files) else defer.succeed(None))
if self.delete_data:
d2 = defer.DeferredList([_delete_stream_data(lbry_file) for lbry_file in bad_files], consumeErrors=True)
# def check_files_exist(self):
# def _disp(deleted_files):
# if deleted_files[0][0]:
# for file in bad_files:
# log.info("[" + str(datetime.now()) + "] Detected " + file.file_name + " was deleted, removing from file manager")
#
# def _delete_stream_data(lbry_file):
# s_h = lbry_file.stream_hash
# d = self.get_count_for_stream_hash(s_h)
# # TODO: could possibly be a timing issue here
# d.addCallback(lambda c: self.stream_info_manager.delete_stream(s_h) if c == 0 else True)
# return d
#
# bad_files = [lbry_file for lbry_file in self.lbry_files
# if lbry_file.completed == True and
# os.path.isfile(os.path.join(self.download_directory, lbry_file.file_name)) == False]
# d = defer.DeferredList([self.delete_lbry_file(lbry_file) for lbry_file in bad_files], consumeErrors=True)
# d.addCallback(lambda files: _disp(files) if len(files) else defer.succeed(None))
#
# if self.delete_data:
# d2 = defer.DeferredList([_delete_stream_data(lbry_file) for lbry_file in bad_files], consumeErrors=True)
def get_lbry_file_status(self, lbry_file):
return self._get_lbry_file_status(lbry_file.rowid)
@ -180,7 +180,7 @@ class LBRYFileManager(object):
return defer.fail(Failure(ValueError("Could not find that LBRY file")))
def stop(self):
self.check_exists_loop.stop()
# self.check_exists_loop.stop()
ds = []

View file

@ -6,10 +6,14 @@ import binascii
import subprocess
import logging
import requests
# import rumps
# import httplib2
from twisted.web import server, resource, static
from twisted.internet import defer, threads, error, reactor
from txjsonrpc.web import jsonrpc
from jsonrpc.proxy import JSONRPCProxy
from datetime import datetime
from decimal import Decimal
from StringIO import StringIO
@ -537,10 +541,10 @@ class LBRYDaemon(jsonrpc.JSONRPC):
return defer.succeed(True)
def _download_name(self, name):
def _disp_file(file):
log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(file.stream_hash))
d = self._path_from_lbry_file(file)
return d
def _disp_file(f):
file_path = os.path.join(self.download_directory, f.file_name)
log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(f.stream_hash) + " --> " + file_path)
return defer.succeed(f)
def _get_stream(name):
def _disp(stream):
@ -558,12 +562,9 @@ class LBRYDaemon(jsonrpc.JSONRPC):
d = self._check_history(name)
d.addCallback(lambda lbry_file: _get_stream(name) if not lbry_file else _disp_file(lbry_file))
d.addCallback(lambda _: self._check_history(name))
d.addCallback(lambda lbry_file: ({'stream_hash': lbry_file.stream_hash,
'path': os.path.join(self.download_directory,
lbry_file.file_name)})
if lbry_file else defer.fail(NOT_FOUND))
d.addCallback(lambda _: self._path_from_name(name))
d.addErrback(lambda err: defer.fail(NOT_FOUND))
return d
def _resolve_name(self, name):
@ -586,12 +587,19 @@ class LBRYDaemon(jsonrpc.JSONRPC):
f = open(path, 'r')
l = json.loads(f.read())
f.close()
file_name = l['stream_name'].decode('hex')
lbry_file = [file for file in self.lbry_file_manager.lbry_files if file.stream_name == file_name]
if lbry_file:
return lbry_file[0]
for lbry_file in self.lbry_file_manager.lbry_files:
if lbry_file.stream_name == file_name:
if sys.platform == "darwin":
if os.path.isfile(os.path.join(self.download_directory, lbry_file.stream_name)):
return lbry_file
else:
return None
return False
else:
return lbry_file
else:
return False
def _check(info):
stream_hash = info['stream_hash']
@ -604,7 +612,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
return defer.succeed(False)
d = self._resolve_name(name)
d.addCallbacks(_check, lambda _: False)
d.addCallback(_check)
d.callback(None)
return d
@ -673,6 +681,11 @@ class LBRYDaemon(jsonrpc.JSONRPC):
def _render_response(self, result, code):
return json.dumps({'result': result, 'code': code})
# def _log_to_slack(self, msg):
# URL = "https://hooks.slack.com/services/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"
# h = httplib2.Http()
# h.request(URL, 'POST', json.dumps({"text": msg}), headers={'Content-Type': 'application/json'})
def jsonrpc_is_running(self):
"""
Returns a startup message when the daemon starts, after which it will return True
@ -722,6 +735,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
self.fetcher.start()
log.info('[' + str(datetime.now()) + '] Start autofetcher')
# self._log_to_slack('[' + str(datetime.now()) + '] Start autofetcher')
return self._render_response("Started autofetching claims", OK_CODE)
def jsonrpc_stop_fetcher(self):
@ -829,6 +843,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
lambda err: self._render_response('error', NOT_FOUND))
else:
d = self._render_response('error', BAD_REQUEST)
return d
def jsonrpc_stop_lbry_file(self, p):
@ -933,27 +948,40 @@ class LBRYDaemon(jsonrpc.JSONRPC):
@param:
@return:
"""
params = Bunch(p)
metadata_fields = {"name": str, "file_path": str, "bid": float, "author": str, "title": str,
"description": str, "thumbnail": str, "key_fee": float, "key_fee_address": str,
"content_license": str}
log.info(params)
log.info(params.__dict__)
metadata_fields = {"name": unicode, "file_path": unicode, "bid": float, "author": unicode, "title": unicode,
"description": unicode, "thumbnail": unicode, "key_fee": float, "key_fee_address": unicode,
"content_license": unicode, "sources": dict}
for k in metadata_fields.keys():
if k in params.__dict__.keys():
assert isinstance(params.__dict__[k], metadata_fields[k])
if isinstance(params.__dict__[k], metadata_fields[k]):
if type(params.__dict__[k]) == unicode:
metadata_fields[k] = str(params.__dict__[k])
else:
metadata_fields[k] = params.__dict__[k]
else:
metadata_fields[k] = None
else:
metadata_fields[k] = None
log.info("[" + str(datetime.now()) + "] Publish: ", metadata_fields)
p = Publisher(self.session, self.lbry_file_manager, self.session.wallet)
d = p.start(metadata_fields['name'], metadata_fields['file_path'], metadata_fields['bid'],
metadata_fields['title'], metadata_fields['description'], metadata_fields['thumbnail'],
metadata_fields['key_fee'], metadata_fields['key_fee_address'], metadata_fields['content_license'])
d = p.start(name=metadata_fields['name'],
file_path=metadata_fields['file_path'],
bid=metadata_fields['bid'],
title=metadata_fields['title'],
description=metadata_fields['description'],
thumbnail=metadata_fields['thumbnail'],
key_fee=metadata_fields['key_fee'],
key_fee_address=metadata_fields['key_fee_address'],
content_license=metadata_fields['content_license'],
author=metadata_fields['author'],
sources=metadata_fields['sources'])
d.addCallbacks(lambda msg: self._render_response(msg, OK_CODE),
lambda err: self._render_response(err.getTraceback(), BAD_REQUEST))

View file

@ -13,7 +13,8 @@ log = logging.getLogger(__name__)
class GetStream(object):
def __init__(self, sd_identifier, session, wallet, lbry_file_manager, max_key_fee, pay_key=True, data_rate=0.5):
def __init__(self, sd_identifier, session, wallet, lbry_file_manager, max_key_fee, pay_key=True, data_rate=0.5,
timeout=30):
self.wallet = wallet
self.resolved_name = None
self.description = None
@ -30,8 +31,25 @@ class GetStream(object):
self.max_key_fee = max_key_fee
self.stream_info = None
self.stream_info_manager = None
self.d = defer.Deferred(None)
self.timeout = timeout
self.timeout_counter = 0
self.download_path = None
self.checker = LoopingCall(self.check_status)
def check_status(self):
self.timeout_counter += 1
if self.download_path and os.path.isfile(self.download_path):
self.checker.stop()
return defer.succeed(True)
elif self.timeout_counter >= self.timeout:
log.info("Timeout downloading " + str(self.stream_info))
self.checker.stop()
self.d.cancel()
def start(self, stream_info):
self.stream_info = stream_info
if 'stream_hash' in self.stream_info.keys():
@ -60,17 +78,18 @@ class GetStream(object):
else:
pass
d = defer.Deferred(None)
d.addCallback(lambda _: download_sd_blob(self.session, self.stream_hash, self.payment_rate_manager))
d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
d.addCallback(lambda metadata: (next(factory for factory in metadata.factories if isinstance(factory, ManagedLBRYFileDownloaderFactory)), metadata))
d.addCallback(lambda (factory, metadata): factory.make_downloader(metadata, [self.data_rate, True], self.payment_rate_manager))
d.addErrback(lambda err: err.trap(defer.CancelledError))
d.addErrback(lambda err: log.error("An exception occurred attempting to load the stream descriptor: %s", err.getTraceback()))
d.addCallback(self._start_download)
d.callback(None)
self.checker.start(1)
return d
self.d.addCallback(lambda _: download_sd_blob(self.session, self.stream_hash, self.payment_rate_manager))
self.d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
self.d.addCallback(lambda metadata: (next(factory for factory in metadata.factories if isinstance(factory, ManagedLBRYFileDownloaderFactory)), metadata))
self.d.addCallback(lambda (factory, metadata): factory.make_downloader(metadata, [self.data_rate, True], self.payment_rate_manager))
self.d.addErrback(lambda err: err.trap(defer.CancelledError))
self.d.addErrback(lambda err: log.error("An exception occurred attempting to load the stream descriptor: %s", err.getTraceback()))
self.d.addCallback(self._start_download)
self.d.callback(None)
return self.d
def _start_download(self, downloader):
def _pay_key_fee():
@ -87,9 +106,9 @@ class GetStream(object):
else:
d = defer.Deferred()
downloader.start()
log.info("Downloading", self.stream_hash, "-->", os.path.join(downloader.download_directory, downloader.file_name))
self.download_path = os.path.join(downloader.download_directory, downloader.file_name)
d.addCallback(lambda _: downloader.start())
d.addCallback(lambda _: log.info("Downloading " + str(self.stream_hash) + " --> " + str(self.download_path)))
return d

View file

@ -33,9 +33,11 @@ class Publisher(object):
self.sd_hash = None
self.tx_hash = None
self.content_license = None
self.author = None
self.sources = None
def start(self, name, file_path, bid, title=None, description=None, thumbnail=None,
key_fee=None, key_fee_address=None, content_license=None):
key_fee=None, key_fee_address=None, content_license=None, author=None, sources=None):
def _show_result():
message = "[" + str(datetime.now()) + "] Published " + self.file_name + " --> lbry://" + \
@ -52,6 +54,8 @@ class Publisher(object):
self.key_fee = key_fee
self.key_fee_address = key_fee_address
self.content_license = content_license
self.author = author
self.sources = sources
d = self._check_file_path(self.file_path)
d.addCallback(lambda _: create_lbry_file(self.session, self.lbry_file_manager,
@ -104,10 +108,11 @@ class Publisher(object):
return d
def _claim_name(self):
d = self.wallet.claim_name(self.publish_name, self.sd_hash, self.bid_amount,
d = self.wallet.claim_name(self.publish_name, {'sd_hash': self.sd_hash}, self.bid_amount,
description=self.description, key_fee=self.key_fee,
key_fee_address=self.key_fee_address, thumbnail=self.thumbnail,
content_license=self.content_license)
content_license=self.content_license, author=self.author,
sources=self.sources)
def set_tx_hash(tx_hash):
self.tx_hash = tx_hash