lbry-sdk/lbrynet/reflector/server/server.py

414 lines
16 KiB
Python
Raw Normal View History

2016-08-04 05:03:14 +02:00
import logging
import json
2016-08-04 05:03:14 +02:00
from twisted.python import failure
from twisted.internet import error, defer
2016-08-04 05:03:14 +02:00
from twisted.internet.protocol import Protocol, ServerFactory
from lbrynet.core.utils import is_valid_blobhash
from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError, NoSuchSDHash
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader
from lbrynet.lbry_file.StreamDescriptor import save_sd_info
from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2
from lbrynet.reflector.common import ReflectorRequestError, ReflectorClientVersionError
2016-08-04 05:03:14 +02:00
log = logging.getLogger(__name__)
2016-08-04 05:03:14 +02:00
MAXIMUM_QUERY_SIZE = 200
SEND_SD_BLOB = 'send_sd_blob'
SEND_BLOB = 'send_blob'
RECEIVED_SD_BLOB = 'received_sd_blob'
RECEIVED_BLOB = 'received_blob'
NEEDED_BLOBS = 'needed_blobs'
VERSION = 'version'
BLOB_SIZE = 'blob_size'
BLOB_HASH = 'blob_hash'
SD_BLOB_SIZE = 'sd_blob_size'
SD_BLOB_HASH = 'sd_blob_hash'
2016-08-04 05:03:14 +02:00
class ReflectorServer(Protocol):
2016-08-04 05:03:14 +02:00
def connectionMade(self):
peer_info = self.transport.getPeer()
2016-08-11 07:14:21 +02:00
log.debug('Connection made to %s', peer_info)
2016-08-04 05:03:14 +02:00
self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port)
self.blob_manager = self.factory.blob_manager
self.stream_info_manager = self.factory.stream_info_manager
self.lbry_file_manager = self.factory.lbry_file_manager
self.protocol_version = self.factory.protocol_version
2016-08-04 05:03:14 +02:00
self.received_handshake = False
self.peer_version = None
self.receiving_blob = False
self.incoming_blob = None
2016-08-04 05:03:14 +02:00
self.blob_finished_d = None
self.request_buff = ""
self.blob_writer = None
2016-08-04 05:03:14 +02:00
def connectionLost(self, reason=failure.Failure(error.ConnectionDone())):
2016-08-21 05:47:41 +02:00
log.info("Reflector upload from %s finished" % self.peer.host)
2016-08-04 05:03:14 +02:00
def handle_error(self, err):
log.error(err.getTraceback())
self.transport.loseConnection()
def send_response(self, response_dict):
self.transport.write(json.dumps(response_dict))
############################
# Incoming blob file stuff #
############################
def clean_up_failed_upload(self, err, blob):
log.warning("Failed to receive %s", blob)
if err.check(DownloadCanceledError):
self.blob_manager.delete_blobs([blob.blob_hash])
else:
log.exception(err)
@defer.inlineCallbacks
def check_head_blob_announce(self, stream_hash):
blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash)
blob_hash, blob_num, blob_iv, blob_length = blob_infos[0]
if blob_hash in self.blob_manager.blobs:
head_blob = self.blob_manager.blobs[blob_hash]
if head_blob.get_is_verified():
should_announce = yield self.blob_manager.get_should_announce(blob_hash)
if should_announce == 0:
yield self.blob_manager.set_should_announce(blob_hash, 1)
log.info("Discovered previously completed head blob (%s), "
"setting it to be announced", blob_hash[:8])
defer.returnValue(None)
@defer.inlineCallbacks
def check_sd_blob_announce(self, sd_hash):
if sd_hash in self.blob_manager.blobs:
sd_blob = self.blob_manager.blobs[sd_hash]
if sd_blob.get_is_verified():
should_announce = yield self.blob_manager.get_should_announce(sd_hash)
if should_announce == 0:
yield self.blob_manager.set_should_announce(sd_hash, 1)
log.info("Discovered previously completed sd blob (%s), "
"setting it to be announced", sd_hash[:8])
try:
yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash)
except NoSuchSDHash:
log.info("Adding blobs to stream")
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
yield save_sd_info(self.stream_info_manager, sd_info)
yield self.stream_info_manager.save_sd_blob_hash_to_stream(
sd_info['stream_hash'],
sd_hash)
defer.returnValue(None)
@defer.inlineCallbacks
def _on_completed_blob(self, blob, response_key):
should_announce = False
if response_key == RECEIVED_SD_BLOB:
sd_info = yield BlobStreamDescriptorReader(blob).get_info()
yield save_sd_info(self.stream_info_manager, sd_info)
yield self.stream_info_manager.save_sd_blob_hash_to_stream(sd_info['stream_hash'],
blob.blob_hash)
self.lbry_file_manager.add_lbry_file(sd_info['stream_hash'])
should_announce = True
# if we already have the head blob, set it to be announced now that we know it's
# a head blob
d = self.check_head_blob_announce(sd_info['stream_hash'])
else:
d = defer.succeed(None)
stream_hash = yield self.stream_info_manager.get_stream_of_blob(blob.blob_hash)
if stream_hash is not None:
blob_num = yield self.stream_info_manager._get_blob_num_by_hash(stream_hash,
blob.blob_hash)
if blob_num == 0:
should_announce = True
sd_hashes = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(
stream_hash)
# if we already have the sd blob, set it to be announced now that we know it's
# a sd blob
for sd_hash in sd_hashes:
d.addCallback(lambda _: self.check_sd_blob_announce(sd_hash))
yield self.blob_manager.blob_completed(blob, should_announce=should_announce)
yield self.close_blob()
yield d
log.info("Received %s", blob)
yield self.send_response({response_key: True})
@defer.inlineCallbacks
def _on_failed_blob(self, err, response_key):
yield self.clean_up_failed_upload(err, self.incoming_blob)
yield self.send_response({response_key: False})
def handle_incoming_blob(self, response_key):
"""
Open blob for writing and send a response indicating if the transfer was
successful when finished.
response_key will either be received_blob or received_sd_blob
"""
blob = self.incoming_blob
self.blob_writer, self.blob_finished_d = blob.open_for_writing(self.peer)
self.blob_finished_d.addCallback(self._on_completed_blob, response_key)
self.blob_finished_d.addErrback(self._on_failed_blob, response_key)
def close_blob(self):
self.blob_writer.close()
self.blob_writer = None
self.blob_finished_d = None
self.incoming_blob = None
self.receiving_blob = False
####################
# Request handling #
####################
2016-08-04 05:03:14 +02:00
def dataReceived(self, data):
2016-08-11 07:25:45 +02:00
if self.receiving_blob:
self.blob_writer.write(data)
2016-08-11 07:25:45 +02:00
else:
log.debug('Not yet recieving blob, data needs further processing')
2016-08-04 05:03:14 +02:00
self.request_buff += data
msg, extra_data = self._get_valid_response(self.request_buff)
if msg is not None:
2016-08-04 05:03:14 +02:00
self.request_buff = ''
d = self.handle_request(msg)
d.addErrback(self.handle_error)
2016-08-11 07:25:45 +02:00
if self.receiving_blob and extra_data:
2016-08-18 11:36:17 +02:00
log.debug('Writing extra data to blob')
self.blob_writer.write(extra_data)
2016-08-04 05:03:14 +02:00
def _get_valid_response(self, response_msg):
extra_data = None
response = None
curr_pos = 0
while not self.receiving_blob:
next_close_paren = response_msg.find('}', curr_pos)
if next_close_paren != -1:
curr_pos = next_close_paren + 1
try:
response = json.loads(response_msg[:curr_pos])
except ValueError:
if curr_pos > MAXIMUM_QUERY_SIZE:
raise ValueError("Error decoding response: %s" % str(response_msg))
else:
pass
else:
extra_data = response_msg[curr_pos:]
break
else:
break
return response, extra_data
2016-08-04 05:03:14 +02:00
def need_handshake(self):
return self.received_handshake is False
def is_descriptor_request(self, request_dict):
if SD_BLOB_HASH not in request_dict or SD_BLOB_SIZE not in request_dict:
return False
if not is_valid_blobhash(request_dict[SD_BLOB_HASH]):
raise InvalidBlobHashError(request_dict[SD_BLOB_HASH])
return True
def is_blob_request(self, request_dict):
if BLOB_HASH not in request_dict or BLOB_SIZE not in request_dict:
return False
if not is_valid_blobhash(request_dict[BLOB_HASH]):
raise InvalidBlobHashError(request_dict[BLOB_HASH])
return True
2016-08-04 05:03:14 +02:00
def handle_request(self, request_dict):
if self.need_handshake():
2016-08-04 05:03:14 +02:00
return self.handle_handshake(request_dict)
if self.is_descriptor_request(request_dict):
return self.handle_descriptor_request(request_dict)
if self.is_blob_request(request_dict):
return self.handle_blob_request(request_dict)
raise ReflectorRequestError("Invalid request")
2016-08-04 05:03:14 +02:00
def handle_handshake(self, request_dict):
"""
Upon connecting, the client sends a version handshake:
{
'version': int,
}
The server replies with the same version if it is supported
{
'version': int,
}
"""
if VERSION not in request_dict:
raise ReflectorRequestError("Client should send version")
if int(request_dict[VERSION]) not in [REFLECTOR_V1, REFLECTOR_V2]:
raise ReflectorClientVersionError("Unknown version: %i" % int(request_dict[VERSION]))
self.peer_version = int(request_dict[VERSION])
2017-03-08 21:32:00 +01:00
log.debug('Handling handshake for client version %i', self.peer_version)
self.received_handshake = True
return self.send_handshake_response()
def send_handshake_response(self):
d = defer.succeed({VERSION: self.peer_version})
d.addCallback(self.send_response)
return d
def handle_descriptor_request(self, request_dict):
"""
If the client is reflecting a whole stream, they send a stream descriptor request:
{
'sd_blob_hash': str,
'sd_blob_size': int
}
The server indicates if it's aware of this stream already by requesting (or not requesting)
the stream descriptor blob. If the server has a validated copy of the sd blob, it will
include the needed_blobs field (a list of blob hashes missing from reflector) in the
response. If the server does not have the sd blob the needed_blobs field will not be
included, as the server does not know what blobs it is missing - so the client should send
all of the blobs in the stream.
{
'send_sd_blob': bool
'needed_blobs': list, conditional
}
The client may begin the file transfer of the sd blob if send_sd_blob was True.
If the client sends the blob, after receiving it the server indicates if the
transfer was successful:
{
'received_sd_blob': bool
}
"""
sd_blob_hash = request_dict[SD_BLOB_HASH]
sd_blob_size = request_dict[SD_BLOB_SIZE]
if self.blob_writer is None:
2017-02-14 20:18:42 +01:00
d = self.blob_manager.get_blob(sd_blob_hash, sd_blob_size)
d.addCallback(self.get_descriptor_response)
d.addCallback(self.send_response)
else:
self.receiving_blob = True
d = self.blob_finished_d
return d
@defer.inlineCallbacks
def get_descriptor_response(self, sd_blob):
if sd_blob.get_is_verified():
# if we already have the sd blob being offered, make sure we have it and the head blob
# marked as such for announcement now that we know it's an sd blob that we have.
yield self.check_sd_blob_announce(sd_blob.blob_hash)
try:
stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash(
sd_blob.blob_hash)
except NoSuchSDHash:
sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info()
stream_hash = sd_info['stream_hash']
yield save_sd_info(self.stream_info_manager, sd_info)
yield self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash,
sd_blob.blob_hash)
yield self.check_head_blob_announce(stream_hash)
response = yield self.request_needed_blobs({SEND_SD_BLOB: False}, sd_blob)
else:
self.incoming_blob = sd_blob
self.receiving_blob = True
self.handle_incoming_blob(RECEIVED_SD_BLOB)
response = {SEND_SD_BLOB: True}
defer.returnValue(response)
def request_needed_blobs(self, response, sd_blob):
def _add_needed_blobs_to_response(needed_blobs):
response.update({NEEDED_BLOBS: needed_blobs})
return response
d = self.determine_missing_blobs(sd_blob)
d.addCallback(_add_needed_blobs_to_response)
return d
def determine_missing_blobs(self, sd_blob):
reader = sd_blob.open_for_reading()
sd_blob_data = reader.read()
reader.close()
decoded_sd_blob = json.loads(sd_blob_data)
return self.get_unvalidated_blobs_in_stream(decoded_sd_blob)
def get_unvalidated_blobs_in_stream(self, sd_blob):
dl = defer.DeferredList(list(self._iter_unvalidated_blobs_in_stream(sd_blob)),
consumeErrors=True)
dl.addCallback(lambda needed: [blob[1] for blob in needed if blob[1]])
return dl
def _iter_unvalidated_blobs_in_stream(self, sd_blob):
for blob in sd_blob['blobs']:
if 'blob_hash' in blob and 'length' in blob:
blob_hash, blob_len = blob['blob_hash'], blob['length']
2017-02-14 20:18:42 +01:00
d = self.blob_manager.get_blob(blob_hash, blob_len)
d.addCallback(lambda blob: blob_hash if not blob.get_is_verified() else None)
yield d
def handle_blob_request(self, request_dict):
"""
A client queries if the server will accept a blob
{
'blob_hash': str,
'blob_size': int
}
The server replies, send_blob will be False if the server has a validated copy of the blob:
{
'send_blob': bool
}
The client may begin the raw blob file transfer if the server replied True.
If the client sends the blob, the server replies:
{
'received_blob': bool
}
"""
blob_hash = request_dict[BLOB_HASH]
blob_size = request_dict[BLOB_SIZE]
2016-08-04 05:03:14 +02:00
if self.blob_writer is None:
log.debug('Received info for blob: %s', blob_hash[:16])
d = self.blob_manager.get_blob(blob_hash, blob_size)
d.addCallback(self.get_blob_response)
d.addCallback(self.send_response)
2017-02-14 18:35:06 +01:00
else:
2016-08-11 07:25:45 +02:00
log.debug('blob is already open')
self.receiving_blob = True
d = self.blob_finished_d
return d
2016-08-04 05:03:14 +02:00
def get_blob_response(self, blob):
if blob.get_is_verified():
return defer.succeed({SEND_BLOB: False})
else:
self.incoming_blob = blob
self.receiving_blob = True
self.handle_incoming_blob(RECEIVED_BLOB)
d = defer.succeed({SEND_BLOB: True})
return d
2016-08-04 05:03:14 +02:00
class ReflectorServerFactory(ServerFactory):
protocol = ReflectorServer
def __init__(self, peer_manager, blob_manager, stream_info_manager, lbry_file_manager):
self.peer_manager = peer_manager
2016-08-11 07:25:45 +02:00
self.blob_manager = blob_manager
self.stream_info_manager = stream_info_manager
self.lbry_file_manager = lbry_file_manager
self.protocol_version = REFLECTOR_V2
2016-08-11 07:25:45 +02:00
def buildProtocol(self, addr):
log.debug('Creating a protocol for %s', addr)
return ServerFactory.buildProtocol(self, addr)