From 6fae07d29e4925f1a884d071b5c104cea0cb96f9 Mon Sep 17 00:00:00 2001
From: Jack Robison <jackrobison@lbry.io>
Date: Mon, 6 Feb 2017 13:18:41 -0500
Subject: [PATCH] reflector v2
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

-add {‘sd_blob_hash’: …, ‘sd_blob_size ‘: …} query type with
{‘send_sd_blob’: True/False, ‘needed_blobs’: []} response

this allows the reflector client to know how much of a stream reflector
already has covered, as to minimize the number of subsequent requests
and prevent streams from being partially reflected

-remove empty {} request
---
 lbrynet/reflector/client/blob.py   |  14 +-
 lbrynet/reflector/client/client.py | 235 +++++++++++++----------
 lbrynet/reflector/common.py        |  27 ++-
 lbrynet/reflector/reupload.py      |  16 +-
 lbrynet/reflector/server/server.py | 298 +++++++++++++++++++++++------
 5 files changed, 409 insertions(+), 181 deletions(-)

diff --git a/lbrynet/reflector/client/blob.py b/lbrynet/reflector/client/blob.py
index 62bb49938..2aba7716f 100644
--- a/lbrynet/reflector/client/blob.py
+++ b/lbrynet/reflector/client/blob.py
@@ -5,7 +5,7 @@ from twisted.protocols.basic import FileSender
 from twisted.internet.protocol import Protocol, ClientFactory
 from twisted.internet import defer, error
 
-from lbrynet.reflector.common import IncompleteResponse
+from lbrynet.reflector.common import IncompleteResponse, REFLECTOR_V2
 
 
 log = logging.getLogger(__name__)
@@ -22,7 +22,7 @@ class BlobReflectorClient(Protocol):
         self.next_blob_to_send = None
         self.blob_read_handle = None
         self.received_handshake_response = False
-        self.protocol_version = None
+        self.protocol_version = self.factory.protocol_version
         self.file_sender = None
         self.producer = None
         self.streaming = False
@@ -32,7 +32,7 @@ class BlobReflectorClient(Protocol):
             lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
 
     def dataReceived(self, data):
-        log.debug('Recieved %s', data)
+        log.debug('Received %s', data)
         self.response_buff += data
         try:
             msg = self.parse_response(self.response_buff)
@@ -74,7 +74,7 @@ class BlobReflectorClient(Protocol):
 
     def send_handshake(self):
         log.debug('Sending handshake')
-        self.write(json.dumps({'version': 0}))
+        self.write(json.dumps({'version': self.protocol_version}))
         return defer.succeed(None)
 
     def parse_response(self, buff):
@@ -102,7 +102,6 @@ class BlobReflectorClient(Protocol):
 
     def start_transfer(self):
         self.sent_blobs = True
-        self.write(json.dumps({}))
         assert self.read_handle is not None, \
             "self.read_handle was None when trying to start the transfer"
         d = self.file_sender.beginFileTransfer(self.read_handle, self)
@@ -111,8 +110,8 @@ class BlobReflectorClient(Protocol):
     def handle_handshake_response(self, response_dict):
         if 'version' not in response_dict:
             raise ValueError("Need protocol version number!")
-        self.protocol_version = int(response_dict['version'])
-        if self.protocol_version != 0:
+        server_version = int(response_dict['version'])
+        if self.protocol_version != server_version:
             raise ValueError("I can't handle protocol version {}!".format(self.protocol_version))
         self.received_handshake_response = True
         return defer.succeed(True)
@@ -184,6 +183,7 @@ class BlobReflectorClientFactory(ClientFactory):
     protocol = BlobReflectorClient
 
     def __init__(self, blob_manager, blobs):
+        self.protocol_version = REFLECTOR_V2
         self.blob_manager = blob_manager
         self.blobs = blobs
         self.p = None
diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py
index e6c3e328e..31eda8d31 100644
--- a/lbrynet/reflector/client/client.py
+++ b/lbrynet/reflector/client/client.py
@@ -1,61 +1,13 @@
-"""
-The reflector protocol (all dicts encoded in json):
-
-Client Handshake (sent once per connection, at the start of the connection):
-
-{
-    'version': 0,
-}
-
-
-Server Handshake (sent once per connection, after receiving the client handshake):
-
-{
-    'version': 0,
-}
-
-
-Client Info Request:
-
-{
-    'blob_hash': "<blob_hash>",
-    'blob_size': <blob_size>
-}
-
-
-Server Info Response (sent in response to Client Info Request):
-
-{
-    'send_blob': True|False
-}
-
-If response is 'YES', client may send a Client Blob Request or a Client Info Request.
-If response is 'NO', client may only send a Client Info Request
-
-
-Client Blob Request:
-
-{}  # Yes, this is an empty dictionary, in case something needs to go here in the future
-<raw blob_data>  # this blob data must match the info sent in the most recent Client Info Request
-
-
-Server Blob Response (sent in response to Client Blob Request):
-{
-    'received_blob': True
-}
-
-Client may now send another Client Info Request
-
-"""
 import json
 import logging
 
+from twisted.internet.error import ConnectionRefusedError
 from twisted.protocols.basic import FileSender
 from twisted.internet.protocol import Protocol, ClientFactory
 from twisted.internet import defer, error
 
-from lbrynet.reflector.common import IncompleteResponse
-
+from lbrynet.reflector.common import IncompleteResponse, ReflectorRequestError
+from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2
 
 log = logging.getLogger(__name__)
 
@@ -63,24 +15,31 @@ log = logging.getLogger(__name__)
 class EncryptedFileReflectorClient(Protocol):
     #  Protocol stuff
     def connectionMade(self):
+        log.debug("Connected to reflector")
         self.blob_manager = self.factory.blob_manager
         self.response_buff = ''
         self.outgoing_buff = ''
         self.blob_hashes_to_send = []
         self.next_blob_to_send = None
-        self.blob_read_handle = None
-        self.received_handshake_response = False
-        self.protocol_version = None
+        self.read_handle = None
+        self.sent_stream_info = False
+        self.received_descriptor_response = False
+        self.protocol_version = self.factory.protocol_version
+        self.received_server_version = False
+        self.server_version = None
+        self.stream_descriptor = None
+        self.descriptor_needed = None
+        self.needed_blobs = []
+        self.reflected_blobs = []
         self.file_sender = None
         self.producer = None
         self.streaming = False
-        d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash)
+        d = self.load_descriptor()
         d.addCallback(lambda _: self.send_handshake())
         d.addErrback(
             lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
 
     def dataReceived(self, data):
-        log.debug('Recieved %s', data)
         self.response_buff += data
         try:
             msg = self.parse_response(self.response_buff)
@@ -94,8 +53,8 @@ class EncryptedFileReflectorClient(Protocol):
 
     def connectionLost(self, reason):
         if reason.check(error.ConnectionDone):
-            log.info('Finished sending data via reflector')
-            self.factory.finished_deferred.callback(True)
+            log.debug('Finished sending data via reflector')
+            self.factory.finished_deferred.callback(self.reflected_blobs)
         else:
             log.debug('Reflector finished: %s', reason)
             self.factory.finished_deferred.callback(reason)
@@ -118,31 +77,54 @@ class EncryptedFileReflectorClient(Protocol):
             from twisted.internet import reactor
             reactor.callLater(0, self.producer.resumeProducing)
 
-    def get_blobs_to_send(self, stream_info_manager, stream_hash):
-        log.debug('Getting blobs from stream hash: %s', stream_hash)
-        d = stream_info_manager.get_blobs_for_stream(stream_hash)
+    def get_validated_blobs(self, blobs_in_stream):
+        def get_blobs(blobs):
+            for (blob, _, _, blob_len) in blobs:
+                if blob:
+                    yield self.blob_manager.get_blob(blob, True, blob_len)
 
-        def set_blobs(blob_hashes):
-            for blob_hash, position, iv, length in blob_hashes:
-                if blob_hash is not None:
-                    self.blob_hashes_to_send.append(blob_hash)
-            log.debug("Preparing to send %i blobs", len(self.blob_hashes_to_send))
+        dl = defer.DeferredList(list(get_blobs(blobs_in_stream)), consumeErrors=True)
+        dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.is_validated()])
+        return dl
 
-        d.addCallback(set_blobs)
+    def set_blobs_to_send(self, blobs_to_send):
+        for blob in blobs_to_send:
+            if blob not in self.blob_hashes_to_send:
+                self.blob_hashes_to_send.append(blob)
 
-        d.addCallback(lambda _: stream_info_manager.get_sd_blob_hashes_for_stream(stream_hash))
+    def get_blobs_to_send(self):
+        def _show_missing_blobs(filtered):
+            if filtered:
+                needs_desc = "" if not self.descriptor_needed else "descriptor and "
+                log.info("Reflector needs %s%i blobs for %s",
+                         needs_desc,
+                         len(filtered),
+                         str(self.stream_descriptor)[:16])
+            return filtered
 
-        def set_sd_blobs(sd_blob_hashes):
-            for sd_blob_hash in sd_blob_hashes:
-                self.blob_hashes_to_send.append(sd_blob_hash)
-            log.debug("Preparing to send %i sd blobs", len(sd_blob_hashes))
-
-        d.addCallback(set_sd_blobs)
+        d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash)
+        d.addCallback(self.get_validated_blobs)
+        if not self.descriptor_needed:
+            d.addCallback(lambda filtered:
+                          [blob for blob in filtered if blob.blob_hash in self.needed_blobs])
+        d.addCallback(_show_missing_blobs)
+        d.addCallback(self.set_blobs_to_send)
         return d
 
+    def send_request(self, request_dict):
+        self.write(json.dumps(request_dict))
+
     def send_handshake(self):
-        log.debug('Sending handshake')
-        self.write(json.dumps({'version': 0}))
+        self.send_request({'version': self.protocol_version})
+
+    def load_descriptor(self):
+        def _save_descriptor_blob(sd_blob):
+            self.stream_descriptor = sd_blob
+
+        d = self.factory.stream_info_manager.get_sd_blob_hashes_for_stream(self.factory.stream_hash)
+        d.addCallback(lambda sd: self.factory.blob_manager.get_blob(sd[0], True))
+        d.addCallback(_save_descriptor_blob)
+        return d
 
     def parse_response(self, buff):
         try:
@@ -154,8 +136,10 @@ class EncryptedFileReflectorClient(Protocol):
         log.warning("An error occurred handling the response: %s", err.getTraceback())
 
     def handle_response(self, response_dict):
-        if self.received_handshake_response is False:
+        if not self.received_server_version:
             return self.handle_handshake_response(response_dict)
+        elif not self.received_descriptor_response and self.server_version == REFLECTOR_V2:
+            return self.handle_descriptor_response(response_dict)
         else:
             return self.handle_normal_response(response_dict)
 
@@ -168,7 +152,6 @@ class EncryptedFileReflectorClient(Protocol):
         return defer.succeed(None)
 
     def start_transfer(self):
-        self.write(json.dumps({}))
         assert self.read_handle is not None, \
             "self.read_handle was None when trying to start the transfer"
         d = self.file_sender.beginFileTransfer(self.read_handle, self)
@@ -177,12 +160,37 @@ class EncryptedFileReflectorClient(Protocol):
     def handle_handshake_response(self, response_dict):
         if 'version' not in response_dict:
             raise ValueError("Need protocol version number!")
-        self.protocol_version = int(response_dict['version'])
-        if self.protocol_version != 0:
-            raise ValueError("I can't handle protocol version {}!".format(self.protocol_version))
-        self.received_handshake_response = True
+        self.server_version = int(response_dict['version'])
+        if self.server_version not in [REFLECTOR_V1, REFLECTOR_V2]:
+            raise ValueError("I can't handle protocol version {}!".format(self.server_version))
+        self.received_server_version = True
         return defer.succeed(True)
 
+    def handle_descriptor_response(self, response_dict):
+        if self.file_sender is None:  # Expecting Server Info Response
+            if 'send_sd_blob' not in response_dict:
+                raise ReflectorRequestError("I don't know whether to send the sd blob or not!")
+            if response_dict['send_sd_blob'] is True:
+                self.file_sender = FileSender()
+            else:
+                self.received_descriptor_response = True
+            self.descriptor_needed = response_dict['send_sd_blob']
+            self.needed_blobs = response_dict.get('needed_blobs', [])
+            return self.get_blobs_to_send()
+        else:  # Expecting Server Blob Response
+            if 'received_sd_blob' not in response_dict:
+                raise ValueError("I don't know if the sd blob made it to the intended destination!")
+            else:
+                self.received_descriptor_response = True
+                if response_dict['received_sd_blob']:
+                    self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
+                    log.info("Sent reflector descriptor %s", self.next_blob_to_send.blob_hash[:16])
+                else:
+                    log.warning("Reflector failed to receive descriptor %s, trying again later",
+                                self.next_blob_to_send.blob_hash[:16])
+                    self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
+                return self.set_not_uploading()
+
     def handle_normal_response(self, response_dict):
         if self.file_sender is None:  # Expecting Server Info Response
             if 'send_blob' not in response_dict:
@@ -191,13 +199,19 @@ class EncryptedFileReflectorClient(Protocol):
                 self.file_sender = FileSender()
                 return defer.succeed(True)
             else:
-                log.debug("Reflector already has %s", str(self.next_blob_to_send.blob_hash)[:16])
+                log.warning("Reflector already has %s", self.next_blob_to_send.blob_hash[:16])
                 return self.set_not_uploading()
         else:  # Expecting Server Blob Response
             if 'received_blob' not in response_dict:
                 raise ValueError("I don't know if the blob made it to the intended destination!")
             else:
-                log.info("Reflector received %s", str(self.next_blob_to_send.blob_hash)[:16])
+                if response_dict['received_blob']:
+                    self.reflected_blobs.append(self.next_blob_to_send.blob_hash)
+                    log.info("Sent reflector blob %s", self.next_blob_to_send.blob_hash[:16])
+                else:
+                    log.warning("Reflector failed to receive blob %s, trying again later",
+                                self.next_blob_to_send.blob_hash[:16])
+                    self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash)
                 return self.set_not_uploading()
 
     def open_blob_for_reading(self, blob):
@@ -207,50 +221,59 @@ class EncryptedFileReflectorClient(Protocol):
                 log.debug('Getting ready to send %s', blob.blob_hash)
                 self.next_blob_to_send = blob
                 self.read_handle = read_handle
-                return None
-        else:
-            log.warning("Can't reflect blob %s", str(blob.blob_hash)[:16])
-            return defer.fail(ValueError(
+                return defer.succeed(None)
+        return defer.fail(ValueError(
             "Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)))
 
     def send_blob_info(self):
         assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
-        log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash)
-        self.write(json.dumps({
+        r = {
             'blob_hash': self.next_blob_to_send.blob_hash,
             'blob_size': self.next_blob_to_send.length
-        }))
+        }
+        self.send_request(r)
+
+    def send_descriptor_info(self):
+        assert self.stream_descriptor is not None, "need to have a sd blob to send at this point"
+        r = {
+            'sd_blob_hash': self.stream_descriptor.blob_hash,
+            'sd_blob_size': self.stream_descriptor.length
+        }
+        self.sent_stream_info = True
+        self.send_request(r)
 
     def skip_missing_blob(self, err, blob_hash):
+        log.warning("Can't reflect blob %s", str(blob_hash)[:16])
         err.trap(ValueError)
         return self.send_next_request()
 
     def send_next_request(self):
         if self.file_sender is not None:
             # send the blob
-            log.debug('Sending %s to reflector', str(self.next_blob_to_send.blob_hash)[:16])
             return self.start_transfer()
+        elif not self.sent_stream_info:
+            # open the sd blob to send
+            blob = self.stream_descriptor
+            d = self.open_blob_for_reading(blob)
+            d.addCallback(lambda _: self.send_descriptor_info())
+            return d
         elif self.blob_hashes_to_send:
             # open the next blob to send
-            blob_hash = self.blob_hashes_to_send[0]
-            log.debug('No current blob, sending the next one: %s', blob_hash)
+            blob = self.blob_hashes_to_send[0]
             self.blob_hashes_to_send = self.blob_hashes_to_send[1:]
-            d = self.blob_manager.get_blob(blob_hash, True)
-            d.addCallback(self.open_blob_for_reading)
-            # send the server the next blob hash + length
+            d = self.open_blob_for_reading(blob)
             d.addCallbacks(lambda _: self.send_blob_info(),
-                           lambda err: self.skip_missing_blob(err, blob_hash))
+                           lambda err: self.skip_missing_blob(err, blob.blob_hash))
             return d
-        else:
-            # close connection
-            log.debug('No more blob hashes, closing connection')
-            self.transport.loseConnection()
+        # close connection
+        self.transport.loseConnection()
 
 
 class EncryptedFileReflectorClientFactory(ClientFactory):
     protocol = EncryptedFileReflectorClient
 
     def __init__(self, blob_manager, stream_info_manager, stream_hash):
+        self.protocol_version = REFLECTOR_V2
         self.blob_manager = blob_manager
         self.stream_info_manager = stream_info_manager
         self.stream_hash = stream_hash
@@ -268,11 +291,13 @@ class EncryptedFileReflectorClientFactory(ClientFactory):
         ClientFactory.startFactory(self)
 
     def startedConnecting(self, connector):
-        log.debug('Started connecting')
+        log.debug('Connecting to reflector')
 
     def clientConnectionLost(self, connector, reason):
         """If we get disconnected, reconnect to server."""
-        log.debug("connection lost: %s", reason)
 
     def clientConnectionFailed(self, connector, reason):
-        log.debug("connection failed: %s", reason)
+        if reason.check(ConnectionRefusedError):
+            log.warning("Could not connect to reflector server")
+        else:
+            log.error("Reflector connection failed: %s", reason)
diff --git a/lbrynet/reflector/common.py b/lbrynet/reflector/common.py
index f505167bf..5e41f2ede 100644
--- a/lbrynet/reflector/common.py
+++ b/lbrynet/reflector/common.py
@@ -1,2 +1,27 @@
+REFLECTOR_V1 = 0
+REFLECTOR_V2 = 1
+
+
+class ReflectorClientVersionError(Exception):
+    """
+    Raised by reflector server if client sends an incompatible or unknown version
+    """
+
+
+class ReflectorRequestError(Exception):
+    """
+    Raised by reflector server if client sends a message without the required fields
+    """
+
+
+class ReflectorRequestDecodeError(Exception):
+    """
+    Raised by reflector server if client sends an invalid json request
+    """
+
+
 class IncompleteResponse(Exception):
-    pass
+    """
+    Raised by reflector server when client sends a portion of a json request,
+    used buffering the incoming request
+    """
diff --git a/lbrynet/reflector/reupload.py b/lbrynet/reflector/reupload.py
index 96a4b3d9f..9008e113f 100644
--- a/lbrynet/reflector/reupload.py
+++ b/lbrynet/reflector/reupload.py
@@ -1,5 +1,6 @@
 import logging
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
+from twisted.internet.error import ConnectionLost, ConnectionDone
 from lbrynet.reflector import BlobClientFactory, ClientFactory
 
 log = logging.getLogger(__name__)
@@ -27,19 +28,13 @@ def _reflect_stream(lbry_file, reflector_server):
     )
     d = reactor.resolve(reflector_address)
     d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory))
-    d.addCallback(lambda _: log.info("Connected to %s", reflector_address))
     d.addCallback(lambda _: factory.finished_deferred)
+    d.addCallback(lambda reflected_blobs: log.info("Reflected %i blobs for lbry://%s",
+                                                   len(reflected_blobs),
+                                                   lbry_file.uri))
     return d
 
 
-def _reflect_if_unavailable(reflector_has_stream, lbry_file, reflector_server):
-    if reflector_has_stream:
-        log.info("lbry://%s is available", lbry_file.uri)
-        return defer.succeed(False)
-    log.info("lbry://%s is unavailable, reflecting it", lbry_file.uri)
-    return _reflect_stream(lbry_file, reflector_server)
-
-
 def _catch_error(err, uri):
     msg = "An error occurred while checking availability for lbry://%s: %s"
     log.error(msg, uri, err.getTraceback())
@@ -47,5 +42,6 @@ def _catch_error(err, uri):
 
 def check_and_restore_availability(lbry_file, reflector_server):
     d = _reflect_stream(lbry_file, reflector_server)
+    d.addErrback(lambda err: err.trap(ConnectionDone, ConnectionLost))
     d.addErrback(_catch_error, lbry_file.uri)
     return d
diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py
index 03f278867..5340ba248 100644
--- a/lbrynet/reflector/server/server.py
+++ b/lbrynet/reflector/server/server.py
@@ -1,21 +1,36 @@
 import logging
+import json
 from twisted.python import failure
 from twisted.internet import error, defer
 from twisted.internet.protocol import Protocol, ServerFactory
-import json
 from lbrynet.core.utils import is_valid_blobhash
+from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError
+from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2
+from lbrynet.reflector.common import ReflectorRequestError, ReflectorClientVersionError
 
 
 log = logging.getLogger(__name__)
 
+MAXIMUM_QUERY_SIZE = 200
+SEND_SD_BLOB = 'send_sd_blob'
+SEND_BLOB = 'send_blob'
+RECEIVED_SD_BLOB = 'received_sd_blob'
+RECEIVED_BLOB = 'received_blob'
+NEEDED_BLOBS = 'needed_blobs'
+VERSION = 'version'
+BLOB_SIZE = 'blob_size'
+BLOB_HASH = 'blob_hash'
+SD_BLOB_SIZE = 'sd_blob_size'
+SD_BLOB_HASH = 'sd_blob_hash'
+
 
 class ReflectorServer(Protocol):
-
     def connectionMade(self):
         peer_info = self.transport.getPeer()
         log.debug('Connection made to %s', peer_info)
         self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port)
         self.blob_manager = self.factory.blob_manager
+        self.protocol_version = self.factory.protocol_version
         self.received_handshake = False
         self.peer_version = None
         self.receiving_blob = False
@@ -28,6 +43,60 @@ class ReflectorServer(Protocol):
     def connectionLost(self, reason=failure.Failure(error.ConnectionDone())):
         log.info("Reflector upload from %s finished" % self.peer.host)
 
+    def handle_error(self, err):
+        log.error(err.getTraceback())
+        self.transport.loseConnection()
+
+    def send_response(self, response_dict):
+        self.transport.write(json.dumps(response_dict))
+
+    ############################
+    # Incoming blob file stuff #
+    ############################
+
+    def clean_up_failed_upload(self, err, blob):
+        log.warning("Failed to receive %s", blob)
+        if err.check(DownloadCanceledError):
+            self.blob_manager.delete_blobs([blob.blob_hash])
+        else:
+            log.exception(err)
+
+    @defer.inlineCallbacks
+    def _on_completed_blob(self, blob, response_key):
+        yield self.blob_manager.blob_completed(blob)
+        yield self.close_blob()
+        log.info("Received %s", blob)
+        yield self.send_response({response_key: True})
+
+    @defer.inlineCallbacks
+    def _on_failed_blob(self, err, response_key):
+        yield self.clean_up_failed_upload(err, self.incoming_blob)
+        yield self.send_response({response_key: False})
+
+    def handle_incoming_blob(self, response_key):
+        """
+        Open blob for writing and send a response indicating if the transfer was
+        successful when finished.
+
+        response_key will either be received_blob or received_sd_blob
+        """
+
+        blob = self.incoming_blob
+        self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer)
+        self.blob_finished_d.addCallback(self._on_completed_blob, response_key)
+        self.blob_finished_d.addErrback(self._on_failed_blob, response_key)
+
+    def close_blob(self):
+        self.blob_finished_d = None
+        self.blob_write = None
+        self.cancel_write = None
+        self.incoming_blob = None
+        self.receiving_blob = False
+
+    ####################
+    # Request handling #
+    ####################
+
     def dataReceived(self, data):
         if self.receiving_blob:
             self.blob_write(data)
@@ -38,7 +107,7 @@ class ReflectorServer(Protocol):
             if msg is not None:
                 self.request_buff = ''
                 d = self.handle_request(msg)
-                d.addCallbacks(self.send_response, self.handle_error)
+                d.addErrback(self.handle_error)
                 if self.receiving_blob and extra_data:
                     log.debug('Writing extra data to blob')
                     self.blob_write(extra_data)
@@ -47,15 +116,15 @@ class ReflectorServer(Protocol):
         extra_data = None
         response = None
         curr_pos = 0
-        while True:
+        while not self.receiving_blob:
             next_close_paren = response_msg.find('}', curr_pos)
             if next_close_paren != -1:
                 curr_pos = next_close_paren + 1
                 try:
                     response = json.loads(response_msg[:curr_pos])
                 except ValueError:
-                    if curr_pos > 100:
-                        raise Exception("error decoding response")
+                    if curr_pos > MAXIMUM_QUERY_SIZE:
+                        raise ValueError("Error decoding response: %s" % str(response_msg))
                     else:
                         pass
                 else:
@@ -65,73 +134,185 @@ class ReflectorServer(Protocol):
                 break
         return response, extra_data
 
+    def need_handshake(self):
+        return self.received_handshake is False
+
+    def is_descriptor_request(self, request_dict):
+        if SD_BLOB_HASH not in request_dict or SD_BLOB_SIZE not in request_dict:
+            return False
+        if not is_valid_blobhash(request_dict[SD_BLOB_HASH]):
+            raise InvalidBlobHashError(request_dict[SD_BLOB_HASH])
+        return True
+
+    def is_blob_request(self, request_dict):
+        if BLOB_HASH not in request_dict or BLOB_SIZE not in request_dict:
+            return False
+        if not is_valid_blobhash(request_dict[BLOB_HASH]):
+            raise InvalidBlobHashError(request_dict[BLOB_HASH])
+        return True
+
     def handle_request(self, request_dict):
-        if self.received_handshake is False:
+        if self.need_handshake():
             return self.handle_handshake(request_dict)
-        else:
-            return self.handle_normal_request(request_dict)
+        if self.is_descriptor_request(request_dict):
+            return self.handle_descriptor_request(request_dict)
+        if self.is_blob_request(request_dict):
+            return self.handle_blob_request(request_dict)
+        raise ReflectorRequestError("Invalid request")
 
     def handle_handshake(self, request_dict):
-        log.debug('Handling handshake')
-        if 'version' not in request_dict:
-            raise ValueError("Client should send version")
-        self.peer_version = int(request_dict['version'])
-        if self.peer_version != 0:
-            raise ValueError("I don't know that version!")
+        """
+        Upon connecting, the client sends a version handshake:
+        {
+            'version': int,
+        }
+
+        The server replies with the same version if it is supported
+        {
+            'version': int,
+        }
+        """
+
+        if VERSION not in request_dict:
+            raise ReflectorRequestError("Client should send version")
+
+        if int(request_dict[VERSION]) not in [REFLECTOR_V1, REFLECTOR_V2]:
+            raise ReflectorClientVersionError("Unknown version: %i" % int(request_dict[VERSION]))
+
+        log.debug('Handling handshake for client version %i', self.peer_version)
+
+        self.peer_version = int(request_dict[VERSION])
         self.received_handshake = True
-        return defer.succeed({'version': 0})
+        return self.send_handshake_response()
 
-    def determine_blob_needed(self, blob):
-        if blob.is_validated():
-            return {'send_blob': False}
-        else:
-            self.incoming_blob = blob
-            self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer) # pylint: disable=line-too-long
-            self.blob_finished_d.addCallback(lambda _: self.blob_manager.blob_completed(blob))
-            return {'send_blob': True}
+    def send_handshake_response(self):
+        d = defer.succeed({VERSION: self.peer_version})
+        d.addCallback(self.send_response)
+        return d
 
-    def close_blob(self):
-        self.blob_finished_d = None
-        self.blob_write = None
-        self.cancel_write = None
-        self.incoming_blob = None
-        self.receiving_blob = False
+    def handle_descriptor_request(self, request_dict):
+        """
+        If the client is reflecting a whole stream, they send a stream descriptor request:
+        {
+            'sd_blob_hash': str,
+            'sd_blob_size': int
+        }
+
+        The server indicates if it's aware of this stream already by requesting (or not requesting)
+        the stream descriptor blob. If the server has a validated copy of the sd blob, it will
+        include the needed_blobs field (a list of blob hashes missing from reflector) in the
+        response. If the server does not have the sd blob the needed_blobs field will not be
+        included, as the server does not know what blobs it is missing - so the client should send
+        all of the blobs in the stream.
+        {
+            'send_sd_blob': bool
+            'needed_blobs': list, conditional
+        }
+
+
+        The client may begin the file transfer of the sd blob if send_sd_blob was True.
+        If the client sends the blob, after receiving it the server indicates if the
+        transfer was successful:
+        {
+            'received_sd_blob': bool
+        }
+        """
+
+        sd_blob_hash = request_dict[SD_BLOB_HASH]
+        sd_blob_size = request_dict[SD_BLOB_SIZE]
 
-    def handle_normal_request(self, request_dict):
         if self.blob_write is None:
-            #  we haven't opened a blob yet, meaning we must be waiting for the
-            #  next message containing a blob hash and a length. this message
-            #  should be it. if it's one we want, open the blob for writing, and
-            #  return a nice response dict (in a Deferred) saying go ahead
-            if not 'blob_hash' in request_dict or not 'blob_size' in request_dict:
-                raise ValueError("Expected a blob hash and a blob size")
-            if not is_valid_blobhash(request_dict['blob_hash']):
-                raise ValueError("Got a bad blob hash: {}".format(request_dict['blob_hash']))
-            log.debug('Recieved info for blob: %s', request_dict['blob_hash'])
-            d = self.blob_manager.get_blob(
-                request_dict['blob_hash'],
-                True,
-                int(request_dict['blob_size'])
-            )
-            d.addCallback(self.determine_blob_needed)
+            d = self.blob_manager.get_blob(sd_blob_hash, True, sd_blob_size)
+            d.addCallback(self.get_descriptor_response)
+            d.addCallback(self.send_response)
+        else:
+            self.receiving_blob = True
+            d = self.blob_finished_d
+        return d
+
+    def get_descriptor_response(self, sd_blob):
+        if sd_blob.is_validated():
+            d = defer.succeed({SEND_SD_BLOB: False})
+            d.addCallback(self.request_needed_blobs, sd_blob)
+        else:
+            self.incoming_blob = sd_blob
+            self.receiving_blob = True
+            self.handle_incoming_blob(RECEIVED_SD_BLOB)
+            d = defer.succeed({SEND_SD_BLOB: True})
+        return d
+
+    def request_needed_blobs(self, response, sd_blob):
+        def _add_needed_blobs_to_response(needed_blobs):
+            response.update({NEEDED_BLOBS: needed_blobs})
+            return response
+
+        d = self.determine_missing_blobs(sd_blob)
+        d.addCallback(_add_needed_blobs_to_response)
+        return d
+
+    def determine_missing_blobs(self, sd_blob):
+        with sd_blob.open_for_reading() as sd_file:
+            sd_blob_data = sd_file.read()
+        decoded_sd_blob = json.loads(sd_blob_data)
+        return self.get_unvalidated_blobs_in_stream(decoded_sd_blob)
+
+    def get_unvalidated_blobs_in_stream(self, sd_blob):
+        dl = defer.DeferredList(list(self._iter_unvalidated_blobs_in_stream(sd_blob)),
+                                consumeErrors=True)
+        dl.addCallback(lambda needed: [blob[1] for blob in needed if blob[1]])
+        return dl
+
+    def _iter_unvalidated_blobs_in_stream(self, sd_blob):
+        for blob in sd_blob['blobs']:
+            if 'blob_hash' in blob and 'length' in blob:
+                blob_hash, blob_len = blob['blob_hash'], blob['length']
+                d = self.blob_manager.get_blob(blob_hash, True, blob_len)
+                d.addCallback(lambda blob: blob_hash if not blob.is_validated() else None)
+                yield d
+
+    def handle_blob_request(self, request_dict):
+        """
+        A client queries if the server will accept a blob
+        {
+            'blob_hash': str,
+            'blob_size': int
+        }
+
+        The server replies, send_blob will be False if the server has a validated copy of the blob:
+        {
+            'send_blob': bool
+        }
+
+        The client may begin the raw blob file transfer if the server replied True.
+        If the client sends the blob, the server replies:
+        {
+            'received_blob': bool
+        }
+        """
+
+        blob_hash = request_dict[BLOB_HASH]
+        blob_size = request_dict[BLOB_SIZE]
+
+        if self.blob_write is None:
+            log.debug('Received info for blob: %s', blob_hash[:16])
+            d = self.blob_manager.get_blob(blob_hash, True, blob_size)
+            d.addCallback(self.get_blob_response)
+            d.addCallback(self.send_response)
         else:
-            #  we have a blob open already, so this message should have nothing
-            #  important in it. to the deferred that fires when the blob is done,
-            #  add a callback which returns a nice response dict saying to keep
-            #  sending, and then return that deferred
             log.debug('blob is already open')
             self.receiving_blob = True
             d = self.blob_finished_d
-            d.addCallback(lambda _: self.close_blob())
-            d.addCallback(lambda _: {'received_blob': True})
         return d
 
-    def send_response(self, response_dict):
-        self.transport.write(json.dumps(response_dict))
-
-    def handle_error(self, err):
-        log.error(err.getTraceback())
-        self.transport.loseConnection()
+    def get_blob_response(self, blob):
+        if blob.is_validated():
+            return defer.succeed({SEND_BLOB: False})
+        else:
+            self.incoming_blob = blob
+            self.receiving_blob = True
+            self.handle_incoming_blob(RECEIVED_BLOB)
+            d = defer.succeed({SEND_BLOB: True})
+        return d
 
 
 class ReflectorServerFactory(ServerFactory):
@@ -140,6 +321,7 @@ class ReflectorServerFactory(ServerFactory):
     def __init__(self, peer_manager, blob_manager):
         self.peer_manager = peer_manager
         self.blob_manager = blob_manager
+        self.protocol_version = REFLECTOR_V2
 
     def buildProtocol(self, addr):
         log.debug('Creating a protocol for %s', addr)