forked from LBRYCommunity/lbry-sdk
skeleton for reflector protocol
This commit is contained in:
parent
f3ab38ba8f
commit
191f661b35
5 changed files with 226 additions and 0 deletions
0
lbrynet/reflector/__init__.py
Normal file
0
lbrynet/reflector/__init__.py
Normal file
0
lbrynet/reflector/client/__init__.py
Normal file
0
lbrynet/reflector/client/__init__.py
Normal file
140
lbrynet/reflector/client/client.py
Normal file
140
lbrynet/reflector/client/client.py
Normal file
|
@ -0,0 +1,140 @@
|
||||||
|
"""
|
||||||
|
The reflector protocol (all dicts encoded in json):
|
||||||
|
|
||||||
|
Client Handshake (sent once per connection, at the start of the connection):
|
||||||
|
|
||||||
|
{
|
||||||
|
'version': 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Server Handshake (sent once per connection, after receiving the client handshake):
|
||||||
|
|
||||||
|
{
|
||||||
|
'version': 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Client Info Request:
|
||||||
|
|
||||||
|
{
|
||||||
|
'blob_hash': "<blob_hash>",
|
||||||
|
'blob_size': <blob_size>
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Server Info Response (sent in response to Client Info Request):
|
||||||
|
|
||||||
|
{
|
||||||
|
'response': ['YES', 'NO']
|
||||||
|
}
|
||||||
|
|
||||||
|
If response is 'YES', client may send a Client Blob Request or a Client Info Request.
|
||||||
|
If response is 'NO', client may only send a Client Info Request
|
||||||
|
|
||||||
|
|
||||||
|
Client Blob Request:
|
||||||
|
|
||||||
|
{} # Yes, this is an empty dictionary, in case something needs to go here in the future
|
||||||
|
<raw blob_data> # this blob data must match the info sent in the most recent Client Info Request
|
||||||
|
|
||||||
|
|
||||||
|
Server Blob Response (sent in response to Client Blob Request):
|
||||||
|
{
|
||||||
|
'received': True
|
||||||
|
}
|
||||||
|
|
||||||
|
Client may now send another Client Info Request
|
||||||
|
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from twisted.internet.protocol import Protocol, ClientFactory
|
||||||
|
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class IncompleteResponseError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class LBRYFileReflectorClient(Protocol):
|
||||||
|
def connectionMade(self):
|
||||||
|
self.peer = self.factory.peer
|
||||||
|
self.response_buff = ''
|
||||||
|
self.outgoing_buff = ''
|
||||||
|
self.blob_hashes_to_send = []
|
||||||
|
self.next_blob_to_send = None
|
||||||
|
self.received_handshake_response = False
|
||||||
|
d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash)
|
||||||
|
d.addCallback(lambda _: self.send_handshake())
|
||||||
|
|
||||||
|
def dataReceived(self, data):
|
||||||
|
self.response_buff += data
|
||||||
|
try:
|
||||||
|
msg = self.parse_response(self.response_buff)
|
||||||
|
except IncompleteResponseError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
self.response_buff = ''
|
||||||
|
d = self.handle_response(msg)
|
||||||
|
d.addCallbacks(lambda _: self.send_next_request(), self.response_failure_handler)
|
||||||
|
|
||||||
|
def connectionLost(self, reason):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_blobs_to_send(self, stream_info_manager, stream_hash):
|
||||||
|
d = stream_info_manager.get_blobs_for_stream(stream_hash)
|
||||||
|
|
||||||
|
def set_blobs(blob_hashes):
|
||||||
|
for blob_hash, position, iv, length in blob_hashes:
|
||||||
|
self.blob_hashes_to_send.append(blob_hash)
|
||||||
|
|
||||||
|
d.addCallback(set_blobs)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def parse_response(self, buff):
|
||||||
|
try:
|
||||||
|
return json.loads(buff)
|
||||||
|
except ValueError:
|
||||||
|
raise IncompleteResponseError()
|
||||||
|
|
||||||
|
def handle_response(self, response_dict):
|
||||||
|
if self.received_handshake_response is False:
|
||||||
|
self.handle_handshake_response(response_dict)
|
||||||
|
else:
|
||||||
|
self.handle_normal_response(response_dict)
|
||||||
|
|
||||||
|
def handle_handshake_response(self, response_dict):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def handle_normal_response(self, response_dict):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def send_next_request(self):
|
||||||
|
if self.next_blob_to_send is not None:
|
||||||
|
# send the blob
|
||||||
|
pass
|
||||||
|
elif self.blobs_to_send:
|
||||||
|
# send the server the next blob hash + length
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
# close connection
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class LBRYFileReflectorClientFactory(ClientFactory):
|
||||||
|
protocol = LBRYFileReflectorClient
|
||||||
|
|
||||||
|
def __init__(self, stream_info_manager, peer, stream_hash):
|
||||||
|
self.peer = peer
|
||||||
|
self.stream_info_manager = stream_info_manager
|
||||||
|
self.stream_hash = stream_hash
|
||||||
|
self.p = None
|
||||||
|
|
||||||
|
def buildProtocol(self, addr):
|
||||||
|
p = self.protocol()
|
||||||
|
p.factory = self
|
||||||
|
self.p = p
|
||||||
|
return p
|
0
lbrynet/reflector/server/__init__.py
Normal file
0
lbrynet/reflector/server/__init__.py
Normal file
86
lbrynet/reflector/server/server.py
Normal file
86
lbrynet/reflector/server/server.py
Normal file
|
@ -0,0 +1,86 @@
|
||||||
|
import logging
|
||||||
|
from twisted.python import failure
|
||||||
|
from twisted.internet import error
|
||||||
|
from twisted.internet.protocol import Protocol, ServerFactory
|
||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class IncompleteMessageError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ReflectorServer(Protocol):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
|
||||||
|
def connectionMade(self):
|
||||||
|
peer_info = self.transport.getPeer()
|
||||||
|
self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port)
|
||||||
|
self.received_handshake = False
|
||||||
|
self.peer_version = None
|
||||||
|
self.receiving_blob = False
|
||||||
|
self.blob_write = None
|
||||||
|
self.blob_finished_d = None
|
||||||
|
self.request_buff = ""
|
||||||
|
|
||||||
|
def connectionLost(self, reason=failure.Failure(error.ConnectionDone())):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def dataReceived(self, data):
|
||||||
|
if self.receiving_blob is False:
|
||||||
|
self.request_buff += data
|
||||||
|
try:
|
||||||
|
msg = self.parse_request(self.request_buff)
|
||||||
|
except IncompleteMessageError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
self.request_buff = ''
|
||||||
|
d = self.handle_request(msg)
|
||||||
|
d.addCallbacks(self.send_response, self.handle_error)
|
||||||
|
else:
|
||||||
|
self.blob_write(data)
|
||||||
|
|
||||||
|
def parse_request(self, buff):
|
||||||
|
try:
|
||||||
|
return json.loads(buff)
|
||||||
|
except ValueError:
|
||||||
|
raise IncompleteMessageError()
|
||||||
|
|
||||||
|
def handle_request(self, request_dict):
|
||||||
|
if self.received_handshake is False:
|
||||||
|
return self.handle_handshake(request_dict)
|
||||||
|
else:
|
||||||
|
return self.handle_normal_request(request_dict)
|
||||||
|
|
||||||
|
def handle_handshake(self, request_dict):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def handle_normal_request(self, request_dict):
|
||||||
|
if self.blob_write is None:
|
||||||
|
# we haven't opened a blob yet, meaning we must be waiting for the
|
||||||
|
# next message containing a blob hash and a length. this message
|
||||||
|
# should be it. if it's one we want, open the blob for writing, and
|
||||||
|
# return a nice response dict (in a Deferred) saying go ahead
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
# we have a blob open already, so this message should have nothing
|
||||||
|
# important in it. to the deferred that fires when the blob is done,
|
||||||
|
# add a callback which returns a nice response dict saying to keep
|
||||||
|
# sending, and then return that deferred
|
||||||
|
pass
|
||||||
|
|
||||||
|
def send_response(self, response_dict):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def handle_error(self, err):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ReflectorServerFactory(ServerFactory):
|
||||||
|
protocol = ReflectorServer
|
||||||
|
|
||||||
|
def __init__(self, peer_manager):
|
||||||
|
self.peer_manager = peer_manager
|
Loading…
Reference in a new issue