pass sd_hash to reflector client factory instead of looking it up
This commit is contained in:
parent
b9600101ae
commit
9a9b2f47ce
4 changed files with 18 additions and 14 deletions
|
@ -35,6 +35,7 @@ at anytime.
|
||||||
* raise the default number of concurrent blob announcers to 100
|
* raise the default number of concurrent blob announcers to 100
|
||||||
* dht logging to be more verbose with errors and warnings
|
* dht logging to be more verbose with errors and warnings
|
||||||
* added `single_announce` and `last_announced_time` columns to the `blob` table in sqlite
|
* added `single_announce` and `last_announced_time` columns to the `blob` table in sqlite
|
||||||
|
* pass the sd hash to reflector ClientFactory instead of looking it up
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
* virtual kademlia network and mock udp transport for dht integration tests
|
* virtual kademlia network and mock udp transport for dht integration tests
|
||||||
|
|
|
@ -37,6 +37,7 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
self.blob_manager = self.factory.blob_manager
|
self.blob_manager = self.factory.blob_manager
|
||||||
self.protocol_version = self.factory.protocol_version
|
self.protocol_version = self.factory.protocol_version
|
||||||
self.stream_hash = self.factory.stream_hash
|
self.stream_hash = self.factory.stream_hash
|
||||||
|
self.sd_hash = self.factory.sd_hash
|
||||||
|
|
||||||
d = self.load_descriptor()
|
d = self.load_descriptor()
|
||||||
d.addCallback(lambda _: self.send_handshake())
|
d.addCallback(lambda _: self.send_handshake())
|
||||||
|
@ -135,14 +136,12 @@ class EncryptedFileReflectorClient(Protocol):
|
||||||
def send_handshake(self):
|
def send_handshake(self):
|
||||||
self.send_request({'version': self.protocol_version})
|
self.send_request({'version': self.protocol_version})
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def load_descriptor(self):
|
def load_descriptor(self):
|
||||||
def _save_descriptor_blob(sd_blob):
|
if self.sd_hash:
|
||||||
self.stream_descriptor = sd_blob
|
self.stream_descriptor = yield self.factory.blob_manager.get_blob(self.sd_hash)
|
||||||
|
else:
|
||||||
d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.stream_hash)
|
raise ValueError("no sd hash for stream %s" % self.stream_hash)
|
||||||
d.addCallback(self.factory.blob_manager.get_blob)
|
|
||||||
d.addCallback(_save_descriptor_blob)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def parse_response(self, buff):
|
def parse_response(self, buff):
|
||||||
try:
|
try:
|
||||||
|
@ -305,9 +304,10 @@ class EncryptedFileReflectorClientFactory(ClientFactory):
|
||||||
protocol = EncryptedFileReflectorClient
|
protocol = EncryptedFileReflectorClient
|
||||||
protocol_version = REFLECTOR_V2
|
protocol_version = REFLECTOR_V2
|
||||||
|
|
||||||
def __init__(self, blob_manager, stream_hash):
|
def __init__(self, blob_manager, stream_hash, sd_hash):
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.stream_hash = stream_hash
|
self.stream_hash = stream_hash
|
||||||
|
self.sd_hash = sd_hash
|
||||||
self.p = None
|
self.p = None
|
||||||
self.finished_deferred = defer.Deferred()
|
self.finished_deferred = defer.Deferred()
|
||||||
|
|
||||||
|
|
|
@ -24,9 +24,9 @@ def resolve(host):
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _reflect_stream(blob_manager, stream_hash, reflector_server):
|
def _reflect_stream(blob_manager, stream_hash, sd_hash, reflector_server):
|
||||||
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
reflector_address, reflector_port = reflector_server[0], reflector_server[1]
|
||||||
factory = ClientFactory(blob_manager, stream_hash)
|
factory = ClientFactory(blob_manager, stream_hash, sd_hash)
|
||||||
ip = yield resolve(reflector_address)
|
ip = yield resolve(reflector_address)
|
||||||
yield reactor.connectTCP(ip, reflector_port, factory)
|
yield reactor.connectTCP(ip, reflector_port, factory)
|
||||||
result = yield factory.finished_deferred
|
result = yield factory.finished_deferred
|
||||||
|
@ -34,7 +34,7 @@ def _reflect_stream(blob_manager, stream_hash, reflector_server):
|
||||||
|
|
||||||
|
|
||||||
def _reflect_file(lbry_file, reflector_server):
|
def _reflect_file(lbry_file, reflector_server):
|
||||||
return _reflect_stream(lbry_file.blob_manager, lbry_file.stream_hash, reflector_server)
|
return _reflect_stream(lbry_file.blob_manager, lbry_file.stream_hash, lbry_file.sd_hash, reflector_server)
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -59,6 +59,7 @@ def reflect_file(lbry_file, reflector_server=None):
|
||||||
return _reflect_file(lbry_file, reflector_server)
|
return _reflect_file(lbry_file, reflector_server)
|
||||||
|
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def reflect_stream(blob_manager, stream_hash, reflector_server=None):
|
def reflect_stream(blob_manager, stream_hash, reflector_server=None):
|
||||||
if reflector_server:
|
if reflector_server:
|
||||||
if len(reflector_server.split(":")) == 2:
|
if len(reflector_server.split(":")) == 2:
|
||||||
|
@ -68,7 +69,9 @@ def reflect_stream(blob_manager, stream_hash, reflector_server=None):
|
||||||
reflector_server = reflector_server, 5566
|
reflector_server = reflector_server, 5566
|
||||||
else:
|
else:
|
||||||
reflector_server = random.choice(conf.settings['reflector_servers'])
|
reflector_server = random.choice(conf.settings['reflector_servers'])
|
||||||
return _reflect_stream(blob_manager, stream_hash, reflector_server)
|
sd_hash = yield blob_manager.storage.get_sd_blob_hash_for_stream(stream_hash)
|
||||||
|
result = yield _reflect_stream(blob_manager, stream_hash, sd_hash, reflector_server)
|
||||||
|
defer.returnValue(result)
|
||||||
|
|
||||||
|
|
||||||
def reflect_blob_hashes(blob_hashes, blob_manager, reflector_server=None):
|
def reflect_blob_hashes(blob_hashes, blob_manager, reflector_server=None):
|
||||||
|
|
|
@ -213,7 +213,7 @@ class TestReflector(unittest.TestCase):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def send_to_server():
|
def send_to_server():
|
||||||
factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash)
|
factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash, self.sd_hash)
|
||||||
|
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
reactor.connectTCP('localhost', self.port, factory)
|
reactor.connectTCP('localhost', self.port, factory)
|
||||||
|
@ -346,7 +346,7 @@ class TestReflector(unittest.TestCase):
|
||||||
return factory.finished_deferred
|
return factory.finished_deferred
|
||||||
|
|
||||||
def send_to_server_as_stream(result):
|
def send_to_server_as_stream(result):
|
||||||
factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash)
|
factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash, self.sd_hash)
|
||||||
|
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
reactor.connectTCP('localhost', self.port, factory)
|
reactor.connectTCP('localhost', self.port, factory)
|
||||||
|
|
Loading…
Add table
Reference in a new issue