lbry-sdk/lbrynet/core/server/ServerRequestHandler.py
2016-12-15 18:58:16 -06:00

187 lines
5.7 KiB
Python

import json
import logging
from twisted.internet import interfaces, defer
from zope.interface import implements
from lbrynet.interfaces import IRequestHandler
log = logging.getLogger(__name__)
class ServerRequestHandler(object):
"""This class handles requests from clients. It can upload blobs and
return request for information about more blobs that are
associated with streams.
"""
implements(interfaces.IPushProducer, interfaces.IConsumer, IRequestHandler)
def __init__(self, consumer):
self.consumer = consumer
self.production_paused = False
self.request_buff = ''
self.response_buff = ''
self.producer = None
self.request_received = False
self.CHUNK_SIZE = 2**14
self.query_handlers = {} # {IQueryHandler: [query_identifiers]}
self.blob_sender = None
self.consumer.registerProducer(self, True)
#IPushProducer stuff
def pauseProducing(self):
self.production_paused = True
def stopProducing(self):
if self.producer is not None:
self.producer.stopProducing()
self.producer = None
self.production_paused = True
self.consumer.unregisterProducer()
def resumeProducing(self):
from twisted.internet import reactor
self.production_paused = False
self._produce_more()
if self.producer is not None:
reactor.callLater(0, self.producer.resumeProducing)
def _produce_more(self):
from twisted.internet import reactor
if self.production_paused:
return
chunk = self.response_buff[:self.CHUNK_SIZE]
self.response_buff = self.response_buff[self.CHUNK_SIZE:]
if chunk == '':
return
log.trace("writing %s bytes to the client", len(chunk))
self.consumer.write(chunk)
reactor.callLater(0, self._produce_more)
#IConsumer stuff
def registerProducer(self, producer, streaming):
self.producer = producer
assert streaming is False
producer.resumeProducing()
def unregisterProducer(self):
self.producer = None
def write(self, data):
from twisted.internet import reactor
self.response_buff = self.response_buff + data
self._produce_more()
def get_more_data():
if self.producer is not None:
log.trace("Requesting more data from the producer")
self.producer.resumeProducing()
reactor.callLater(0, get_more_data)
#From Protocol
def data_received(self, data):
log.debug("Received data")
log.debug("%s", str(data))
if self.request_received is False:
return self._parse_data_and_maybe_send_blob(data)
else:
log.warning(
"The client sent data when we were uploading a file. This should not happen")
def _parse_data_and_maybe_send_blob(self, data):
self.request_buff = self.request_buff + data
msg = self.try_to_parse_request(self.request_buff)
if msg:
self.request_buff = ''
self._process_msg(msg)
else:
log.debug("Request buff not a valid json message")
log.debug("Request buff: %s", self.request_buff)
def _process_msg(self, msg):
d = self.handle_request(msg)
if self.blob_sender:
d.addCallback(lambda _: self.blob_sender.send_blob_if_requested(self))
d.addCallbacks(lambda _: self.finished_response(), self.request_failure_handler)
######### IRequestHandler #########
def register_query_handler(self, query_handler, query_identifiers):
self.query_handlers[query_handler] = query_identifiers
def register_blob_sender(self, blob_sender):
self.blob_sender = blob_sender
#response handling
def request_failure_handler(self, err):
log.warning("An error occurred handling a request. Error: %s", err.getErrorMessage())
self.stopProducing()
return err
def finished_response(self):
self.request_received = False
self._produce_more()
def send_response(self, msg):
m = json.dumps(msg)
log.debug("Sending a response of length %s", str(len(m)))
log.debug("Response: %s", str(m))
self.response_buff = self.response_buff + m
self._produce_more()
return True
def handle_request(self, msg):
log.debug("Handling a request")
log.debug(str(msg))
def create_response_message(results):
response = {}
for success, result in results:
if success is True:
response.update(result)
else:
# result is a Failure
return result
log.debug("Finished making the response message. Response: %s", str(response))
return response
def log_errors(err):
log.warning(
"An error occurred handling a client request. Error message: %s",
err.getErrorMessage())
return err
def send_response(response):
self.send_response(response)
return True
ds = []
for query_handler, query_identifiers in self.query_handlers.iteritems():
queries = {q_i: msg[q_i] for q_i in query_identifiers if q_i in msg}
d = query_handler.handle_queries(queries)
d.addErrback(log_errors)
ds.append(d)
dl = defer.DeferredList(ds)
dl.addCallback(create_response_message)
dl.addCallback(send_response)
return dl
def try_to_parse_request(self, request_buff):
try:
msg = json.loads(request_buff)
return msg
except ValueError:
return None