from collections import defaultdict import logging from zope.interface import implements from twisted.internet import defer from twisted.python.failure import Failure from lbrynet.conf import MAX_BLOB_INFOS_TO_REQUEST from lbrynet.core.client.ClientRequest import ClientRequest, ClientPaidRequest from lbrynet.lbrylive.LiveBlob import LiveBlobInfo from lbrynet.core.cryptoutils import get_lbry_hash_obj, verify_signature from lbrynet.interfaces import IRequestCreator, IMetadataHandler from lbrynet.core.Error import InsufficientFundsError, InvalidResponseError, RequestCanceledError from lbrynet.core.Error import NoResponseError, ConnectionClosedBeforeResponseError log = logging.getLogger(__name__) class LiveStreamMetadataHandler(object): implements(IRequestCreator, IMetadataHandler) def __init__(self, stream_hash, stream_info_manager, peer_finder, stream_pub_key, download_whole, payment_rate_manager, wallet, download_manager, max_before_skip_ahead=None): self.stream_hash = stream_hash self.stream_info_manager = stream_info_manager self.payment_rate_manager = payment_rate_manager self.wallet = wallet self.peer_finder = peer_finder self.stream_pub_key = stream_pub_key self.download_whole = download_whole self.max_before_skip_ahead = max_before_skip_ahead if self.download_whole is False: assert self.max_before_skip_ahead is not None, \ "If download whole is False, max_before_skip_ahead must be set" self.download_manager = download_manager self._peers = defaultdict(int) # {Peer: score} self._protocol_prices = {} self._final_blob_num = None self._price_disagreements = [] # [Peer] self._incompatible_peers = [] # [Peer] ######### IMetadataHandler ######### def get_initial_blobs(self): d = self.stream_info_manager.get_blobs_for_stream(self.stream_hash) d.addCallback(self._format_initial_blobs_for_download_manager) return d def final_blob_num(self): return self._final_blob_num ######## IRequestCreator ######### def send_next_request(self, peer, protocol): if self._finished_discovery() is False and self._should_send_request_to(peer) is True: p_r = None if not self._price_settled(protocol): p_r = self._get_price_request(peer, protocol) d_r = self._get_discover_request(peer) reserved_points = self._reserve_points(peer, protocol, d_r.max_pay_units) if reserved_points is not None: d1 = protocol.add_request(d_r) d1.addCallback(self._handle_discover_response, peer, d_r) d1.addBoth(self._pay_or_cancel_payment, protocol, reserved_points) d1.addErrback(self._request_failed, peer) if p_r is not None: d2 = protocol.add_request(p_r) d2.addCallback(self._handle_price_response, peer, p_r, protocol) d2.addErrback(self._request_failed, peer) return defer.succeed(True) else: return defer.fail(InsufficientFundsError()) return defer.succeed(False) def get_new_peers(self): d = self._get_hash_for_peer_search() d.addCallback(self._find_peers_for_hash) return d ######### internal calls ######### def _get_hash_for_peer_search(self): r = None if self._finished_discovery() is False: r = self.stream_hash log.debug("Info finder peer search response for stream %s: %s", str(self.stream_hash), str(r)) return defer.succeed(r) def _find_peers_for_hash(self, h): if h is None: return None else: d = self.peer_finder.find_peers_for_blob(h) def choose_best_peers(peers): bad_peers = self._get_bad_peers() return [p for p in peers if not p in bad_peers] d.addCallback(choose_best_peers) return d def _format_initial_blobs_for_download_manager(self, blob_infos): infos = [] for blob_hash, blob_num, revision, iv, length, signature in blob_infos: if blob_hash is not None: infos.append(LiveBlobInfo(blob_hash, blob_num, length, iv, revision, signature)) else: log.debug("Setting _final_blob_num to %s", str(blob_num - 1)) self._final_blob_num = blob_num - 1 return infos def _should_send_request_to(self, peer): if self._peers[peer] < -5.0: return False if peer in self._price_disagreements: return False return True def _get_bad_peers(self): return [p for p in self._peers.iterkeys() if not self._should_send_request_to(p)] def _finished_discovery(self): if self._get_discovery_params() is None: return True return False def _get_discover_request(self, peer): discovery_params = self._get_discovery_params() if discovery_params: further_blobs_request = {} reference, start, end, count = discovery_params further_blobs_request['reference'] = reference if start is not None: further_blobs_request['start'] = start if end is not None: further_blobs_request['end'] = end if count is not None: further_blobs_request['count'] = count else: further_blobs_request['count'] = MAX_BLOB_INFOS_TO_REQUEST log.debug("Requesting %s blob infos from %s", str(further_blobs_request['count']), str(peer)) r_dict = {'further_blobs': further_blobs_request} response_identifier = 'further_blobs' request = ClientPaidRequest(r_dict, response_identifier, further_blobs_request['count']) return request return None def _get_discovery_params(self): log.debug("In _get_discovery_params") stream_position = self.download_manager.stream_position() blobs = self.download_manager.blobs if blobs: last_blob_num = max(blobs.iterkeys()) else: last_blob_num = -1 final_blob_num = self.final_blob_num() if final_blob_num is not None: last_blob_num = final_blob_num if self.download_whole is False: log.debug("download_whole is False") if final_blob_num is not None: for i in xrange(stream_position, final_blob_num + 1): if not i in blobs: count = min(self.max_before_skip_ahead, (final_blob_num - i + 1)) return self.stream_hash, None, 'end', count return None else: if blobs: for i in xrange(stream_position, last_blob_num + 1): if not i in blobs: if i == 0: return self.stream_hash, 'beginning', 'end', -1 * self.max_before_skip_ahead else: return self.stream_hash, blobs[i-1].blob_hash, 'end', -1 * self.max_before_skip_ahead return self.stream_hash, blobs[last_blob_num].blob_hash, 'end', -1 * self.max_before_skip_ahead else: return self.stream_hash, None, 'end', -1 * self.max_before_skip_ahead log.debug("download_whole is True") beginning = None end = None for i in xrange(stream_position, last_blob_num + 1): if not i in blobs: if beginning is None: if i == 0: beginning = 'beginning' else: beginning = blobs[i-1].blob_hash else: if beginning is not None: end = blobs[i].blob_hash break if beginning is None: if final_blob_num is not None: log.debug("Discovery is finished. stream_position: %s, last_blob_num + 1: %s", str(stream_position), str(last_blob_num + 1)) return None else: log.debug("Discovery is not finished. final blob num is unknown.") if last_blob_num != -1: return self.stream_hash, blobs[last_blob_num].blob_hash, None, None else: return self.stream_hash, 'beginning', None, None else: log.info("Discovery is not finished. Not all blobs are known.") return self.stream_hash, beginning, end, None def _price_settled(self, protocol): if protocol in self._protocol_prices: return True return False def _update_local_score(self, peer, amount): self._peers[peer] += amount def _reserve_points(self, peer, protocol, max_infos): assert protocol in self._protocol_prices point_amount = 1.0 * max_infos * self._protocol_prices[protocol] / 1000.0 return self.wallet.reserve_points(peer, point_amount) def _pay_or_cancel_payment(self, arg, protocol, reserved_points): if isinstance(arg, Failure) or arg == 0: self._cancel_points(reserved_points) else: self._pay_peer(protocol, arg, reserved_points) return arg def _pay_peer(self, protocol, num_infos, reserved_points): assert num_infos != 0 assert protocol in self._protocol_prices point_amount = 1.0 * num_infos * self._protocol_prices[protocol] / 1000.0 self.wallet.send_points(reserved_points, point_amount) self.payment_rate_manager.record_points_paid(point_amount) def _cancel_points(self, reserved_points): return self.wallet.cancel_point_reservation(reserved_points) def _get_price_request(self, peer, protocol): self._protocol_prices[protocol] = self.payment_rate_manager.get_rate_live_blob_info(peer) request_dict = {'blob_info_payment_rate': self._protocol_prices[protocol]} request = ClientRequest(request_dict, 'blob_info_payment_rate') return request def _handle_price_response(self, response_dict, peer, request, protocol): if not request.response_identifier in response_dict: return InvalidResponseError("response identifier not in response") assert protocol in self._protocol_prices response = response_dict[request.response_identifier] if response == "RATE_ACCEPTED": return True else: log.info("Rate offer has been rejected by %s", str(peer)) del self._protocol_prices[protocol] self._price_disagreements.append(peer) return True def _handle_discover_response(self, response_dict, peer, request): if not request.response_identifier in response_dict: return InvalidResponseError("response identifier not in response") response = response_dict[request.response_identifier] blob_infos = [] if 'error' in response: if response['error'] == 'RATE_UNSET': return defer.succeed(0) else: return InvalidResponseError("Got an unknown error from the peer: %s" % (response['error'],)) if not 'blob_infos' in response: return InvalidResponseError("Missing the required field 'blob_infos'") raw_blob_infos = response['blob_infos'] log.info("Handling %s further blobs from %s", str(len(raw_blob_infos)), str(peer)) log.debug("blobs: %s", str(raw_blob_infos)) for raw_blob_info in raw_blob_infos: length = raw_blob_info['length'] if length != 0: blob_hash = raw_blob_info['blob_hash'] else: blob_hash = None num = raw_blob_info['blob_num'] revision = raw_blob_info['revision'] iv = raw_blob_info['iv'] signature = raw_blob_info['signature'] blob_info = LiveBlobInfo(blob_hash, num, length, iv, revision, signature) log.debug("Learned about a potential blob: %s", str(blob_hash)) if self._verify_blob(blob_info): if blob_hash is None: log.info("Setting _final_blob_num to %s", str(num - 1)) self._final_blob_num = num - 1 else: blob_infos.append(blob_info) else: raise ValueError("Peer sent an invalid blob info") d = self.stream_info_manager.add_blobs_to_stream(self.stream_hash, blob_infos) def add_blobs_to_download_manager(): blob_nums = [b.blob_num for b in blob_infos] log.info("Adding the following blob nums to the download manager: %s", str(blob_nums)) self.download_manager.add_blobs_to_download(blob_infos) d.addCallback(lambda _: add_blobs_to_download_manager()) def pay_or_penalize_peer(): if len(blob_infos): self._update_local_score(peer, len(blob_infos)) peer.update_stats('downloaded_crypt_blob_infos', len(blob_infos)) peer.update_score(len(blob_infos)) else: self._update_local_score(peer, -.0001) return len(blob_infos) d.addCallback(lambda _: pay_or_penalize_peer()) return d def _verify_blob(self, blob): log.debug("Got an unverified blob to check:") log.debug("blob_hash: %s", blob.blob_hash) log.debug("blob_num: %s", str(blob.blob_num)) log.debug("revision: %s", str(blob.revision)) log.debug("iv: %s", blob.iv) log.debug("length: %s", str(blob.length)) hashsum = get_lbry_hash_obj() hashsum.update(self.stream_hash) if blob.length != 0: hashsum.update(blob.blob_hash) hashsum.update(str(blob.blob_num)) hashsum.update(str(blob.revision)) hashsum.update(blob.iv) hashsum.update(str(blob.length)) log.debug("hexdigest to be verified: %s", hashsum.hexdigest()) if verify_signature(hashsum.digest(), blob.signature, self.stream_pub_key): log.debug("Blob info is valid") return True else: log.debug("The blob info is invalid") return False def _request_failed(self, reason, peer): if reason.check(RequestCanceledError): return if reason.check(NoResponseError): self._incompatible_peers.append(peer) log.warning("Crypt stream info finder: a request failed. Reason: %s", reason.getErrorMessage()) self._update_local_score(peer, -5.0) peer.update_score(-10.0) if reason.check(ConnectionClosedBeforeResponseError): return return reason