Merge pull request #367 from lbryio/analytics-on-get

Analytics on get
This commit is contained in:
Job Evers‐Meltzer 2016-12-30 15:39:06 -06:00 committed by GitHub
commit 621530b3b8
14 changed files with 298 additions and 276 deletions

View file

@ -48,13 +48,18 @@ class Events(object):
def heartbeat(self): def heartbeat(self):
return self._event('Heartbeat') return self._event('Heartbeat')
def download_started(self, name, stream_info=None): def download_started(self, *args, **kwargs):
properties = { properties = download_properties(*args, **kwargs)
'name': name,
'stream_info': get_sd_hash(stream_info)
}
return self._event('Download Started', properties) return self._event('Download Started', properties)
def download_errored(self, *args, **kwargs):
properties = download_properties(*args, **kwargs)
return self._event('Download Errored', properties)
def download_finished(self, *args, **kwargs):
properties = download_properties(*args, **kwargs)
return self._event('Download Finished', properties)
def error(self, message, sd_hash=None): def error(self, message, sd_hash=None):
properties = { properties = {
'message': message, 'message': message,
@ -110,3 +115,11 @@ def make_context(platform, wallet):
'version': '1.0.0' 'version': '1.0.0'
}, },
} }
def download_properties(id_, name, stream_info=None):
return {
'download_id': id_,
'name': name,
'stream_info': get_sd_hash(stream_info)
}

View file

@ -70,8 +70,16 @@ class Manager(object):
event = self.events_generator.server_startup_error(message) event = self.events_generator.server_startup_error(message)
self.analytics_api.track(event) self.analytics_api.track(event)
def send_download_started(self, name, stream_info=None): def send_download_started(self, id_, name, stream_info=None):
event = self.events_generator.download_started(name, stream_info) event = self.events_generator.download_started(id_, name, stream_info)
self.analytics_api.track(event)
def send_download_errored(self, id_, name, stream_info=None):
event = self.events_generator.download_errored(id_, name, stream_info)
self.analytics_api.track(event)
def send_download_finished(self, id_, name, stream_info=None):
event = self.events_generator.download_finished(id_, name, stream_info)
self.analytics_api.track(event) self.analytics_api.track(event)
def send_error(self, message, sd_hash=None): def send_error(self, message, sd_hash=None):

View file

@ -176,11 +176,12 @@ class ClientProtocol(Protocol):
def _handle_response(self, response): def _handle_response(self, response):
ds = [] ds = []
log.debug("Handling a response. Current expected responses: %s", self._response_deferreds) log.debug(
"Handling a response from %s. Expected responses: %s. Actual responses: %s",
self.peer, self._response_deferreds.keys(), response.keys())
for key, val in response.items(): for key, val in response.items():
if key in self._response_deferreds: if key in self._response_deferreds:
d = self._response_deferreds[key] d = self._response_deferreds.pop(key)
del self._response_deferreds[key]
d.callback({key: val}) d.callback({key: val})
ds.append(d) ds.append(d)
for k, d in self._response_deferreds.items(): for k, d in self._response_deferreds.items():
@ -194,6 +195,7 @@ class ClientProtocol(Protocol):
d.addErrback(self._handle_response_error) d.addErrback(self._handle_response_error)
ds.append(d) ds.append(d)
# TODO: are we sure we want to consume errors here
dl = defer.DeferredList(ds, consumeErrors=True) dl = defer.DeferredList(ds, consumeErrors=True)
def get_next_request(results): def get_next_request(results):

View file

@ -72,13 +72,19 @@ class ConnectionManager(object):
closing_deferreds.append(close_connection(peer)) closing_deferreds.append(close_connection(peer))
return defer.DeferredList(closing_deferreds) return defer.DeferredList(closing_deferreds)
@defer.inlineCallbacks
def get_next_request(self, peer, protocol): def get_next_request(self, peer, protocol):
log.debug("Trying to get the next request for peer %s", peer) log.debug("Trying to get the next request for peer %s", peer)
if not peer in self._peer_connections or self.stopped is True: if not peer in self._peer_connections or self.stopped is True:
log.debug("The peer has already been told to shut down.") log.debug("The peer has already been told to shut down.")
return defer.succeed(False) defer.returnValue(False)
requests = yield self._send_primary_requests(peer, protocol)
have_request = any(r[1] for r in requests if r[0] is True)
if have_request:
yield self._send_secondary_requests(peer, protocol)
defer.returnValue(have_request)
def _send_primary_requests(self, peer, protocol):
def handle_error(err): def handle_error(err):
err.trap(InsufficientFundsError) err.trap(InsufficientFundsError)
@ -97,34 +103,20 @@ class ConnectionManager(object):
self._peer_connections[peer].request_creators.append(request_creator) self._peer_connections[peer].request_creators.append(request_creator)
return request_sent return request_sent
def check_requests(requests):
have_request = True in [r[1] for r in requests if r[0] is True]
return have_request
def get_secondary_requests_if_necessary(have_request):
if have_request is True:
ds = []
for s_r_c in self._secondary_request_creators:
d = s_r_c.send_next_request(peer, protocol)
ds.append(d)
dl = defer.DeferredList(ds)
else:
dl = defer.succeed(None)
dl.addCallback(lambda _: have_request)
return dl
ds = [] ds = []
for p_r_c in self._primary_request_creators: for p_r_c in self._primary_request_creators:
d = p_r_c.send_next_request(peer, protocol) d = p_r_c.send_next_request(peer, protocol)
d.addErrback(handle_error) d.addErrback(handle_error)
d.addCallback(check_if_request_sent, p_r_c) d.addCallback(check_if_request_sent, p_r_c)
ds.append(d) ds.append(d)
return defer.DeferredList(ds, fireOnOneErrback=True)
dl = defer.DeferredList(ds, fireOnOneErrback=True) def _send_secondary_requests(self, peer, protocol):
dl.addCallback(check_requests) ds = [
dl.addCallback(get_secondary_requests_if_necessary) s_r_c.send_next_request(peer, protocol)
return dl for s_r_c in self._secondary_request_creators
]
return defer.DeferredList(ds)
def protocol_disconnected(self, peer, protocol): def protocol_disconnected(self, peer, protocol):
if peer in self._peer_connections: if peer in self._peer_connections:
@ -147,49 +139,52 @@ class ConnectionManager(object):
return sorted(self._primary_request_creators, key=count_peers) return sorted(self._primary_request_creators, key=count_peers)
def _connect_to_peer(self, peer): def _connect_to_peer(self, peer):
if peer is None or self.stopped:
return
from twisted.internet import reactor from twisted.internet import reactor
if peer is not None and self.stopped is False: log.debug("Trying to connect to %s", peer)
log.debug("Trying to connect to %s", peer) factory = ClientProtocolFactory(peer, self.rate_limiter, self)
factory = ClientProtocolFactory(peer, self.rate_limiter, self) self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],
self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], factory)
factory) connection = reactor.connectTCP(peer.host, peer.port, factory)
connection = reactor.connectTCP(peer.host, peer.port, factory) self._peer_connections[peer].connection = connection
self._peer_connections[peer].connection = connection
@defer.inlineCallbacks
def _manage(self): def _manage(self):
from twisted.internet import reactor from twisted.internet import reactor
def get_new_peers(request_creators):
log.debug("Trying to get a new peer to connect to")
if len(request_creators) > 0:
log.debug("Got a creator to check: %s", request_creators[0])
d = request_creators[0].get_new_peers()
d.addCallback(lambda h: h if h is not None else get_new_peers(request_creators[1:]))
return d
else:
return defer.succeed(None)
def pick_best_peer(peers):
# TODO: Eventually rank them based on past performance/reputation. For now
# TODO: just pick the first to which we don't have an open connection
log.debug("Got a list of peers to choose from: %s", peers)
if peers is None:
return None
for peer in peers:
if not peer in self._peer_connections:
log.debug("Got a good peer. Returning peer %s", peer)
return peer
log.debug("Couldn't find a good peer to connect to")
return None
if len(self._peer_connections) < conf.settings.max_connections_per_stream: if len(self._peer_connections) < conf.settings.max_connections_per_stream:
ordered_request_creators = self._rank_request_creator_connections() try:
d = get_new_peers(ordered_request_creators) ordered_request_creators = self._rank_request_creator_connections()
d.addCallback(pick_best_peer) peers = yield self._get_new_peers(ordered_request_creators)
d.addCallback(self._connect_to_peer) peer = self._pick_best_peer(peers)
yield self._connect_to_peer(peer)
except Exception:
# log this otherwise it will just end up as an unhandled error in deferred
log.exception('Something bad happened picking a peer')
self._next_manage_call = reactor.callLater(1, self._manage) self._next_manage_call = reactor.callLater(1, self._manage)
@defer.inlineCallbacks
def _get_new_peers(self, request_creators):
log.debug("Trying to get a new peer to connect to")
if not request_creators:
defer.returnValue(None)
log.debug("Got a creator to check: %s", request_creators[0])
new_peers = yield request_creators[0].get_new_peers()
if not new_peers:
new_peers = yield self._get_new_peers(request_creators[1:])
defer.returnValue(new_peers)
def _pick_best_peer(self, peers):
# TODO: Eventually rank them based on past performance/reputation. For now
# TODO: just pick the first to which we don't have an open connection
log.debug("Got a list of peers to choose from: %s", peers)
if peers is None:
return None
for peer in peers:
if not peer in self._peer_connections:
log.debug("Got a good peer. Returning peer %s", peer)
return peer
log.debug("Couldn't find a good peer to connect to")
return None

View file

@ -14,7 +14,6 @@ class DownloadManager(object):
def __init__(self, blob_manager, upload_allowed): def __init__(self, blob_manager, upload_allowed):
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.upload_allowed = upload_allowed self.upload_allowed = upload_allowed
self.blob_requester = None
self.blob_info_finder = None self.blob_info_finder = None
self.progress_manager = None self.progress_manager = None
self.blob_handler = None self.blob_handler = None

View file

@ -369,7 +369,7 @@ class Logger(logging.Logger):
self.name, level, fn, lno, msg, msg_args, exc_info, func, msg_kwargs) self.name, level, fn, lno, msg, msg_args, exc_info, func, msg_kwargs)
self.handle(record) self.handle(record)
if callback: if callback:
callback(err, *args, **kwargs) return callback(err, *args, **kwargs)
return _fail return _fail
def trace(self, msg, *args, **kwargs): def trace(self, msg, *args, **kwargs):

View file

@ -4,6 +4,7 @@ import logging
import random import random
import os import os
import socket import socket
import string
import sys import sys
import pkg_resources import pkg_resources
@ -92,3 +93,7 @@ def setup_certs_for_windows():
if getattr(sys, 'frozen', False) and os.name == "nt": if getattr(sys, 'frozen', False) and os.name == "nt":
cert_path = os.path.join(os.path.dirname(sys.executable), "cacert.pem") cert_path = os.path.join(os.path.dirname(sys.executable), "cacert.pem")
os.environ["REQUESTS_CA_BUNDLE"] = cert_path os.environ["REQUESTS_CA_BUNDLE"] = cert_path
def random_string(length=10, chars=string.ascii_lowercase):
return ''.join([random.choice(chars) for _ in range(length)])

View file

@ -62,19 +62,16 @@ class CryptStreamDownloader(object):
self.payment_rate_manager = payment_rate_manager self.payment_rate_manager = payment_rate_manager
self.wallet = wallet self.wallet = wallet
self.upload_allowed = upload_allowed self.upload_allowed = upload_allowed
self.key = None self.key = None
self.stream_name = None self.stream_name = None
self.completed = False self.completed = False
self.stopped = True self.stopped = True
self.stopping = False self.stopping = False
self.starting = False self.starting = False
self.download_manager = None self.download_manager = None
self.finished_deferred = None self.finished_deferred = None
self.points_paid = 0.0 self.points_paid = 0.0
self.blob_requester = None
def __str__(self): def __str__(self):
return str(self.stream_name) return str(self.stream_name)
@ -86,7 +83,6 @@ class CryptStreamDownloader(object):
return self.stop() return self.stop()
def start(self): def start(self):
if self.starting is True: if self.starting is True:
raise CurrentlyStartingError() raise CurrentlyStartingError()
if self.stopping is True: if self.stopping is True:
@ -97,30 +93,24 @@ class CryptStreamDownloader(object):
self.starting = True self.starting = True
self.completed = False self.completed = False
self.finished_deferred = defer.Deferred() self.finished_deferred = defer.Deferred()
fd = self.finished_deferred
d = self._start() d = self._start()
d.addCallback(lambda _: fd) d.addCallback(lambda _: self.finished_deferred)
return d return d
@defer.inlineCallbacks
def stop(self, err=None): def stop(self, err=None):
def check_if_stop_succeeded(success):
self.stopping = False
if success is True:
self.stopped = True
self._remove_download_manager()
return success
if self.stopped is True: if self.stopped is True:
raise AlreadyStoppedError() raise AlreadyStoppedError()
if self.stopping is True: if self.stopping is True:
raise CurrentlyStoppingError() raise CurrentlyStoppingError()
assert self.download_manager is not None assert self.download_manager is not None
self.stopping = True self.stopping = True
d = self.download_manager.stop_downloading() success = yield self.download_manager.stop_downloading()
d.addCallback(check_if_stop_succeeded) self.stopping = False
d.addCallback(lambda _: self._fire_completed_deferred(err)) if success is True:
return d self.stopped = True
self._remove_download_manager()
yield self._fire_completed_deferred(err)
def _start_failed(self): def _start_failed(self):
@ -155,20 +145,19 @@ class CryptStreamDownloader(object):
return d return d
def _get_download_manager(self): def _get_download_manager(self):
assert self.blob_requester is None
download_manager = DownloadManager(self.blob_manager, self.upload_allowed) download_manager = DownloadManager(self.blob_manager, self.upload_allowed)
download_manager.blob_info_finder = self._get_metadata_handler(download_manager) download_manager.blob_info_finder = self._get_metadata_handler(download_manager)
download_manager.blob_requester = self._get_blob_requester(download_manager)
download_manager.progress_manager = self._get_progress_manager(download_manager) download_manager.progress_manager = self._get_progress_manager(download_manager)
download_manager.blob_handler = self._get_blob_handler(download_manager) download_manager.blob_handler = self._get_blob_handler(download_manager)
download_manager.wallet_info_exchanger = self.wallet.get_info_exchanger() download_manager.wallet_info_exchanger = self.wallet.get_info_exchanger()
# blob_requester needs to be set before the connection manager is setup
self.blob_requester = self._get_blob_requester(download_manager)
download_manager.connection_manager = self._get_connection_manager(download_manager) download_manager.connection_manager = self._get_connection_manager(download_manager)
#return DownloadManager(self.blob_manager, self.blob_requester, self.metadata_handler,
# self.progress_manager, self.blob_handler, self.connection_manager)
return download_manager return download_manager
def _remove_download_manager(self): def _remove_download_manager(self):
self.download_manager.blob_info_finder = None self.download_manager.blob_info_finder = None
self.download_manager.blob_requester = None
self.download_manager.progress_manager = None self.download_manager.progress_manager = None
self.download_manager.blob_handler = None self.download_manager.blob_handler = None
self.download_manager.wallet_info_exchanger = None self.download_manager.wallet_info_exchanger = None
@ -176,7 +165,7 @@ class CryptStreamDownloader(object):
self.download_manager = None self.download_manager = None
def _get_primary_request_creators(self, download_manager): def _get_primary_request_creators(self, download_manager):
return [download_manager.blob_requester] return [self.blob_requester]
def _get_secondary_request_creators(self, download_manager): def _get_secondary_request_creators(self, download_manager):
return [download_manager.wallet_info_exchanger] return [download_manager.wallet_info_exchanger]
@ -210,7 +199,8 @@ class CryptStreamDownloader(object):
if err is not None: if err is not None:
d.errback(err) d.errback(err)
else: else:
d.callback(self._get_finished_deferred_callback_value()) value = self._get_finished_deferred_callback_value()
d.callback(value)
else: else:
log.debug("Not firing the completed deferred because d is None") log.debug("Not firing the completed deferred because d is None")

View file

@ -49,7 +49,7 @@ class DBEncryptedFileMetadataManager(object):
def get_blobs_for_stream(self, stream_hash, start_blob=None, def get_blobs_for_stream(self, stream_hash, start_blob=None,
end_blob=None, count=None, reverse=False): end_blob=None, count=None, reverse=False):
log.debug("Getting blobs for a stream. Count is %s", str(count)) log.debug("Getting blobs for stream %s. Count is %s", stream_hash, count)
def get_positions_of_start_and_end(): def get_positions_of_start_and_end():
if start_blob is not None: if start_blob is not None:

View file

@ -137,32 +137,20 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
d.addCallback(make_full_status) d.addCallback(make_full_status)
return d return d
@defer.inlineCallbacks
def _start(self): def _start(self):
yield EncryptedFileSaver._start(self)
d = EncryptedFileSaver._start(self) sd_hash = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
d.addCallback( if len(sd_hash):
lambda _: self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)) self.sd_hash = sd_hash[0]
maybe_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash)
def _save_sd_hash(sd_hash): if maybe_metadata:
if len(sd_hash): name, txid, nout = maybe_metadata
self.sd_hash = sd_hash[0] self.uri = name
d = self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) self.txid = txid
else: self.nout = nout
d = defer.succeed(None) status = yield self._save_status()
defer.returnValue(status)
return d
def _save_claim(name, txid, nout):
self.uri = name
self.txid = txid
self.nout = nout
return defer.succeed(None)
d.addCallback(_save_sd_hash)
d.addCallback(lambda r: _save_claim(r[0], r[1], r[2]) if r else None)
d.addCallback(lambda _: self._save_status())
return d
def _get_finished_deferred_callback_value(self): def _get_finished_deferred_callback_value(self):
if self.completed is True: if self.completed is True:

View file

@ -70,6 +70,7 @@ STARTUP_STAGES = [
(WAITING_FOR_FIRST_RUN_CREDITS, 'Waiting for first run credits...') (WAITING_FOR_FIRST_RUN_CREDITS, 'Waiting for first run credits...')
] ]
# TODO: make this consistent with the stages in Downloader.py
DOWNLOAD_METADATA_CODE = 'downloading_metadata' DOWNLOAD_METADATA_CODE = 'downloading_metadata'
DOWNLOAD_TIMEOUT_CODE = 'timeout' DOWNLOAD_TIMEOUT_CODE = 'timeout'
DOWNLOAD_RUNNING_CODE = 'running' DOWNLOAD_RUNNING_CODE = 'running'
@ -778,9 +779,9 @@ class Daemon(AuthJSONRPCServer):
d.addCallback(BlobStreamDescriptorReader) d.addCallback(BlobStreamDescriptorReader)
d.addCallback(lambda blob: blob.get_info()) d.addCallback(lambda blob: blob.get_info())
d.addCallback(cb) d.addCallback(cb)
return r return r
@defer.inlineCallbacks
def _download_name(self, name, timeout=None, download_directory=None, def _download_name(self, name, timeout=None, download_directory=None,
file_name=None, stream_info=None, wait_for_write=True): file_name=None, stream_info=None, wait_for_write=True):
""" """
@ -788,20 +789,17 @@ class Daemon(AuthJSONRPCServer):
If it already exists in the file manager, return the existing lbry file If it already exists in the file manager, return the existing lbry file
""" """
timeout = timeout if timeout is not None else conf.settings.download_timeout timeout = timeout if timeout is not None else conf.settings.download_timeout
self.analytics_manager.send_download_started(name, stream_info)
helper = _DownloadNameHelper( helper = _DownloadNameHelper(
self, name, timeout, download_directory, file_name, wait_for_write) self, name, timeout, download_directory, file_name, wait_for_write)
if not stream_info: if not stream_info:
self.waiting_on[name] = True self.waiting_on[name] = True
d = self._resolve_name(name) stream_info = yield self._resolve_name(name)
else: del self.waiting_on[name]
d = defer.succeed(stream_info) lbry_file = yield helper.setup_stream(stream_info)
d.addCallback(helper._setup_stream) sd_hash, file_path = yield helper.wait_or_get_stream(stream_info, lbry_file)
d.addCallback(helper.wait_or_get_stream) defer.returnValue((sd_hash, file_path))
if not stream_info:
d.addCallback(helper._remove_from_wait)
return d
def add_stream(self, name, timeout, download_directory, file_name, stream_info): def add_stream(self, name, timeout, download_directory, file_name, stream_info):
"""Makes, adds and starts a stream""" """Makes, adds and starts a stream"""
@ -1399,8 +1397,6 @@ class Daemon(AuthJSONRPCServer):
return self._render_response(None, BAD_REQUEST) return self._render_response(None, BAD_REQUEST)
d = self._resolve_name(name, force_refresh=force) d = self._resolve_name(name, force_refresh=force)
# TODO: this is the rpc call that returns a server.failure.
# what is up with that?
d.addCallbacks( d.addCallbacks(
lambda info: self._render_response(info, OK_CODE), lambda info: self._render_response(info, OK_CODE),
# TODO: Is server.failure a module? It looks like it: # TODO: Is server.failure a module? It looks like it:
@ -1483,6 +1479,7 @@ class Daemon(AuthJSONRPCServer):
) )
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
@defer.inlineCallbacks
def jsonrpc_get(self, p): def jsonrpc_get(self, p):
"""Download stream from a LBRY uri. """Download stream from a LBRY uri.
@ -1492,28 +1489,63 @@ class Daemon(AuthJSONRPCServer):
'file_name': optional, a user specified name for the downloaded file 'file_name': optional, a user specified name for the downloaded file
'stream_info': optional, specified stream info overrides name 'stream_info': optional, specified stream info overrides name
'timeout': optional 'timeout': optional
'wait_for_write': optional, defaults to True 'wait_for_write': optional, defaults to True. When set, waits for the file to
only start to be written before returning any results.
Returns: Returns:
'stream_hash': hex string 'stream_hash': hex string
'path': path of download 'path': path of download
""" """
params = self._process_get_parameters(p) params = self._process_get_parameters(p)
if not params.name: if not params.name:
return server.failure # TODO: return a useful error message here, like "name argument is required"
defer.returnValue(server.failure)
if params.name in self.waiting_on: if params.name in self.waiting_on:
return server.failure # TODO: return a useful error message here, like "already
d = self._download_name(name=params.name, # waiting for name to be resolved"
timeout=params.timeout, defer.returnValue(server.failure)
download_directory=params.download_directory, name = params.name
stream_info=params.stream_info, stream_info = params.stream_info
file_name=params.file_name,
wait_for_write=params.wait_for_write) # first check if we already have this
# TODO: downloading can timeout. Not sure what to do when that happens lbry_file = yield self._get_lbry_file(FileID.NAME, name, return_json=False)
d.addCallbacks( if lbry_file:
get_output_callback(params), log.info('Already have a file for %s', name)
lambda err: str(err)) message = {
d.addCallback(lambda message: self._render_response(message, OK_CODE)) 'stream_hash': params.sd_hash if params.stream_info else lbry_file.sd_hash,
return d 'path': os.path.join(lbry_file.download_directory, lbry_file.file_name)
}
response = yield self._render_response(message, OK_CODE)
defer.returnValue(response)
download_id = utils.random_string()
self.analytics_manager.send_download_started(download_id, name, stream_info)
try:
sd_hash, file_path = yield self._download_name(
name=params.name,
timeout=params.timeout,
download_directory=params.download_directory,
stream_info=params.stream_info,
file_name=params.file_name,
wait_for_write=params.wait_for_write
)
except Exception as e:
self.analytics_manager.send_download_errored(download_id, name, stream_info)
log.exception('Failed to get %s', params.name)
response = yield self._render_response(str(e), OK_CODE)
else:
# TODO: should stream_hash key be changed to sd_hash?
message = {
'stream_hash': params.sd_hash if params.stream_info else sd_hash,
'path': file_path
}
stream = self.streams.get(name)
if stream:
stream.downloader.finished_deferred.addCallback(
lambda _: self.analytics_manager.send_download_finished(
download_id, name, stream_info)
)
response = yield self._render_response(message, OK_CODE)
defer.returnValue(response)
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
def jsonrpc_stop_lbry_file(self, p): def jsonrpc_stop_lbry_file(self, p):
@ -1721,6 +1753,7 @@ class Daemon(AuthJSONRPCServer):
txid = p['txid'] txid = p['txid']
nout = p['nout'] nout = p['nout']
else: else:
# TODO: return a useful error message
return server.failure return server.failure
def _disp(x): def _disp(x):
@ -1915,6 +1948,7 @@ class Daemon(AuthJSONRPCServer):
amount = p['amount'] amount = p['amount']
address = p['address'] address = p['address']
else: else:
# TODO: return a useful error message
return server.failure return server.failure
reserved_points = self.session.wallet.reserve_points(address, amount) reserved_points = self.session.wallet.reserve_points(address, amount)
@ -1956,6 +1990,7 @@ class Daemon(AuthJSONRPCServer):
d = self.session.wallet.get_block_info(height) d = self.session.wallet.get_block_info(height)
d.addCallback(lambda blockhash: self.session.wallet.get_block(blockhash)) d.addCallback(lambda blockhash: self.session.wallet.get_block(blockhash))
else: else:
# TODO: return a useful error message
return server.failure return server.failure
d.addCallback(lambda r: self._render_response(r, OK_CODE)) d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d return d
@ -1973,6 +2008,7 @@ class Daemon(AuthJSONRPCServer):
if 'txid' in p.keys(): if 'txid' in p.keys():
txid = p['txid'] txid = p['txid']
else: else:
# TODO: return a useful error message
return server.failure return server.failure
d = self.session.wallet.get_claims_from_tx(txid) d = self.session.wallet.get_claims_from_tx(txid)
@ -2317,15 +2353,6 @@ def get_sd_hash(stream_info):
return stream_info.get('stream_hash') return stream_info.get('stream_hash')
def get_output_callback(params):
def callback(l):
return {
'stream_hash': params.sd_hash if params.stream_info else l.sd_hash,
'path': os.path.join(params.download_directory, l.file_name)
}
return callback
class _DownloadNameHelper(object): class _DownloadNameHelper(object):
def __init__(self, daemon, name, def __init__(self, daemon, name,
timeout=None, timeout=None,
@ -2341,102 +2368,89 @@ class _DownloadNameHelper(object):
self.file_name = file_name self.file_name = file_name
self.wait_for_write = wait_for_write self.wait_for_write = wait_for_write
def _setup_stream(self, stream_info): @defer.inlineCallbacks
stream_hash = get_sd_hash(stream_info) def setup_stream(self, stream_info):
d = self.daemon._get_lbry_file_by_sd_hash(stream_hash) sd_hash = get_sd_hash(stream_info)
d.addCallback(self._prepend_stream_info, stream_info) lbry_file = yield self.daemon._get_lbry_file_by_sd_hash(sd_hash)
return d if self._does_lbry_file_exists(lbry_file):
defer.returnValue(lbry_file)
else:
defer.returnValue(None)
def _prepend_stream_info(self, lbry_file, stream_info): def _does_lbry_file_exists(self, lbry_file):
if lbry_file: return lbry_file and os.path.isfile(self._full_path(lbry_file))
if os.path.isfile(os.path.join(self.download_directory, lbry_file.file_name)):
return defer.succeed((stream_info, lbry_file))
return defer.succeed((stream_info, None))
def wait_or_get_stream(self, args): def _full_path(self, lbry_file):
stream_info, lbry_file = args return os.path.join(self.download_directory, lbry_file.file_name)
@defer.inlineCallbacks
def wait_or_get_stream(self, stream_info, lbry_file):
if lbry_file: if lbry_file:
log.debug('Wait on lbry_file') log.debug('Wait on lbry_file')
return self._wait_on_lbry_file(lbry_file) # returns the lbry_file
yield self._wait_on_lbry_file(lbry_file)
defer.returnValue((lbry_file.sd_hash, self._full_path(lbry_file)))
else: else:
log.debug('No lbry_file, need to get stream') log.debug('No lbry_file, need to get stream')
return self._get_stream(stream_info) # returns an instance of ManagedEncryptedFileDownloaderFactory
sd_hash, file_path = yield self._get_stream(stream_info)
defer.returnValue((sd_hash, file_path))
def _wait_on_lbry_file(self, f):
file_path = self._full_path(f)
written_bytes = self._get_written_bytes(file_path)
if written_bytes:
log.info("File has bytes: %s --> %s", f.sd_hash, file_path)
return defer.succeed(True)
return task.deferLater(reactor, 1, self._wait_on_lbry_file, f)
@defer.inlineCallbacks
def _get_stream(self, stream_info): def _get_stream(self, stream_info):
d = self.daemon.add_stream( was_successful, sd_hash, download_path = yield self.daemon.add_stream(
self.name, self.timeout, self.download_directory, self.file_name, stream_info) self.name, self.timeout, self.download_directory, self.file_name, stream_info)
if not was_successful:
def _handle_timeout(args): log.warning("lbry://%s timed out, removing from streams", self.name)
was_successful, _, _ = args del self.daemon.streams[self.name]
if not was_successful: self.remove_from_wait("Timed out")
log.warning("lbry://%s timed out, removing from streams", self.name) raise Exception("Timed out")
del self.daemon.streams[self.name]
d.addCallback(_handle_timeout)
if self.wait_for_write: if self.wait_for_write:
d.addCallback(lambda _: self._wait_for_write()) yield self._wait_for_write()
defer.returnValue((sd_hash, download_path))
def _get_stream_for_return():
stream = self.daemon.streams.get(self.name, None)
if stream:
return stream.downloader
else:
self._remove_from_wait("Timed out")
return defer.fail(Exception("Timed out"))
d.addCallback(lambda _: _get_stream_for_return())
return d
def _wait_for_write(self): def _wait_for_write(self):
d = defer.succeed(None) d = defer.succeed(None)
if not self.has_downloader_wrote(): if not self._has_downloader_wrote():
d.addCallback(lambda _: reactor.callLater(1, self._wait_for_write)) d.addCallback(lambda _: reactor.callLater(1, self._wait_for_write))
return d return d
def has_downloader_wrote(self): def _has_downloader_wrote(self):
stream = self.daemon.streams.get(self.name, False) stream = self.daemon.streams.get(self.name, False)
if stream: if stream:
downloader = stream.downloader file_path = self._full_path(stream.downloader)
return self._get_written_bytes(file_path)
else: else:
downloader = False
if not downloader:
return False return False
return self.get_written_bytes(downloader.file_name)
def _wait_on_lbry_file(self, f): def _get_written_bytes(self, file_path):
written_bytes = self.get_written_bytes(f.file_name) """Returns the number of bytes written to `file_path`.
if written_bytes:
return defer.succeed(self._disp_file(f))
return task.deferLater(reactor, 1, self._wait_on_lbry_file, f)
def get_written_bytes(self, file_name): Returns False if there were issues reading `file_path`.
"""Returns the number of bytes written to `file_name`.
Returns False if there were issues reading `file_name`.
""" """
try: try:
file_path = os.path.join(self.download_directory, file_name)
if os.path.isfile(file_path): if os.path.isfile(file_path):
written_file = file(file_path) with open(file_path) as written_file:
written_file.seek(0, os.SEEK_END) written_file.seek(0, os.SEEK_END)
written_bytes = written_file.tell() written_bytes = written_file.tell()
written_file.close()
else: else:
written_bytes = False written_bytes = False
except Exception: except Exception:
writen_bytes = False writen_bytes = False
return written_bytes return written_bytes
def _disp_file(self, f): def remove_from_wait(self, reason):
file_path = os.path.join(self.download_directory, f.file_name)
log.info("Already downloaded: %s --> %s", f.sd_hash, file_path)
return f
def _remove_from_wait(self, r):
if self.name in self.daemon.waiting_on: if self.name in self.daemon.waiting_on:
del self.daemon.waiting_on[self.name] del self.daemon.waiting_on[self.name]
return r return reason
class _ResolveNameHelper(object): class _ResolveNameHelper(object):

View file

@ -15,6 +15,7 @@ INITIALIZING_CODE = 'initializing'
DOWNLOAD_METADATA_CODE = 'downloading_metadata' DOWNLOAD_METADATA_CODE = 'downloading_metadata'
DOWNLOAD_TIMEOUT_CODE = 'timeout' DOWNLOAD_TIMEOUT_CODE = 'timeout'
DOWNLOAD_RUNNING_CODE = 'running' DOWNLOAD_RUNNING_CODE = 'running'
# TODO: is this ever used?
DOWNLOAD_STOPPED_CODE = 'stopped' DOWNLOAD_STOPPED_CODE = 'stopped'
STREAM_STAGES = [ STREAM_STAGES = [
(INITIALIZING_CODE, 'Initializing...'), (INITIALIZING_CODE, 'Initializing...'),
@ -46,16 +47,18 @@ class GetStream(object):
self.payment_rate_manager = self.session.payment_rate_manager self.payment_rate_manager = self.session.payment_rate_manager
self.lbry_file_manager = lbry_file_manager self.lbry_file_manager = lbry_file_manager
self.sd_identifier = sd_identifier self.sd_identifier = sd_identifier
self.stream_hash = None self.sd_hash = None
self.max_key_fee = max_key_fee self.max_key_fee = max_key_fee
self.stream_info = None self.stream_info = None
self.stream_info_manager = None self.stream_info_manager = None
self.d = defer.Deferred(None) self._d = defer.Deferred(None)
self.timeout = timeout self.timeout = timeout
self.timeout_counter = 0 self.timeout_counter = 0
self.download_directory = download_directory self.download_directory = download_directory
self.download_path = None self.download_path = None
self.downloader = None self.downloader = None
# fired after the metadata has been downloaded and the
# actual file has been started
self.finished = defer.Deferred(None) self.finished = defer.Deferred(None)
self.checker = LoopingCall(self.check_status) self.checker = LoopingCall(self.check_status)
self.code = STREAM_STAGES[0] self.code = STREAM_STAGES[0]
@ -63,15 +66,15 @@ class GetStream(object):
def check_status(self): def check_status(self):
self.timeout_counter += 1 self.timeout_counter += 1
# TODO: Why is this the stopping condition for the finished callback? # download_path is set after the sd blob has been downloaded
if self.download_path: if self.download_path:
self.checker.stop() self.checker.stop()
self.finished.callback((True, self.stream_hash, self.download_path)) self.finished.callback((True, self.sd_hash, self.download_path))
elif self.timeout_counter >= self.timeout: elif self.timeout_counter >= self.timeout:
log.info("Timeout downloading lbry://%s" % self.resolved_name) log.info("Timeout downloading lbry://%s" % self.resolved_name)
self.checker.stop() self.checker.stop()
self.d.cancel() self._d.cancel()
self.code = STREAM_STAGES[4] self.code = STREAM_STAGES[4]
self.finished.callback((False, None, None)) self.finished.callback((False, None, None))
@ -108,35 +111,35 @@ class GetStream(object):
self.resolved_name = name self.resolved_name = name
self.stream_info = deepcopy(stream_info) self.stream_info = deepcopy(stream_info)
self.description = self.stream_info['description'] self.description = self.stream_info['description']
self.stream_hash = self.stream_info['sources']['lbry_sd_hash'] self.sd_hash = self.stream_info['sources']['lbry_sd_hash']
if 'fee' in self.stream_info: if 'fee' in self.stream_info:
self.fee = FeeValidator(self.stream_info['fee']) self.fee = FeeValidator(self.stream_info['fee'])
max_key_fee = self._convert_max_fee() max_key_fee = self._convert_max_fee()
converted_fee = self.exchange_rate_manager.to_lbc(self.fee).amount converted_fee = self.exchange_rate_manager.to_lbc(self.fee).amount
if converted_fee > self.wallet.wallet_balance: if converted_fee > self.wallet.wallet_balance:
log.warning("Insufficient funds to download lbry://%s", self.resolved_name) msg = "Insufficient funds to download lbry://{}. Need {:0.2f}, have {:0.2f}".format(
return defer.fail(InsufficientFundsError()) self.resolved_name, converted_fee, self.wallet.wallet_balance)
raise InsufficientFundsError(msg)
if converted_fee > max_key_fee: if converted_fee > max_key_fee:
log.warning( msg = "Key fee {:0.2f} above limit of {:0.2f} didn't download lbry://{}".format(
"Key fee %f above limit of %f didn't download lbry://%s",
converted_fee, max_key_fee, self.resolved_name) converted_fee, max_key_fee, self.resolved_name)
return defer.fail(KeyFeeAboveMaxAllowed()) raise KeyFeeAboveMaxAllowed(msg)
log.info( log.info(
"Key fee %f below limit of %f, downloading lbry://%s", "Key fee %f below limit of %f, downloading lbry://%s",
converted_fee, max_key_fee, self.resolved_name) converted_fee, max_key_fee, self.resolved_name)
self.checker.start(1) self.checker.start(1)
self.d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE)) self._d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE))
self.d.addCallback(lambda _: download_sd_blob( self._d.addCallback(lambda _: download_sd_blob(
self.session, self.stream_hash, self.payment_rate_manager)) self.session, self.sd_hash, self.payment_rate_manager))
self.d.addCallback(self.sd_identifier.get_metadata_for_sd_blob) self._d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
self.d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE)) self._d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE))
self.d.addCallback(get_downloader_factory) self._d.addCallback(get_downloader_factory)
self.d.addCallback(make_downloader) self._d.addCallback(make_downloader)
self.d.addCallbacks(self._start_download, _cause_timeout) self._d.addCallbacks(self._start_download, _cause_timeout)
self.d.callback(None) self._d.callback(None)
return self.finished return self.finished
@ -147,7 +150,7 @@ class GetStream(object):
d = self._pay_key_fee() d = self._pay_key_fee()
d.addCallback(lambda _: log.info( d.addCallback(lambda _: log.info(
"Downloading %s --> %s", self.stream_hash, self.downloader.file_name)) "Downloading %s --> %s", self.sd_hash, self.downloader.file_name))
d.addCallback(lambda _: self.downloader.start()) d.addCallback(lambda _: self.downloader.start())
def _pay_key_fee(self): def _pay_key_fee(self):
@ -155,6 +158,9 @@ class GetStream(object):
fee_lbc = self.exchange_rate_manager.to_lbc(self.fee).amount fee_lbc = self.exchange_rate_manager.to_lbc(self.fee).amount
reserved_points = self.wallet.reserve_points(self.fee.address, fee_lbc) reserved_points = self.wallet.reserve_points(self.fee.address, fee_lbc)
if reserved_points is None: if reserved_points is None:
log.warning('Unable to pay the key fee of %s for %s', fee_lbc, self.resolved_name)
# TODO: If we get here, nobody will know that there was an error
# as nobody actually cares about self._d
return defer.fail(InsufficientFundsError()) return defer.fail(InsufficientFundsError())
return self.wallet.send_points_to_address(reserved_points, fee_lbc) return self.wallet.send_points_to_address(reserved_points, fee_lbc)
return defer.succeed(None) return defer.succeed(None)

View file

@ -652,7 +652,8 @@ class TestTransfer(TestCase):
options = metadata.options options = metadata.options
factories = metadata.factories factories = metadata.factories
chosen_options = [ chosen_options = [
o.default_value for o in options.get_downloader_options(info_validator, prm)] o.default_value for o in options.get_downloader_options(info_validator, prm)
]
return factories[0].make_downloader(metadata, chosen_options, prm) return factories[0].make_downloader(metadata, chosen_options, prm)
def download_file(sd_hash): def download_file(sd_hash):
@ -669,17 +670,14 @@ class TestTransfer(TestCase):
hashsum.update(f.read()) hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be") self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be")
@defer.inlineCallbacks
def start_transfer(sd_hash): def start_transfer(sd_hash):
logging.debug("Starting the transfer") logging.debug("Starting the transfer")
yield self.session.setup()
d = self.session.setup() yield add_lbry_file_to_sd_identifier(sd_identifier)
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) yield self.lbry_file_manager.setup()
d.addCallback(lambda _: self.lbry_file_manager.setup()) yield download_file(sd_hash)
d.addCallback(lambda _: download_file(sd_hash)) yield check_md5_sum()
d.addCallback(lambda _: check_md5_sum())
return d
def stop(arg): def stop(arg):
if isinstance(arg, Failure): if isinstance(arg, Failure):
@ -914,25 +912,30 @@ class TestTransfer(TestCase):
self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir)
self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier) self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier)
@defer.inlineCallbacks
def make_downloader(metadata, prm): def make_downloader(metadata, prm):
info_validator = metadata.validator info_validator = metadata.validator
options = metadata.options options = metadata.options
factories = metadata.factories factories = metadata.factories
chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)] chosen_options = [
return factories[0].make_downloader(metadata, chosen_options, prm) o.default_value for o in options.get_downloader_options(info_validator, prm)
]
downloader = yield factories[0].make_downloader(metadata, chosen_options, prm)
defer.returnValue(downloader)
def append_downloader(downloader): def append_downloader(downloader):
downloaders.append(downloader) downloaders.append(downloader)
return downloader return downloader
@defer.inlineCallbacks
def download_file(sd_hash): def download_file(sd_hash):
prm = self.session.payment_rate_manager prm = self.session.payment_rate_manager
d = download_sd_blob(self.session, sd_hash, prm) sd_blob = yield download_sd_blob(self.session, sd_hash, prm)
d.addCallback(sd_identifier.get_metadata_for_sd_blob) metadata = yield sd_identifier.get_metadata_for_sd_blob(sd_blob)
d.addCallback(make_downloader, prm) downloader = yield make_downloader(metadata, prm)
d.addCallback(append_downloader) downloaders.append(downloader)
d.addCallback(lambda downloader: downloader.start()) finished_value = yield downloader.start()
return d defer.returnValue(finished_value)
def check_md5_sum(): def check_md5_sum():
f = open('test_file') f = open('test_file')
@ -959,20 +962,18 @@ class TestTransfer(TestCase):
d.addCallback(check_status_report) d.addCallback(check_status_report)
return d return d
@defer.inlineCallbacks
def start_transfer(sd_hash): def start_transfer(sd_hash):
logging.debug("Starting the transfer") logging.debug("Starting the transfer")
yield self.session.setup()
d = self.session.setup() yield self.stream_info_manager.setup()
d.addCallback(lambda _: self.stream_info_manager.setup()) yield add_lbry_file_to_sd_identifier(sd_identifier)
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) yield self.lbry_file_manager.setup()
d.addCallback(lambda _: self.lbry_file_manager.setup()) yield download_file(sd_hash)
d.addCallback(lambda _: download_file(sd_hash)) yield check_md5_sum()
d.addCallback(lambda _: check_md5_sum()) yield download_file(sd_hash)
d.addCallback(lambda _: download_file(sd_hash)) yield delete_lbry_file()
d.addCallback(lambda _: delete_lbry_file()) yield check_lbry_file()
d.addCallback(lambda _: check_lbry_file())
return d
def stop(arg): def stop(arg):
if isinstance(arg, Failure): if isinstance(arg, Failure):

View file

@ -22,7 +22,7 @@ class EventsTest(unittest.TestCase):
self.assertEqual(desired_result, result) self.assertEqual(desired_result, result)
def test_download_started(self): def test_download_started(self):
result = self.event_generator.download_started('great gatsby') result = self.event_generator.download_started('1', 'great gatsby')
desired_result = { desired_result = {
'context': 'any valid json datatype', 'context': 'any valid json datatype',
'event': 'Download Started', 'event': 'Download Started',
@ -31,6 +31,7 @@ class EventsTest(unittest.TestCase):
'session_id': 'session456', 'session_id': 'session456',
'name': 'great gatsby', 'name': 'great gatsby',
'stream_info': None, 'stream_info': None,
'download_id': '1'
}, },
'timestamp': '2016-01-01T00:00:00Z', 'timestamp': '2016-01-01T00:00:00Z',
'userId': 'lbry' 'userId': 'lbry'