add stream_availability and blob_availability, deprecate get_availability

This commit is contained in:
Jack Robison 2017-12-20 20:46:41 -05:00
parent 7c6c666342
commit 09c57675a7
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
3 changed files with 290 additions and 67 deletions

View file

@ -21,7 +21,7 @@ at anytime.
### Deprecated
* `channel_list_mine`, replaced with `channel_list`
*
* `get_availability`, replaced with `stream_availability`
### Changed
* Check claim schema in `publish` before trying to make the claim, return better error messages
@ -57,6 +57,7 @@ at anytime.
* Added a table to the lbry file database to store the outpoint of the claim downloaded from
* Added `wallet_unlock`, a command available during startup to unlock an encrypted wallet
* Added support for wallet encryption via new commands `wallet_decrypt` and `wallet_encrypt`
* Added `blob_availability` and `stream_availability` commands for debugging download issues
### Removed
* Removed claim related filter arguments `name`, `claim_id`, and `outpoint` from `file_list`, `file_delete`, `file_set_status`, and `file_reflect`

View file

@ -0,0 +1,111 @@
import logging
import shutil
import tempfile
from twisted.internet import defer, threads, reactor
from lbrynet.blob import BlobFile
from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.core.HashAnnouncer import DummyHashAnnouncer
from lbrynet.core.RateLimiter import DummyRateLimiter
from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager
from lbrynet.core.PeerFinder import DummyPeerFinder
from lbrynet.core.client.BlobRequester import BlobRequester
from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader
from lbrynet.core.client.ConnectionManager import ConnectionManager
log = logging.getLogger(__name__)
class TempBlobManager(DiskBlobManager):
def stop(self):
self.db_conn.close()
return defer.succeed(True)
class SinglePeerFinder(DummyPeerFinder):
def __init__(self, peer):
DummyPeerFinder.__init__(self)
self.peer = peer
def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=False):
return defer.succeed([self.peer])
class BlobCallback(BlobFile):
def __init__(self, blob_dir, blob_hash, timeout):
BlobFile.__init__(self, blob_dir, blob_hash)
self.callback = defer.Deferred()
reactor.callLater(timeout, self._cancel)
def _cancel(self):
if not self.callback.called:
self.callback.callback(False)
def save_verified_blob(self, writer):
result = BlobFile.save_verified_blob(self, writer)
if not self.callback.called:
self.callback.callback(True)
return result
class SingleBlobDownloadManager(object):
def __init__(self, blob):
self.blob = blob
def needed_blobs(self):
if self.blob.verified:
return []
else:
return [self.blob]
def get_head_blob_hash(self):
return self.blob.blob_hash
class SinglePeerDownloader(object):
def __init__(self):
self._payment_rate_manager = OnlyFreePaymentsManager()
self._announcer = DummyHashAnnouncer()
self._rate_limiter = DummyRateLimiter()
self._wallet = None
self._blob_manager = None
def setup(self, wallet, blob_manager=None):
if not self._wallet:
self._wallet = wallet
if not self._blob_manager:
self._blob_manager = blob_manager
@defer.inlineCallbacks
def download_blob_from_peer(self, peer, timeout, blob_hash, blob_manager):
log.debug("Try to download %s from %s", blob_hash, peer.host)
blob_manager = blob_manager
blob = BlobCallback(blob_manager.blob_dir, blob_hash, timeout)
download_manager = SingleBlobDownloadManager(blob)
peer_finder = SinglePeerFinder(peer)
requester = BlobRequester(blob_manager, peer_finder, self._payment_rate_manager,
self._wallet, download_manager)
downloader = StandaloneBlobDownloader(blob_hash, blob_manager, peer_finder,
self._rate_limiter, self._payment_rate_manager,
self._wallet, timeout=timeout)
info_exchanger = self._wallet.get_info_exchanger()
connection_manager = ConnectionManager(downloader, self._rate_limiter, [requester],
[info_exchanger])
connection_manager.start()
result = yield blob.callback
if not result:
log.debug("Failed to downloaded %s from %s", blob_hash[:16], peer.host)
yield connection_manager.stop()
defer.returnValue(result)
@defer.inlineCallbacks
def download_temp_blob_from_peer(self, peer, timeout, blob_hash):
tmp_dir = yield threads.deferToThread(tempfile.mkdtemp)
tmp_blob_manager = TempBlobManager(self._announcer, tmp_dir, tmp_dir)
try:
result = yield self.download_blob_from_peer(peer, timeout, blob_hash, tmp_blob_manager)
finally:
yield tmp_blob_manager.stop()
yield threads.deferToThread(shutil.rmtree, tmp_dir)
defer.returnValue(result)

View file

@ -7,7 +7,6 @@ import requests
import urllib
import json
import textwrap
import random
import signal
from copy import deepcopy
from twisted.web import server
@ -20,6 +19,7 @@ from lbryschema.uri import parse_lbry_uri
from lbryschema.error import URIParseError, DecodeError
from lbryschema.validator import validate_claim_id
from lbryschema.address import decode_address
from lbryschema.decode import smart_decode
# TODO: importing this when internet is disabled raises a socket.gaierror
from lbrynet.core.system_info import get_lbrynet_version
@ -48,6 +48,8 @@ from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash
from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSDTimeout
from lbrynet.core.Error import NullFundsError, NegativeFundsError
from lbrynet.core.Peer import Peer
from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader
log = logging.getLogger(__name__)
@ -190,7 +192,6 @@ class Daemon(AuthJSONRPCServer):
self.use_upnp = conf.settings['use_upnp']
self.auto_renew_claim_height_delta = conf.settings['auto_renew_claim_height_delta']
self.startup_status = STARTUP_STAGES[0]
self.connected_to_internet = True
self.connection_status_code = None
@ -295,10 +296,10 @@ class Daemon(AuthJSONRPCServer):
for outpoint, result in results.iteritems():
if result['success']:
log.info("Renewed claim at outpoint:%s claim ID:%s, paid fee:%s",
outpoint, result['claim_id'], result['fee'])
outpoint, result['claim_id'], result['fee'])
else:
log.info("Failed to renew claim at outpoint:%s, reason:%s",
outpoint, result['reason'])
outpoint, result['reason'])
def _start_server(self):
if self.peer_port is not None:
@ -667,10 +668,10 @@ class Daemon(AuthJSONRPCServer):
self.analytics_manager.send_download_started(download_id, name, claim_dict)
self.streams[sd_hash] = GetStream(self.sd_identifier, self.session,
self.exchange_rate_manager, self.max_key_fee,
self.disable_max_key_fee,
conf.settings['data_rate'], timeout,
file_name)
self.exchange_rate_manager, self.max_key_fee,
self.disable_max_key_fee,
conf.settings['data_rate'], timeout,
file_name)
try:
lbry_file, finished_deferred = yield self.streams[sd_hash].start(claim_dict, name)
yield self.stream_info_manager.save_outpoint_to_file(lbry_file.rowid, txid, nout)
@ -951,6 +952,52 @@ class Daemon(AuthJSONRPCServer):
d.addCallback(self.get_blobs_for_stream_hash)
return d
def _get_single_peer_downloader(self):
downloader = SinglePeerDownloader()
downloader.setup(self.session.wallet)
return downloader
@defer.inlineCallbacks
def _blob_availability(self, blob_hash, search_timeout, blob_timeout, downloader=None):
if not downloader:
downloader = self._get_single_peer_downloader()
result = {}
search_timeout = search_timeout or conf.settings['peer_search_timeout']
blob_timeout = blob_timeout or conf.settings['sd_download_timeout']
is_available = False
reachable_peers = []
unreachable_peers = []
try:
peers = yield self.jsonrpc_peer_list(blob_hash, search_timeout)
peer_infos = [{"peer": Peer(x[0], x[1]),
"blob_hash": blob_hash,
"timeout": blob_timeout} for x in peers if x[2]]
dl = []
dl_peers = []
dl_results = []
for peer_info in peer_infos:
d = downloader.download_temp_blob_from_peer(**peer_info)
dl.append(d)
dl_peers.append("%s:%i" % (peer_info['peer'].host, peer_info['peer'].port))
for dl_peer, (success, download_result) in zip(dl_peers,
(yield defer.DeferredList(dl))):
if success:
if download_result:
reachable_peers.append(dl_peer)
else:
unreachable_peers.append(dl_peer)
dl_results.append(download_result)
is_available = any(dl_results)
except Exception as err:
result['error'] = "Failed to get peers for blob: %s" % err
response = {
'is_available': is_available,
'reachable_peers': reachable_peers,
'unreachable_peers': unreachable_peers,
}
defer.returnValue(response)
############################################################################
# #
# JSON-RPC API methods start here #
@ -1514,7 +1561,7 @@ class Daemon(AuthJSONRPCServer):
for u in uris:
try:
parse_lbry_uri(u)
valid_uris += (u, )
valid_uris += (u,)
except URIParseError:
results[u] = {"error": "%s is not a valid uri" % u}
@ -2099,7 +2146,7 @@ class Daemon(AuthJSONRPCServer):
else:
raise Exception("invalid outpoint")
result = yield self.session.wallet.claim_renew(txid, nout)
result = {outpoint:result}
result = {outpoint: result}
else:
height = int(height)
result = yield self.session.wallet.claim_renew_all_before_expiration(height)
@ -2247,7 +2294,7 @@ class Daemon(AuthJSONRPCServer):
uris = tuple(uris)
if uri is not None:
uris += (uri, )
uris += (uri,)
results = {}
@ -2260,7 +2307,7 @@ class Daemon(AuthJSONRPCServer):
elif parsed.path:
results[chan_uri] = {"error": "%s is a claim in a channel" % parsed.path}
else:
valid_uris += (chan_uri, )
valid_uris += (chan_uri,)
except URIParseError:
results[chan_uri] = {"error": "%s is not a valid uri" % chan_uri}
@ -2271,8 +2318,8 @@ class Daemon(AuthJSONRPCServer):
results[u] = resolved[u]
else:
results[u] = {
'claims_in_channel': resolved[u]['claims_in_channel']
}
'claims_in_channel': resolved[u]['claims_in_channel']
}
if page:
results[u]['returned_page'] = page
results[u]['claims_in_channel'] = resolved[u].get('claims_in_channel', [])
@ -2919,7 +2966,29 @@ class Daemon(AuthJSONRPCServer):
result['node_id'] = self.session.dht_node.node_id.encode('hex')
return self._render_response(result)
@defer.inlineCallbacks
def jsonrpc_blob_availability(self, blob_hash, search_timeout=None, blob_timeout=None):
"""
Get blob availability
Usage:
blob_availability (<blob_hash>) [<search_timeout> | --search_timeout=<search_timeout>]
[<blob_timeout> | --blob_timeout=<blob_timeout>]
Options:
<search_timeout> : how long to search for peers for the blob in the dht
<blob_timeout> : how long to try downloading from a peer
Returns:
(dict) {
"is_available": <bool, true if blob is available from a peer from peer list>
"reachable_peers": ["<ip>:<port>"],
"unreachable_peers": ["<ip>:<port>"]
}
"""
return self._blob_availability(blob_hash, search_timeout, blob_timeout)
@AuthJSONRPCServer.deprecated("stream_availability")
def jsonrpc_get_availability(self, uri, sd_timeout=None, peer_timeout=None):
"""
Get stream availability for lbry uri
@ -2936,63 +3005,105 @@ class Daemon(AuthJSONRPCServer):
(float) Peers per blob / total blobs
"""
def _get_mean(blob_availabilities):
peer_counts = []
for blob_availability in blob_availabilities:
for blob, peers in blob_availability.iteritems():
peer_counts.append(peers)
if peer_counts:
return round(1.0 * sum(peer_counts) / len(peer_counts), 2)
else:
return 0.0
return self.jsonrpc_stream_availability(uri, peer_timeout, sd_timeout)
def read_sd_blob(sd_blob):
sd_blob_file = sd_blob.open_for_reading()
decoded_sd_blob = json.loads(sd_blob_file.read())
sd_blob_file.close()
return decoded_sd_blob
@defer.inlineCallbacks
def jsonrpc_stream_availability(self, uri, search_timeout=None, blob_timeout=None):
"""
Get stream availability for lbry uri
resolved_result = yield self.session.wallet.resolve(uri)
if resolved_result and uri in resolved_result:
resolved = resolved_result[uri]
else:
defer.returnValue(None)
Usage:
stream_availability (<uri>) [<search_timeout> | --search_timeout=<search_timeout>]
[<blob_timeout> | --blob_timeout=<blob_timeout>]
if 'claim' in resolved:
metadata = resolved['claim']['value']
else:
defer.returnValue(None)
Options:
<search_timeout> : how long to search for peers for the blob in the dht
<blob_timeout> : how long to try downloading from a peer
Returns:
(dict) {
'is_available': <bool>,
'did_decode': <bool>,
'did_resolve': <bool>,
'is_stream': <bool>,
'num_blobs_in_stream': <int>,
'sd_hash': <str>,
'sd_blob_availability': <dict> see `blob_availability`,
'head_blob_hash': <str>,
'head_blob_availability': <dict> see `blob_availability`,
'use_upnp': <bool>,
'upnp_redirect_is_set': <bool>,
'error': <None> | <str> error message
}
"""
search_timeout = search_timeout or conf.settings['peer_search_timeout']
blob_timeout = blob_timeout or conf.settings['sd_download_timeout']
response = {
'is_available': False,
'did_decode': False,
'did_resolve': False,
'is_stream': False,
'num_blobs_in_stream': None,
'sd_hash': None,
'sd_blob_availability': {},
'head_blob_hash': None,
'head_blob_availability': {},
'use_upnp': conf.settings['use_upnp'],
'upnp_redirect_is_set': len(self.session.upnp_redirects) > 0,
'error': None
}
sd_hash = utils.get_sd_hash(metadata)
sd_timeout = sd_timeout or conf.settings['sd_download_timeout']
peer_timeout = peer_timeout or conf.settings['peer_search_timeout']
blobs = []
try:
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
need_sd_blob = False
log.info("Already have sd blob")
except NoSuchSDHash:
need_sd_blob = True
log.info("Need sd blob")
resolved_result = yield self.session.wallet.resolve(uri)
response['did_resolve'] = True
except UnknownNameError:
response['error'] = "Failed to resolve name"
defer.returnValue(response)
except URIParseError:
response['error'] = "Invalid URI"
defer.returnValue(response)
blob_hashes = [blob.blob_hash for blob in blobs]
if need_sd_blob:
# we don't want to use self._download_descriptor here because it would create a stream
try:
sd_blob = yield self._download_blob(sd_hash, timeout=sd_timeout)
except Exception as err:
response = yield self._render_response(0.0)
log.warning(err)
defer.returnValue(response)
decoded = read_sd_blob(sd_blob)
blob_hashes = [blob.get("blob_hash") for blob in decoded['blobs']
if blob.get("blob_hash")]
sample = random.sample(blob_hashes, min(len(blob_hashes), 5))
log.info("check peers for %i of %i blobs in stream", len(sample), len(blob_hashes))
availabilities = yield self.session.blob_tracker.get_availability_for_blobs(sample,
peer_timeout)
mean_availability = _get_mean(availabilities)
response = yield self._render_response(mean_availability)
try:
claim_obj = smart_decode(resolved_result[uri]['claim']['hex'])
response['did_decode'] = True
except DecodeError:
response['error'] = "Failed to decode claim value"
defer.returnValue(response)
response['is_stream'] = claim_obj.is_stream
if not claim_obj.is_stream:
response['error'] = "Claim for \"%s\" does not contain a stream" % uri
defer.returnValue(response)
sd_hash = claim_obj.source_hash
response['sd_hash'] = sd_hash
head_blob_hash = None
downloader = self._get_single_peer_downloader()
have_sd_blob = sd_hash in self.session.blob_manager.blobs
try:
sd_blob = yield self.jsonrpc_blob_get(sd_hash, timeout=blob_timeout,
encoding="json")
if not have_sd_blob:
yield self.jsonrpc_blob_delete(sd_hash)
if sd_blob and 'blobs' in sd_blob:
response['num_blobs_in_stream'] = len(sd_blob['blobs']) - 1
head_blob_hash = sd_blob['blobs'][0]['blob_hash']
head_blob_availability = yield self._blob_availability(head_blob_hash,
search_timeout,
blob_timeout,
downloader)
response['head_blob_availability'] = head_blob_availability
except Exception as err:
response['error'] = err
response['head_blob_hash'] = head_blob_hash
response['sd_blob_availability'] = yield self._blob_availability(sd_hash,
search_timeout,
blob_timeout,
downloader)
response['is_available'] = response['sd_blob_availability'].get('is_available') and \
response['head_blob_availability'].get('is_available')
defer.returnValue(response)
@defer.inlineCallbacks