forked from LBRYCommunity/lbry-sdk
283 lines
8.9 KiB
Python
283 lines
8.9 KiB
Python
import typing
|
|
import json
|
|
import logging
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class BlobMessage:
|
|
key = ''
|
|
|
|
def to_dict(self) -> typing.Dict:
|
|
raise NotImplementedError()
|
|
|
|
|
|
class BlobPriceRequest(BlobMessage):
|
|
key = 'blob_data_payment_rate'
|
|
|
|
def __init__(self, blob_data_payment_rate: float, **kwargs) -> None:
|
|
self.blob_data_payment_rate = blob_data_payment_rate
|
|
|
|
def to_dict(self) -> typing.Dict:
|
|
return {
|
|
self.key: self.blob_data_payment_rate
|
|
}
|
|
|
|
|
|
class BlobPriceResponse(BlobMessage):
|
|
key = 'blob_data_payment_rate'
|
|
rate_accepted = 'RATE_ACCEPTED'
|
|
rate_too_low = 'RATE_TOO_LOW'
|
|
rate_unset = 'RATE_UNSET'
|
|
|
|
def __init__(self, blob_data_payment_rate: str, **kwargs) -> None:
|
|
if blob_data_payment_rate not in (self.rate_accepted, self.rate_too_low, self.rate_unset):
|
|
raise ValueError(blob_data_payment_rate)
|
|
self.blob_data_payment_rate = blob_data_payment_rate
|
|
|
|
def to_dict(self) -> typing.Dict:
|
|
return {
|
|
self.key: self.blob_data_payment_rate
|
|
}
|
|
|
|
|
|
class BlobAvailabilityRequest(BlobMessage):
|
|
key = 'requested_blobs'
|
|
|
|
def __init__(self, requested_blobs: typing.List[str], lbrycrd_address: typing.Optional[bool] = True,
|
|
**kwargs) -> None:
|
|
assert len(requested_blobs)
|
|
self.requested_blobs = requested_blobs
|
|
self.lbrycrd_address = lbrycrd_address
|
|
|
|
def to_dict(self) -> typing.Dict:
|
|
return {
|
|
self.key: self.requested_blobs,
|
|
'lbrycrd_address': self.lbrycrd_address
|
|
}
|
|
|
|
|
|
class BlobAvailabilityResponse(BlobMessage):
|
|
key = 'available_blobs'
|
|
|
|
def __init__(self, available_blobs: typing.List[str], lbrycrd_address: typing.Optional[str] = True,
|
|
**kwargs) -> None:
|
|
self.available_blobs = available_blobs
|
|
self.lbrycrd_address = lbrycrd_address
|
|
|
|
def to_dict(self) -> typing.Dict:
|
|
d = {
|
|
self.key: self.available_blobs
|
|
}
|
|
if self.lbrycrd_address:
|
|
d['lbrycrd_address'] = self.lbrycrd_address
|
|
return d
|
|
|
|
|
|
class BlobDownloadRequest(BlobMessage):
|
|
key = 'requested_blob'
|
|
|
|
def __init__(self, requested_blob: str, **kwargs) -> None:
|
|
self.requested_blob = requested_blob
|
|
|
|
def to_dict(self) -> typing.Dict:
|
|
return {
|
|
self.key: self.requested_blob
|
|
}
|
|
|
|
|
|
class BlobDownloadResponse(BlobMessage):
|
|
key = 'incoming_blob'
|
|
|
|
def __init__(self, **response: typing.Dict) -> None:
|
|
incoming_blob = response[self.key]
|
|
self.error = None
|
|
self.incoming_blob = None
|
|
if 'error' in incoming_blob:
|
|
self.error = incoming_blob['error']
|
|
else:
|
|
self.incoming_blob = {'blob_hash': incoming_blob['blob_hash'], 'length': incoming_blob['length']}
|
|
self.length = None if not self.incoming_blob else self.incoming_blob['length']
|
|
self.blob_hash = None if not self.incoming_blob else self.incoming_blob['blob_hash']
|
|
|
|
def to_dict(self) -> typing.Dict:
|
|
return {
|
|
self.key if not self.error else 'error': self.incoming_blob or self.error,
|
|
}
|
|
|
|
|
|
class BlobPaymentAddressRequest(BlobMessage):
|
|
key = 'lbrycrd_address'
|
|
|
|
def __init__(self, lbrycrd_address: str, **kwargs) -> None:
|
|
self.lbrycrd_address = lbrycrd_address
|
|
|
|
def to_dict(self) -> typing.Dict:
|
|
return {
|
|
self.key: self.lbrycrd_address
|
|
}
|
|
|
|
|
|
class BlobPaymentAddressResponse(BlobPaymentAddressRequest):
|
|
pass
|
|
|
|
|
|
class BlobErrorResponse(BlobMessage):
|
|
key = 'error'
|
|
|
|
def __init__(self, error: str, **kwargs) -> None:
|
|
self.error = error
|
|
|
|
def to_dict(self) -> typing.Dict:
|
|
return {
|
|
self.key: self.error
|
|
}
|
|
|
|
|
|
blob_request_types = typing.Union[BlobPriceRequest, BlobAvailabilityRequest, BlobDownloadRequest,
|
|
BlobPaymentAddressRequest]
|
|
blob_response_types = typing.Union[BlobPriceResponse, BlobAvailabilityResponse, BlobDownloadResponse,
|
|
BlobErrorResponse, BlobPaymentAddressResponse]
|
|
|
|
|
|
def _parse_blob_response(response_msg: bytes) -> typing.Tuple[typing.Optional[typing.Dict], bytes]:
|
|
# scenarios:
|
|
# <json>
|
|
# <blob bytes>
|
|
# <json><blob bytes>
|
|
|
|
curr_pos = 0
|
|
while True:
|
|
next_close_paren = response_msg.find(b'}', curr_pos)
|
|
if next_close_paren == -1:
|
|
return None, response_msg
|
|
curr_pos = next_close_paren + 1
|
|
try:
|
|
response = json.loads(response_msg[:curr_pos])
|
|
except ValueError:
|
|
continue
|
|
possible_response_keys = {
|
|
BlobPaymentAddressResponse.key,
|
|
BlobAvailabilityResponse.key,
|
|
BlobPriceResponse.key,
|
|
BlobDownloadResponse.key
|
|
}
|
|
if isinstance(response, dict) and response.keys():
|
|
if set(response.keys()).issubset(possible_response_keys):
|
|
return response, response_msg[curr_pos:]
|
|
return None, response_msg
|
|
|
|
|
|
class BlobRequest:
|
|
def __init__(self, requests: typing.List[blob_request_types]) -> None:
|
|
self.requests = requests
|
|
|
|
def to_dict(self):
|
|
d = {}
|
|
for request in self.requests:
|
|
d.update(request.to_dict())
|
|
return d
|
|
|
|
def _get_request(self, request_type: blob_request_types):
|
|
request = tuple(filter(lambda r: type(r) == request_type, self.requests))
|
|
if request:
|
|
return request[0]
|
|
|
|
def get_availability_request(self) -> typing.Optional[BlobAvailabilityRequest]:
|
|
response = self._get_request(BlobAvailabilityRequest)
|
|
if response:
|
|
return response
|
|
|
|
def get_price_request(self) -> typing.Optional[BlobPriceRequest]:
|
|
response = self._get_request(BlobPriceRequest)
|
|
if response:
|
|
return response
|
|
|
|
def get_blob_request(self) -> typing.Optional[BlobDownloadRequest]:
|
|
response = self._get_request(BlobDownloadRequest)
|
|
if response:
|
|
return response
|
|
|
|
def get_address_request(self) -> typing.Optional[BlobPaymentAddressRequest]:
|
|
response = self._get_request(BlobPaymentAddressRequest)
|
|
if response:
|
|
return response
|
|
|
|
def serialize(self) -> bytes:
|
|
return json.dumps(self.to_dict()).encode()
|
|
|
|
@classmethod
|
|
def deserialize(cls, data: bytes) -> 'BlobRequest':
|
|
request = json.loads(data)
|
|
return cls([
|
|
request_type(**request)
|
|
for request_type in (BlobPriceRequest, BlobAvailabilityRequest, BlobDownloadRequest,
|
|
BlobPaymentAddressRequest)
|
|
if request_type.key in request
|
|
])
|
|
|
|
@classmethod
|
|
def make_request_for_blob_hash(cls, blob_hash: str) -> 'BlobRequest':
|
|
return cls(
|
|
[BlobAvailabilityRequest([blob_hash]), BlobPriceRequest(0.0), BlobDownloadRequest(blob_hash)]
|
|
)
|
|
|
|
|
|
class BlobResponse:
|
|
def __init__(self, responses: typing.List[blob_response_types], blob_data: typing.Optional[bytes] = None) -> None:
|
|
self.responses = responses
|
|
self.blob_data = blob_data
|
|
|
|
def to_dict(self):
|
|
d = {}
|
|
for response in self.responses:
|
|
d.update(response.to_dict())
|
|
return d
|
|
|
|
def _get_response(self, response_type: blob_response_types):
|
|
response = tuple(filter(lambda r: type(r) == response_type, self.responses))
|
|
if response:
|
|
return response[0]
|
|
|
|
def get_error_response(self) -> typing.Optional[BlobErrorResponse]:
|
|
error = self._get_response(BlobErrorResponse)
|
|
if error:
|
|
log.error(error)
|
|
return error
|
|
|
|
def get_availability_response(self) -> typing.Optional[BlobAvailabilityResponse]:
|
|
response = self._get_response(BlobAvailabilityResponse)
|
|
if response:
|
|
return response
|
|
|
|
def get_price_response(self) -> typing.Optional[BlobPriceResponse]:
|
|
response = self._get_response(BlobPriceResponse)
|
|
if response:
|
|
return response
|
|
|
|
def get_blob_response(self) -> typing.Optional[BlobDownloadResponse]:
|
|
response = self._get_response(BlobDownloadResponse)
|
|
if response:
|
|
return response
|
|
|
|
def get_address_response(self) -> typing.Optional[BlobPaymentAddressResponse]:
|
|
response = self._get_response(BlobPaymentAddressResponse)
|
|
if response:
|
|
return response
|
|
|
|
def serialize(self) -> bytes:
|
|
return json.dumps(self.to_dict()).encode()
|
|
|
|
@classmethod
|
|
def deserialize(cls, data: bytes) -> 'BlobResponse':
|
|
response, extra = _parse_blob_response(data)
|
|
requests = []
|
|
if response:
|
|
requests.extend([
|
|
response_type(**response)
|
|
for response_type in (BlobPriceResponse, BlobAvailabilityResponse, BlobDownloadResponse,
|
|
BlobErrorResponse, BlobPaymentAddressResponse)
|
|
if response_type.key in response
|
|
])
|
|
return cls(requests, extra)
|
|
|