Refactor blob requester to better seperate out the three requests
Each blob requests has three parts - A request to see what blobs are available - A request to download and pay for one of those blobs - A request to negotiate data costs Before the refactoring all of the code to accomplish this was mixed into one class. They are now three seperate classes. There is still a lot of room for improvement, but it is at least better than it was.
This commit is contained in:
parent
6803bfae99
commit
4334cdace1
5 changed files with 408 additions and 202 deletions
|
@ -33,4 +33,8 @@ class Peer(object):
|
||||||
self.stats[stat_type] += count
|
self.stats[stat_type] += count
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.host + ":" + str(self.port)
|
return '{}:{}'.format(self.host, self.port)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return 'Peer({!r}, {!r})'.format(self.host, self.port)
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,21 @@ from lbrynet.interfaces import IRequestCreator
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def get_points(num_bytes, rate):
|
||||||
|
return 1.0 * num_bytes * rate / 2**20
|
||||||
|
|
||||||
|
|
||||||
|
def cache(fn):
|
||||||
|
"""Caches the function call for each instance"""
|
||||||
|
attr = '__{}_value'.format(fn.__name__)
|
||||||
|
def helper(self):
|
||||||
|
if not hasattr(self, attr):
|
||||||
|
value = fn(self)
|
||||||
|
setattr(self, attr, value)
|
||||||
|
return getattr(self, attr)
|
||||||
|
return helper
|
||||||
|
|
||||||
|
|
||||||
class BlobRequester(object):
|
class BlobRequester(object):
|
||||||
implements(IRequestCreator)
|
implements(IRequestCreator)
|
||||||
|
|
||||||
|
@ -30,56 +45,11 @@ class BlobRequester(object):
|
||||||
self._incompatible_peers = []
|
self._incompatible_peers = []
|
||||||
|
|
||||||
######## IRequestCreator #########
|
######## IRequestCreator #########
|
||||||
|
|
||||||
def send_next_request(self, peer, protocol):
|
def send_next_request(self, peer, protocol):
|
||||||
sent_request = False
|
"""Makes an availability request, download request and price request"""
|
||||||
if self._blobs_to_download() and self._should_send_request_to(peer):
|
if not self.should_send_next_request(peer):
|
||||||
a_r = self._get_availability_request(peer)
|
return defer.succeed(False)
|
||||||
d_r = self._get_download_request(peer)
|
return self._send_next_request(peer, protocol)
|
||||||
p_r = None
|
|
||||||
|
|
||||||
if a_r is not None or d_r is not None:
|
|
||||||
p_r = self._get_price_request(peer, protocol)
|
|
||||||
|
|
||||||
if a_r is not None:
|
|
||||||
d1 = protocol.add_request(a_r)
|
|
||||||
d1.addCallback(self._handle_availability, peer, a_r)
|
|
||||||
d1.addErrback(self._request_failed, "availability request", peer)
|
|
||||||
sent_request = True
|
|
||||||
if d_r is not None:
|
|
||||||
reserved_points = self._reserve_points(peer, protocol, d_r.max_pay_units)
|
|
||||||
if reserved_points is not None:
|
|
||||||
# Note: The following three callbacks will be called when the blob has been
|
|
||||||
# fully downloaded or canceled
|
|
||||||
d_r.finished_deferred.addCallbacks(self._download_succeeded, self._download_failed,
|
|
||||||
callbackArgs=(peer, d_r.blob),
|
|
||||||
errbackArgs=(peer,))
|
|
||||||
d_r.finished_deferred.addBoth(self._pay_or_cancel_payment, protocol, reserved_points, d_r.blob)
|
|
||||||
d_r.finished_deferred.addErrback(self._handle_download_error, peer, d_r.blob)
|
|
||||||
|
|
||||||
d2 = protocol.add_blob_request(d_r)
|
|
||||||
# Note: The following two callbacks will be called as soon as the peer sends its
|
|
||||||
# response, which will be before the blob has finished downloading, but may be
|
|
||||||
# after the blob has been canceled. For example,
|
|
||||||
# 1) client sends request to Peer A
|
|
||||||
# 2) the blob is finished downloading from peer B, and therefore this one is canceled
|
|
||||||
# 3) client receives response from Peer A
|
|
||||||
# Therefore, these callbacks shouldn't rely on there being a blob about to be
|
|
||||||
# downloaded.
|
|
||||||
d2.addCallback(self._handle_incoming_blob, peer, d_r)
|
|
||||||
d2.addErrback(self._request_failed, "download request", peer)
|
|
||||||
|
|
||||||
sent_request = True
|
|
||||||
else:
|
|
||||||
d_r.cancel(InsufficientFundsError())
|
|
||||||
d_r.finished_deferred.addErrback(lambda _: True)
|
|
||||||
return defer.fail(InsufficientFundsError())
|
|
||||||
if sent_request is True:
|
|
||||||
if p_r is not None:
|
|
||||||
d3 = protocol.add_request(p_r)
|
|
||||||
d3.addCallback(self._handle_price_response, peer, p_r, protocol)
|
|
||||||
d3.addErrback(self._request_failed, "price request", peer)
|
|
||||||
return defer.succeed(sent_request)
|
|
||||||
|
|
||||||
def get_new_peers(self):
|
def get_new_peers(self):
|
||||||
d = self._get_hash_for_peer_search()
|
d = self._get_hash_for_peer_search()
|
||||||
|
@ -87,36 +57,34 @@ class BlobRequester(object):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
######### internal calls #########
|
######### internal calls #########
|
||||||
|
def should_send_next_request(self, peer):
|
||||||
|
return (
|
||||||
|
self._blobs_to_download() and
|
||||||
|
self._should_send_request_to(peer)
|
||||||
|
)
|
||||||
|
|
||||||
def _download_succeeded(self, arg, peer, blob):
|
def _send_next_request(self, peer, protocol):
|
||||||
log.info("Blob %s has been successfully downloaded from %s", str(blob), str(peer))
|
log.debug('Sending a blob request for %s and %s', peer, protocol)
|
||||||
self._update_local_score(peer, 5.0)
|
availability = AvailabilityRequest(self, peer, protocol)
|
||||||
peer.update_stats('blobs_downloaded', 1)
|
download = DownloadRequest(self, peer, protocol, self.wallet, self.payment_rate_manager)
|
||||||
peer.update_score(5.0)
|
price = PriceRequest(self, peer, protocol)
|
||||||
self.blob_manager.blob_completed(blob)
|
|
||||||
return arg
|
|
||||||
|
|
||||||
def _download_failed(self, reason, peer):
|
sent_request = False
|
||||||
if not reason.check(DownloadCanceledError, PriceDisagreementError):
|
if availability.can_make_request():
|
||||||
self._update_local_score(peer, -10.0)
|
availability.make_request_and_handle_response()
|
||||||
return reason
|
sent_request = True
|
||||||
|
|
||||||
def _pay_or_cancel_payment(self, arg, protocol, reserved_points, blob):
|
if download.can_make_request():
|
||||||
if blob.length != 0 and (not isinstance(arg, Failure) or arg.check(DownloadCanceledError)):
|
try:
|
||||||
self._pay_peer(protocol, blob.length, reserved_points)
|
download.make_request_and_handle_response()
|
||||||
else:
|
sent_request = True
|
||||||
self._cancel_points(reserved_points)
|
except InsufficientFundsError as err:
|
||||||
return arg
|
return defer.fail(err)
|
||||||
|
if sent_request and price.can_make_request():
|
||||||
def _handle_download_error(self, err, peer, blob_to_download):
|
# TODO: document why a PriceRequest is only made if an
|
||||||
if not err.check(DownloadCanceledError, PriceDisagreementError, RequestCanceledError):
|
# Availability or Download request was made
|
||||||
log.warning("An error occurred while downloading %s from %s. Error: %s",
|
price.make_request_and_handle_response()
|
||||||
blob_to_download.blob_hash, str(peer), err.getTraceback())
|
return defer.succeed(sent_request)
|
||||||
if err.check(PriceDisagreementError):
|
|
||||||
# Don't kill the whole connection just because a price couldn't be agreed upon.
|
|
||||||
# Other information might be desired by other request creators at a better rate.
|
|
||||||
return True
|
|
||||||
return err
|
|
||||||
|
|
||||||
def _get_hash_for_peer_search(self):
|
def _get_hash_for_peer_search(self):
|
||||||
r = None
|
r = None
|
||||||
|
@ -178,96 +146,71 @@ class BlobRequester(object):
|
||||||
return sorted(needed_blobs, key=lambda b: b.is_downloading())
|
return sorted(needed_blobs, key=lambda b: b.is_downloading())
|
||||||
|
|
||||||
def _blobs_without_sources(self):
|
def _blobs_without_sources(self):
|
||||||
return [b for b in self.download_manager.needed_blobs() if not self._hash_available(b.blob_hash)]
|
return [
|
||||||
|
b for b in self.download_manager.needed_blobs()
|
||||||
def _get_availability_request(self, peer):
|
if not self._hash_available(b.blob_hash)
|
||||||
all_needed = [b.blob_hash for b in self._blobs_to_download() if not b.blob_hash in self._available_blobs[peer]]
|
]
|
||||||
# sort them so that the peer will be asked first for blobs it hasn't said it doesn't have
|
|
||||||
to_request = sorted(all_needed, key=lambda b: b in self._unavailable_blobs[peer])[:20]
|
|
||||||
if to_request:
|
|
||||||
r_dict = {'requested_blobs': to_request}
|
|
||||||
response_identifier = 'available_blobs'
|
|
||||||
request = ClientRequest(r_dict, response_identifier)
|
|
||||||
return request
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _get_download_request(self, peer):
|
|
||||||
request = None
|
|
||||||
to_download = [b for b in self._blobs_to_download() if self._hash_available_on(b.blob_hash, peer)]
|
|
||||||
while to_download and request is None:
|
|
||||||
blob_to_download = to_download[0]
|
|
||||||
to_download = to_download[1:]
|
|
||||||
if not blob_to_download.is_validated():
|
|
||||||
d, write_func, cancel_func = blob_to_download.open_for_writing(peer)
|
|
||||||
|
|
||||||
def counting_write_func(data):
|
|
||||||
peer.update_stats('blob_bytes_downloaded', len(data))
|
|
||||||
return write_func(data)
|
|
||||||
|
|
||||||
if d is not None:
|
|
||||||
|
|
||||||
request_dict = {'requested_blob': blob_to_download.blob_hash}
|
|
||||||
response_identifier = 'incoming_blob'
|
|
||||||
|
|
||||||
request = ClientBlobRequest(request_dict, response_identifier, counting_write_func, d,
|
|
||||||
cancel_func, blob_to_download)
|
|
||||||
|
|
||||||
log.info("Requesting blob %s from %s", str(blob_to_download), str(peer))
|
|
||||||
return request
|
|
||||||
|
|
||||||
def _price_settled(self, protocol):
|
def _price_settled(self, protocol):
|
||||||
if protocol in self._protocol_prices:
|
if protocol in self._protocol_prices:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _get_price_request(self, peer, protocol):
|
|
||||||
request = None
|
|
||||||
if not protocol in self._protocol_prices:
|
|
||||||
self._protocol_prices[protocol] = self.payment_rate_manager.get_rate_blob_data(peer)
|
|
||||||
request_dict = {'blob_data_payment_rate': self._protocol_prices[protocol]}
|
|
||||||
request = ClientRequest(request_dict, 'blob_data_payment_rate')
|
|
||||||
return request
|
|
||||||
|
|
||||||
def _update_local_score(self, peer, amount):
|
def _update_local_score(self, peer, amount):
|
||||||
self._peers[peer] += amount
|
self._peers[peer] += amount
|
||||||
|
|
||||||
def _reserve_points(self, peer, protocol, max_bytes):
|
|
||||||
assert protocol in self._protocol_prices
|
|
||||||
points_to_reserve = 1.0 * max_bytes * self._protocol_prices[protocol] / 2**20
|
|
||||||
return self.wallet.reserve_points(peer, points_to_reserve)
|
|
||||||
|
|
||||||
def _pay_peer(self, protocol, num_bytes, reserved_points):
|
class RequestHelper(object):
|
||||||
assert num_bytes != 0
|
def __init__(self, requestor, peer, protocol):
|
||||||
assert protocol in self._protocol_prices
|
self.requestor = requestor
|
||||||
point_amount = 1.0 * num_bytes * self._protocol_prices[protocol] / 2**20
|
self.peer = peer
|
||||||
self.wallet.send_points(reserved_points, point_amount)
|
self.protocol = protocol
|
||||||
self.payment_rate_manager.record_points_paid(point_amount)
|
|
||||||
|
|
||||||
def _cancel_points(self, reserved_points):
|
@property
|
||||||
self.wallet.cancel_point_reservation(reserved_points)
|
def protocol_prices(self):
|
||||||
|
return self.requestor._protocol_prices
|
||||||
|
|
||||||
def _handle_availability(self, response_dict, peer, request):
|
@property
|
||||||
if not request.response_identifier in response_dict:
|
def available_blobs(self):
|
||||||
raise InvalidResponseError("response identifier not in response")
|
return self.requestor._available_blobs[self.peer]
|
||||||
log.debug("Received a response to the availability request")
|
|
||||||
blob_hashes = response_dict[request.response_identifier]
|
|
||||||
for blob_hash in blob_hashes:
|
|
||||||
if blob_hash in request.request_dict['requested_blobs']:
|
|
||||||
log.debug("The server has indicated it has the following blob available: %s", blob_hash)
|
|
||||||
self._available_blobs[peer].append(blob_hash)
|
|
||||||
if blob_hash in self._unavailable_blobs[peer]:
|
|
||||||
self._unavailable_blobs[peer].remove(blob_hash)
|
|
||||||
request.request_dict['requested_blobs'].remove(blob_hash)
|
|
||||||
for blob_hash in request.request_dict['requested_blobs']:
|
|
||||||
self._unavailable_blobs[peer].append(blob_hash)
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _handle_incoming_blob(self, response_dict, peer, request):
|
@property
|
||||||
|
def unavailable_blobs(self):
|
||||||
|
return self.requestor._unavailable_blobs[self.peer]
|
||||||
|
|
||||||
|
def update_local_score(self, score):
|
||||||
|
self.requestor._update_local_score(self.peer, score)
|
||||||
|
|
||||||
|
def _request_failed(self, reason, request_type):
|
||||||
|
if reason.check(RequestCanceledError):
|
||||||
|
return
|
||||||
|
if reason.check(NoResponseError):
|
||||||
|
self.requestor._incompatible_peers.append(self.peer)
|
||||||
|
log.warning("A request of type '%s' failed. Reason: %s, Error type: %s",
|
||||||
|
request_type, reason.getErrorMessage(), reason.type)
|
||||||
|
self.update_local_score(-10.0)
|
||||||
|
if isinstance(reason, InvalidResponseError) or isinstance(reason, NoResponseError):
|
||||||
|
self.peer.update_score(-10.0)
|
||||||
|
else:
|
||||||
|
self.peer.update_score(-2.0)
|
||||||
|
if reason.check(ConnectionClosedBeforeResponseError):
|
||||||
|
return
|
||||||
|
return reason
|
||||||
|
|
||||||
|
def get_and_save_rate_for_protocol(self):
|
||||||
|
rate = self.protocol_prices.get(self.protocol)
|
||||||
|
if rate is None:
|
||||||
|
rate = self.requestor.payment_rate_manager.get_rate_blob_data(self.peer)
|
||||||
|
self.protocol_prices[self.protocol] = rate
|
||||||
|
return rate
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_incoming_blob(response_dict, peer, request):
|
||||||
if not request.response_identifier in response_dict:
|
if not request.response_identifier in response_dict:
|
||||||
return InvalidResponseError("response identifier not in response")
|
return InvalidResponseError("response identifier not in response")
|
||||||
if not type(response_dict[request.response_identifier]) == dict:
|
if not type(response_dict[request.response_identifier]) == dict:
|
||||||
return InvalidResponseError("response not a dict. got %s" %
|
return InvalidResponseError("response not a dict. got %s" %
|
||||||
(type(response_dict[request.response_identifier]),))
|
type(response_dict[request.response_identifier]))
|
||||||
response = response_dict[request.response_identifier]
|
response = response_dict[request.response_identifier]
|
||||||
if 'error' in response:
|
if 'error' in response:
|
||||||
# This means we're not getting our blob for some reason
|
# This means we're not getting our blob for some reason
|
||||||
|
@ -282,38 +225,295 @@ class BlobRequester(object):
|
||||||
if not 'blob_hash' in response:
|
if not 'blob_hash' in response:
|
||||||
return InvalidResponseError("Missing the required field 'blob_hash'")
|
return InvalidResponseError("Missing the required field 'blob_hash'")
|
||||||
if not response['blob_hash'] == request.request_dict['requested_blob']:
|
if not response['blob_hash'] == request.request_dict['requested_blob']:
|
||||||
return InvalidResponseError("Incoming blob does not match expected. Incoming: %s. Expected: %s" %
|
return InvalidResponseError(
|
||||||
(response['blob_hash'], request.request_dict['requested_blob']))
|
"Incoming blob does not match expected. Incoming: %s. Expected: %s" %
|
||||||
|
(response['blob_hash'], request.request_dict['requested_blob'])
|
||||||
|
)
|
||||||
if not 'length' in response:
|
if not 'length' in response:
|
||||||
return InvalidResponseError("Missing the required field 'length'")
|
return InvalidResponseError("Missing the required field 'length'")
|
||||||
if not request.blob.set_length(response['length']):
|
if not request.blob.set_length(response['length']):
|
||||||
return InvalidResponseError("Could not set the length of the blob")
|
return InvalidResponseError("Could not set the length of the blob")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _handle_price_response(self, response_dict, peer, request, protocol):
|
|
||||||
if not request.response_identifier in response_dict:
|
def _handle_download_error(err, peer, blob_to_download):
|
||||||
|
if not err.check(DownloadCanceledError, PriceDisagreementError, RequestCanceledError):
|
||||||
|
log.warning("An error occurred while downloading %s from %s. Error: %s",
|
||||||
|
blob_to_download.blob_hash, str(peer), err.getTraceback())
|
||||||
|
if err.check(PriceDisagreementError):
|
||||||
|
# Don't kill the whole connection just because a price couldn't be agreed upon.
|
||||||
|
# Other information might be desired by other request creators at a better rate.
|
||||||
|
return True
|
||||||
|
return err
|
||||||
|
|
||||||
|
|
||||||
|
class AvailabilityRequest(RequestHelper):
|
||||||
|
"""Ask a peer what blobs it has available.
|
||||||
|
|
||||||
|
Results are saved in `_available_blobs` and `_unavailable_blobs`
|
||||||
|
on the parent BlobRequester.
|
||||||
|
"""
|
||||||
|
def can_make_request(self):
|
||||||
|
return self.get_top_needed_blobs()
|
||||||
|
|
||||||
|
def make_request_and_handle_response(self):
|
||||||
|
request = self._get_request()
|
||||||
|
self._handle_request(request)
|
||||||
|
|
||||||
|
def _get_request(self):
|
||||||
|
to_request = self.get_top_needed_blobs()
|
||||||
|
if not to_request:
|
||||||
|
raise Exception('Unable to make a request without available blobs')
|
||||||
|
return self._make_request(to_request)
|
||||||
|
|
||||||
|
@cache
|
||||||
|
def get_top_needed_blobs(self, limit=20):
|
||||||
|
all_needed = [
|
||||||
|
b.blob_hash for b in self.requestor._blobs_to_download()
|
||||||
|
if not self.is_available(b)
|
||||||
|
]
|
||||||
|
# sort them so that the peer will be asked first for blobs it
|
||||||
|
# hasn't said it doesn't have
|
||||||
|
sorted_needed = sorted(
|
||||||
|
all_needed,
|
||||||
|
key=lambda b: b in self.unavailable_blobs
|
||||||
|
)
|
||||||
|
return sorted_needed[:limit]
|
||||||
|
|
||||||
|
def is_available(self, blob):
|
||||||
|
return blob.blob_hash in self.available_blobs
|
||||||
|
|
||||||
|
def _make_request(self, to_request):
|
||||||
|
log.debug('Requesting blobs: %s', to_request)
|
||||||
|
r_dict = {'requested_blobs': to_request}
|
||||||
|
response_identifier = 'available_blobs'
|
||||||
|
request = ClientRequest(r_dict, response_identifier)
|
||||||
|
return request
|
||||||
|
|
||||||
|
def _handle_request(self, a_r):
|
||||||
|
log.debug('making an availability request')
|
||||||
|
d1 = self.protocol.add_request(a_r)
|
||||||
|
d1.addCallback(self._handle_availability, a_r)
|
||||||
|
d1.addErrback(self._request_failed, "availability request")
|
||||||
|
|
||||||
|
def _handle_availability(self, response_dict, request):
|
||||||
|
assert request.response_identifier == 'available_blobs'
|
||||||
|
if 'available_blobs' not in response_dict:
|
||||||
|
raise InvalidResponseError("response identifier not in response")
|
||||||
|
log.debug("Received a response to the availability request")
|
||||||
|
# save available blobs
|
||||||
|
blob_hashes = response_dict['available_blobs']
|
||||||
|
for blob_hash in blob_hashes:
|
||||||
|
if blob_hash in request.request_dict['requested_blobs']:
|
||||||
|
self.process_available_blob_hash(blob_hash, request)
|
||||||
|
# everything left in the request is missing
|
||||||
|
for blob_hash in request.request_dict['requested_blobs']:
|
||||||
|
self.unavailable_blobs.append(blob_hash)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def process_available_blob_hash(self, blob_hash, request):
|
||||||
|
log.debug("The server has indicated it has the following blob available: %s", blob_hash)
|
||||||
|
self.available_blobs.append(blob_hash)
|
||||||
|
self.remove_from_unavailable_blobs(blob_hash)
|
||||||
|
request.request_dict['requested_blobs'].remove(blob_hash)
|
||||||
|
|
||||||
|
def remove_from_unavailable_blobs(self, blob_hash):
|
||||||
|
if blob_hash in self.unavailable_blobs:
|
||||||
|
self.unavailable_blobs.remove(blob_hash)
|
||||||
|
|
||||||
|
|
||||||
|
class DownloadRequest(RequestHelper):
|
||||||
|
"""Choose a blob and download it from a peer and also pay the peer for the data."""
|
||||||
|
def __init__(self, requestor, peer, protocol, wallet, payment_rate_manager):
|
||||||
|
RequestHelper.__init__(self, requestor, peer, protocol)
|
||||||
|
self.wallet = wallet
|
||||||
|
self.payment_rate_manager = payment_rate_manager
|
||||||
|
|
||||||
|
def can_make_request(self):
|
||||||
|
return self.get_blob_details()
|
||||||
|
|
||||||
|
def make_request_and_handle_response(self):
|
||||||
|
request = self._get_request()
|
||||||
|
self._handle_download_request(request)
|
||||||
|
|
||||||
|
def _get_request(self):
|
||||||
|
blob_details = self.get_blob_details()
|
||||||
|
if not blob_details:
|
||||||
|
raise Exception('No blobs available to download')
|
||||||
|
return self._make_request(blob_details)
|
||||||
|
|
||||||
|
@cache
|
||||||
|
def get_blob_details(self):
|
||||||
|
"""Open a blob for writing and return the details.
|
||||||
|
|
||||||
|
If no blob can be opened, returns None.
|
||||||
|
"""
|
||||||
|
to_download = self.get_available_blobs()
|
||||||
|
return self.find_blob(to_download)
|
||||||
|
|
||||||
|
def get_available_blobs(self):
|
||||||
|
available_blobs = [
|
||||||
|
b for b in self.requestor._blobs_to_download()
|
||||||
|
if self.requestor._hash_available_on(b.blob_hash, self.peer)
|
||||||
|
]
|
||||||
|
log.debug('available blobs: %s', available_blobs)
|
||||||
|
return available_blobs
|
||||||
|
|
||||||
|
def find_blob(self, to_download):
|
||||||
|
"""Return the first blob in `to_download` that is successfully opened for write."""
|
||||||
|
for blob in to_download:
|
||||||
|
if blob.is_validated():
|
||||||
|
log.debug('Skipping blob %s as its already validated', blob)
|
||||||
|
continue
|
||||||
|
d, write_func, cancel_func = blob.open_for_writing(self.peer)
|
||||||
|
if d is not None:
|
||||||
|
return BlobDownloadDetails(blob, d, write_func, cancel_func, self.peer)
|
||||||
|
log.debug('Skipping blob %s as there was an issue opening it for writing', blob)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _make_request(self, blob_details):
|
||||||
|
blob = blob_details.blob
|
||||||
|
request = ClientBlobRequest(
|
||||||
|
{'requested_blob': blob.blob_hash},
|
||||||
|
'incoming_blob',
|
||||||
|
blob_details.counting_write_func,
|
||||||
|
blob_details.deferred,
|
||||||
|
blob_details.cancel_func,
|
||||||
|
blob
|
||||||
|
)
|
||||||
|
log.info("Requesting blob %s from %s", blob.blob_hash, self.peer)
|
||||||
|
return request
|
||||||
|
|
||||||
|
def _handle_download_request(self, client_blob_request):
|
||||||
|
reserved_points = self.reserve_funds_or_cancel(client_blob_request)
|
||||||
|
self.add_callbacks_to_download_request(client_blob_request, reserved_points)
|
||||||
|
self.create_add_blob_request(client_blob_request)
|
||||||
|
|
||||||
|
def reserve_funds_or_cancel(self, client_blob_request):
|
||||||
|
reserved_points = self._reserve_points(client_blob_request.max_pay_units)
|
||||||
|
if reserved_points is not None:
|
||||||
|
return reserved_points
|
||||||
|
client_blob_request.cancel(InsufficientFundsError())
|
||||||
|
client_blob_request.finished_deferred.addErrback(lambda _: True)
|
||||||
|
raise InsufficientFundsError()
|
||||||
|
|
||||||
|
def add_callbacks_to_download_request(self, client_blob_request, reserved_points):
|
||||||
|
# Note: The following three callbacks will be called when the blob has been
|
||||||
|
# fully downloaded or canceled
|
||||||
|
client_blob_request.finished_deferred.addCallbacks(
|
||||||
|
self._download_succeeded,
|
||||||
|
self._download_failed,
|
||||||
|
callbackArgs=(client_blob_request.blob,),
|
||||||
|
)
|
||||||
|
client_blob_request.finished_deferred.addBoth(
|
||||||
|
self._pay_or_cancel_payment, reserved_points, client_blob_request.blob)
|
||||||
|
client_blob_request.finished_deferred.addErrback(
|
||||||
|
_handle_download_error, self.peer, client_blob_request.blob)
|
||||||
|
|
||||||
|
def _pay_or_cancel_payment(self, arg, reserved_points, blob):
|
||||||
|
if self._can_pay_peer(blob, arg):
|
||||||
|
self._pay_peer(blob.length, reserved_points)
|
||||||
|
else:
|
||||||
|
self._cancel_points(reserved_points)
|
||||||
|
return arg
|
||||||
|
|
||||||
|
def _can_pay_peer(self, blob, arg):
|
||||||
|
return (
|
||||||
|
blob.length != 0 and
|
||||||
|
(not isinstance(arg, Failure) or arg.check(DownloadCanceledError))
|
||||||
|
)
|
||||||
|
|
||||||
|
def _pay_peer(self, num_bytes, reserved_points):
|
||||||
|
assert num_bytes != 0
|
||||||
|
rate = self.get_and_save_rate_for_protocol()
|
||||||
|
point_amount = get_points(num_bytes, rate)
|
||||||
|
self.wallet.send_points(reserved_points, point_amount)
|
||||||
|
self.payment_rate_manager.record_points_paid(point_amount)
|
||||||
|
|
||||||
|
def _cancel_points(self, reserved_points):
|
||||||
|
self.wallet.cancel_point_reservation(reserved_points)
|
||||||
|
|
||||||
|
def create_add_blob_request(self, client_blob_request):
|
||||||
|
d = self.protocol.add_blob_request(client_blob_request)
|
||||||
|
# Note: The following two callbacks will be called as soon as the peer sends its
|
||||||
|
# response, which will be before the blob has finished downloading, but may be
|
||||||
|
# after the blob has been canceled. For example,
|
||||||
|
# 1) client sends request to Peer A
|
||||||
|
# 2) the blob is finished downloading from peer B, and therefore this one is canceled
|
||||||
|
# 3) client receives response from Peer A
|
||||||
|
# Therefore, these callbacks shouldn't rely on there being a blob about to be
|
||||||
|
# downloaded.
|
||||||
|
d.addCallback(_handle_incoming_blob, self.peer, client_blob_request)
|
||||||
|
d.addErrback(self._request_failed, "download request")
|
||||||
|
|
||||||
|
def _reserve_points(self, num_bytes):
|
||||||
|
# jobevers: there was an assertion here, but I don't think it
|
||||||
|
# was a valid assertion to make. It is possible for a rate to
|
||||||
|
# not yet been set for this protocol or for it to have been
|
||||||
|
# removed so instead I switched it to check if a rate has been set
|
||||||
|
# and calculate it if it has not
|
||||||
|
rate = self.get_and_save_rate_for_protocol()
|
||||||
|
points_to_reserve = get_points(num_bytes, rate)
|
||||||
|
return self.wallet.reserve_points(self.peer, points_to_reserve)
|
||||||
|
|
||||||
|
def _download_succeeded(self, arg, blob):
|
||||||
|
log.info("Blob %s has been successfully downloaded from %s", blob, self.peer)
|
||||||
|
self.update_local_score(5.0)
|
||||||
|
self.peer.update_stats('blobs_downloaded', 1)
|
||||||
|
self.peer.update_score(5.0)
|
||||||
|
self.requestor.blob_manager.blob_completed(blob)
|
||||||
|
return arg
|
||||||
|
|
||||||
|
def _download_failed(self, reason):
|
||||||
|
if not reason.check(DownloadCanceledError, PriceDisagreementError):
|
||||||
|
self.update_local_score(-10.0)
|
||||||
|
return reason
|
||||||
|
|
||||||
|
|
||||||
|
class BlobDownloadDetails(object):
|
||||||
|
"""Contains the information needed to make a ClientBlobRequest from an open blob"""
|
||||||
|
def __init__(self, blob, deferred, write_func, cancel_func, peer):
|
||||||
|
self.blob = blob
|
||||||
|
self.deferred = deferred
|
||||||
|
self.write_func = write_func
|
||||||
|
self.cancel_func = cancel_func
|
||||||
|
self.peer = peer
|
||||||
|
|
||||||
|
def counting_write_func(self, data):
|
||||||
|
self.peer.update_stats('blob_bytes_downloaded', len(data))
|
||||||
|
return self.write_func(data)
|
||||||
|
|
||||||
|
|
||||||
|
class PriceRequest(RequestHelper):
|
||||||
|
"""Ask a peer if a certain price is acceptable"""
|
||||||
|
def can_make_request(self):
|
||||||
|
return self.get_and_save_rate_for_protocol() is not None
|
||||||
|
|
||||||
|
def make_request_and_handle_response(self):
|
||||||
|
request = self._get_price_request()
|
||||||
|
self._handle_price_request(request)
|
||||||
|
|
||||||
|
def _get_price_request(self):
|
||||||
|
rate = self.get_and_save_rate_for_protocol()
|
||||||
|
if rate is None:
|
||||||
|
raise Exception('Cannot make a price request without a payment rate')
|
||||||
|
request_dict = {'blob_data_payment_rate': rate}
|
||||||
|
return ClientRequest(request_dict, 'blob_data_payment_rate')
|
||||||
|
|
||||||
|
def _handle_price_request(self, price_request):
|
||||||
|
d = self.protocol.add_request(price_request)
|
||||||
|
d.addCallback(self._handle_price_response, price_request)
|
||||||
|
d.addErrback(self._request_failed, "price request")
|
||||||
|
|
||||||
|
def _handle_price_response(self, response_dict, request):
|
||||||
|
assert request.response_identifier == 'blob_data_payment_rate'
|
||||||
|
if 'blob_data_payment_rate' not in response_dict:
|
||||||
return InvalidResponseError("response identifier not in response")
|
return InvalidResponseError("response identifier not in response")
|
||||||
assert protocol in self._protocol_prices
|
assert self.protocol in self.protocol_prices
|
||||||
response = response_dict[request.response_identifier]
|
response = response_dict['blob_data_payment_rate']
|
||||||
if response == "RATE_ACCEPTED":
|
if response == "RATE_ACCEPTED":
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
del self._protocol_prices[protocol]
|
del self.protocol_prices[self.protocol]
|
||||||
self._price_disagreements.append(peer)
|
self.requestor._price_disagreements.append(self.peer)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _request_failed(self, reason, request_type, peer):
|
|
||||||
if reason.check(RequestCanceledError):
|
|
||||||
return
|
|
||||||
if reason.check(NoResponseError):
|
|
||||||
self._incompatible_peers.append(peer)
|
|
||||||
log.warning("Blob requester: a request of type '%s' failed. Reason: %s, Error type: %s",
|
|
||||||
str(request_type), reason.getErrorMessage(), reason.type)
|
|
||||||
self._update_local_score(peer, -10.0)
|
|
||||||
if isinstance(reason, InvalidResponseError) or isinstance(reason, NoResponseError):
|
|
||||||
peer.update_score(-10.0)
|
|
||||||
else:
|
|
||||||
peer.update_score(-2.0)
|
|
||||||
if reason.check(ConnectionClosedBeforeResponseError):
|
|
||||||
return
|
|
||||||
return reason
|
|
|
@ -49,8 +49,8 @@ class ConnectionManager(object):
|
||||||
for peer in self._peer_connections.keys():
|
for peer in self._peer_connections.keys():
|
||||||
|
|
||||||
def close_connection(p):
|
def close_connection(p):
|
||||||
log.info("Abruptly closing a connection to %s due to downloading being paused",
|
log.info(
|
||||||
str(p))
|
"Abruptly closing a connection to %s due to downloading being paused", p)
|
||||||
|
|
||||||
if self._peer_connections[p].factory.p is not None:
|
if self._peer_connections[p].factory.p is not None:
|
||||||
d = self._peer_connections[p].factory.p.cancel_requests()
|
d = self._peer_connections[p].factory.p.cancel_requests()
|
||||||
|
@ -73,7 +73,7 @@ class ConnectionManager(object):
|
||||||
|
|
||||||
def get_next_request(self, peer, protocol):
|
def get_next_request(self, peer, protocol):
|
||||||
|
|
||||||
log.debug("Trying to get the next request for peer %s", str(peer))
|
log.debug("Trying to get the next request for peer %s", peer)
|
||||||
|
|
||||||
if not peer in self._peer_connections or self.stopped is True:
|
if not peer in self._peer_connections or self.stopped is True:
|
||||||
log.debug("The peer has already been told to shut down.")
|
log.debug("The peer has already been told to shut down.")
|
||||||
|
@ -148,7 +148,7 @@ class ConnectionManager(object):
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
|
|
||||||
if peer is not None and self.stopped is False:
|
if peer is not None and self.stopped is False:
|
||||||
log.debug("Trying to connect to %s", str(peer))
|
log.debug("Trying to connect to %s", peer)
|
||||||
factory = ClientProtocolFactory(peer, self.rate_limiter, self)
|
factory = ClientProtocolFactory(peer, self.rate_limiter, self)
|
||||||
self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],
|
self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],
|
||||||
factory)
|
factory)
|
||||||
|
@ -162,7 +162,7 @@ class ConnectionManager(object):
|
||||||
def get_new_peers(request_creators):
|
def get_new_peers(request_creators):
|
||||||
log.debug("Trying to get a new peer to connect to")
|
log.debug("Trying to get a new peer to connect to")
|
||||||
if len(request_creators) > 0:
|
if len(request_creators) > 0:
|
||||||
log.debug("Got a creator to check: %s", str(request_creators[0]))
|
log.debug("Got a creator to check: %s", request_creators[0])
|
||||||
d = request_creators[0].get_new_peers()
|
d = request_creators[0].get_new_peers()
|
||||||
d.addCallback(lambda h: h if h is not None else get_new_peers(request_creators[1:]))
|
d.addCallback(lambda h: h if h is not None else get_new_peers(request_creators[1:]))
|
||||||
return d
|
return d
|
||||||
|
@ -173,12 +173,12 @@ class ConnectionManager(object):
|
||||||
# TODO: Eventually rank them based on past performance/reputation. For now
|
# TODO: Eventually rank them based on past performance/reputation. For now
|
||||||
# TODO: just pick the first to which we don't have an open connection
|
# TODO: just pick the first to which we don't have an open connection
|
||||||
|
|
||||||
log.debug("Got a list of peers to choose from: %s", str(peers))
|
log.debug("Got a list of peers to choose from: %s", peers)
|
||||||
if peers is None:
|
if peers is None:
|
||||||
return None
|
return None
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
if not peer in self._peer_connections:
|
if not peer in self._peer_connections:
|
||||||
log.debug("Got a good peer. Returning peer %s", str(peer))
|
log.debug("Got a good peer. Returning peer %s", peer)
|
||||||
return peer
|
return peer
|
||||||
log.debug("Couldn't find a good peer to connect to")
|
log.debug("Couldn't find a good peer to connect to")
|
||||||
return None
|
return None
|
||||||
|
|
|
@ -73,10 +73,9 @@ def _log_decorator(fn):
|
||||||
|
|
||||||
def disable_third_party_loggers():
|
def disable_third_party_loggers():
|
||||||
logging.getLogger('requests').setLevel(logging.WARNING)
|
logging.getLogger('requests').setLevel(logging.WARNING)
|
||||||
|
logging.getLogger('BitcoinRPC').setLevel(logging.INFO)
|
||||||
|
|
||||||
def disable_noisy_loggers():
|
def disable_noisy_loggers():
|
||||||
logging.getLogger('BitcoinRPC').setLevel(logging.INFO)
|
|
||||||
logging.getLogger('lbrynet.analytics.api').setLevel(logging.INFO)
|
logging.getLogger('lbrynet.analytics.api').setLevel(logging.INFO)
|
||||||
logging.getLogger('lbrynet.core.client').setLevel(logging.INFO)
|
logging.getLogger('lbrynet.core.client').setLevel(logging.INFO)
|
||||||
logging.getLogger('lbrynet.core.server').setLevel(logging.INFO)
|
logging.getLogger('lbrynet.core.server').setLevel(logging.INFO)
|
||||||
|
|
|
@ -73,6 +73,8 @@ def start():
|
||||||
parser.add_argument('--no-launch', dest='launchui', action="store_false")
|
parser.add_argument('--no-launch', dest='launchui', action="store_false")
|
||||||
parser.add_argument('--log-to-console', dest='logtoconsole', action="store_true")
|
parser.add_argument('--log-to-console', dest='logtoconsole', action="store_true")
|
||||||
parser.add_argument('--quiet', dest='quiet', action="store_true")
|
parser.add_argument('--quiet', dest='quiet', action="store_true")
|
||||||
|
parser.add_argument('--verbose', action='store_true',
|
||||||
|
help='enable more debug output for the console')
|
||||||
parser.set_defaults(branch=False, launchui=True, logtoconsole=False, quiet=False)
|
parser.set_defaults(branch=False, launchui=True, logtoconsole=False, quiet=False)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
@ -81,6 +83,7 @@ def start():
|
||||||
if args.logtoconsole:
|
if args.logtoconsole:
|
||||||
log_support.configure_console(level='DEBUG')
|
log_support.configure_console(level='DEBUG')
|
||||||
log_support.disable_third_party_loggers()
|
log_support.disable_third_party_loggers()
|
||||||
|
if not args.verbose:
|
||||||
log_support.disable_noisy_loggers()
|
log_support.disable_noisy_loggers()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Reference in a new issue