lbry-sdk/lbry/blob_exchange/server.py

195 lines
9 KiB
Python
Raw Permalink Normal View History

2019-01-22 18:52:32 +01:00
import asyncio
import binascii
import logging
import socket
2019-01-22 18:52:32 +01:00
import typing
from json.decoder import JSONDecodeError
2019-06-21 02:55:47 +02:00
from lbry.blob_exchange.serialization import BlobResponse, BlobRequest, blob_response_types
from lbry.blob_exchange.serialization import BlobAvailabilityResponse, BlobPriceResponse, BlobDownloadResponse, \
2019-01-22 18:52:32 +01:00
BlobPaymentAddressResponse
if typing.TYPE_CHECKING:
2019-06-21 02:55:47 +02:00
from lbry.blob.blob_manager import BlobManager
2019-01-22 18:52:32 +01:00
log = logging.getLogger(__name__)
2019-08-12 04:26:21 +02:00
# a standard request will be 295 bytes
MAX_REQUEST_SIZE = 1200
2019-01-22 18:52:32 +01:00
2019-01-25 19:10:40 +01:00
class BlobServerProtocol(asyncio.Protocol):
def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str,
idle_timeout: float = 30.0, transfer_timeout: float = 60.0):
2019-01-22 18:52:32 +01:00
self.loop = loop
self.blob_manager = blob_manager
self.idle_timeout = idle_timeout
self.transfer_timeout = transfer_timeout
self.server_task: typing.Optional[asyncio.Task] = None
2019-01-22 18:52:32 +01:00
self.started_listening = asyncio.Event(loop=self.loop)
self.buf = b''
self.transport: typing.Optional[asyncio.Transport] = None
2019-01-22 18:52:32 +01:00
self.lbrycrd_address = lbrycrd_address
2019-06-03 05:50:17 +02:00
self.peer_address_and_port: typing.Optional[str] = None
self.started_transfer = asyncio.Event(loop=self.loop)
self.transfer_finished = asyncio.Event(loop=self.loop)
self.close_on_idle_task: typing.Optional[asyncio.Task] = None
async def close_on_idle(self):
while self.transport:
try:
await asyncio.wait_for(self.started_transfer.wait(), self.idle_timeout, loop=self.loop)
except asyncio.TimeoutError:
log.debug("closing idle connection from %s", self.peer_address_and_port)
return self.close()
self.started_transfer.clear()
await self.transfer_finished.wait()
self.transfer_finished.clear()
2019-01-22 18:52:32 +01:00
2019-08-12 04:26:21 +02:00
def close(self):
if self.transport:
self.transport.close()
2019-01-22 18:52:32 +01:00
def connection_made(self, transport):
self.transport = transport
self.close_on_idle_task = self.loop.create_task(self.close_on_idle())
2019-06-03 05:50:17 +02:00
self.peer_address_and_port = "%s:%i" % self.transport.get_extra_info('peername')
self.blob_manager.connection_manager.connection_received(self.peer_address_and_port)
log.debug("received connection from %s", self.peer_address_and_port)
2019-06-03 05:50:17 +02:00
def connection_lost(self, exc: typing.Optional[Exception]) -> None:
log.debug("lost connection from %s", self.peer_address_and_port)
2019-06-03 05:50:17 +02:00
self.blob_manager.connection_manager.incoming_connection_lost(self.peer_address_and_port)
self.transport = None
if self.close_on_idle_task and not self.close_on_idle_task.done():
self.close_on_idle_task.cancel()
self.close_on_idle_task = None
2019-01-22 18:52:32 +01:00
def send_response(self, responses: typing.List[blob_response_types]):
to_send = []
while responses:
to_send.append(responses.pop())
2019-06-03 05:50:17 +02:00
serialized = BlobResponse(to_send).serialize()
self.transport.write(serialized)
self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, len(serialized))
2019-01-22 18:52:32 +01:00
async def handle_request(self, request: BlobRequest):
addr = self.transport.get_extra_info('peername')
peer_address, peer_port = addr
responses = []
address_request = request.get_address_request()
if address_request:
responses.append(BlobPaymentAddressResponse(lbrycrd_address=self.lbrycrd_address))
availability_request = request.get_availability_request()
if availability_request:
responses.append(BlobAvailabilityResponse(available_blobs=list(set(
2019-01-22 18:52:32 +01:00
filter(lambda blob_hash: blob_hash in self.blob_manager.completed_blob_hashes,
availability_request.requested_blobs)
))))
2019-01-22 18:52:32 +01:00
price_request = request.get_price_request()
if price_request:
responses.append(BlobPriceResponse(blob_data_payment_rate='RATE_ACCEPTED'))
download_request = request.get_blob_request()
if download_request:
blob = self.blob_manager.get_blob(download_request.requested_blob)
if blob.get_is_verified():
incoming_blob = {'blob_hash': blob.blob_hash, 'length': blob.length}
responses.append(BlobDownloadResponse(incoming_blob=incoming_blob))
self.send_response(responses)
2020-01-03 06:07:21 +01:00
blob_hash = blob.blob_hash[:8]
log.debug("send %s to %s:%i", blob_hash, peer_address, peer_port)
self.started_transfer.set()
try:
sent = await asyncio.wait_for(blob.sendfile(self), self.transfer_timeout, loop=self.loop)
2019-12-02 05:05:40 +01:00
if sent and sent > 0:
self.blob_manager.connection_manager.sent_data(self.peer_address_and_port, sent)
2020-01-03 06:07:21 +01:00
log.info("sent %s (%i bytes) to %s:%i", blob_hash, sent, peer_address, peer_port)
2019-12-02 05:05:40 +01:00
else:
self.close()
2020-01-03 06:07:21 +01:00
log.debug("stopped sending %s to %s:%i", blob_hash, peer_address, peer_port)
2020-02-04 04:24:02 +01:00
return
except (OSError, ValueError, asyncio.TimeoutError) as err:
if isinstance(err, asyncio.TimeoutError):
2020-01-03 06:07:21 +01:00
log.debug("timed out sending blob %s to %s", blob_hash, peer_address)
else:
2020-01-03 06:07:21 +01:00
log.warning("could not read blob %s to send %s:%i", blob_hash, peer_address, peer_port)
self.close()
2020-02-04 04:24:02 +01:00
return
finally:
self.transfer_finished.set()
2019-12-02 05:05:40 +01:00
else:
log.info("don't have %s to send %s:%i", blob.blob_hash[:8], peer_address, peer_port)
if responses and not self.transport.is_closing():
2019-01-22 18:52:32 +01:00
self.send_response(responses)
def data_received(self, data):
request = None
2019-08-12 04:26:21 +02:00
if len(self.buf) + len(data or b'') >= MAX_REQUEST_SIZE:
log.warning("request from %s is too large", self.peer_address_and_port)
self.close()
return
2019-01-22 18:52:32 +01:00
if data:
2019-06-03 05:50:17 +02:00
self.blob_manager.connection_manager.received_data(self.peer_address_and_port, len(data))
2020-01-03 06:07:21 +01:00
_, separator, remainder = data.rpartition(b'}')
2019-01-22 18:52:32 +01:00
if not separator:
self.buf += data
return
try:
2019-01-30 16:11:05 +01:00
request = BlobRequest.deserialize(self.buf + data)
2019-01-22 18:52:32 +01:00
self.buf = remainder
2022-05-09 17:34:28 +02:00
except (UnicodeDecodeError, JSONDecodeError):
log.error("request from %s is not valid json (%i bytes): %s", self.peer_address_and_port,
len(self.buf + data), '' if not data else binascii.hexlify(self.buf + data).decode())
2019-08-12 06:25:57 +02:00
self.close()
return
2019-08-12 06:25:57 +02:00
if not request.requests:
log.error("failed to decode request from %s (%i bytes): %s", self.peer_address_and_port,
len(self.buf + data), '' if not data else binascii.hexlify(self.buf + data).decode())
2019-08-12 06:25:57 +02:00
self.close()
2019-01-22 18:52:32 +01:00
return
self.loop.create_task(self.handle_request(request))
2019-01-25 19:10:40 +01:00
class BlobServer:
def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', lbrycrd_address: str,
idle_timeout: float = 30.0, transfer_timeout: float = 60.0):
2019-01-25 19:10:40 +01:00
self.loop = loop
self.blob_manager = blob_manager
2019-06-03 05:50:17 +02:00
self.server_task: typing.Optional[asyncio.Task] = None
2019-01-25 19:10:40 +01:00
self.started_listening = asyncio.Event(loop=self.loop)
self.lbrycrd_address = lbrycrd_address
self.idle_timeout = idle_timeout
self.transfer_timeout = transfer_timeout
2019-02-12 07:08:34 +01:00
self.server_protocol_class = BlobServerProtocol
2019-01-25 19:10:40 +01:00
2019-01-22 18:52:32 +01:00
def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'):
if self.server_task is not None:
raise Exception("already running")
async def _start_server():
# checking if the port is in use
2022-04-05 05:35:48 +02:00
# thx https://stackoverflow.com/a/52872579
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
if s.connect_ex(('localhost', port)) == 0:
# the port is already in use!
2022-04-05 05:35:48 +02:00
log.error("Failed to bind TCP %s:%d", interface, port)
2019-01-25 19:10:40 +01:00
server = await self.loop.create_server(
lambda: self.server_protocol_class(self.loop, self.blob_manager, self.lbrycrd_address,
self.idle_timeout, self.transfer_timeout),
2019-01-25 19:10:40 +01:00
interface, port
)
2019-01-22 18:52:32 +01:00
self.started_listening.set()
log.info("Blob server listening on TCP %s:%i", interface, port)
async with server:
await server.serve_forever()
self.server_task = self.loop.create_task(_start_server())
def stop_server(self):
if self.server_task:
self.server_task.cancel()
self.server_task = None
log.info("Stopped blob server")