Merge pull request #110 from lbryio/lighthouse-client
Lighthouse client
This commit is contained in:
commit
2d0a892e70
4 changed files with 72 additions and 6 deletions
|
@ -48,6 +48,7 @@ DEFAULT_TIMEOUT = 30
|
|||
DEFAULT_MAX_SEARCH_RESULTS = 25
|
||||
DEFAULT_MAX_KEY_FEE = {'USD': {'amount': 25.0, 'address': ''}}
|
||||
DEFAULT_SEARCH_TIMEOUT = 3.0
|
||||
DEFAULT_SD_DOWNLOAD_TIMEOUT = 3
|
||||
DEFAULT_CACHE_TIME = 3600
|
||||
DEFAULT_UI_BRANCH = "master"
|
||||
|
||||
|
|
|
@ -173,14 +173,23 @@ class LBRYSession(object):
|
|||
self.upnp_redirects.append((self.peer_port, 'TCP'))
|
||||
log.info("Set UPnP redirect for TCP port %d", self.peer_port)
|
||||
else:
|
||||
# see comment below
|
||||
log.warning("UPnP redirect already set for TCP port %d", self.peer_port)
|
||||
self.upnp_redirects.append((self.peer_port, 'TCP'))
|
||||
if self.dht_node_port is not None:
|
||||
if u.getspecificportmapping(self.dht_node_port, 'UDP') is None:
|
||||
u.addportmapping(self.dht_node_port, 'UDP', u.lanaddr, self.dht_node_port, 'LBRY DHT port', '')
|
||||
self.upnp_redirects.append((self.dht_node_port, 'UDP'))
|
||||
log.info("Set UPnP redirect for UPD port %d", self.dht_node_port)
|
||||
else:
|
||||
# TODO: check that the existing redirect was put up by an old lbrynet session before grabbing it
|
||||
# if such a disconnected redirect exists, then upnp won't work unless the redirect is appended
|
||||
# or is torn down and set back up. a bad shutdown of lbrynet could leave such a redirect up
|
||||
# and cause problems on the next start.
|
||||
# this could be problematic if a previous lbrynet session didn't make the redirect, and it was
|
||||
# made by another application
|
||||
log.warning("UPnP redirect already set for UDP port %d", self.dht_node_port)
|
||||
self.upnp_redirects.append((self.dht_node_port, 'UDP'))
|
||||
return True
|
||||
return False
|
||||
|
||||
|
|
|
@ -41,13 +41,14 @@ from lbrynet.lbrynet_daemon.LBRYUIManager import LBRYUIManager
|
|||
from lbrynet.lbrynet_daemon.LBRYDownloader import GetStream
|
||||
from lbrynet.lbrynet_daemon.LBRYPublisher import Publisher
|
||||
from lbrynet.lbrynet_daemon.LBRYExchangeRateManager import ExchangeRateManager
|
||||
from lbrynet.lbrynet_daemon.Lighthouse import LighthouseClient
|
||||
from lbrynet.core import utils
|
||||
from lbrynet.core.LBRYMetadata import verify_name_characters
|
||||
from lbrynet.core.utils import generate_id
|
||||
from lbrynet.lbrynet_console.LBRYSettings import LBRYSettings
|
||||
from lbrynet.conf import MIN_BLOB_DATA_PAYMENT_RATE, DEFAULT_MAX_SEARCH_RESULTS, KNOWN_DHT_NODES, DEFAULT_MAX_KEY_FEE, \
|
||||
DEFAULT_WALLET, DEFAULT_SEARCH_TIMEOUT, DEFAULT_CACHE_TIME, DEFAULT_UI_BRANCH, LOG_POST_URL, LOG_FILE_NAME, SOURCE_TYPES
|
||||
from lbrynet.conf import SEARCH_SERVERS
|
||||
from lbrynet.conf import DEFAULT_SD_DOWNLOAD_TIMEOUT
|
||||
from lbrynet.conf import DEFAULT_TIMEOUT, WALLET_TYPES
|
||||
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob, BlobStreamDescriptorReader
|
||||
from lbrynet.core.Session import LBRYSession
|
||||
|
@ -160,6 +161,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
|||
self.run_server = True
|
||||
self.session = None
|
||||
self.exchange_rate_manager = ExchangeRateManager()
|
||||
self.lighthouse_client = LighthouseClient()
|
||||
self.waiting_on = {}
|
||||
self.streams = {}
|
||||
self.pending_claims = {}
|
||||
|
@ -375,6 +377,9 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
|||
f.write("rpcpassword=" + password)
|
||||
log.info("Done writing lbrycrd.conf")
|
||||
|
||||
def _responseFailed(self, err, call):
|
||||
call.cancel()
|
||||
|
||||
def render(self, request):
|
||||
request.content.seek(0, 0)
|
||||
# Unmarshal the JSON-RPC data.
|
||||
|
@ -414,6 +419,10 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
|||
d = defer.maybeDeferred(function)
|
||||
else:
|
||||
d = defer.maybeDeferred(function, *args)
|
||||
|
||||
# cancel the response if the connection is broken
|
||||
request.notifyFinish().addErrback(self._responseFailed, d)
|
||||
|
||||
d.addErrback(self._ebRender, id)
|
||||
d.addCallback(self._cbRender, request, id, version)
|
||||
return server.NOT_DONE_YET
|
||||
|
@ -438,6 +447,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
|||
except:
|
||||
f = jsonrpclib.Fault(self.FAILURE, "can't serialize output")
|
||||
s = jsonrpclib.dumps(f, version=version)
|
||||
|
||||
request.setHeader("content-length", str(len(s)))
|
||||
request.write(s)
|
||||
request.finish()
|
||||
|
@ -625,6 +635,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
|||
# TODO: this was blatantly copied from jsonrpc_start_lbry_file. Be DRY.
|
||||
def _start_file(f):
|
||||
d = self.lbry_file_manager.toggle_lbry_file_running(f)
|
||||
d.addCallback(lambda _: self.lighthouse_client.announce_sd(f.sd_hash))
|
||||
return defer.succeed("Started LBRY file")
|
||||
|
||||
def _get_and_start_file(name):
|
||||
|
@ -1072,7 +1083,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
|||
self.sd_identifier.add_stream_downloader_factory(LBRYFileStreamType, downloader_factory)
|
||||
return defer.succeed(True)
|
||||
|
||||
def _download_sd_blob(self, sd_hash):
|
||||
def _download_sd_blob(self, sd_hash, timeout=DEFAULT_SD_DOWNLOAD_TIMEOUT):
|
||||
def cb(result):
|
||||
if not r.called:
|
||||
r.callback(result)
|
||||
|
@ -1082,7 +1093,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
|||
r.errback(Exception("sd timeout"))
|
||||
|
||||
r = defer.Deferred(None)
|
||||
reactor.callLater(3, eb)
|
||||
reactor.callLater(timeout, eb)
|
||||
d = download_sd_blob(self.session, sd_hash, PaymentRateManager(self.session.base_payment_rate_manager))
|
||||
d.addCallback(BlobStreamDescriptorReader)
|
||||
d.addCallback(lambda blob: blob.get_info())
|
||||
|
@ -1444,8 +1455,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
|||
return defer.succeed(None)
|
||||
|
||||
def _search(self, search):
|
||||
proxy = Proxy(random.choice(SEARCH_SERVERS))
|
||||
return proxy.callRemote('search', search)
|
||||
return self.lighthouse_client.search(search)
|
||||
|
||||
def _render_response(self, result, code):
|
||||
return defer.succeed({'result': result, 'code': code})
|
||||
|
@ -2250,8 +2260,9 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
|||
sd blob, dict
|
||||
"""
|
||||
sd_hash = p['sd_hash']
|
||||
timeout = p.get('timeout', DEFAULT_SD_DOWNLOAD_TIMEOUT)
|
||||
|
||||
d = self._download_sd_blob(sd_hash)
|
||||
d = self._download_sd_blob(sd_hash, timeout)
|
||||
d.addCallbacks(lambda r: self._render_response(r, OK_CODE), lambda _: self._render_response(False, OK_CODE))
|
||||
return d
|
||||
|
||||
|
@ -2401,6 +2412,23 @@ class LBRYDaemon(jsonrpc.JSONRPC):
|
|||
d.addCallback(lambda _: self._render_response(True, OK_CODE))
|
||||
return d
|
||||
|
||||
def jsonrpc_get_peers_for_hash(self, p):
|
||||
"""
|
||||
Get peers for blob hash
|
||||
|
||||
Args:
|
||||
'blob_hash': blob hash
|
||||
Returns:
|
||||
List of contacts
|
||||
"""
|
||||
|
||||
blob_hash = p['blob_hash']
|
||||
|
||||
d = self.session.peer_finder.find_peers_for_blob(blob_hash)
|
||||
d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r])
|
||||
d.addCallback(lambda r: self._render_response(r, OK_CODE))
|
||||
return d
|
||||
|
||||
|
||||
def get_lbrynet_version_from_github():
|
||||
"""Return the latest released version from github."""
|
||||
|
|
28
lbrynet/lbrynet_daemon/Lighthouse.py
Normal file
28
lbrynet/lbrynet_daemon/Lighthouse.py
Normal file
|
@ -0,0 +1,28 @@
|
|||
import logging
|
||||
import random
|
||||
from txjsonrpc.web.jsonrpc import Proxy
|
||||
from twisted.internet import defer
|
||||
from lbrynet.conf import SEARCH_SERVERS
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LighthouseClient(object):
|
||||
def __init__(self, servers=None):
|
||||
self.servers = servers or SEARCH_SERVERS
|
||||
|
||||
def _get_random_server(self):
|
||||
return Proxy(random.choice(self.servers))
|
||||
|
||||
def _run_query(self, func, arg):
|
||||
return self._get_random_server().callRemote(func, arg)
|
||||
|
||||
def search(self, search):
|
||||
return self._run_query('search', search)
|
||||
|
||||
def announce_sd(self, sd_hash):
|
||||
log.info("Announce sd to lighthouse")
|
||||
return self._run_query('announce_sd', sd_hash)
|
||||
|
||||
def check_available(self, sd_hash):
|
||||
return self._run_query('check_available', sd_hash)
|
Loading…
Reference in a new issue