Merge branch 'stream-availability'

This commit is contained in:
Jack Robison 2017-12-20 21:29:07 -05:00
commit f509f354b4
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
6 changed files with 300 additions and 75 deletions

View file

@ -21,7 +21,7 @@ at anytime.
### Deprecated
* `channel_list_mine`, replaced with `channel_list`
*
* `get_availability`, replaced with `stream_availability`
### Changed
* Check claim schema in `publish` before trying to make the claim, return better error messages
@ -57,6 +57,7 @@ at anytime.
* Added a table to the lbry file database to store the outpoint of the claim downloaded from
* Added `wallet_unlock`, a command available during startup to unlock an encrypted wallet
* Added support for wallet encryption via new commands `wallet_decrypt` and `wallet_encrypt`
* Added `blob_availability` and `stream_availability` commands for debugging download issues
### Removed
* Removed claim related filter arguments `name`, `claim_id`, and `outpoint` from `file_list`, `file_delete`, `file_set_status`, and `file_reflect`

View file

@ -0,0 +1,111 @@
import logging
import shutil
import tempfile
from twisted.internet import defer, threads, reactor
from lbrynet.blob import BlobFile
from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.core.HashAnnouncer import DummyHashAnnouncer
from lbrynet.core.RateLimiter import DummyRateLimiter
from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager
from lbrynet.core.PeerFinder import DummyPeerFinder
from lbrynet.core.client.BlobRequester import BlobRequester
from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader
from lbrynet.core.client.ConnectionManager import ConnectionManager
log = logging.getLogger(__name__)
class TempBlobManager(DiskBlobManager):
def stop(self):
self.db_conn.close()
return defer.succeed(True)
class SinglePeerFinder(DummyPeerFinder):
def __init__(self, peer):
DummyPeerFinder.__init__(self)
self.peer = peer
def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=False):
return defer.succeed([self.peer])
class BlobCallback(BlobFile):
def __init__(self, blob_dir, blob_hash, timeout):
BlobFile.__init__(self, blob_dir, blob_hash)
self.callback = defer.Deferred()
reactor.callLater(timeout, self._cancel)
def _cancel(self):
if not self.callback.called:
self.callback.callback(False)
def save_verified_blob(self, writer):
result = BlobFile.save_verified_blob(self, writer)
if not self.callback.called:
self.callback.callback(True)
return result
class SingleBlobDownloadManager(object):
def __init__(self, blob):
self.blob = blob
def needed_blobs(self):
if self.blob.verified:
return []
else:
return [self.blob]
def get_head_blob_hash(self):
return self.blob.blob_hash
class SinglePeerDownloader(object):
def __init__(self):
self._payment_rate_manager = OnlyFreePaymentsManager()
self._announcer = DummyHashAnnouncer()
self._rate_limiter = DummyRateLimiter()
self._wallet = None
self._blob_manager = None
def setup(self, wallet, blob_manager=None):
if not self._wallet:
self._wallet = wallet
if not self._blob_manager:
self._blob_manager = blob_manager
@defer.inlineCallbacks
def download_blob_from_peer(self, peer, timeout, blob_hash, blob_manager):
log.debug("Try to download %s from %s", blob_hash, peer.host)
blob_manager = blob_manager
blob = BlobCallback(blob_manager.blob_dir, blob_hash, timeout)
download_manager = SingleBlobDownloadManager(blob)
peer_finder = SinglePeerFinder(peer)
requester = BlobRequester(blob_manager, peer_finder, self._payment_rate_manager,
self._wallet, download_manager)
downloader = StandaloneBlobDownloader(blob_hash, blob_manager, peer_finder,
self._rate_limiter, self._payment_rate_manager,
self._wallet, timeout=timeout)
info_exchanger = self._wallet.get_info_exchanger()
connection_manager = ConnectionManager(downloader, self._rate_limiter, [requester],
[info_exchanger])
connection_manager.start()
result = yield blob.callback
if not result:
log.debug("Failed to downloaded %s from %s", blob_hash[:16], peer.host)
yield connection_manager.stop()
defer.returnValue(result)
@defer.inlineCallbacks
def download_temp_blob_from_peer(self, peer, timeout, blob_hash):
tmp_dir = yield threads.deferToThread(tempfile.mkdtemp)
tmp_blob_manager = TempBlobManager(self._announcer, tmp_dir, tmp_dir)
try:
result = yield self.download_blob_from_peer(peer, timeout, blob_hash, tmp_blob_manager)
finally:
yield tmp_blob_manager.stop()
yield threads.deferToThread(shutil.rmtree, tmp_dir)
defer.returnValue(result)

View file

@ -27,8 +27,10 @@ from lbrynet import conf
from lbrynet.core.sqlite_helpers import rerun_if_locked
from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet
from lbrynet.core.client.ClientRequest import ClientRequest
from lbrynet.core.Error import RequestCanceledError, InsufficientFundsError, UnknownNameError
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError
from lbrynet.core.Error import UnknownClaimID, UnknownURI, NegativeFundsError, UnknownOutpoint
from lbrynet.core.Error import DownloadCanceledError, RequestCanceledError
from twisted.internet.error import ConnectionAborted
log = logging.getLogger(__name__)
@ -1610,7 +1612,7 @@ class LBRYcrdAddressRequester(object):
self.wallet.update_peer_address(peer, address)
def _request_failed(self, err, peer):
if not err.check(RequestCanceledError):
if not err.check(DownloadCanceledError, RequestCanceledError, ConnectionAborted):
log.warning("A peer failed to send a valid public key response. Error: %s, peer: %s",
err.getErrorMessage(), str(peer))
return err

View file

@ -484,7 +484,7 @@ class DownloadRequest(RequestHelper):
blob_details.cancel_func,
blob
)
log.debug("Requesting blob %s from %s", blob.blob_hash, self.peer)
log.info("Requesting blob %s from %s", blob.blob_hash, self.peer)
return request
def _handle_download_request(self, client_blob_request):
@ -508,10 +508,10 @@ class DownloadRequest(RequestHelper):
self._download_failed,
callbackArgs=(client_blob_request.blob,),
)
client_blob_request.finished_deferred.addBoth(
self._pay_or_cancel_payment, reserved_points, client_blob_request.blob)
client_blob_request.finished_deferred.addErrback(
_handle_download_error, self.peer, client_blob_request.blob)
client_blob_request.finished_deferred.addBoth(self._pay_or_cancel_payment,
reserved_points, client_blob_request.blob)
client_blob_request.finished_deferred.addErrback(_handle_download_error, self.peer,
client_blob_request.blob)
def _pay_or_cancel_payment(self, arg, reserved_points, blob):
if self._can_pay_peer(blob, arg):

View file

@ -39,7 +39,7 @@ class ConnectionManager(object):
# a deferred that gets fired when a _manage call is set
self._manage_deferred = None
self.stopped = True
log.info("%s initialized", self._get_log_name())
log.debug("%s initialized", self._get_log_name())
# this identifies what the connection manager is for,
# used for logging purposes only

View file

@ -7,7 +7,6 @@ import requests
import urllib
import json
import textwrap
import random
import signal
from copy import deepcopy
from twisted.web import server
@ -20,6 +19,7 @@ from lbryschema.uri import parse_lbry_uri
from lbryschema.error import URIParseError, DecodeError
from lbryschema.validator import validate_claim_id
from lbryschema.address import decode_address
from lbryschema.decode import smart_decode
# TODO: importing this when internet is disabled raises a socket.gaierror
from lbrynet.core.system_info import get_lbrynet_version
@ -48,6 +48,8 @@ from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash
from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSDTimeout
from lbrynet.core.Error import NullFundsError, NegativeFundsError
from lbrynet.core.Peer import Peer
from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader
log = logging.getLogger(__name__)
@ -190,7 +192,6 @@ class Daemon(AuthJSONRPCServer):
self.use_upnp = conf.settings['use_upnp']
self.auto_renew_claim_height_delta = conf.settings['auto_renew_claim_height_delta']
self.startup_status = STARTUP_STAGES[0]
self.connected_to_internet = True
self.connection_status_code = None
@ -295,10 +296,10 @@ class Daemon(AuthJSONRPCServer):
for outpoint, result in results.iteritems():
if result['success']:
log.info("Renewed claim at outpoint:%s claim ID:%s, paid fee:%s",
outpoint, result['claim_id'], result['fee'])
outpoint, result['claim_id'], result['fee'])
else:
log.info("Failed to renew claim at outpoint:%s, reason:%s",
outpoint, result['reason'])
outpoint, result['reason'])
def _start_server(self):
if self.peer_port is not None:
@ -667,10 +668,10 @@ class Daemon(AuthJSONRPCServer):
self.analytics_manager.send_download_started(download_id, name, claim_dict)
self.streams[sd_hash] = GetStream(self.sd_identifier, self.session,
self.exchange_rate_manager, self.max_key_fee,
self.disable_max_key_fee,
conf.settings['data_rate'], timeout,
file_name)
self.exchange_rate_manager, self.max_key_fee,
self.disable_max_key_fee,
conf.settings['data_rate'], timeout,
file_name)
try:
lbry_file, finished_deferred = yield self.streams[sd_hash].start(claim_dict, name)
yield self.stream_info_manager.save_outpoint_to_file(lbry_file.rowid, txid, nout)
@ -951,6 +952,52 @@ class Daemon(AuthJSONRPCServer):
d.addCallback(self.get_blobs_for_stream_hash)
return d
def _get_single_peer_downloader(self):
downloader = SinglePeerDownloader()
downloader.setup(self.session.wallet)
return downloader
@defer.inlineCallbacks
def _blob_availability(self, blob_hash, search_timeout, blob_timeout, downloader=None):
if not downloader:
downloader = self._get_single_peer_downloader()
result = {}
search_timeout = search_timeout or conf.settings['peer_search_timeout']
blob_timeout = blob_timeout or conf.settings['sd_download_timeout']
is_available = False
reachable_peers = []
unreachable_peers = []
try:
peers = yield self.jsonrpc_peer_list(blob_hash, search_timeout)
peer_infos = [{"peer": Peer(x[0], x[1]),
"blob_hash": blob_hash,
"timeout": blob_timeout} for x in peers if x[2]]
dl = []
dl_peers = []
dl_results = []
for peer_info in peer_infos:
d = downloader.download_temp_blob_from_peer(**peer_info)
dl.append(d)
dl_peers.append("%s:%i" % (peer_info['peer'].host, peer_info['peer'].port))
for dl_peer, (success, download_result) in zip(dl_peers,
(yield defer.DeferredList(dl))):
if success:
if download_result:
reachable_peers.append(dl_peer)
else:
unreachable_peers.append(dl_peer)
dl_results.append(download_result)
is_available = any(dl_results)
except Exception as err:
result['error'] = "Failed to get peers for blob: %s" % err
response = {
'is_available': is_available,
'reachable_peers': reachable_peers,
'unreachable_peers': unreachable_peers,
}
defer.returnValue(response)
############################################################################
# #
# JSON-RPC API methods start here #
@ -1514,7 +1561,7 @@ class Daemon(AuthJSONRPCServer):
for u in uris:
try:
parse_lbry_uri(u)
valid_uris += (u, )
valid_uris += (u,)
except URIParseError:
results[u] = {"error": "%s is not a valid uri" % u}
@ -2099,7 +2146,7 @@ class Daemon(AuthJSONRPCServer):
else:
raise Exception("invalid outpoint")
result = yield self.session.wallet.claim_renew(txid, nout)
result = {outpoint:result}
result = {outpoint: result}
else:
height = int(height)
result = yield self.session.wallet.claim_renew_all_before_expiration(height)
@ -2247,7 +2294,7 @@ class Daemon(AuthJSONRPCServer):
uris = tuple(uris)
if uri is not None:
uris += (uri, )
uris += (uri,)
results = {}
@ -2260,7 +2307,7 @@ class Daemon(AuthJSONRPCServer):
elif parsed.path:
results[chan_uri] = {"error": "%s is a claim in a channel" % parsed.path}
else:
valid_uris += (chan_uri, )
valid_uris += (chan_uri,)
except URIParseError:
results[chan_uri] = {"error": "%s is not a valid uri" % chan_uri}
@ -2271,8 +2318,8 @@ class Daemon(AuthJSONRPCServer):
results[u] = resolved[u]
else:
results[u] = {
'claims_in_channel': resolved[u]['claims_in_channel']
}
'claims_in_channel': resolved[u]['claims_in_channel']
}
if page:
results[u]['returned_page'] = page
results[u]['claims_in_channel'] = resolved[u].get('claims_in_channel', [])
@ -2919,7 +2966,29 @@ class Daemon(AuthJSONRPCServer):
result['node_id'] = self.session.dht_node.node_id.encode('hex')
return self._render_response(result)
@defer.inlineCallbacks
def jsonrpc_blob_availability(self, blob_hash, search_timeout=None, blob_timeout=None):
"""
Get blob availability
Usage:
blob_availability (<blob_hash>) [<search_timeout> | --search_timeout=<search_timeout>]
[<blob_timeout> | --blob_timeout=<blob_timeout>]
Options:
<search_timeout> : how long to search for peers for the blob in the dht
<blob_timeout> : how long to try downloading from a peer
Returns:
(dict) {
"is_available": <bool, true if blob is available from a peer from peer list>
"reachable_peers": ["<ip>:<port>"],
"unreachable_peers": ["<ip>:<port>"]
}
"""
return self._blob_availability(blob_hash, search_timeout, blob_timeout)
@AuthJSONRPCServer.deprecated("stream_availability")
def jsonrpc_get_availability(self, uri, sd_timeout=None, peer_timeout=None):
"""
Get stream availability for lbry uri
@ -2936,63 +3005,105 @@ class Daemon(AuthJSONRPCServer):
(float) Peers per blob / total blobs
"""
def _get_mean(blob_availabilities):
peer_counts = []
for blob_availability in blob_availabilities:
for blob, peers in blob_availability.iteritems():
peer_counts.append(peers)
if peer_counts:
return round(1.0 * sum(peer_counts) / len(peer_counts), 2)
else:
return 0.0
return self.jsonrpc_stream_availability(uri, peer_timeout, sd_timeout)
def read_sd_blob(sd_blob):
sd_blob_file = sd_blob.open_for_reading()
decoded_sd_blob = json.loads(sd_blob_file.read())
sd_blob_file.close()
return decoded_sd_blob
@defer.inlineCallbacks
def jsonrpc_stream_availability(self, uri, search_timeout=None, blob_timeout=None):
"""
Get stream availability for lbry uri
resolved_result = yield self.session.wallet.resolve(uri)
if resolved_result and uri in resolved_result:
resolved = resolved_result[uri]
else:
defer.returnValue(None)
Usage:
stream_availability (<uri>) [<search_timeout> | --search_timeout=<search_timeout>]
[<blob_timeout> | --blob_timeout=<blob_timeout>]
if 'claim' in resolved:
metadata = resolved['claim']['value']
else:
defer.returnValue(None)
Options:
<search_timeout> : how long to search for peers for the blob in the dht
<blob_timeout> : how long to try downloading from a peer
Returns:
(dict) {
'is_available': <bool>,
'did_decode': <bool>,
'did_resolve': <bool>,
'is_stream': <bool>,
'num_blobs_in_stream': <int>,
'sd_hash': <str>,
'sd_blob_availability': <dict> see `blob_availability`,
'head_blob_hash': <str>,
'head_blob_availability': <dict> see `blob_availability`,
'use_upnp': <bool>,
'upnp_redirect_is_set': <bool>,
'error': <None> | <str> error message
}
"""
search_timeout = search_timeout or conf.settings['peer_search_timeout']
blob_timeout = blob_timeout or conf.settings['sd_download_timeout']
response = {
'is_available': False,
'did_decode': False,
'did_resolve': False,
'is_stream': False,
'num_blobs_in_stream': None,
'sd_hash': None,
'sd_blob_availability': {},
'head_blob_hash': None,
'head_blob_availability': {},
'use_upnp': conf.settings['use_upnp'],
'upnp_redirect_is_set': len(self.session.upnp_redirects) > 0,
'error': None
}
sd_hash = utils.get_sd_hash(metadata)
sd_timeout = sd_timeout or conf.settings['sd_download_timeout']
peer_timeout = peer_timeout or conf.settings['peer_search_timeout']
blobs = []
try:
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
need_sd_blob = False
log.info("Already have sd blob")
except NoSuchSDHash:
need_sd_blob = True
log.info("Need sd blob")
resolved_result = yield self.session.wallet.resolve(uri)
response['did_resolve'] = True
except UnknownNameError:
response['error'] = "Failed to resolve name"
defer.returnValue(response)
except URIParseError:
response['error'] = "Invalid URI"
defer.returnValue(response)
blob_hashes = [blob.blob_hash for blob in blobs]
if need_sd_blob:
# we don't want to use self._download_descriptor here because it would create a stream
try:
sd_blob = yield self._download_blob(sd_hash, timeout=sd_timeout)
except Exception as err:
response = yield self._render_response(0.0)
log.warning(err)
defer.returnValue(response)
decoded = read_sd_blob(sd_blob)
blob_hashes = [blob.get("blob_hash") for blob in decoded['blobs']
if blob.get("blob_hash")]
sample = random.sample(blob_hashes, min(len(blob_hashes), 5))
log.info("check peers for %i of %i blobs in stream", len(sample), len(blob_hashes))
availabilities = yield self.session.blob_tracker.get_availability_for_blobs(sample,
peer_timeout)
mean_availability = _get_mean(availabilities)
response = yield self._render_response(mean_availability)
try:
claim_obj = smart_decode(resolved_result[uri]['claim']['hex'])
response['did_decode'] = True
except DecodeError:
response['error'] = "Failed to decode claim value"
defer.returnValue(response)
response['is_stream'] = claim_obj.is_stream
if not claim_obj.is_stream:
response['error'] = "Claim for \"%s\" does not contain a stream" % uri
defer.returnValue(response)
sd_hash = claim_obj.source_hash
response['sd_hash'] = sd_hash
head_blob_hash = None
downloader = self._get_single_peer_downloader()
have_sd_blob = sd_hash in self.session.blob_manager.blobs
try:
sd_blob = yield self.jsonrpc_blob_get(sd_hash, timeout=blob_timeout,
encoding="json")
if not have_sd_blob:
yield self.jsonrpc_blob_delete(sd_hash)
if sd_blob and 'blobs' in sd_blob:
response['num_blobs_in_stream'] = len(sd_blob['blobs']) - 1
head_blob_hash = sd_blob['blobs'][0]['blob_hash']
head_blob_availability = yield self._blob_availability(head_blob_hash,
search_timeout,
blob_timeout,
downloader)
response['head_blob_availability'] = head_blob_availability
except Exception as err:
response['error'] = err
response['head_blob_hash'] = head_blob_hash
response['sd_blob_availability'] = yield self._blob_availability(sd_hash,
search_timeout,
blob_timeout,
downloader)
response['is_available'] = response['sd_blob_availability'].get('is_available') and \
response['head_blob_availability'].get('is_available')
defer.returnValue(response)
@defer.inlineCallbacks