lbry-sdk/lbrynet/lbrylive/client/LiveStreamMetadataHandler.py
2016-10-27 14:31:27 -05:00

346 lines
15 KiB
Python

from collections import defaultdict
import logging
from zope.interface import implements
from twisted.internet import defer
from twisted.python.failure import Failure
from lbrynet.conf import settings
from lbrynet.core.client.ClientRequest import ClientRequest, ClientPaidRequest
from lbrynet.lbrylive.LiveBlob import LiveBlobInfo
from lbrynet.core.cryptoutils import get_lbry_hash_obj, verify_signature
from lbrynet.interfaces import IRequestCreator, IMetadataHandler
from lbrynet.core.Error import InsufficientFundsError, InvalidResponseError, RequestCanceledError
from lbrynet.core.Error import NoResponseError, ConnectionClosedBeforeResponseError
log = logging.getLogger(__name__)
class LiveStreamMetadataHandler(object):
implements(IRequestCreator, IMetadataHandler)
def __init__(self, stream_hash, stream_info_manager, peer_finder, stream_pub_key, download_whole,
payment_rate_manager, wallet, download_manager, max_before_skip_ahead=None):
self.stream_hash = stream_hash
self.stream_info_manager = stream_info_manager
self.payment_rate_manager = payment_rate_manager
self.wallet = wallet
self.peer_finder = peer_finder
self.stream_pub_key = stream_pub_key
self.download_whole = download_whole
self.max_before_skip_ahead = max_before_skip_ahead
if self.download_whole is False:
assert self.max_before_skip_ahead is not None, \
"If download whole is False, max_before_skip_ahead must be set"
self.download_manager = download_manager
self._peers = defaultdict(int) # {Peer: score}
self._protocol_prices = {}
self._final_blob_num = None
self._price_disagreements = [] # [Peer]
self._incompatible_peers = [] # [Peer]
######### IMetadataHandler #########
def get_initial_blobs(self):
d = self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
d.addCallback(self._format_initial_blobs_for_download_manager)
return d
def final_blob_num(self):
return self._final_blob_num
######## IRequestCreator #########
def send_next_request(self, peer, protocol):
if self._finished_discovery() is False and self._should_send_request_to(peer) is True:
p_r = None
if not self._price_settled(protocol):
p_r = self._get_price_request(peer, protocol)
d_r = self._get_discover_request(peer)
reserved_points = self._reserve_points(peer, protocol, d_r.max_pay_units)
if reserved_points is not None:
d1 = protocol.add_request(d_r)
d1.addCallback(self._handle_discover_response, peer, d_r)
d1.addBoth(self._pay_or_cancel_payment, protocol, reserved_points)
d1.addErrback(self._request_failed, peer)
if p_r is not None:
d2 = protocol.add_request(p_r)
d2.addCallback(self._handle_price_response, peer, p_r, protocol)
d2.addErrback(self._request_failed, peer)
return defer.succeed(True)
else:
return defer.fail(InsufficientFundsError())
return defer.succeed(False)
def get_new_peers(self):
d = self._get_hash_for_peer_search()
d.addCallback(self._find_peers_for_hash)
return d
######### internal calls #########
def _get_hash_for_peer_search(self):
r = None
if self._finished_discovery() is False:
r = self.stream_hash
log.debug("Info finder peer search response for stream %s: %s", str(self.stream_hash), str(r))
return defer.succeed(r)
def _find_peers_for_hash(self, h):
if h is None:
return None
else:
d = self.peer_finder.find_peers_for_blob(h)
def choose_best_peers(peers):
bad_peers = self._get_bad_peers()
return [p for p in peers if not p in bad_peers]
d.addCallback(choose_best_peers)
return d
def _format_initial_blobs_for_download_manager(self, blob_infos):
infos = []
for blob_hash, blob_num, revision, iv, length, signature in blob_infos:
if blob_hash is not None:
infos.append(LiveBlobInfo(blob_hash, blob_num, length, iv, revision, signature))
else:
log.debug("Setting _final_blob_num to %s", str(blob_num - 1))
self._final_blob_num = blob_num - 1
return infos
def _should_send_request_to(self, peer):
if self._peers[peer] < -5.0:
return False
if peer in self._price_disagreements:
return False
return True
def _get_bad_peers(self):
return [p for p in self._peers.iterkeys() if not self._should_send_request_to(p)]
def _finished_discovery(self):
if self._get_discovery_params() is None:
return True
return False
def _get_discover_request(self, peer):
discovery_params = self._get_discovery_params()
if discovery_params:
further_blobs_request = {}
reference, start, end, count = discovery_params
further_blobs_request['reference'] = reference
if start is not None:
further_blobs_request['start'] = start
if end is not None:
further_blobs_request['end'] = end
if count is not None:
further_blobs_request['count'] = count
else:
further_blobs_request['count'] = settings.MAX_BLOB_INFOS_TO_REQUEST
log.debug("Requesting %s blob infos from %s", str(further_blobs_request['count']), str(peer))
r_dict = {'further_blobs': further_blobs_request}
response_identifier = 'further_blobs'
request = ClientPaidRequest(r_dict, response_identifier, further_blobs_request['count'])
return request
return None
def _get_discovery_params(self):
log.debug("In _get_discovery_params")
stream_position = self.download_manager.stream_position()
blobs = self.download_manager.blobs
if blobs:
last_blob_num = max(blobs.iterkeys())
else:
last_blob_num = -1
final_blob_num = self.final_blob_num()
if final_blob_num is not None:
last_blob_num = final_blob_num
if self.download_whole is False:
log.debug("download_whole is False")
if final_blob_num is not None:
for i in xrange(stream_position, final_blob_num + 1):
if not i in blobs:
count = min(self.max_before_skip_ahead, (final_blob_num - i + 1))
return self.stream_hash, None, 'end', count
return None
else:
if blobs:
for i in xrange(stream_position, last_blob_num + 1):
if not i in blobs:
if i == 0:
return self.stream_hash, 'beginning', 'end', -1 * self.max_before_skip_ahead
else:
return self.stream_hash, blobs[i-1].blob_hash, 'end', -1 * self.max_before_skip_ahead
return self.stream_hash, blobs[last_blob_num].blob_hash, 'end', -1 * self.max_before_skip_ahead
else:
return self.stream_hash, None, 'end', -1 * self.max_before_skip_ahead
log.debug("download_whole is True")
beginning = None
end = None
for i in xrange(stream_position, last_blob_num + 1):
if not i in blobs:
if beginning is None:
if i == 0:
beginning = 'beginning'
else:
beginning = blobs[i-1].blob_hash
else:
if beginning is not None:
end = blobs[i].blob_hash
break
if beginning is None:
if final_blob_num is not None:
log.debug("Discovery is finished. stream_position: %s, last_blob_num + 1: %s", str(stream_position),
str(last_blob_num + 1))
return None
else:
log.debug("Discovery is not finished. final blob num is unknown.")
if last_blob_num != -1:
return self.stream_hash, blobs[last_blob_num].blob_hash, None, None
else:
return self.stream_hash, 'beginning', None, None
else:
log.info("Discovery is not finished. Not all blobs are known.")
return self.stream_hash, beginning, end, None
def _price_settled(self, protocol):
if protocol in self._protocol_prices:
return True
return False
def _update_local_score(self, peer, amount):
self._peers[peer] += amount
def _reserve_points(self, peer, protocol, max_infos):
assert protocol in self._protocol_prices
point_amount = 1.0 * max_infos * self._protocol_prices[protocol] / 1000.0
return self.wallet.reserve_points(peer, point_amount)
def _pay_or_cancel_payment(self, arg, protocol, reserved_points):
if isinstance(arg, Failure) or arg == 0:
self._cancel_points(reserved_points)
else:
self._pay_peer(protocol, arg, reserved_points)
return arg
def _pay_peer(self, protocol, num_infos, reserved_points):
assert num_infos != 0
assert protocol in self._protocol_prices
point_amount = 1.0 * num_infos * self._protocol_prices[protocol] / 1000.0
self.wallet.send_points(reserved_points, point_amount)
self.payment_rate_manager.record_points_paid(point_amount)
def _cancel_points(self, reserved_points):
return self.wallet.cancel_point_reservation(reserved_points)
def _get_price_request(self, peer, protocol):
self._protocol_prices[protocol] = self.payment_rate_manager.get_rate_live_blob_info(peer)
request_dict = {'blob_info_payment_rate': self._protocol_prices[protocol]}
request = ClientRequest(request_dict, 'blob_info_payment_rate')
return request
def _handle_price_response(self, response_dict, peer, request, protocol):
if not request.response_identifier in response_dict:
return InvalidResponseError("response identifier not in response")
assert protocol in self._protocol_prices
response = response_dict[request.response_identifier]
if response == "RATE_ACCEPTED":
return True
else:
log.info("Rate offer has been rejected by %s", str(peer))
del self._protocol_prices[protocol]
self._price_disagreements.append(peer)
return True
def _handle_discover_response(self, response_dict, peer, request):
if not request.response_identifier in response_dict:
return InvalidResponseError("response identifier not in response")
response = response_dict[request.response_identifier]
blob_infos = []
if 'error' in response:
if response['error'] == 'RATE_UNSET':
return defer.succeed(0)
else:
return InvalidResponseError("Got an unknown error from the peer: %s" %
(response['error'],))
if not 'blob_infos' in response:
return InvalidResponseError("Missing the required field 'blob_infos'")
raw_blob_infos = response['blob_infos']
log.info("Handling %s further blobs from %s", str(len(raw_blob_infos)), str(peer))
log.debug("blobs: %s", str(raw_blob_infos))
for raw_blob_info in raw_blob_infos:
length = raw_blob_info['length']
if length != 0:
blob_hash = raw_blob_info['blob_hash']
else:
blob_hash = None
num = raw_blob_info['blob_num']
revision = raw_blob_info['revision']
iv = raw_blob_info['iv']
signature = raw_blob_info['signature']
blob_info = LiveBlobInfo(blob_hash, num, length, iv, revision, signature)
log.debug("Learned about a potential blob: %s", str(blob_hash))
if self._verify_blob(blob_info):
if blob_hash is None:
log.info("Setting _final_blob_num to %s", str(num - 1))
self._final_blob_num = num - 1
else:
blob_infos.append(blob_info)
else:
raise ValueError("Peer sent an invalid blob info")
d = self.stream_info_manager.add_blobs_to_stream(self.stream_hash, blob_infos)
def add_blobs_to_download_manager():
blob_nums = [b.blob_num for b in blob_infos]
log.info("Adding the following blob nums to the download manager: %s", str(blob_nums))
self.download_manager.add_blobs_to_download(blob_infos)
d.addCallback(lambda _: add_blobs_to_download_manager())
def pay_or_penalize_peer():
if len(blob_infos):
self._update_local_score(peer, len(blob_infos))
peer.update_stats('downloaded_crypt_blob_infos', len(blob_infos))
peer.update_score(len(blob_infos))
else:
self._update_local_score(peer, -.0001)
return len(blob_infos)
d.addCallback(lambda _: pay_or_penalize_peer())
return d
def _verify_blob(self, blob):
log.debug("Got an unverified blob to check:")
log.debug("blob_hash: %s", blob.blob_hash)
log.debug("blob_num: %s", str(blob.blob_num))
log.debug("revision: %s", str(blob.revision))
log.debug("iv: %s", blob.iv)
log.debug("length: %s", str(blob.length))
hashsum = get_lbry_hash_obj()
hashsum.update(self.stream_hash)
if blob.length != 0:
hashsum.update(blob.blob_hash)
hashsum.update(str(blob.blob_num))
hashsum.update(str(blob.revision))
hashsum.update(blob.iv)
hashsum.update(str(blob.length))
log.debug("hexdigest to be verified: %s", hashsum.hexdigest())
if verify_signature(hashsum.digest(), blob.signature, self.stream_pub_key):
log.debug("Blob info is valid")
return True
else:
log.debug("The blob info is invalid")
return False
def _request_failed(self, reason, peer):
if reason.check(RequestCanceledError):
return
if reason.check(NoResponseError):
self._incompatible_peers.append(peer)
log.warning("Crypt stream info finder: a request failed. Reason: %s", reason.getErrorMessage())
self._update_local_score(peer, -5.0)
peer.update_score(-10.0)
if reason.check(ConnectionClosedBeforeResponseError):
return
return reason