removed RequestCounter
This commit is contained in:
parent
2040748c62
commit
633b49da92
1 changed files with 5 additions and 39 deletions
|
@ -9,24 +9,6 @@ from errors import RemoteServiceException, ProtocolException, ServiceException
|
||||||
log = logging.getLogger()
|
log = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
class RequestCounter(object):
|
|
||||||
def __init__(self):
|
|
||||||
self.on_finish = defer.Deferred()
|
|
||||||
self.counter = 0
|
|
||||||
|
|
||||||
def set_count(self, cnt):
|
|
||||||
self.counter = cnt
|
|
||||||
|
|
||||||
def decrease(self):
|
|
||||||
self.counter -= 1
|
|
||||||
if self.counter <= 0:
|
|
||||||
self.finish()
|
|
||||||
|
|
||||||
def finish(self):
|
|
||||||
if not self.on_finish.called:
|
|
||||||
self.on_finish.callback(True)
|
|
||||||
|
|
||||||
|
|
||||||
class StratumClientProtocol(LineOnlyReceiver):
|
class StratumClientProtocol(LineOnlyReceiver):
|
||||||
delimiter = '\n'
|
delimiter = '\n'
|
||||||
|
|
||||||
|
@ -60,8 +42,6 @@ class StratumClientProtocol(LineOnlyReceiver):
|
||||||
|
|
||||||
self.request_id = 0
|
self.request_id = 0
|
||||||
self.lookup_table = {}
|
self.lookup_table = {}
|
||||||
self.on_finish = None # Will point to defer which is called
|
|
||||||
# once all client requests are processed
|
|
||||||
|
|
||||||
self._connected.callback(True)
|
self._connected.callback(True)
|
||||||
|
|
||||||
|
@ -99,11 +79,10 @@ class StratumClientProtocol(LineOnlyReceiver):
|
||||||
log.error(message)
|
log.error(message)
|
||||||
return self.writeJsonError(code, message, None, None)
|
return self.writeJsonError(code, message, None, None)
|
||||||
|
|
||||||
def process_response(self, data, message_id, request_counter):
|
def process_response(self, data, message_id):
|
||||||
self.writeJsonResponse(data.result, message_id)
|
self.writeJsonResponse(data.result, message_id)
|
||||||
request_counter.decrease()
|
|
||||||
|
|
||||||
def process_failure(self, failure, message_id, request_counter):
|
def process_failure(self, failure, message_id):
|
||||||
if not isinstance(failure.value, ServiceException):
|
if not isinstance(failure.value, ServiceException):
|
||||||
# All handled exceptions should inherit from ServiceException class.
|
# All handled exceptions should inherit from ServiceException class.
|
||||||
# Throwing other exception class means that it is unhandled error
|
# Throwing other exception class means that it is unhandled error
|
||||||
|
@ -113,9 +92,8 @@ class StratumClientProtocol(LineOnlyReceiver):
|
||||||
if message_id != None:
|
if message_id != None:
|
||||||
tb = failure.getBriefTraceback()
|
tb = failure.getBriefTraceback()
|
||||||
self.writeJsonError(code, failure.getErrorMessage(), tb, message_id)
|
self.writeJsonError(code, failure.getErrorMessage(), tb, message_id)
|
||||||
request_counter.decrease()
|
|
||||||
|
|
||||||
def dataReceived(self, data, request_counter=None):
|
def dataReceived(self, data):
|
||||||
'''Original code from Twisted, hacked for request_counter proxying.
|
'''Original code from Twisted, hacked for request_counter proxying.
|
||||||
request_counter is hack for HTTP transport, didn't found cleaner solution how
|
request_counter is hack for HTTP transport, didn't found cleaner solution how
|
||||||
to indicate end of request processing in asynchronous manner.
|
to indicate end of request processing in asynchronous manner.
|
||||||
|
@ -123,40 +101,30 @@ class StratumClientProtocol(LineOnlyReceiver):
|
||||||
TODO: This would deserve some unit test to be sure that future twisted versions
|
TODO: This would deserve some unit test to be sure that future twisted versions
|
||||||
will work nicely with this.'''
|
will work nicely with this.'''
|
||||||
|
|
||||||
if request_counter == None:
|
|
||||||
request_counter = RequestCounter()
|
|
||||||
|
|
||||||
lines = (self._buffer + data).split(self.delimiter)
|
lines = (self._buffer + data).split(self.delimiter)
|
||||||
self._buffer = lines.pop(-1)
|
self._buffer = lines.pop(-1)
|
||||||
request_counter.set_count(len(lines))
|
|
||||||
self.on_finish = request_counter.on_finish
|
|
||||||
|
|
||||||
for line in lines:
|
for line in lines:
|
||||||
if self.transport.disconnecting:
|
if self.transport.disconnecting:
|
||||||
request_counter.finish()
|
|
||||||
return
|
return
|
||||||
if len(line) > self.MAX_LENGTH:
|
if len(line) > self.MAX_LENGTH:
|
||||||
request_counter.finish()
|
|
||||||
return self.lineLengthExceeded(line)
|
return self.lineLengthExceeded(line)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
self.lineReceived(line, request_counter)
|
self.lineReceived(line)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
request_counter.finish()
|
|
||||||
# log.exception("Processing of message failed")
|
# log.exception("Processing of message failed")
|
||||||
log.warning("Failed message: %s from %s" % (str(exc), self._get_ip()))
|
log.warning("Failed message: %s from %s" % (str(exc), self._get_ip()))
|
||||||
return error.ConnectionLost('Processing of message failed')
|
return error.ConnectionLost('Processing of message failed')
|
||||||
|
|
||||||
if len(self._buffer) > self.MAX_LENGTH:
|
if len(self._buffer) > self.MAX_LENGTH:
|
||||||
request_counter.finish()
|
|
||||||
return self.lineLengthExceeded(self._buffer)
|
return self.lineLengthExceeded(self._buffer)
|
||||||
|
|
||||||
def lineReceived(self, line, request_counter):
|
def lineReceived(self, line):
|
||||||
try:
|
try:
|
||||||
message = json.loads(line)
|
message = json.loads(line)
|
||||||
except (ValueError, TypeError):
|
except (ValueError, TypeError):
|
||||||
# self.writeGeneralError("Cannot decode message '%s'" % line)
|
# self.writeGeneralError("Cannot decode message '%s'" % line)
|
||||||
request_counter.finish()
|
|
||||||
raise ProtocolException("Cannot decode message '%s'" % line.strip())
|
raise ProtocolException("Cannot decode message '%s'" % line.strip())
|
||||||
msg_id = message.get('id', 0)
|
msg_id = message.get('id', 0)
|
||||||
msg_result = message.get('result')
|
msg_result = message.get('result')
|
||||||
|
@ -164,7 +132,6 @@ class StratumClientProtocol(LineOnlyReceiver):
|
||||||
if msg_id:
|
if msg_id:
|
||||||
# It's a RPC response
|
# It's a RPC response
|
||||||
# Perform lookup to the table of waiting requests.
|
# Perform lookup to the table of waiting requests.
|
||||||
request_counter.decrease()
|
|
||||||
try:
|
try:
|
||||||
meta = self.lookup_table[msg_id]
|
meta = self.lookup_table[msg_id]
|
||||||
del self.lookup_table[msg_id]
|
del self.lookup_table[msg_id]
|
||||||
|
@ -181,7 +148,6 @@ class StratumClientProtocol(LineOnlyReceiver):
|
||||||
else:
|
else:
|
||||||
meta['defer'].callback(msg_result)
|
meta['defer'].callback(msg_result)
|
||||||
else:
|
else:
|
||||||
request_counter.decrease()
|
|
||||||
raise ProtocolException("Cannot handle message '%s'" % line)
|
raise ProtocolException("Cannot handle message '%s'" % line)
|
||||||
|
|
||||||
def rpc(self, method, params, is_notification=False):
|
def rpc(self, method, params, is_notification=False):
|
||||||
|
|
Loading…
Reference in a new issue