diff --git a/lbrynet/blob/client/CryptBlobHandler.py b/lbrynet/blob/client/CryptBlobHandler.py deleted file mode 100644 index 436901dcd..000000000 --- a/lbrynet/blob/client/CryptBlobHandler.py +++ /dev/null @@ -1,22 +0,0 @@ -import binascii -from twisted.internet import defer -from lbrynet.blob.CryptBlob import StreamBlobDecryptor - - -class CryptBlobHandler: - #implements(IBlobHandler) - - def __init__(self, key, write_func): - self.key = key - self.write_func = write_func - - ######## IBlobHandler ######### - - def handle_blob(self, blob, blob_info): - try: - blob_decryptor = StreamBlobDecryptor(blob, self.key, binascii.unhexlify(blob_info.iv), - blob_info.length) - except ValueError as err: - return defer.fail(err) - d = blob_decryptor.decrypt(self.write_func) - return d diff --git a/lbrynet/blob/client/__init__.py b/lbrynet/blob_exchange/__init__.py similarity index 100% rename from lbrynet/blob/client/__init__.py rename to lbrynet/blob_exchange/__init__.py diff --git a/lbrynet/blob_exchange/client.py b/lbrynet/blob_exchange/client.py new file mode 100644 index 000000000..dc458081c --- /dev/null +++ b/lbrynet/blob_exchange/client.py @@ -0,0 +1,168 @@ +import asyncio +import logging +import typing +from lbrynet.blob_exchange.serialization import BlobResponse, BlobRequest +if typing.TYPE_CHECKING: + from lbrynet.blob.blob_file import BlobFile + from lbrynet.blob.writer import HashBlobWriter + +log = logging.getLogger(__name__) + + +class BlobExchangeClientProtocol(asyncio.Protocol): + def __init__(self, loop: asyncio.BaseEventLoop, peer_timeout: typing.Optional[float] = 10): + self.loop = loop + self.peer_port: typing.Optional[int] = None + self.peer_address: typing.Optional[str] = None + self.peer_timeout = peer_timeout + self.transport: asyncio.Transport = None + + self.writer: 'HashBlobWriter' = None + self.blob: 'BlobFile' = None + self.download_running = asyncio.Event(loop=self.loop) + + self._blob_bytes_received = 0 + self._response_fut: asyncio.Future = None + self._request_lock = asyncio.Lock(loop=self.loop) + + def handle_data_received(self, data: bytes): + if self.transport.is_closing(): + if self._response_fut and not (self._response_fut.done() or self._response_fut.cancelled()): + self._response_fut.cancel() + return + + response = BlobResponse.deserialize(data) + + if response.responses and self.blob: + blob_response = response.get_blob_response() + if blob_response and not blob_response.error and blob_response.blob_hash == self.blob.blob_hash: + self.blob.set_length(blob_response.length) + elif blob_response and not blob_response.error and self.blob.blob_hash != blob_response.blob_hash: + log.warning("mismatch with self.blob %s", self.blob.blob_hash) + return + if response.responses: + self._response_fut.set_result(response) + if response.blob_data and self.writer and not self.writer.closed(): + self._blob_bytes_received += len(response.blob_data) + try: + self.writer.write(response.blob_data) + except IOError as err: + log.error("error downloading blob: %s", err) + + def data_received(self, data): + try: + return self.handle_data_received(data) + except (asyncio.CancelledError, asyncio.TimeoutError) as err: + if self._response_fut and not self._response_fut.done(): + self._response_fut.set_exception(err) + + async def _download_blob(self) -> typing.Tuple[bool, bool]: + request = BlobRequest.make_request_for_blob_hash(self.blob.blob_hash) + try: + self.transport.write(request.serialize()) + response: BlobResponse = await asyncio.wait_for(self._response_fut, self.peer_timeout, loop=self.loop) + availability_response = response.get_availability_response() + price_response = response.get_price_response() + blob_response = response.get_blob_response() + if (not blob_response or blob_response.error) and\ + (not availability_response or not availability_response.available_blobs): + log.warning("blob not in availability response") + return False, True + elif availability_response.available_blobs and \ + availability_response.available_blobs != [self.blob.blob_hash]: + log.warning("blob availability response doesn't match our request") + return False, False + if not price_response or price_response.blob_data_payment_rate != 'RATE_ACCEPTED': + log.warning("data rate rejected") + return False, False + if not blob_response or blob_response.error: + log.warning("blob cant be downloaded from this peer") + return False, True + if not blob_response.error and blob_response.blob_hash != self.blob.blob_hash: + log.warning("incoming blob hash mismatch") + return False, False + if self.blob.length is not None and self.blob.length != blob_response.length: + log.warning("incoming blob unexpected length") + return False, False + msg = f"downloading {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}," \ + f" timeout in {self.peer_timeout}" + log.info(msg) + await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop) + await self.blob.finished_writing.wait() + msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}" + log.info(msg) + return True, True + except asyncio.CancelledError: + return False, True + except asyncio.TimeoutError: + return False, False + finally: + await self.close() + + async def close(self): + if self._response_fut and not self._response_fut.done(): + self._response_fut.cancel() + if self.writer and not self.writer.closed(): + self.writer.close_handle() + if self.blob: + await self.blob.close() + self.download_running.clear() + self._response_fut = None + self.writer = None + self.blob = None + if self.transport: + self.transport.close() + self.transport = None + + async def download_blob(self, blob: 'BlobFile') -> typing.Tuple[bool, bool]: + if blob.get_is_verified(): + return False, True + async with self._request_lock: + try: + if self.download_running.is_set(): + log.info("wait for download already running") + await self.download_running.wait() + self.blob, self.writer, self._blob_bytes_received = blob, blob.open_for_writing(), 0 + self.download_running.set() + self._response_fut = asyncio.Future(loop=self.loop) + return await self._download_blob() + except OSError: + log.error("race happened") + # i'm not sure how to fix this race condition - jack + return False, True + except asyncio.TimeoutError: + if self._response_fut and not self._response_fut.done(): + self._response_fut.cancel() + return False, False + except asyncio.CancelledError: + if self._response_fut and not self._response_fut.done(): + self._response_fut.cancel() + return False, True + + def connection_made(self, transport: asyncio.Transport): + self.transport = transport + self.peer_address, self.peer_port = self.transport.get_extra_info('peername') + # log.info("connection made to %s: %s", self.peer_address, transport) + + def connection_lost(self, reason): + # log.info("connection lost to %s (reason: %s)", self.peer_address, reason) + self.transport = None + self.loop.create_task(self.close()) + + +async def request_blob(loop: asyncio.BaseEventLoop, blob: 'BlobFile', protocol: 'BlobExchangeClientProtocol', + address: str, tcp_port: int, peer_connect_timeout: float) -> typing.Tuple[bool, bool]: + """ + Returns [, ] + """ + if blob.get_is_verified(): + return False, True + if blob.get_is_verified(): + log.info("already verified") + return False, True + try: + await asyncio.wait_for(loop.create_connection(lambda: protocol, address, tcp_port), + peer_connect_timeout, loop=loop) + return await protocol.download_blob(blob) + except (asyncio.TimeoutError, asyncio.CancelledError, ConnectionRefusedError, ConnectionAbortedError, OSError): + return False, False diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py new file mode 100644 index 000000000..53f458b5e --- /dev/null +++ b/lbrynet/blob_exchange/downloader.py @@ -0,0 +1,132 @@ +import asyncio +import typing +import logging +from lbrynet import conf +from lbrynet.utils import drain_tasks +from lbrynet.blob_exchange.client import BlobExchangeClientProtocol, request_blob +if typing.TYPE_CHECKING: + from lbrynet.dht.node import Node + from lbrynet.dht.peer import KademliaPeer + from lbrynet.blob.blob_manager import BlobFileManager + from lbrynet.blob.blob_file import BlobFile + +log = logging.getLogger(__name__) + + +def drain_into(a: list, b: list): + while a: + b.append(a.pop()) + + +class BlobDownloader: # TODO: refactor to be the base class used by StreamDownloader + """A single blob downloader""" + def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', config: conf.Config): + self.loop = loop + self.blob_manager = blob_manager + self.new_peer_event = asyncio.Event(loop=self.loop) + self.active_connections: typing.Dict['KademliaPeer', BlobExchangeClientProtocol] = {} + self.running_download_requests: typing.List[asyncio.Task] = [] + self.requested_from: typing.Dict[str, typing.Dict['KademliaPeer', asyncio.Task]] = {} + self.lock = asyncio.Lock(loop=self.loop) + self.blob: 'BlobFile' = None + self.blob_queue = asyncio.Queue(loop=self.loop) + + self.blob_download_timeout = config.get('blob_download_timeout') + self.peer_connect_timeout = config.get('peer_connect_timeout') + self.max_connections = config.get('max_connections_per_stream') + + async def _request_blob(self, peer: 'KademliaPeer'): + if self.blob.get_is_verified(): + log.info("already verified") + return + if peer not in self.active_connections: + log.warning("not active, adding: %s", str(peer)) + self.active_connections[peer] = BlobExchangeClientProtocol(self.loop, self.blob_download_timeout) + protocol = self.active_connections[peer] + success, keep_connection = await request_blob(self.loop, self.blob, protocol, peer.address, peer.tcp_port, + self.peer_connect_timeout) + await protocol.close() + if not keep_connection: + log.info("drop peer %s:%i", peer.address, peer.tcp_port) + if peer in self.active_connections: + async with self.lock: + del self.active_connections[peer] + return + log.info("keep peer %s:%i", peer.address, peer.tcp_port) + + def _update_requests(self): + self.new_peer_event.clear() + if self.blob.blob_hash not in self.requested_from: + self.requested_from[self.blob.blob_hash] = {} + to_add = [] + for peer in self.active_connections.keys(): + if peer not in self.requested_from[self.blob.blob_hash] and peer not in to_add: + to_add.append(peer) + if to_add or self.running_download_requests: + log.info("adding download probes for %i peers to %i already active", + min(len(to_add), 8 - len(self.running_download_requests)), + len(self.running_download_requests)) + else: + log.info("downloader idle...") + for peer in to_add: + if len(self.running_download_requests) >= 8: + break + task = self.loop.create_task(self._request_blob(peer)) + self.requested_from[self.blob.blob_hash][peer] = task + self.running_download_requests.append(task) + + def _add_peer_protocols(self, peers: typing.List['KademliaPeer']): + added = 0 + for peer in peers: + if peer not in self.active_connections: + self.active_connections[peer] = BlobExchangeClientProtocol(self.loop, self.blob_download_timeout) + added += 1 + if added: + if not self.new_peer_event.is_set(): + log.info("added %i new peers", len(peers)) + self.new_peer_event.set() + + async def _accumulate_connections(self, node: 'Node'): + try: + async with node.stream_peer_search_junction(self.blob_queue) as search_junction: + async for peers in search_junction: + if not isinstance(peers, list): # TODO: what's up with this? + log.error("not a list: %s %s", peers, str(type(peers))) + else: + self._add_peer_protocols(peers) + return + except asyncio.CancelledError: + pass + + async def get_blob(self, blob_hash: str, node: 'Node') -> 'BlobFile': + self.blob = self.blob_manager.get_blob(blob_hash) + if self.blob.get_is_verified(): + return self.blob + accumulator = self.loop.create_task(self._accumulate_connections(node)) + self.blob_queue.put_nowait(blob_hash) + try: + while not self.blob.get_is_verified(): + if len(self.running_download_requests) < self.max_connections: + self._update_requests() + + # drain the tasks into a temporary list + download_tasks = [] + drain_into(self.running_download_requests, download_tasks) + got_new_peer = self.loop.create_task(self.new_peer_event.wait()) + + # wait for a new peer to be added or for a download attempt to finish + await asyncio.wait([got_new_peer] + download_tasks, return_when='FIRST_COMPLETED', + loop=self.loop) + if got_new_peer and not got_new_peer.done(): + got_new_peer.cancel() + if self.blob.get_is_verified(): + if got_new_peer and not got_new_peer.done(): + got_new_peer.cancel() + drain_tasks(download_tasks) + return self.blob + except asyncio.CancelledError: + drain_tasks(self.running_download_requests) + raise + finally: + if accumulator and not accumulator.done(): + accumulator.cancel() diff --git a/lbrynet/blob_exchange/serialization.py b/lbrynet/blob_exchange/serialization.py new file mode 100644 index 000000000..0b80ea1f9 --- /dev/null +++ b/lbrynet/blob_exchange/serialization.py @@ -0,0 +1,289 @@ +import typing +import json +import logging + +log = logging.getLogger(__name__) + + +class BlobMessage: + key = '' + + def to_dict(self) -> typing.Dict: + raise NotImplementedError() + + +class BlobPriceRequest(BlobMessage): + key = 'blob_data_payment_rate' + + def __init__(self, blob_data_payment_rate: float, **kwargs) -> None: + self.blob_data_payment_rate = blob_data_payment_rate + + def to_dict(self) -> typing.Dict: + return { + self.key: self.blob_data_payment_rate + } + + +class BlobPriceResponse(BlobMessage): + key = 'blob_data_payment_rate' + rate_accepted = 'RATE_ACCEPTED' + rate_too_low = 'RATE_TOO_LOW' + rate_unset = 'RATE_UNSET' + + def __init__(self, blob_data_payment_rate: str, **kwargs) -> None: + if blob_data_payment_rate not in (self.rate_accepted, self.rate_too_low, self.rate_unset): + raise ValueError(blob_data_payment_rate) + self.blob_data_payment_rate = blob_data_payment_rate + + def to_dict(self) -> typing.Dict: + return { + self.key: self.blob_data_payment_rate + } + + +class BlobAvailabilityRequest(BlobMessage): + key = 'requested_blobs' + + def __init__(self, requested_blobs: typing.List[str], lbrycrd_address: typing.Optional[bool] = True, + **kwargs) -> None: + assert len(requested_blobs) + self.requested_blobs = requested_blobs + self.lbrycrd_address = lbrycrd_address + + def to_dict(self) -> typing.Dict: + return { + self.key: self.requested_blobs, + 'lbrycrd_address': self.lbrycrd_address + } + + +class BlobAvailabilityResponse(BlobMessage): + key = 'available_blobs' + + def __init__(self, available_blobs: typing.List[str], lbrycrd_address: typing.Optional[str] = True, + **kwargs) -> None: + self.available_blobs = available_blobs + self.lbrycrd_address = lbrycrd_address + + def to_dict(self) -> typing.Dict: + d = { + self.key: self.available_blobs + } + if self.lbrycrd_address: + d['lbrycrd_address'] = self.lbrycrd_address + return d + + +class BlobDownloadRequest(BlobMessage): + key = 'requested_blob' + + def __init__(self, requested_blob: str, **kwargs) -> None: + self.requested_blob = requested_blob + + def to_dict(self) -> typing.Dict: + return { + self.key: self.requested_blob + } + + +class BlobDownloadResponse(BlobMessage): + key = 'incoming_blob' + + def __init__(self, **response: typing.Dict) -> None: + incoming_blob = response[self.key] + self.error = None + self.incoming_blob = None + if 'error' in incoming_blob: + self.error = incoming_blob['error'] + else: + self.incoming_blob = {'blob_hash': incoming_blob['blob_hash'], 'length': incoming_blob['length']} + self.length = None if not self.incoming_blob else self.incoming_blob['length'] + self.blob_hash = None if not self.incoming_blob else self.incoming_blob['blob_hash'] + + def to_dict(self) -> typing.Dict: + return { + self.key if not self.error else 'error': self.incoming_blob or self.error, + } + + +class BlobPaymentAddressRequest(BlobMessage): + key = 'lbrycrd_address' + + def __init__(self, lbrycrd_address: str, **kwargs) -> None: + self.lbrycrd_address = lbrycrd_address + + def to_dict(self) -> typing.Dict: + return { + self.key: self.lbrycrd_address + } + + +class BlobPaymentAddressResponse(BlobPaymentAddressRequest): + pass + + +class BlobErrorResponse(BlobMessage): + key = 'error' + + def __init__(self, error: str, **kwargs) -> None: + self.error = error + + def to_dict(self) -> typing.Dict: + return { + self.key: self.error + } + + +blob_request_types = typing.Union[BlobPriceRequest, BlobAvailabilityRequest, BlobDownloadRequest, + BlobPaymentAddressRequest] +blob_response_types = typing.Union[BlobPriceResponse, BlobAvailabilityResponse, BlobDownloadResponse, + BlobErrorResponse, BlobPaymentAddressResponse] + + +def _parse_blob_response(response_msg: bytes) -> typing.Tuple[typing.Optional[typing.Dict], bytes]: + # scenarios: + # + # + # + + extra_data = b'' + response = None + curr_pos = 0 + while True: + next_close_paren = response_msg.find(b'}', curr_pos) + if next_close_paren == -1: + break + curr_pos = next_close_paren + 1 + try: + response = json.loads(response_msg[:curr_pos]) + if not isinstance(response, dict): + raise ValueError() + for key in response.keys(): + if key not in [ + BlobPaymentAddressResponse.key, + BlobAvailabilityResponse.key, + BlobPriceResponse.key, + BlobDownloadResponse.key]: + raise ValueError() + extra_data = response_msg[curr_pos:] + break + except ValueError: + response = None + if response is None: + extra_data = response_msg + return response, extra_data + + +class BlobRequest: + def __init__(self, requests: typing.List[blob_request_types]) -> None: + self.requests = requests + + def to_dict(self): + d = {} + for request in self.requests: + d.update(request.to_dict()) + return d + + def _get_request(self, request_type: blob_request_types): + request = tuple(filter(lambda r: type(r) == request_type, self.requests)) + if request: + return request[0] + + def get_availability_request(self) -> typing.Optional[BlobAvailabilityRequest]: + response = self._get_request(BlobAvailabilityRequest) + if response: + return response + + def get_price_request(self) -> typing.Optional[BlobPriceRequest]: + response = self._get_request(BlobPriceRequest) + if response: + return response + + def get_blob_request(self) -> typing.Optional[BlobDownloadRequest]: + response = self._get_request(BlobDownloadRequest) + if response: + return response + + def get_address_request(self) -> typing.Optional[BlobPaymentAddressRequest]: + response = self._get_request(BlobPaymentAddressRequest) + if response: + return response + + def serialize(self) -> bytes: + return json.dumps(self.to_dict()).encode() + + @classmethod + def deserialize(cls, data: bytes) -> 'BlobRequest': + request = json.loads(data) + return cls([ + request_type(**request) + for request_type in (BlobPriceRequest, BlobAvailabilityRequest, BlobDownloadRequest, + BlobPaymentAddressRequest) + if request_type.key in request + ]) + + @classmethod + def make_request_for_blob_hash(cls, blob_hash: str) -> 'BlobRequest': + return cls( + [BlobAvailabilityRequest([blob_hash]), BlobPriceRequest(0.0), BlobDownloadRequest(blob_hash)] + ) + + +class BlobResponse: + def __init__(self, responses: typing.List[blob_response_types], blob_data: typing.Optional[bytes] = None) -> None: + self.responses = responses + self.blob_data = blob_data + + def to_dict(self): + d = {} + for response in self.responses: + d.update(response.to_dict()) + return d + + def _get_response(self, response_type: blob_response_types): + response = tuple(filter(lambda r: type(r) == response_type, self.responses)) + if response: + return response[0] + + def get_error_response(self) -> typing.Optional[BlobErrorResponse]: + error = self._get_response(BlobErrorResponse) + if error: + log.error(error) + return error + + def get_availability_response(self) -> typing.Optional[BlobAvailabilityResponse]: + response = self._get_response(BlobAvailabilityResponse) + if response: + return response + + def get_price_response(self) -> typing.Optional[BlobPriceResponse]: + response = self._get_response(BlobPriceResponse) + if response: + return response + + def get_blob_response(self) -> typing.Optional[BlobDownloadResponse]: + response = self._get_response(BlobDownloadResponse) + if response: + return response + + def get_address_response(self) -> typing.Optional[BlobPaymentAddressResponse]: + response = self._get_response(BlobPaymentAddressResponse) + if response: + return response + + def serialize(self) -> bytes: + return json.dumps(self.to_dict()).encode() + + @classmethod + def deserialize(cls, data: bytes) -> 'BlobResponse': + response, extra = _parse_blob_response(data) + requests = [] + if response: + requests.extend([ + response_type(**response) + for response_type in (BlobPriceResponse, BlobAvailabilityResponse, BlobDownloadResponse, + BlobErrorResponse, BlobPaymentAddressResponse) + if response_type.key in response + ]) + return cls(requests, extra) + diff --git a/lbrynet/blob_exchange/server.py b/lbrynet/blob_exchange/server.py new file mode 100644 index 000000000..2f6c8b5c1 --- /dev/null +++ b/lbrynet/blob_exchange/server.py @@ -0,0 +1,107 @@ +import asyncio +import binascii +import logging +import typing +from json.decoder import JSONDecodeError +from lbrynet.blob_exchange.serialization import BlobResponse, BlobRequest, blob_response_types +from lbrynet.blob_exchange.serialization import BlobAvailabilityResponse, BlobPriceResponse, BlobDownloadResponse, \ + BlobPaymentAddressResponse + +if typing.TYPE_CHECKING: + from lbrynet.blob.blob_manager import BlobFileManager + +log = logging.getLogger(__name__) + + +class BlobServer(asyncio.Protocol): + def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', lbrycrd_address: str): + self.loop = loop + self.blob_manager = blob_manager + self.server_task: asyncio.Task = None + self.started_listening = asyncio.Event(loop=self.loop) + self.buf = b'' + self.transport = None + self.lbrycrd_address = lbrycrd_address + + def connection_made(self, transport): + self.transport = transport + + def send_response(self, responses: typing.List[blob_response_types]): + to_send = [] + while responses: + to_send.append(responses.pop()) + self.transport.write(BlobResponse(to_send).serialize()) + + async def handle_request(self, request: BlobRequest): + addr = self.transport.get_extra_info('peername') + peer_address, peer_port = addr + + responses = [] + address_request = request.get_address_request() + if address_request: + responses.append(BlobPaymentAddressResponse(lbrycrd_address=self.lbrycrd_address)) + availability_request = request.get_availability_request() + if availability_request: + responses.append(BlobAvailabilityResponse(available_blobs=list(set(( + filter(lambda blob_hash: blob_hash in self.blob_manager.completed_blob_hashes, + availability_request.requested_blobs) + ))))) + price_request = request.get_price_request() + if price_request: + responses.append(BlobPriceResponse(blob_data_payment_rate='RATE_ACCEPTED')) + download_request = request.get_blob_request() + + if download_request: + blob = self.blob_manager.get_blob(download_request.requested_blob) + if blob.get_is_verified(): + incoming_blob = {'blob_hash': blob.blob_hash, 'length': blob.length} + responses.append(BlobDownloadResponse(incoming_blob=incoming_blob)) + self.send_response(responses) + log.info("send %s to %s:%i", blob.blob_hash[:8], peer_address, peer_port) + sent = await blob.sendfile(self) + log.info("sent %s (%i bytes) to %s:%i", blob.blob_hash[:8], sent, peer_address, peer_port) + if responses: + self.send_response(responses) + # self.transport.close() + + def data_received(self, data): + request = None + if data: + message, separator, remainder = data.rpartition(b'}') + if not separator: + self.buf += data + return + try: + request = BlobRequest.deserialize(data) + self.buf = remainder + except JSONDecodeError: + addr = self.transport.get_extra_info('peername') + peer_address, peer_port = addr + log.error("failed to decode blob request from %s:%i (%i bytes): %s", peer_address, peer_port, + len(data), '' if not data else binascii.hexlify(data).decode()) + if not request: + addr = self.transport.get_extra_info('peername') + peer_address, peer_port = addr + log.warning("failed to decode blob request from %s:%i", peer_address, peer_port) + self.transport.close() + return + self.loop.create_task(self.handle_request(request)) + + def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'): + if self.server_task is not None: + raise Exception("already running") + + async def _start_server(): + server = await self.loop.create_server(lambda: self, interface, port) + self.started_listening.set() + log.info("Blob server listening on TCP %s:%i", interface, port) + async with server: + await server.serve_forever() + + self.server_task = self.loop.create_task(_start_server()) + + def stop_server(self): + if self.server_task: + self.server_task.cancel() + self.server_task = None + log.info("Stopped blob server") diff --git a/lbrynet/extras/wallet/manager.py b/lbrynet/extras/wallet/manager.py index ca12d8952..7373ec92f 100644 --- a/lbrynet/extras/wallet/manager.py +++ b/lbrynet/extras/wallet/manager.py @@ -6,15 +6,12 @@ from binascii import unhexlify from datetime import datetime from typing import Optional -from twisted.internet import defer - from lbrynet.schema.schema import SECP256k1 from torba.client.basemanager import BaseWalletManager from torba.rpc.jsonrpc import CodeMessageError from lbrynet.schema.claim import ClaimDict -from lbrynet.extras.compat import f2d from lbrynet.extras.wallet.ledger import MainNetLedger from lbrynet.extras.wallet.account import BaseAccount, generate_certificate from lbrynet.extras.wallet.transaction import Transaction @@ -260,12 +257,6 @@ class LbryWalletManager(BaseWalletManager): destination_address: bytes = reserved.identifier.encode('latin1') return self.send_amount_to_address(amount, destination_address, account) - def get_wallet_info_query_handler_factory(self): - return LBRYcrdAddressQueryHandlerFactory(self) - - def get_info_exchanger(self): - return LBRYcrdAddressRequester(self) - async def resolve(self, *uris, **kwargs): page = kwargs.get('page', 0) page_size = kwargs.get('page_size', 10) @@ -516,10 +507,10 @@ class LbryWalletManager(BaseWalletManager): pass # TODO: Data payments is disabled def send_points(self, reserved_points, amount): - defer.succeed(True) # TODO: Data payments is disabled + pass # TODO: Data payments is disabled def cancel_point_reservation(self, reserved_points): - pass # fixme: disabled for now. + pass # fixme: disabled for now. def save(self): for wallet in self.wallets: @@ -537,83 +528,3 @@ class LbryWalletManager(BaseWalletManager): def get_claim_by_outpoint(self, txid, nout): return self.ledger.get_claim_by_outpoint(txid, nout) - - -class ClientRequest: - def __init__(self, request_dict, response_identifier=None): - self.request_dict = request_dict - self.response_identifier = response_identifier - - -class LBRYcrdAddressRequester: - - def __init__(self, wallet): - self.wallet = wallet - self._protocols = [] - - def send_next_request(self, peer, protocol): - if not protocol in self._protocols: - r = ClientRequest({'lbrycrd_address': True}, 'lbrycrd_address') - d = protocol.add_request(r) - d.addCallback(self._handle_address_response, peer, r, protocol) - d.addErrback(self._request_failed, peer) - self._protocols.append(protocol) - return defer.succeed(True) - else: - return defer.succeed(False) - - def _handle_address_response(self, response_dict, peer, request, protocol): - if request.response_identifier not in response_dict: - raise ValueError( - f"Expected {request.response_identifier} in response but did not get it") - assert protocol in self._protocols, "Responding protocol is not in our list of protocols" - address = response_dict[request.response_identifier] - self.wallet.update_peer_address(peer, address) - - def _request_failed(self, error, peer): - raise Exception( - "A peer failed to send a valid public key response. Error: {}, peer: {}".format( - error.getErrorMessage(), str(peer) - ) - ) - - -class LBRYcrdAddressQueryHandlerFactory: - - def __init__(self, wallet): - self.wallet = wallet - - def build_query_handler(self): - q_h = LBRYcrdAddressQueryHandler(self.wallet) - return q_h - - def get_primary_query_identifier(self): - return 'lbrycrd_address' - - def get_description(self): - return "LBRYcrd Address - an address for receiving payments via LBRYcrd" - - -class LBRYcrdAddressQueryHandler: - - def __init__(self, wallet): - self.wallet = wallet - self.query_identifiers = ['lbrycrd_address'] - self.address = None - self.peer = None - - def register_with_request_handler(self, request_handler, peer): - self.peer = peer - request_handler.register_query_handler(self, self.query_identifiers) - - @defer.inlineCallbacks - def handle_queries(self, queries): - if self.query_identifiers[0] in queries: - address = yield f2d(self.wallet.get_unused_address_for_peer(self.peer)) - self.address = address - fields = {'lbrycrd_address': address} - return fields - if self.address is None: - raise Exception("Expected a request for an address, but did not receive one") - else: - return {} diff --git a/lbrynet/p2p/BlobAvailability.py b/lbrynet/p2p/BlobAvailability.py deleted file mode 100644 index 0ddb7072e..000000000 --- a/lbrynet/p2p/BlobAvailability.py +++ /dev/null @@ -1,95 +0,0 @@ -import logging -import random -import time -from decimal import Decimal -from twisted.internet import defer -from twisted.internet.task import LoopingCall - -log = logging.getLogger(__name__) - - -class BlobAvailabilityTracker: - """ - Class to track peer counts for known blobs, and to discover new popular blobs - - Attributes: - availability (dict): dictionary of peers for known blobs - """ - - def __init__(self, blob_manager, peer_finder, dht_node): - self.availability = {} - self._last_mean_availability = Decimal(0.0) - self._blob_manager = blob_manager - self._peer_finder = peer_finder - self._dht_node = dht_node - self._check_mine = LoopingCall(self._update_mine) - - def start(self): - log.info("Starting blob availability tracker.") - self._check_mine.start(600) - - def stop(self): - log.info("Stopping blob availability tracker.") - if self._check_mine.running: - self._check_mine.stop() - - def get_blob_availability(self, blob, timeout=None): - def _get_peer_count(peers): - have_blob = sum(1 for peer in peers if peer.is_available()) - return {blob: have_blob} - - d = self._peer_finder.find_peers_for_blob(blob, timeout) - d.addCallback(_get_peer_count) - return d - - def get_availability_for_blobs(self, blobs, timeout=None): - dl = [self.get_blob_availability(blob, timeout) for blob in blobs if blob] - d = defer.DeferredList(dl) - d.addCallback(lambda results: [val for success, val in results if success]) - return d - - @property - def last_mean_availability(self): - return max(Decimal(0.01), self._last_mean_availability) - - def _update_peers_for_blob(self, blob): - def _save_peer_info(blob_hash, peers): - v = {blob_hash: peers} - self.availability.update(v) - return v - - d = self._peer_finder.find_peers_for_blob(blob) - d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r]) - d.addCallback(lambda peers: _save_peer_info(blob, peers)) - return d - - def _update_mine(self): - def _get_peers(blobs): - dl = [] - for hash in blobs: - dl.append(self._update_peers_for_blob(hash)) - return defer.DeferredList(dl) - - def sample(blobs): - return random.sample(blobs, min(len(blobs), 10)) - - start = time.time() - log.debug('==> Updating the peers for my blobs') - d = self._blob_manager.get_all_verified_blobs() - # as far as I can tell, this only is used to set _last_mean_availability - # which... seems like a very expensive operation for such little payoff. - # so taking a sample should get about the same effect as querying the entire - # list of blobs - d.addCallback(sample) - d.addCallback(_get_peers) - d.addCallback(lambda _: self._set_mean_peers()) - d.addCallback(lambda _: log.debug('<== Done updating peers for my blobs. Took %s seconds', - time.time() - start)) - # although unused, need to return or else the looping call - # could overrun on a previous call - return d - - def _set_mean_peers(self): - num_peers = [len(self.availability[blob]) for blob in self.availability] - mean = Decimal(sum(num_peers)) / Decimal(max(1, len(num_peers))) - self._last_mean_availability = mean diff --git a/lbrynet/p2p/PriceModel.py b/lbrynet/p2p/PriceModel.py deleted file mode 100644 index 33b6b0e86..000000000 --- a/lbrynet/p2p/PriceModel.py +++ /dev/null @@ -1,56 +0,0 @@ -from decimal import Decimal - - -def get_default_price_model(blob_tracker, base_price, **kwargs): - return MeanAvailabilityWeightedPrice(blob_tracker, base_price, **kwargs) - - -class ZeroPrice: - def __init__(self): - self.base_price = 0.0 - - def calculate_price(self, blob): - return 0.0 - - -class MeanAvailabilityWeightedPrice: - """Calculate mean-blob-availability and stream-position weighted price for a blob - - Attributes: - base_price (float): base price - alpha (float): constant, > 0.0 and <= 1.0, used to more highly - value blobs at the beginning of a stream. - alpha defaults to 1.0, which has a null effect - blob_tracker (BlobAvailabilityTracker): blob availability tracker - - """ - - def __init__(self, tracker, base_price, alpha=1.0): - self.blob_tracker = tracker - self.base_price = Decimal(base_price) - self.alpha = Decimal(alpha) - - def calculate_price(self, blob): - mean_availability = self.blob_tracker.last_mean_availability - availability = self.blob_tracker.availability.get(blob, []) - index = 0 # blob.index - availability_mult = self._get_availability_multiplier(mean_availability, availability) - price = self.base_price * availability_mult / self._frontload(index) - return round(price, 5) - - def _get_availability_multiplier(self, mean_availability, availability): - return Decimal(max(1, mean_availability) / Decimal(max(1, len(availability)))) - - def _frontload(self, index): - """Get front-load multiplier, used to weight prices of blobs in a - stream towards the front of the stream. - - At index 0, returns 1.0 - As index increases, return value approaches 2.0 - - @param index: blob position in stream - @return: front-load multiplier - - """ - - return Decimal(2.0) - (self.alpha ** index) diff --git a/lbrynet/p2p/RateLimiter.py b/lbrynet/p2p/RateLimiter.py deleted file mode 100644 index 136b533b0..000000000 --- a/lbrynet/p2p/RateLimiter.py +++ /dev/null @@ -1,154 +0,0 @@ -import logging - -from twisted.internet import task - - -log = logging.getLogger(__name__) - - -class DummyRateLimiter: - def __init__(self): - self.dl_bytes_this_second = 0 - self.ul_bytes_this_second = 0 - self.total_dl_bytes = 0 - self.total_ul_bytes = 0 - self.target_dl = 0 - self.target_ul = 0 - self.tick_call = None - - def start(self): - self.tick_call = task.LoopingCall(self.tick) - self.tick_call.start(1) - - def tick(self): - self.dl_bytes_this_second = 0 - self.ul_bytes_this_second = 0 - - def stop(self): - if self.tick_call is not None: - self.tick_call.stop() - self.tick_call = None - - def set_dl_limit(self, limit): - pass - - def set_ul_limit(self, limit): - pass - - def report_dl_bytes(self, num_bytes): - self.dl_bytes_this_second += num_bytes - self.total_dl_bytes += num_bytes - - def report_ul_bytes(self, num_bytes): - self.ul_bytes_this_second += num_bytes - self.total_ul_bytes += num_bytes - - -class RateLimiter: - """This class ensures that upload and download rates don't exceed specified maximums""" - - #implements(IRateLimiter) - - #called by main application - - def __init__(self, max_dl_bytes=None, max_ul_bytes=None): - self.max_dl_bytes = max_dl_bytes - self.max_ul_bytes = max_ul_bytes - self.dl_bytes_this_interval = 0 - self.ul_bytes_this_interval = 0 - self.total_dl_bytes = 0 - self.total_ul_bytes = 0 - self.tick_call = None - self.tick_interval = 0.1 - - self.dl_throttled = False - self.ul_throttled = False - - self.protocols = [] - - def start(self): - log.info("Starting rate limiter.") - self.tick_call = task.LoopingCall(self.tick) - self.tick_call.start(self.tick_interval) - - def tick(self): - self.dl_bytes_this_interval = 0 - self.ul_bytes_this_interval = 0 - self.unthrottle_dl() - self.unthrottle_ul() - - def stop(self): - log.info("Stopping rate limiter.") - if self.tick_call is not None: - self.tick_call.stop() - self.tick_call = None - - def set_dl_limit(self, limit): - self.max_dl_bytes = limit - - def set_ul_limit(self, limit): - self.max_ul_bytes = limit - - #throttling - - def check_dl(self): - need_throttle = (self.max_dl_bytes is not None and - self.dl_bytes_this_interval > self.max_dl_bytes * self.tick_interval) - if need_throttle: - from twisted.internet import reactor - reactor.callLater(0, self.throttle_dl) - - def check_ul(self): - need_throttle = (self.max_ul_bytes is not None and - self.ul_bytes_this_interval > self.max_ul_bytes * self.tick_interval) - if need_throttle: - from twisted.internet import reactor - reactor.callLater(0, self.throttle_ul) - - def throttle_dl(self): - if self.dl_throttled is False: - for protocol in self.protocols: - protocol.throttle_download() - self.dl_throttled = True - - def throttle_ul(self): - if self.ul_throttled is False: - for protocol in self.protocols: - protocol.throttle_upload() - self.ul_throttled = True - - def unthrottle_dl(self): - if self.dl_throttled is True: - for protocol in self.protocols: - protocol.unthrottle_download() - self.dl_throttled = False - - def unthrottle_ul(self): - if self.ul_throttled is True: - for protocol in self.protocols: - protocol.unthrottle_upload() - self.ul_throttled = False - - #called by protocols - - def report_dl_bytes(self, num_bytes): - self.dl_bytes_this_interval += num_bytes - self.total_dl_bytes += num_bytes - self.check_dl() - - def report_ul_bytes(self, num_bytes): - self.ul_bytes_this_interval += num_bytes - self.total_ul_bytes += num_bytes - self.check_ul() - - def register_protocol(self, protocol): - if protocol not in self.protocols: - self.protocols.append(protocol) - if self.dl_throttled is True: - protocol.throttle_download() - if self.ul_throttled is True: - protocol.throttle_upload() - - def unregister_protocol(self, protocol): - if protocol in self.protocols: - self.protocols.remove(protocol) diff --git a/lbrynet/p2p/SinglePeerDownloader.py b/lbrynet/p2p/SinglePeerDownloader.py deleted file mode 100644 index ca166c6db..000000000 --- a/lbrynet/p2p/SinglePeerDownloader.py +++ /dev/null @@ -1,109 +0,0 @@ -import logging -import shutil -import tempfile - -from twisted.internet import defer, reactor - -from lbrynet.extras.compat import d2f -from lbrynet.blob.blob_file import BlobFile -from lbrynet.p2p.BlobManager import DiskBlobManager -from lbrynet.p2p.RateLimiter import DummyRateLimiter -from lbrynet.p2p.PaymentRateManager import OnlyFreePaymentsManager -from lbrynet.p2p.client.BlobRequester import BlobRequester -from lbrynet.p2p.client.StandaloneBlobDownloader import StandaloneBlobDownloader -from lbrynet.p2p.client.ConnectionManager import ConnectionManager -from lbrynet.extras.daemon.storage import SQLiteStorage -from lbrynet.extras.daemon.PeerFinder import DummyPeerFinder -from lbrynet.conf import Config - - -log = logging.getLogger(__name__) - - -class SinglePeerFinder(DummyPeerFinder): - def __init__(self, peer): - super().__init__() - self.peer = peer - - def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=False): - return defer.succeed([self.peer]) - - -class BlobCallback(BlobFile): - def __init__(self, blob_dir, blob_hash, timeout): - super().__init__(blob_dir, blob_hash) - self.callback = defer.Deferred() - reactor.callLater(timeout, self._cancel) - - def _cancel(self): - if not self.callback.called: - self.callback.callback(False) - - def save_verified_blob(self, writer): - result = BlobFile.save_verified_blob(self, writer) - if not self.callback.called: - self.callback.callback(True) - return result - - -class SingleBlobDownloadManager: - def __init__(self, blob): - self.blob = blob - - def needed_blobs(self): - if self.blob.verified: - return [] - else: - return [self.blob] - - def get_head_blob_hash(self): - return self.blob.blob_hash - - -class SinglePeerDownloader: - def __init__(self, conf: Config): - self.conf = conf - self._payment_rate_manager = OnlyFreePaymentsManager() - self._rate_limiter = DummyRateLimiter() - self._wallet = None - self._blob_manager = None - - def setup(self, wallet, blob_manager=None): - if not self._wallet: - self._wallet = wallet - if not self._blob_manager: - self._blob_manager = blob_manager - - @defer.inlineCallbacks - def download_blob_from_peer(self, peer, timeout, blob_hash, blob_manager): - log.debug("Try to download %s from %s", blob_hash, peer.host) - blob_manager = blob_manager - blob = BlobCallback(blob_manager.blob_dir, blob_hash, timeout) - download_manager = SingleBlobDownloadManager(blob) - peer_finder = SinglePeerFinder(peer) - requester = BlobRequester(blob_manager, peer_finder, self._payment_rate_manager, - self._wallet, download_manager) - downloader = StandaloneBlobDownloader(self.conf, blob_hash, blob_manager, peer_finder, - self._rate_limiter, self._payment_rate_manager, - self._wallet, timeout=timeout) - info_exchanger = self._wallet.get_info_exchanger() - connection_manager = ConnectionManager(downloader, self._rate_limiter, [requester], - [info_exchanger]) - connection_manager.start() - - result = yield blob.callback - if not result: - log.debug("Failed to downloaded %s from %s", blob_hash[:16], peer.host) - yield connection_manager.stop() - defer.returnValue(result) - - async def download_temp_blob_from_peer(self, peer, timeout, blob_hash): - tmp_storage = SQLiteStorage(Config(), ':memory:') - await tmp_storage.open() - tmp_dir = tempfile.mkdtemp() - tmp_blob_manager = DiskBlobManager(tmp_dir, tmp_storage) - try: - return await d2f(self.download_blob_from_peer(peer, timeout, blob_hash, tmp_blob_manager)) - finally: - await tmp_storage.close() - shutil.rmtree(tmp_dir) diff --git a/lbrynet/p2p/Strategy.py b/lbrynet/p2p/Strategy.py deleted file mode 100644 index 052410040..000000000 --- a/lbrynet/p2p/Strategy.py +++ /dev/null @@ -1,143 +0,0 @@ -from decimal import Decimal -from lbrynet.p2p.Offer import Offer -from lbrynet.p2p.PriceModel import MeanAvailabilityWeightedPrice, ZeroPrice - - -def get_default_strategy(blob_tracker, base_price, is_generous, **kwargs): - return BasicAvailabilityWeightedStrategy(blob_tracker, base_price, is_generous, **kwargs) - - -class Strategy: - """ - Base for negotiation strategies - """ - - def __init__(self, price_model, max_rate, min_rate, is_generous): - self.price_model = price_model - self.is_generous = is_generous - self.accepted_offers = {} - self.pending_sent_offers = {} - self.offers_sent = {} - self.offers_received = {} - self.max_rate = max_rate or Decimal(self.price_model.base_price * 50) - self.min_rate = Decimal(min_rate) - - def _make_rate_offer(self, rates, offer_count): - return NotImplementedError() - - def _get_response_rate(self, rates, offer_count): - return NotImplementedError() - - def make_offer(self, peer, blobs): - offer_count = self.offers_sent.get(peer, 0) - self._add_offer_sent(peer) - if peer in self.accepted_offers: - # if there was a previous accepted offer, use that - offer = self.accepted_offers[peer] - if peer in self.pending_sent_offers: - del self.pending_sent_offers[peer] - elif offer_count == 0 and self.is_generous: - # Try asking for it for free - offer = Offer(Decimal(0.0)) - self.pending_sent_offers.update({peer: offer}) - else: - rates = [self.price_model.calculate_price(blob) for blob in blobs] - price = self._make_rate_offer(rates, offer_count) - offer = Offer(price) - self.pending_sent_offers.update({peer: offer}) - return offer - - def respond_to_offer(self, offer, peer, blobs): - offer_count = self.offers_received.get(peer, 0) - self._add_offer_received(peer) - rates = [self.price_model.calculate_price(blob) for blob in blobs] - price = self._get_response_rate(rates, offer_count) - if peer in self.accepted_offers: - offer = self.accepted_offers[peer] - elif offer.rate == 0.0 and offer_count == 0 and self.is_generous: - # give blobs away for free by default on the first request - offer.accept() - self.accepted_offers.update({peer: offer}) - elif offer.rate >= price: - offer.accept() - self.accepted_offers.update({peer: offer}) - else: - offer.reject() - if peer in self.accepted_offers: - del self.accepted_offers[peer] - return offer - - def update_accepted_offers(self, peer, offer): - if not offer.is_accepted and peer in self.accepted_offers: - del self.accepted_offers[peer] - if offer.is_accepted: - self.accepted_offers.update({peer: offer}) - self.pending_sent_offers.update({peer: offer}) - - def _add_offer_sent(self, peer): - turn = self.offers_sent.get(peer, 0) + 1 - self.offers_sent.update({peer: turn}) - - def _add_offer_received(self, peer): - turn = self.offers_received.get(peer, 0) + 1 - self.offers_received.update({peer: turn}) - - def _bounded_price(self, price): - price_for_return = Decimal(min(self.max_rate, max(price, self.min_rate))) - return price_for_return - - -class BasicAvailabilityWeightedStrategy(Strategy): - """Basic strategy to target blob prices based on supply relative to mean supply - - Discount price target with each incoming request, and raise it - with each outgoing from the modeled price until the rate is - accepted or a threshold is reached - - """ - - def __init__(self, blob_tracker, base_price, is_generous, - acceleration=1.25, deceleration=0.9, max_rate=None, - min_rate=0.0, alpha=1.0): - price_model = MeanAvailabilityWeightedPrice( - blob_tracker, base_price, alpha=alpha) - super().__init__(price_model, max_rate, min_rate, is_generous) - self._acceleration = Decimal(acceleration) # rate of how quickly to ramp offer - self._deceleration = Decimal(deceleration) - - def _get_mean_rate(self, rates): - mean_rate = Decimal(sum(rates)) / Decimal(max(len(rates), 1)) - return mean_rate - - def _premium(self, rate, turn): - return rate * (self._acceleration ** Decimal(turn)) - - def _discount(self, rate, turn): - return rate * (self._deceleration ** Decimal(turn)) - - def _get_response_rate(self, rates, offer_count): - rate = self._get_mean_rate(rates) - discounted = self._discount(rate, offer_count) - rounded_price = round(discounted, 5) - return self._bounded_price(rounded_price) - - def _make_rate_offer(self, rates, offer_count): - rate = self._get_mean_rate(rates) - with_premium = self._premium(rate, offer_count) - rounded_price = round(with_premium, 5) - return self._bounded_price(rounded_price) - - -class OnlyFreeStrategy(Strategy): - def __init__(self, *args, **kwargs): - price_model = ZeroPrice() - super().__init__(price_model, 0.0, 0.0, True) - - def _get_mean_rate(self, rates): - return 0.0 - - def _get_response_rate(self, rates, offer_count): - return 0.0 - - def _make_rate_offer(self, rates, offer_count): - return 0.0 diff --git a/lbrynet/p2p/client/BlobRequester.py b/lbrynet/p2p/client/BlobRequester.py deleted file mode 100644 index d2506043a..000000000 --- a/lbrynet/p2p/client/BlobRequester.py +++ /dev/null @@ -1,590 +0,0 @@ -import logging -from collections import defaultdict -from decimal import Decimal - -from twisted.internet import defer -from twisted.python.failure import Failure -from twisted.internet.error import ConnectionAborted - -from lbrynet.p2p.Error import ConnectionClosedBeforeResponseError -from lbrynet.p2p.Error import InvalidResponseError, RequestCanceledError, NoResponseError -from lbrynet.p2p.Error import PriceDisagreementError, DownloadCanceledError, InsufficientFundsError -from lbrynet.p2p.client.ClientRequest import ClientRequest, ClientBlobRequest -from lbrynet.p2p.Offer import Offer - - -log = logging.getLogger(__name__) - - -def get_points(num_bytes, rate): - if isinstance(rate, float): - return 1.0 * num_bytes * rate / 2**20 - elif isinstance(rate, Decimal): - return 1.0 * num_bytes * float(rate) / 2**20 - else: - raise Exception("Unknown rate type") - - -def cache(fn): - """Caches the function call for each instance""" - attr = f'__{fn.__name__}_value' - - def helper(self): - if not hasattr(self, attr): - value = fn(self) - setattr(self, attr, value) - return getattr(self, attr) - return helper - - -class BlobRequester: - #implements(IRequestCreator) - - def __init__(self, blob_manager, peer_finder, payment_rate_manager, wallet, download_manager): - self.blob_manager = blob_manager - self.peer_finder = peer_finder - self.payment_rate_manager = payment_rate_manager - self.wallet = wallet - self._download_manager = download_manager - self._peers = defaultdict(int) # {Peer: score} - self._available_blobs = defaultdict(list) # {Peer: [blob_hash]} - self._unavailable_blobs = defaultdict(list) # {Peer: [blob_hash]}} - self._protocol_prices = {} # {ClientProtocol: price} - self._protocol_offers = {} - self._price_disagreements = [] # [Peer] - self._protocol_tries = {} - self._maxed_out_peers = [] - self._incompatible_peers = [] - - ######## IRequestCreator ######### - def send_next_request(self, peer, protocol): - """Makes an availability request, download request and price request""" - if not self.should_send_next_request(peer): - return defer.succeed(False) - return self._send_next_request(peer, protocol) - - @defer.inlineCallbacks - def get_new_peers_for_head_blob(self): - """ look for peers for the head blob """ - head_blob_hash = self._download_manager.get_head_blob_hash() - peers = yield self._find_peers_for_hash(head_blob_hash) - defer.returnValue(peers) - - @defer.inlineCallbacks - def get_new_peers_for_next_unavailable(self): - """ - Look for peers for the next unavailable blob, if we have - all blobs, return an empty list - """ - blob_hash = yield self._get_hash_for_peer_search() - if blob_hash is None: - defer.returnValue([]) - peers = yield self._find_peers_for_hash(blob_hash) - defer.returnValue(peers) - - ######### internal calls ######### - def should_send_next_request(self, peer): - return ( - self._blobs_to_download() and - self._should_send_request_to(peer) - ) - - def _send_next_request(self, peer, protocol): - log.debug('Sending a blob request for %s and %s', peer, protocol) - availability = AvailabilityRequest(self, peer, protocol, self.payment_rate_manager) - head_blob_hash = self._download_manager.get_head_blob_hash() - download = DownloadRequest(self, peer, protocol, self.payment_rate_manager, - self.wallet, head_blob_hash) - price = PriceRequest(self, peer, protocol, self.payment_rate_manager) - - sent_request = False - if availability.can_make_request(): - availability.make_request_and_handle_response() - sent_request = True - if price.can_make_request(): - # TODO: document why a PriceRequest is only made if an - # Availability or Download request was made - price.make_request_and_handle_response() - sent_request = True - if download.can_make_request(): - try: - download.make_request_and_handle_response() - sent_request = True - except InsufficientFundsError as err: - return defer.fail(err) - - return defer.succeed(sent_request) - - def _get_hash_for_peer_search(self): - """ - Get next unavailable hash for blob, - returns None if there is nothing left to download - """ - r = None - blobs_to_download = self._blobs_to_download() - if blobs_to_download: - blobs_without_sources = self._blobs_without_sources() - if not blobs_without_sources: - blob_hash = blobs_to_download[0].blob_hash - else: - blob_hash = blobs_without_sources[0].blob_hash - r = blob_hash - log.debug("Blob requester peer search response: %s", str(r)) - return defer.succeed(r) - - def _find_peers_for_hash(self, h): - d = self.peer_finder.find_peers_for_blob(h, filter_self=True) - - def choose_best_peers(peers): - bad_peers = self._get_bad_peers() - without_bad_peers = [p for p in peers if not p in bad_peers] - without_maxed_out_peers = [ - p for p in without_bad_peers if p not in self._maxed_out_peers] - return without_maxed_out_peers - - d.addCallback(choose_best_peers) - - def lookup_failed(err): - log.error("An error occurred looking up peers for a hash: %s", err.getTraceback()) - return [] - - d.addErrback(lookup_failed) - return d - - def _should_send_request_to(self, peer): - if self._peers[peer] < -5.0: - return False - if peer in self._price_disagreements: - return False - if peer in self._incompatible_peers: - return False - return True - - def _get_bad_peers(self): - return [p for p in self._peers.keys() if not self._should_send_request_to(p)] - - def _hash_available(self, blob_hash): - for peer in self._available_blobs: - if blob_hash in self._available_blobs[peer]: - return True - return False - - def _hash_available_on(self, blob_hash, peer): - if blob_hash in self._available_blobs[peer]: - return True - return False - - def _blobs_to_download(self): - needed_blobs = self._download_manager.needed_blobs() - return sorted(needed_blobs, key=lambda b: b.is_downloading()) - - def _blobs_without_sources(self): - return [ - b for b in self._download_manager.needed_blobs() - if not self._hash_available(b.blob_hash) - ] - - def _price_settled(self, protocol): - if protocol in self._protocol_prices: - return True - return False - - def _update_local_score(self, peer, amount): - self._peers[peer] += amount - - -class RequestHelper: - def __init__(self, requestor, peer, protocol, payment_rate_manager): - self.requestor = requestor - self.peer = peer - self.protocol = protocol - self.payment_rate_manager = payment_rate_manager - - @property - def protocol_prices(self): - return self.requestor._protocol_prices - - @property - def protocol_offers(self): - return self.requestor._protocol_offers - - @property - def available_blobs(self): - return self.requestor._available_blobs[self.peer] - - @property - def unavailable_blobs(self): - return self.requestor._unavailable_blobs[self.peer] - - @property - def maxed_out_peers(self): - return self.requestor._maxed_out_peers - - def update_local_score(self, score): - self.requestor._update_local_score(self.peer, score) - - def _request_failed(self, reason, request_type): - if reason.check(DownloadCanceledError, RequestCanceledError, ConnectionAborted, - ConnectionClosedBeforeResponseError, ValueError): - return - if reason.check(NoResponseError): - self.requestor._incompatible_peers.append(self.peer) - log.warning("A request of type '%s' failed. Reason: %s, Error type: %s", - request_type, reason.getErrorMessage(), reason.type) - self.update_local_score(-10.0) - if isinstance(reason, (InvalidResponseError, NoResponseError)): - self.peer.update_score(-10.0) - else: - self.peer.update_score(-2.0) - return reason - - def get_rate(self): - if self.payment_rate_manager.price_limit_reached(self.peer): - if self.peer not in self.maxed_out_peers: - self.maxed_out_peers.append(self.peer) - return None - rate = self.protocol_prices.get(self.protocol) - if rate is None: - if self.peer in self.payment_rate_manager.strategy.pending_sent_offers: - pending = self.payment_rate_manager.strategy.pending_sent_offers[self.peer] - if not pending.is_too_low and not pending.is_accepted: - return pending.rate - rate = self.payment_rate_manager.get_rate_blob_data(self.peer, self.available_blobs) - return rate - - -def _handle_incoming_blob(response_dict, peer, request): - if request.response_identifier not in response_dict: - return InvalidResponseError("response identifier not in response") - if not isinstance(response_dict[request.response_identifier], dict): - return InvalidResponseError("response not a dict. got %s" % - type(response_dict[request.response_identifier])) - response = response_dict[request.response_identifier] - if 'error' in response: - # This means we're not getting our blob for some reason - if response['error'] == "RATE_UNSET": - # Stop the download with an error that won't penalize the peer - request.cancel(PriceDisagreementError()) - else: - # The peer has done something bad so we should get out of here - return InvalidResponseError("Got an unknown error from the peer: %s" % - (response['error'],)) - else: - if 'blob_hash' not in response: - return InvalidResponseError("Missing the required field 'blob_hash'") - if not response['blob_hash'] == request.request_dict['requested_blob']: - return InvalidResponseError( - "Incoming blob does not match expected. Incoming: %s. Expected: %s" % - (response['blob_hash'], request.request_dict['requested_blob']) - ) - if 'length' not in response: - return InvalidResponseError("Missing the required field 'length'") - if not request.blob.set_length(response['length']): - return InvalidResponseError("Could not set the length of the blob") - return True - - -def _handle_download_error(err, peer, blob_to_download): - if not err.check(DownloadCanceledError, PriceDisagreementError, RequestCanceledError, - ConnectionClosedBeforeResponseError): - log.warning("An error occurred while downloading %s from %s. Error: %s", - blob_to_download.blob_hash, str(peer), err.getTraceback()) - if err.check(PriceDisagreementError): - # Don't kill the whole connection just because a price couldn't be agreed upon. - # Other information might be desired by other request creators at a better rate. - return True - return err - - -class AvailabilityRequest(RequestHelper): - """Ask a peer what blobs it has available. - - Results are saved in `_available_blobs` and `_unavailable_blobs` - on the parent BlobRequester. - """ - def can_make_request(self): - return self.get_top_needed_blobs() - - def make_request_and_handle_response(self): - request = self._get_request() - self._handle_request(request) - - def _get_request(self): - to_request = self.get_top_needed_blobs() - if not to_request: - raise Exception('Unable to make a request without available blobs') - return self._make_request(to_request) - - @cache - def get_top_needed_blobs(self, limit=20): - all_needed = [ - b.blob_hash for b in self.requestor._blobs_to_download() - if not self.is_available(b) - ] - # sort them so that the peer will be asked first for blobs it - # hasn't said it doesn't have - sorted_needed = sorted( - all_needed, - key=lambda b: b in self.unavailable_blobs - ) - return sorted_needed[:limit] - - def is_available(self, blob): - return blob.blob_hash in self.available_blobs - - def _make_request(self, to_request): - log.debug('Requesting blobs: %s', to_request) - r_dict = {'requested_blobs': to_request} - response_identifier = 'available_blobs' - request = ClientRequest(r_dict, response_identifier) - return request - - def _handle_request(self, a_r): - log.debug('making an availability request') - d1 = self.protocol.add_request(a_r) - d1.addCallback(self._handle_availability, a_r) - d1.addErrback(self._request_failed, "availability request") - - def _handle_availability(self, response_dict, request): - assert request.response_identifier == 'available_blobs' - if 'available_blobs' not in response_dict: - raise InvalidResponseError("response identifier not in response") - log.debug("Received a response to the availability request") - # save available blobs - blob_hashes = response_dict['available_blobs'] - if not blob_hashes: - # should not send any more requests as it doesn't have any blob we need - self.update_local_score(-10.0) - return True - for blob_hash in blob_hashes: - if blob_hash in request.request_dict['requested_blobs']: - self.process_available_blob_hash(blob_hash, request) - # everything left in the request is missing - for blob_hash in request.request_dict['requested_blobs']: - self.unavailable_blobs.append(blob_hash) - return True - - def process_available_blob_hash(self, blob_hash, request): - log.debug("The server has indicated it has the following blob available: %s", blob_hash) - self.available_blobs.append(blob_hash) - self.remove_from_unavailable_blobs(blob_hash) - request.request_dict['requested_blobs'].remove(blob_hash) - - def remove_from_unavailable_blobs(self, blob_hash): - if blob_hash in self.unavailable_blobs: - self.unavailable_blobs.remove(blob_hash) - - -class PriceRequest(RequestHelper): - """Ask a peer if a certain price is acceptable""" - def can_make_request(self): - if len(self.available_blobs) and self.protocol not in self.protocol_prices: - return self.get_rate() is not None - return False - - def make_request_and_handle_response(self): - request = self._get_price_request() - self._handle_price_request(request) - - def _get_price_request(self): - rate = self.get_rate() - if rate is None: - log.debug("No blobs to request from %s", self.peer) - raise Exception('Cannot make a price request without a payment rate') - log.debug("Offer rate %s to %s for %i blobs", rate, self.peer, len(self.available_blobs)) - - request_dict = {'blob_data_payment_rate': rate} - assert self.protocol not in self.protocol_offers - self.protocol_offers[self.protocol] = rate - return ClientRequest(request_dict, 'blob_data_payment_rate') - - def _handle_price_request(self, price_request): - d = self.protocol.add_request(price_request) - d.addCallback(self._handle_price_response, price_request) - d.addErrback(self._request_failed, "price request") - - def _handle_price_response(self, response_dict, request): - assert request.response_identifier == 'blob_data_payment_rate' - if 'blob_data_payment_rate' not in response_dict: - return InvalidResponseError("response identifier not in response") - offer_value = self.protocol_offers.pop(self.protocol) - offer = Offer(offer_value) - offer.handle(response_dict['blob_data_payment_rate']) - self.payment_rate_manager.record_offer_reply(self.peer, offer) - if offer.is_accepted: - log.info("Offered rate %f/mb accepted by %s", offer.rate, self.peer.host) - self.protocol_prices[self.protocol] = offer.rate - return True - elif offer.is_too_low: - log.debug("Offered rate %f/mb rejected by %s", offer.rate, self.peer.host) - return not self.payment_rate_manager.price_limit_reached(self.peer) - else: - log.warning("Price disagreement") - self.requestor._price_disagreements.append(self.peer) - return False - - -class DownloadRequest(RequestHelper): - """Choose a blob and download it from a peer and also pay the peer for the data.""" - def __init__(self, requester, peer, protocol, payment_rate_manager, wallet, head_blob_hash): - super().__init__(requester, peer, protocol, payment_rate_manager) - self.wallet = wallet - self.head_blob_hash = head_blob_hash - - def can_make_request(self): - if self.protocol in self.protocol_prices: - return self.get_blob_details() - return False - - def make_request_and_handle_response(self): - request = self._get_request() - self._handle_download_request(request) - - def _get_request(self): - blob_details = self.get_blob_details() - if not blob_details: - raise Exception('No blobs available to download') - return self._make_request(blob_details) - - @cache - def get_blob_details(self): - """Open a blob for writing and return the details. - - If no blob can be opened, returns None. - """ - to_download = self.get_available_blobs() - return self.find_blob(to_download) - - def get_available_blobs(self): - available_blobs = [ - b for b in self.requestor._blobs_to_download() - if self.requestor._hash_available_on(b.blob_hash, self.peer) - ] - log.debug('available blobs: %s', available_blobs) - return available_blobs - - def find_blob(self, to_download): - """Return the first blob in `to_download` that is successfully opened for write.""" - for blob in to_download: - if blob.get_is_verified(): - log.debug('Skipping blob %s as its already validated', blob) - continue - writer, d = blob.open_for_writing(self.peer) - if d is not None: - return BlobDownloadDetails(blob, d, writer.write, writer.close, self.peer) - log.warning('Skipping blob %s as there was an issue opening it for writing', blob) - return None - - def _make_request(self, blob_details): - blob = blob_details.blob - request = ClientBlobRequest( - {'requested_blob': blob.blob_hash}, - 'incoming_blob', - blob_details.counting_write_func, - blob_details.deferred, - blob_details.cancel_func, - blob - ) - log.info("Requesting blob %s from %s", blob.blob_hash, self.peer) - return request - - def _handle_download_request(self, client_blob_request): - reserved_points = self.reserve_funds_or_cancel(client_blob_request) - self.add_callbacks_to_download_request(client_blob_request, reserved_points) - self.create_add_blob_request(client_blob_request) - - def reserve_funds_or_cancel(self, client_blob_request): - reserved_points = self._reserve_points(client_blob_request.max_pay_units) - if reserved_points is not None: - return reserved_points - client_blob_request.cancel(InsufficientFundsError()) - client_blob_request.finished_deferred.addErrback(lambda _: True) - raise InsufficientFundsError() - - def add_callbacks_to_download_request(self, client_blob_request, reserved_points): - # Note: The following three callbacks will be called when the blob has been - # fully downloaded or canceled - client_blob_request.finished_deferred.addCallbacks( - self._download_succeeded, - self._download_failed, - callbackArgs=(client_blob_request.blob,), - ) - client_blob_request.finished_deferred.addBoth(self._pay_or_cancel_payment, - reserved_points, client_blob_request.blob) - client_blob_request.finished_deferred.addErrback(_handle_download_error, self.peer, - client_blob_request.blob) - - def _pay_or_cancel_payment(self, arg, reserved_points, blob): - if self._can_pay_peer(blob, arg): - self._pay_peer(blob.length, reserved_points) - else: - self._cancel_points(reserved_points) - return arg - - def _can_pay_peer(self, blob, arg): - return ( - blob.length != 0 and - (not isinstance(arg, Failure) or arg.check(DownloadCanceledError)) - ) - - def _pay_peer(self, num_bytes, reserved_points): - assert num_bytes != 0 - rate = self.get_rate() - point_amount = get_points(num_bytes, rate) - self.wallet.send_points(reserved_points, point_amount) - self.payment_rate_manager.record_points_paid(point_amount) - - def _cancel_points(self, reserved_points): - self.wallet.cancel_point_reservation(reserved_points) - - def create_add_blob_request(self, client_blob_request): - d = self.protocol.add_blob_request(client_blob_request) - # Note: The following two callbacks will be called as soon as the peer sends its - # response, which will be before the blob has finished downloading, but may be - # after the blob has been canceled. For example, - # 1) client sends request to Peer A - # 2) the blob is finished downloading from peer B, and therefore this one is canceled - # 3) client receives response from Peer A - # Therefore, these callbacks shouldn't rely on there being a blob about to be - # downloaded. - d.addCallback(_handle_incoming_blob, self.peer, client_blob_request) - d.addErrback(self._request_failed, "download request") - - def _reserve_points(self, num_bytes): - # jobevers: there was an assertion here, but I don't think it - # was a valid assertion to make. It is possible for a rate to - # not yet been set for this protocol or for it to have been - # removed so instead I switched it to check if a rate has been set - # and calculate it if it has not - rate = self.get_rate() - points_to_reserve = get_points(num_bytes, rate) - return self.wallet.reserve_points(self.peer, points_to_reserve) - - def _download_succeeded(self, arg, blob): - log.info("Blob %s has been successfully downloaded from %s", blob, self.peer) - self.update_local_score(5.0) - self.peer.update_stats('blobs_downloaded', 1) - self.peer.update_score(5.0) - should_announce = blob.blob_hash == self.head_blob_hash - d = self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce) - d.addCallback(lambda _: arg) - return d - - def _download_failed(self, reason): - if not reason.check(DownloadCanceledError, PriceDisagreementError): - self.update_local_score(-10.0) - return reason - - -class BlobDownloadDetails: - """Contains the information needed to make a ClientBlobRequest from an open blob""" - def __init__(self, blob, deferred, write_func, cancel_func, peer): - self.blob = blob - self.deferred = deferred - self.write_func = write_func - self.cancel_func = cancel_func - self.peer = peer - - def counting_write_func(self, data): - self.peer.update_stats('blob_bytes_downloaded', len(data)) - return self.write_func(data) diff --git a/lbrynet/p2p/client/ClientProtocol.py b/lbrynet/p2p/client/ClientProtocol.py deleted file mode 100644 index f941241d2..000000000 --- a/lbrynet/p2p/client/ClientProtocol.py +++ /dev/null @@ -1,284 +0,0 @@ -import json -import logging -from decimal import Decimal -from twisted.internet import error, defer -from twisted.internet.protocol import Protocol, ClientFactory -from twisted.protocols.policies import TimeoutMixin -from twisted.python import failure -from lbrynet import utils -from lbrynet.conf import MAX_RESPONSE_INFO_SIZE -from lbrynet.p2p.Error import ConnectionClosedBeforeResponseError, NoResponseError -from lbrynet.p2p.Error import DownloadCanceledError, MisbehavingPeerError -from lbrynet.p2p.Error import RequestCanceledError - - -log = logging.getLogger(__name__) - - -def encode_decimal(obj): - if isinstance(obj, Decimal): - return float(obj) - raise TypeError(repr(obj) + " is not JSON serializable") - - -class ClientProtocol(Protocol, TimeoutMixin): - #implements(IRequestSender, IRateLimited) - ######### Protocol ######### - PROTOCOL_TIMEOUT = 30 - - def connectionMade(self): - log.debug("Connection made to %s", self.factory.peer) - self._connection_manager = self.factory.connection_manager - self._rate_limiter = self.factory.rate_limiter - self.peer = self.factory.peer - self._response_deferreds = {} - self._response_buff = b'' - self._downloading_blob = False - self._blob_download_request = None - self._next_request = {} - self.connection_closed = False - self.connection_closing = False - # This needs to be set for TimeoutMixin - self.callLater = utils.call_later - self.peer.report_up() - - self._ask_for_request() - - def dataReceived(self, data): - log.debug("Received %d bytes from %s", len(data), self.peer) - self.setTimeout(None) - self._rate_limiter.report_dl_bytes(len(data)) - - if self._downloading_blob is True: - self._blob_download_request.write(data) - else: - self._response_buff += data - if len(self._response_buff) > MAX_RESPONSE_INFO_SIZE: - log.warning("Response is too large from %s. Size %s", - self.peer, len(self._response_buff)) - self.transport.loseConnection() - response, extra_data = self._get_valid_response(self._response_buff) - if response is not None: - self._response_buff = b'' - self._handle_response(response) - if self._downloading_blob is True and len(extra_data) != 0: - self._blob_download_request.write(extra_data) - - def timeoutConnection(self): - log.info("Connection timed out to %s", self.peer) - self.peer.report_down() - self.transport.abortConnection() - - def connectionLost(self, reason=None): - log.debug("Connection lost to %s: %s", self.peer, reason) - self.setTimeout(None) - self.connection_closed = True - if reason is None or reason.check(error.ConnectionDone): - err = failure.Failure(ConnectionClosedBeforeResponseError()) - else: - err = reason - for key, d in self._response_deferreds.items(): - d.errback(err) - self._response_deferreds.clear() - if self._blob_download_request is not None: - self._blob_download_request.cancel(err) - self.factory.connection_was_made_deferred.callback(True) - - ######### IRequestSender ######### - - def add_request(self, request): - if request.response_identifier in self._response_deferreds: - raise ValueError("There is already a request for that response active") - self._next_request.update(request.request_dict) - d = defer.Deferred() - log.debug("Adding a request for %s. Request: %s", self.peer, request.request_dict) - self._response_deferreds[request.response_identifier] = d - return d - - def add_blob_request(self, blob_request): - if self._blob_download_request is None: - d = self.add_request(blob_request) - self._blob_download_request = blob_request - blob_request.finished_deferred.addCallbacks(self._downloading_finished, - self._handle_response_error) - return d - else: - return defer.fail(ValueError("There is already a blob download request active")) - - def cancel_requests(self): - self.connection_closing = True - ds = [] - err = RequestCanceledError() - for key, d in list(self._response_deferreds.items()): - del self._response_deferreds[key] - d.errback(err) - ds.append(d) - if self._blob_download_request is not None: - ds.append(self._blob_download_request.finished_deferred) - self._blob_download_request.cancel(err) - self._blob_download_request = None - self._downloading_blob = False - return defer.DeferredList(ds) - - ######### Internal request handling ######### - - def _handle_request_error(self, err): - log.error("An unexpected error occurred creating or sending a request to %s. %s: %s", - self.peer, err.type, err) - self.transport.loseConnection() - - def _ask_for_request(self): - if self.connection_closed is True or self.connection_closing is True: - return - - def send_request_or_close(do_request): - if do_request is True: - request_msg, self._next_request = self._next_request, {} - self._send_request_message(request_msg) - else: - # The connection manager has indicated that this connection should be terminated - log.debug("Closing the connection to %s due to having no further requests to send", - self.peer) - self.peer.report_success() - self.transport.loseConnection() - d = self._connection_manager.get_next_request(self.peer, self) - d.addCallback(send_request_or_close) - d.addErrback(self._handle_request_error) - - def _send_request_message(self, request_msg): - self.setTimeout(self.PROTOCOL_TIMEOUT) - # TODO: compare this message to the last one. If they're the same, - # TODO: incrementally delay this message. - m = json.dumps(request_msg, default=encode_decimal).encode() - self.transport.write(m) - - def _get_valid_response(self, response_msg): - extra_data = None - response = None - curr_pos = 0 - while 1: - next_close_paren = response_msg.find(b'}', curr_pos) - if next_close_paren != -1: - curr_pos = next_close_paren + 1 - try: - response = json.loads(response_msg[:curr_pos]) - except ValueError: - pass - else: - extra_data = response_msg[curr_pos:] - break - else: - break - return response, extra_data - - def _handle_response_error(self, err): - # If an error gets to this point, log it and kill the connection. - if err.check(DownloadCanceledError, RequestCanceledError, error.ConnectionAborted, - ConnectionClosedBeforeResponseError): - # TODO: (wish-list) it seems silly to close the connection over this, and it shouldn't - # TODO: always be this way. it's done this way now because the client has no other way - # TODO: of telling the server it wants the download to stop. It would be great if the - # TODO: protocol had such a mechanism. - log.info("Closing the connection to %s because the download of blob %s was canceled", - self.peer, self._blob_download_request.blob) - result = None - elif err.check(MisbehavingPeerError): - log.warning("The connection to %s is closing due to: %s", self.peer, err) - result = err - else: - log.error("The connection to %s is closing due to an unexpected error: %s", - self.peer, err) - result = err - self._blob_download_request = None - self._downloading_blob = False - self.transport.loseConnection() - return result - - def _handle_response(self, response): - ds = [] - log.debug( - "Handling a response from %s. Expected responses: %s. Actual responses: %s", - self.peer, self._response_deferreds.keys(), response.keys()) - for key, val in response.items(): - if key in self._response_deferreds: - d = self._response_deferreds.pop(key) - d.callback({key: val}) - ds.append(d) - for k, d in self._response_deferreds.items(): - del self._response_deferreds[k] - d.errback(failure.Failure(NoResponseError())) - ds.append(d) - - if self._blob_download_request is not None: - self._downloading_blob = True - d = self._blob_download_request.finished_deferred - d.addErrback(self._handle_response_error) - ds.append(d) - - # TODO: are we sure we want to consume errors here - dl = defer.DeferredList(ds, consumeErrors=True) - - def get_next_request(results): - failed = False - for success, result in results: - if success is False: - failed = True - if not isinstance(result.value, DownloadCanceledError): - log.info(result.value) - log.info("The connection to %s is closing due to an error: %s", - self.peer, result.getTraceback()) - - self.peer.report_down() - if failed is False: - log.debug("Asking for another request from %s", self.peer) - self._ask_for_request() - else: - log.warning("Not asking for another request from %s", self.peer) - self.transport.loseConnection() - - dl.addCallback(get_next_request) - - def _downloading_finished(self, arg): - log.debug("The blob has finished downloading from %s", self.peer) - self._blob_download_request = None - self._downloading_blob = False - return arg - - ######### IRateLimited ######### - - def throttle_upload(self): - pass - - def unthrottle_upload(self): - pass - - def throttle_download(self): - self.transport.pauseProducing() - - def unthrottle_download(self): - self.transport.resumeProducing() - - -class ClientProtocolFactory(ClientFactory): - protocol = ClientProtocol - - def __init__(self, peer, rate_limiter, connection_manager): - self.peer = peer - self.rate_limiter = rate_limiter - self.connection_manager = connection_manager - self.p = None - # This defer fires and returns True when connection was - # made and completed, or fires and returns False if - # connection failed - self.connection_was_made_deferred = defer.Deferred() - - def clientConnectionFailed(self, connector, reason): - log.debug("Connection failed to %s: %s", self.peer, reason) - self.peer.report_down() - self.connection_was_made_deferred.callback(False) - - def buildProtocol(self, addr): - p = self.protocol() - p.factory = self - self.p = p - return p diff --git a/lbrynet/p2p/client/ClientRequest.py b/lbrynet/p2p/client/ClientRequest.py deleted file mode 100644 index a485a9980..000000000 --- a/lbrynet/p2p/client/ClientRequest.py +++ /dev/null @@ -1,27 +0,0 @@ -from lbrynet.blob.blob_file import MAX_BLOB_SIZE - - -class ClientRequest: - def __init__(self, request_dict, response_identifier=None): - self.request_dict = request_dict - self.response_identifier = response_identifier - - -class ClientPaidRequest(ClientRequest): - def __init__(self, request_dict, response_identifier, max_pay_units): - super().__init__(request_dict, response_identifier) - self.max_pay_units = max_pay_units - - -class ClientBlobRequest(ClientPaidRequest): - def __init__(self, request_dict, response_identifier, write_func, finished_deferred, - cancel_func, blob): - if blob.length is None: - max_pay_units = MAX_BLOB_SIZE - else: - max_pay_units = blob.length - super().__init__(request_dict, response_identifier, max_pay_units) - self.write = write_func - self.finished_deferred = finished_deferred - self.cancel = cancel_func - self.blob = blob diff --git a/lbrynet/p2p/client/StandaloneBlobDownloader.py b/lbrynet/p2p/client/StandaloneBlobDownloader.py deleted file mode 100644 index 07a95ade1..000000000 --- a/lbrynet/p2p/client/StandaloneBlobDownloader.py +++ /dev/null @@ -1,145 +0,0 @@ -import logging -from lbrynet.conf import Config -from lbrynet.p2p.BlobInfo import BlobInfo -from lbrynet.p2p.client.BlobRequester import BlobRequester -from lbrynet.p2p.client.ConnectionManager import ConnectionManager -from lbrynet.p2p.client.DownloadManager import DownloadManager -from lbrynet.p2p.Error import InvalidBlobHashError, DownloadSDTimeout -from lbrynet.blob.blob_file import is_valid_blobhash -from lbrynet.utils import safe_start_looping_call, safe_stop_looping_call -from twisted.python.failure import Failure -from twisted.internet import defer -from twisted.internet.task import LoopingCall - -log = logging.getLogger(__name__) - - -class SingleBlobMetadataHandler: - #implements(interfaces.IMetadataHandler) - - def __init__(self, blob_hash, download_manager): - self.blob_hash = blob_hash - self.download_manager = download_manager - - ######## IMetadataHandler ######### - - def get_initial_blobs(self): - log.debug("Returning the blob info") - return defer.succeed([BlobInfo(self.blob_hash, 0, None)]) - - def final_blob_num(self): - return 0 - - -class SingleProgressManager: - def __init__(self, download_manager, finished_callback, timeout_callback, timeout): - self.finished_callback = finished_callback - self.timeout_callback = timeout_callback - self.download_manager = download_manager - - self.timeout = timeout - self.timeout_counter = 0 - self.checker = LoopingCall(self._check_if_finished) - - def start(self): - safe_start_looping_call(self.checker, 1) - return defer.succeed(True) - - def stop(self): - safe_stop_looping_call(self.checker) - return defer.succeed(True) - - def _check_if_finished(self): - if self.stream_position() == 1: - blob_downloaded = self.download_manager.blobs[0] - log.debug("The blob %s has been downloaded. Calling the finished callback", - str(blob_downloaded)) - safe_stop_looping_call(self.checker) - self.finished_callback(blob_downloaded) - elif self.timeout is not None: - self.timeout_counter += 1 - if self.timeout_counter >= self.timeout: - safe_stop_looping_call(self.checker) - self.timeout_callback() - - def stream_position(self): - blobs = self.download_manager.blobs - if blobs and blobs[0].get_is_verified(): - return 1 - return 0 - - def needed_blobs(self): - blobs = self.download_manager.blobs - assert len(blobs) == 1 - return [b for b in blobs.values() if not b.get_is_verified()] - - -class DummyBlobHandler: - def __init__(self): - pass - - def handle_blob(self, blob, blob_info): - pass - - -class StandaloneBlobDownloader: - def __init__(self, conf: Config, blob_hash, blob_manager, peer_finder, - rate_limiter, payment_rate_manager, wallet, - timeout=None): - self.conf = conf - self.blob_hash = blob_hash - self.blob_manager = blob_manager - self.peer_finder = peer_finder - self.rate_limiter = rate_limiter - self.payment_rate_manager = payment_rate_manager - self.wallet = wallet - self.timeout = timeout - self.download_manager = None - self.finished_deferred = None - - def download(self): - if not is_valid_blobhash(self.blob_hash): - return defer.fail(Failure(InvalidBlobHashError(self.blob_hash))) - - def cancel_download(d): - self.stop() - - self.finished_deferred = defer.Deferred(canceller=cancel_download) - self.download_manager = DownloadManager(self.blob_manager) - self.download_manager.blob_requester = BlobRequester(self.blob_manager, self.peer_finder, - self.payment_rate_manager, self.wallet, - self.download_manager) - self.download_manager.blob_info_finder = SingleBlobMetadataHandler(self.blob_hash, - self.download_manager) - self.download_manager.progress_manager = SingleProgressManager(self.download_manager, - self._blob_downloaded, - self._download_timedout, - self.timeout) - self.download_manager.blob_handler = DummyBlobHandler() - self.download_manager.wallet_info_exchanger = self.wallet.get_info_exchanger() - self.download_manager.connection_manager = ConnectionManager( - self, self.rate_limiter, - [self.download_manager.blob_requester], - [self.download_manager.wallet_info_exchanger] - ) - d = self.download_manager.start_downloading() - d.addCallback(lambda _: self.finished_deferred) - return d - - def stop(self): - return self.download_manager.stop_downloading() - - def _blob_downloaded(self, blob): - self.stop() - if not self.finished_deferred.called: - self.finished_deferred.callback(blob) - - def _download_timedout(self): - self.stop() - if not self.finished_deferred.called: - self.finished_deferred.errback(DownloadSDTimeout(self.blob_hash)) - - def insufficient_funds(self, err): - self.stop() - if not self.finished_deferred.called: - self.finished_deferred.errback(err) diff --git a/lbrynet/p2p/server/BlobAvailabilityHandler.py b/lbrynet/p2p/server/BlobAvailabilityHandler.py deleted file mode 100644 index 05cfa8287..000000000 --- a/lbrynet/p2p/server/BlobAvailabilityHandler.py +++ /dev/null @@ -1,49 +0,0 @@ -import logging - - -log = logging.getLogger(__name__) - - -class BlobAvailabilityHandlerFactory: - # implements(IQueryHandlerFactory) - - def __init__(self, blob_manager): - self.blob_manager = blob_manager - - ######### IQueryHandlerFactory ######### - - def build_query_handler(self): - q_h = BlobAvailabilityHandler(self.blob_manager) - return q_h - - def get_primary_query_identifier(self): - return 'requested_blobs' - - def get_description(self): - return "Blob Availability - blobs that are available to be uploaded" - - -class BlobAvailabilityHandler: - #implements(IQueryHandler) - - def __init__(self, blob_manager): - self.blob_manager = blob_manager - self.query_identifiers = ['requested_blobs'] - - ######### IQueryHandler ######### - - def register_with_request_handler(self, request_handler, peer): - request_handler.register_query_handler(self, self.query_identifiers) - - def handle_queries(self, queries): - if self.query_identifiers[0] in queries: - log.info("Received the client's list of requested blobs") - available_blobs = self._get_available_blobs(queries[self.query_identifiers[0]]) - log.debug("available blobs: %s", str(available_blobs)) - return {'available_blobs': available_blobs} - return {} - - ######### internal ######### - - def _get_available_blobs(self, requested_blobs): - return self.blob_manager.completed_blobs(requested_blobs) diff --git a/lbrynet/p2p/server/BlobRequestHandler.py b/lbrynet/p2p/server/BlobRequestHandler.py deleted file mode 100644 index 1bdc92954..000000000 --- a/lbrynet/p2p/server/BlobRequestHandler.py +++ /dev/null @@ -1,221 +0,0 @@ -import logging - -from twisted.internet import defer -from twisted.protocols.basic import FileSender -from twisted.python.failure import Failure - -from lbrynet.extras.daemon import analytics -from lbrynet.p2p.Offer import Offer - -log = logging.getLogger(__name__) - - -class BlobRequestHandlerFactory: - #implements(IQueryHandlerFactory) - - def __init__(self, blob_manager, wallet, payment_rate_manager, analytics_manager): - self.blob_manager = blob_manager - self.wallet = wallet - self.payment_rate_manager = payment_rate_manager - self.analytics_manager = analytics_manager - - ######### IQueryHandlerFactory ######### - - def build_query_handler(self): - q_h = BlobRequestHandler( - self.blob_manager, self.wallet, self.payment_rate_manager, self.analytics_manager) - return q_h - - def get_primary_query_identifier(self): - return 'requested_blob' - - def get_description(self): - return "Blob Uploader - uploads blobs" - - -class BlobRequestHandler: - #implements(IQueryHandler, IBlobSender) - PAYMENT_RATE_QUERY = 'blob_data_payment_rate' - BLOB_QUERY = 'requested_blob' - AVAILABILITY_QUERY = 'requested_blobs' - - def __init__(self, blob_manager, wallet, payment_rate_manager, analytics_manager): - self.blob_manager = blob_manager - self.payment_rate_manager = payment_rate_manager - self.wallet = wallet - self.query_identifiers = [self.PAYMENT_RATE_QUERY, self.BLOB_QUERY, self.AVAILABILITY_QUERY] - self.analytics_manager = analytics_manager - self.peer = None - self.blob_data_payment_rate = None - self.read_handle = None - self.currently_uploading = None - self.file_sender = None - self.blob_bytes_uploaded = 0 - self._blobs_requested = [] - - ######### IQueryHandler ######### - - def register_with_request_handler(self, request_handler, peer): - self.peer = peer - request_handler.register_query_handler(self, self.query_identifiers) - request_handler.register_blob_sender(self) - - def handle_queries(self, queries): - response = {} - log.debug("Handle query: %s", str(queries)) - - if self.AVAILABILITY_QUERY in queries: - self._blobs_requested = queries[self.AVAILABILITY_QUERY] - self._reply_to_availability(response, self._blobs_requested) - if self.PAYMENT_RATE_QUERY in queries: - offered_rate = queries[self.PAYMENT_RATE_QUERY] - offer = Offer(offered_rate) - if offer.rate is None: - log.warning("Empty rate offer") - self._handle_payment_rate_query(offer, response) - if self.BLOB_QUERY in queries: - incoming = queries[self.BLOB_QUERY] - self._reply_to_send_request(response, incoming) - return response - - ######### IBlobSender ######### - - def send_blob_if_requested(self, consumer): - if self.currently_uploading is not None: - return self.send_file(consumer) - return defer.succeed(True) - - def cancel_send(self, err): - if self.currently_uploading is not None: - self.read_handle.close() - self.read_handle = None - self.currently_uploading = None - return err - - ######### internal ######### - - def _reply_to_availability(self, request, blobs): - available_blobs = self._get_available_blobs(blobs) - log.debug("available blobs: %s", str(available_blobs)) - request.update({'available_blobs': available_blobs}) - return request - - def _handle_payment_rate_query(self, offer, request): - blobs = self._blobs_requested - log.debug("Offered rate %f LBC/mb for %i blobs", offer.rate, len(blobs)) - reply = self.payment_rate_manager.reply_to_offer(self.peer, blobs, offer) - if reply.is_accepted: - self.blob_data_payment_rate = offer.rate - request[self.PAYMENT_RATE_QUERY] = "RATE_ACCEPTED" - log.debug("Accepted rate: %f", offer.rate) - elif reply.is_too_low: - request[self.PAYMENT_RATE_QUERY] = "RATE_TOO_LOW" - log.debug("Reject rate: %f", offer.rate) - elif reply.is_unset: - log.warning("Rate unset") - request['incoming_blob'] = {'error': 'RATE_UNSET'} - log.debug("Returning rate query result: %s", str(request)) - - return request - - def _handle_blob_query(self, response, query): - log.debug("Received the client's request to send a blob") - response['incoming_blob'] = {} - - if self.blob_data_payment_rate is None: - response['incoming_blob'] = {'error': "RATE_UNSET"} - return response - else: - return self._send_blob(response, query) - - def _send_blob(self, response, query): - d = self.blob_manager.get_blob(query) - d.addCallback(self.open_blob_for_reading, response) - return d - - def open_blob_for_reading(self, blob, response): - response_fields = {} - d = defer.succeed(None) - if blob.get_is_verified(): - read_handle = blob.open_for_reading() - if read_handle is not None: - self.currently_uploading = blob - self.read_handle = read_handle - log.info("Sending %s to %s", str(blob), self.peer) - response_fields['blob_hash'] = blob.blob_hash - response_fields['length'] = blob.length - response['incoming_blob'] = response_fields - d.addCallback(lambda _: response) - return d - log.debug("We can not send %s", str(blob)) - response['incoming_blob'] = {'error': 'BLOB_UNAVAILABLE'} - d.addCallback(lambda _: response) - return d - - def _reply_to_send_request(self, response, incoming): - response_fields = {} - response['incoming_blob'] = response_fields - - if self.blob_data_payment_rate is None: - log.debug("Rate not set yet") - response['incoming_blob'] = {'error': 'RATE_UNSET'} - return defer.succeed(response) - else: - log.debug("Requested blob: %s", str(incoming)) - d = self.blob_manager.get_blob(incoming) - d.addCallback(lambda blob: self.open_blob_for_reading(blob, response)) - return d - - def _get_available_blobs(self, requested_blobs): - return self.blob_manager.completed_blobs(requested_blobs) - - def send_file(self, consumer): - - def _send_file(): - inner_d = start_transfer() - # TODO: if the transfer fails, check if it's because the connection was cut off. - # TODO: if so, perhaps bill the client - inner_d.addCallback(lambda _: set_expected_payment()) - inner_d.addBoth(set_not_uploading) - return inner_d - - def count_bytes(data): - uploaded = len(data) - self.blob_bytes_uploaded += uploaded - self.peer.update_stats('blob_bytes_uploaded', uploaded) - if self.analytics_manager is not None: - self.analytics_manager.add_observation(analytics.BLOB_BYTES_UPLOADED, uploaded) - return data - - def start_transfer(): - self.file_sender = FileSender() - log.debug("Starting the file upload") - assert self.read_handle is not None, \ - "self.read_handle was None when trying to start the transfer" - d = self.file_sender.beginFileTransfer(self.read_handle, consumer, count_bytes) - return d - - def set_expected_payment(): - log.debug("Setting expected payment") - if ( - self.blob_bytes_uploaded != 0 and self.blob_data_payment_rate is not None - and self.blob_data_payment_rate > 0 - ): - # TODO: explain why 2**20 - self.wallet.add_expected_payment(self.peer, - self.currently_uploading.length * 1.0 * - self.blob_data_payment_rate / 2 ** 20) - self.blob_bytes_uploaded = 0 - self.peer.update_stats('blobs_uploaded', 1) - return None - - def set_not_uploading(reason=None): - if self.currently_uploading is not None: - self.read_handle.close() - self.read_handle = None - self.currently_uploading = None - self.file_sender = None - if reason is not None and isinstance(reason, Failure): - log.warning("Upload has failed. Reason: %s", reason.getErrorMessage()) - - return _send_file() diff --git a/lbrynet/p2p/server/ServerProtocol.py b/lbrynet/p2p/server/ServerProtocol.py deleted file mode 100644 index 66c6b4b97..000000000 --- a/lbrynet/p2p/server/ServerProtocol.py +++ /dev/null @@ -1,92 +0,0 @@ -import logging -from twisted.internet import error -from twisted.internet.protocol import Protocol, ServerFactory -from twisted.python import failure -from lbrynet.p2p.server.ServerRequestHandler import ServerRequestHandler - - -log = logging.getLogger(__name__) - - -class ServerProtocol(Protocol): - """ServerProtocol needs to: - - 1) Receive requests from its transport - 2) Pass those requests on to its request handler - 3) Tell the request handler to pause/resume producing - 4) Tell its transport to pause/resume producing - 5) Hang up when the request handler is done producing - 6) Tell the request handler to stop producing if the connection is lost - 7) Upon creation, register with the rate limiter - 8) Upon connection loss, unregister with the rate limiter - 9) Report all uploaded and downloaded bytes to the rate limiter - 10) Pause/resume production when told by the rate limiter - """ - - #implements(interfaces.IConsumer) - - #Protocol stuff - - def connectionMade(self): - log.debug("Got a connection") - peer_info = self.transport.getPeer() - self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port) - self.request_handler = ServerRequestHandler(self) - for query_handler_factory in self.factory.query_handler_factories.values(): - query_handler = query_handler_factory.build_query_handler() - query_handler.register_with_request_handler(self.request_handler, self.peer) - log.debug("Setting the request handler") - self.factory.rate_limiter.register_protocol(self) - - def connectionLost(self, reason=failure.Failure(error.ConnectionDone())): - if self.request_handler is not None: - self.request_handler.stopProducing() - self.factory.rate_limiter.unregister_protocol(self) - if not reason.check(error.ConnectionDone): - log.warning("Closing a connection. Reason: %s", reason.getErrorMessage()) - - def dataReceived(self, data): - log.debug("Receiving %s bytes of data from the transport", str(len(data))) - self.factory.rate_limiter.report_dl_bytes(len(data)) - if self.request_handler is not None: - self.request_handler.data_received(data) - - #IConsumer stuff - - def registerProducer(self, producer, streaming): - log.debug("Registering the producer") - assert streaming is True - - def unregisterProducer(self): - self.request_handler = None - self.transport.loseConnection() - - def write(self, data): - log.trace("Writing %s bytes of data to the transport", len(data)) - self.transport.write(data) - self.factory.rate_limiter.report_ul_bytes(len(data)) - - #Rate limiter stuff - - def throttle_upload(self): - if self.request_handler is not None: - self.request_handler.pauseProducing() - - def unthrottle_upload(self): - if self.request_handler is not None: - self.request_handler.resumeProducing() - - def throttle_download(self): - self.transport.pauseProducing() - - def unthrottle_download(self): - self.transport.resumeProducing() - - -class ServerProtocolFactory(ServerFactory): - protocol = ServerProtocol - - def __init__(self, rate_limiter, query_handler_factories, peer_manager): - self.rate_limiter = rate_limiter - self.query_handler_factories = query_handler_factories - self.peer_manager = peer_manager diff --git a/lbrynet/p2p/server/ServerRequestHandler.py b/lbrynet/p2p/server/ServerRequestHandler.py deleted file mode 100644 index b969515c2..000000000 --- a/lbrynet/p2p/server/ServerRequestHandler.py +++ /dev/null @@ -1,184 +0,0 @@ -import json -import logging -from twisted.internet import defer - - -log = logging.getLogger(__name__) - - -class ServerRequestHandler: - """This class handles requests from clients. It can upload blobs and - return request for information about more blobs that are - associated with streams. - """ - #implements(interfaces.IPushProducer, interfaces.IConsumer, IRequestHandler) - - def __init__(self, consumer): - self.consumer = consumer - self.production_paused = False - self.request_buff = b'' - self.response_buff = b'' - self.producer = None - self.request_received = False - self.CHUNK_SIZE = 2**14 - self.query_handlers = {} # {IQueryHandler: [query_identifiers]} - self.blob_sender = None - self.consumer.registerProducer(self, True) - - #IPushProducer stuff - - def pauseProducing(self): - self.production_paused = True - - def stopProducing(self): - if self.producer is not None: - self.producer.stopProducing() - self.producer = None - self.production_paused = True - self.consumer.unregisterProducer() - - def resumeProducing(self): - - from twisted.internet import reactor - - self.production_paused = False - self._produce_more() - if self.producer is not None: - reactor.callLater(0, self.producer.resumeProducing) - - def _produce_more(self): - - from twisted.internet import reactor - - if self.production_paused: - return - chunk = self.response_buff[:self.CHUNK_SIZE] - self.response_buff = self.response_buff[self.CHUNK_SIZE:] - if chunk == b'': - return - log.trace("writing %s bytes to the client", len(chunk)) - self.consumer.write(chunk) - reactor.callLater(0, self._produce_more) - - #IConsumer stuff - - def registerProducer(self, producer, streaming): - self.producer = producer - assert streaming is False - producer.resumeProducing() - - def unregisterProducer(self): - self.producer = None - - def write(self, data): - - from twisted.internet import reactor - - self.response_buff = self.response_buff + data - self._produce_more() - - def get_more_data(): - if self.producer is not None: - log.trace("Requesting more data from the producer") - self.producer.resumeProducing() - - reactor.callLater(0, get_more_data) - - #From Protocol - - def data_received(self, data): - log.debug("Received data") - log.debug("%s", str(data)) - if self.request_received is False: - return self._parse_data_and_maybe_send_blob(data) - else: - log.warning( - "The client sent data when we were uploading a file. This should not happen") - - def _parse_data_and_maybe_send_blob(self, data): - self.request_buff = self.request_buff + data - msg = self.try_to_parse_request(self.request_buff) - if msg: - self.request_buff = b'' - self._process_msg(msg) - else: - log.debug("Request buff not a valid json message") - log.debug("Request buff: %s", self.request_buff) - - def _process_msg(self, msg): - d = self.handle_request(msg) - if self.blob_sender: - d.addCallback(lambda _: self.blob_sender.send_blob_if_requested(self)) - d.addCallbacks(lambda _: self.finished_response(), self.request_failure_handler) - - - ######### IRequestHandler ######### - - def register_query_handler(self, query_handler, query_identifiers): - self.query_handlers[query_handler] = query_identifiers - - def register_blob_sender(self, blob_sender): - self.blob_sender = blob_sender - - #response handling - - def request_failure_handler(self, err): - log.warning("An error occurred handling a request. Error: %s", err.getErrorMessage()) - self.stopProducing() - return err - - def finished_response(self): - self.request_received = False - self._produce_more() - - def send_response(self, msg): - m = json.dumps(msg).encode() - log.debug("Sending a response of length %s", str(len(m))) - log.debug("Response: %s", str(m)) - self.response_buff = self.response_buff + m - self._produce_more() - return True - - def handle_request(self, msg): - log.debug("Handling a request") - log.debug(str(msg)) - - def create_response_message(results): - response = {} - for success, result in results: - if success is True: - response.update(result) - else: - # result is a Failure - return result - log.debug("Finished making the response message. Response: %s", str(response)) - return response - - def log_errors(err): - log.warning( - "An error occurred handling a client request. Error message: %s", - err.getErrorMessage()) - return err - - def send_response(response): - self.send_response(response) - return True - - ds = [] - for query_handler, query_identifiers in self.query_handlers.items(): - queries = {q_i: msg[q_i] for q_i in query_identifiers if q_i in msg} - d = defer.maybeDeferred(query_handler.handle_queries(queries)) - d.addErrback(log_errors) - ds.append(d) - - dl = defer.DeferredList(ds) - dl.addCallback(create_response_message) - dl.addCallback(send_response) - return dl - - def try_to_parse_request(self, request_buff): - try: - msg = json.loads(request_buff) - return msg - except ValueError: - return None diff --git a/lbrynet/p2p/server/__init__.py b/lbrynet/p2p/server/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/integration/wallet/test_commands.py b/tests/integration/wallet/test_commands.py index 87efd26b1..2e00a6f93 100644 --- a/tests/integration/wallet/test_commands.py +++ b/tests/integration/wallet/test_commands.py @@ -60,7 +60,7 @@ class CommandTestCase(IntegrationTestCase): twisted.internet.reactor = sys.modules['twisted.internet.reactor'] = AsyncioSelectorReactor() - logging.getLogger('lbrynet.p2p').setLevel(self.VERBOSITY) + logging.getLogger('lbrynet.blob_exchange').setLevel(self.VERBOSITY) logging.getLogger('lbrynet.daemon').setLevel(self.VERBOSITY) conf = Config() diff --git a/lbrynet/p2p/client/__init__.py b/tests/unit/blob_exchange/__init__.py similarity index 100% rename from lbrynet/p2p/client/__init__.py rename to tests/unit/blob_exchange/__init__.py diff --git a/tests/unit/blob_exchange/test_transfer_blob.py b/tests/unit/blob_exchange/test_transfer_blob.py new file mode 100644 index 000000000..2ed37635d --- /dev/null +++ b/tests/unit/blob_exchange/test_transfer_blob.py @@ -0,0 +1,71 @@ +import asyncio +import tempfile +import shutil +import os +from torba.testcase import AsyncioTestCase +from lbrynet.extras.daemon.storage import SQLiteStorage +from lbrynet.blob.blob_manager import BlobFileManager +from lbrynet.blob_exchange.server import BlobServer +from lbrynet.blob_exchange.client import BlobExchangeClientProtocol, request_blob +from lbrynet.dht.peer import KademliaPeer, PeerManager + +# import logging +# logging.getLogger("lbrynet").setLevel(logging.DEBUG) + + +class BlobExchangeTestBase(AsyncioTestCase): + async def asyncSetUp(self): + self.loop = asyncio.get_event_loop() + + self.client_dir = tempfile.mkdtemp() + self.server_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.client_dir) + self.addCleanup(shutil.rmtree, self.server_dir) + + self.server_storage = SQLiteStorage(os.path.join(self.server_dir, "lbrynet.sqlite")) + self.server_blob_manager = BlobFileManager(self.loop, self.server_dir, self.server_storage) + self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP') + + self.client_storage = SQLiteStorage(os.path.join(self.client_dir, "lbrynet.sqlite")) + self.client_blob_manager = BlobFileManager(self.loop, self.client_dir, self.client_storage) + self.client_peer_manager = PeerManager(self.loop) + self.server_from_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) + + await self.client_storage.open() + await self.server_storage.open() + await self.client_blob_manager.setup() + await self.server_blob_manager.setup() + + self.server.start_server(33333, '127.0.0.1') + await self.server.started_listening.wait() + + +class TestBlobExchange(BlobExchangeTestBase): + async def _test_transfer_blob(self, blob_hash: str, blob_bytes: bytes): + # add the blob on the server + server_blob = self.server_blob_manager.get_blob(blob_hash, len(blob_bytes)) + writer = server_blob.open_for_writing() + writer.write(blob_bytes) + await server_blob.finished_writing.wait() + self.assertTrue(os.path.isfile(server_blob.file_path)) + self.assertEqual(server_blob.get_is_verified(), True) + + client_blob = self.client_blob_manager.get_blob(blob_hash) + protocol = BlobExchangeClientProtocol(self.loop, 2) + + # download the blob + downloaded = await request_blob(self.loop, client_blob, protocol, self.server_from_client.address, + self.server_from_client.tcp_port, 2) + await protocol.close() + self.assertEqual(client_blob.get_is_verified(), True) + self.assertTrue(downloaded) + + async def test_transfer_sd_blob(self): + sd_hash = "3e2706157a59aaa47ef52bc264fce488078b4026c0b9bab649a8f2fe1ecc5e5cad7182a2bb7722460f856831a1ac0f02" + mock_sd_blob_bytes = b"""{"blobs": [{"blob_hash": "6f53c72de100f6f007aa1b9720632e2d049cc6049e609ad790b556dba262159f739d5a14648d5701afc84b991254206a", "blob_num": 0, "iv": "3b6110c2d8e742bff66e4314863dee7e", "length": 2097152}, {"blob_hash": "18493bc7c5164b00596153859a0faffa45765e47a6c3f12198a4f7be4658111505b7f8a15ed0162306a0672c4a9b505d", "blob_num": 1, "iv": "df973fa64e73b4ff2677d682cdc32d3e", "length": 2097152}, {"blob_num": 2, "iv": "660d2dc2645da7c7d4540a466fcb0c60", "length": 0}], "key": "6465616462656566646561646265656664656164626565666465616462656566", "stream_hash": "22423c6786584974bd6b462af47ecb03e471da0ef372fe85a4e71a78bef7560c4afb0835c689f03916105404653b7bdf", "stream_name": "746573745f66696c65", "stream_type": "lbryfile", "suggested_file_name": "746573745f66696c65"}""" + return await self._test_transfer_blob(sd_hash, mock_sd_blob_bytes) + + async def test_transfer_blob(self): + blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed" + mock_blob_bytes = b'1' * ((2 * 2 ** 20) - 1) + return await self._test_transfer_blob(blob_hash, mock_blob_bytes)