fix error that occurred when one connection's download was stopped due to another download finishing

This commit is contained in:
Jimmy Kiselak 2016-01-22 15:50:18 -05:00
parent 4b9552fdf5
commit 25e1427a84
11 changed files with 319 additions and 156 deletions

View file

@ -59,6 +59,9 @@ class HashBlobWriter(object):
" %s to %s" % (str(self.len_so_far), " %s to %s" % (str(self.len_so_far),
str(self.length_getter()))))) str(self.length_getter())))))
else: else:
if self.write_handle is None:
log.debug("Tried to write to a write_handle that was None.")
return
self.write_handle.write(data) self.write_handle.write(data)
if self.len_so_far == self.length_getter(): if self.len_so_far == self.length_getter():
self.finished_cb(self) self.finished_cb(self)

View file

@ -612,7 +612,7 @@ class LBRYcrdAddressRequester(object):
if not err.check(RequestCanceledError): if not err.check(RequestCanceledError):
log.warning("A peer failed to send a valid public key response. Error: %s, peer: %s", log.warning("A peer failed to send a valid public key response. Error: %s, peer: %s",
err.getErrorMessage(), str(peer)) err.getErrorMessage(), str(peer))
#return err return err
class LBRYcrdAddressQueryHandlerFactory(object): class LBRYcrdAddressQueryHandlerFactory(object):

View file

@ -266,7 +266,7 @@ class PointTraderKeyExchanger(object):
if not err.check(RequestCanceledError): if not err.check(RequestCanceledError):
log.warning("A peer failed to send a valid public key response. Error: %s, peer: %s", log.warning("A peer failed to send a valid public key response. Error: %s, peer: %s",
err.getErrorMessage(), str(peer)) err.getErrorMessage(), str(peer))
#return err return err
class PointTraderKeyQueryHandlerFactory(object): class PointTraderKeyQueryHandlerFactory(object):

View file

@ -1,5 +1,6 @@
from zope.interface import implements from zope.interface import implements
from lbrynet.interfaces import IRateLimiter from lbrynet.interfaces import IRateLimiter
from twisted.internet import task
class DummyRateLimiter(object): class DummyRateLimiter(object):
@ -10,22 +11,20 @@ class DummyRateLimiter(object):
self.total_ul_bytes = 0 self.total_ul_bytes = 0
self.target_dl = 0 self.target_dl = 0
self.target_ul = 0 self.target_ul = 0
self.ul_delay = 0.00 self.tick_call = None
self.dl_delay = 0.00
self.next_tick = None def start(self):
self.tick_call = task.LoopingCall(self.tick)
self.tick_call.start(1)
def tick(self): def tick(self):
from twisted.internet import reactor
self.dl_bytes_this_second = 0 self.dl_bytes_this_second = 0
self.ul_bytes_this_second = 0 self.ul_bytes_this_second = 0
self.next_tick = reactor.callLater(1.0, self.tick)
def stop(self): def stop(self):
if self.next_tick is not None: if self.tick_call is not None:
self.next_tick.cancel() self.tick_call.stop()
self.next_tick = None self.tick_call = None
def set_dl_limit(self, limit): def set_dl_limit(self, limit):
pass pass
@ -33,12 +32,6 @@ class DummyRateLimiter(object):
def set_ul_limit(self, limit): def set_ul_limit(self, limit):
pass pass
def ul_wait_time(self):
return self.ul_delay
def dl_wait_time(self):
return self.dl_delay
def report_dl_bytes(self, num_bytes): def report_dl_bytes(self, num_bytes):
self.dl_bytes_this_second += num_bytes self.dl_bytes_this_second += num_bytes
self.total_dl_bytes += num_bytes self.total_dl_bytes += num_bytes
@ -58,66 +51,32 @@ class RateLimiter(object):
def __init__(self, max_dl_bytes=None, max_ul_bytes=None): def __init__(self, max_dl_bytes=None, max_ul_bytes=None):
self.max_dl_bytes = max_dl_bytes self.max_dl_bytes = max_dl_bytes
self.max_ul_bytes = max_ul_bytes self.max_ul_bytes = max_ul_bytes
self.dl_bytes_this_second = 0 self.dl_bytes_this_interval = 0
self.ul_bytes_this_second = 0 self.ul_bytes_this_interval = 0
self.total_dl_bytes = 0 self.total_dl_bytes = 0
self.total_ul_bytes = 0 self.total_ul_bytes = 0
self.next_tick = None self.tick_call = None
self.next_unthrottle_dl = None self.tick_interval = 0.1
self.next_unthrottle_ul = None
self.next_dl_check = None
self.next_ul_check = None
self.dl_check_interval = 1.0
self.ul_check_interval = 1.0
self.dl_throttled = False self.dl_throttled = False
self.ul_throttled = False self.ul_throttled = False
self.protocols = [] self.protocols = []
def start(self):
self.tick_call = task.LoopingCall(self.tick)
self.tick_call.start(self.tick_interval)
def tick(self): def tick(self):
self.dl_bytes_this_interval = 0
from twisted.internet import reactor self.ul_bytes_this_interval = 0
# happens once per second
if self.next_dl_check is not None:
self.next_dl_check.cancel()
self.next_dl_check = None
if self.next_ul_check is not None:
self.next_ul_check.cancel()
self.next_ul_check = None
if self.max_dl_bytes is not None:
if self.dl_bytes_this_second == 0:
self.dl_check_interval = 1.0
else:
self.dl_check_interval = min(1.0, self.dl_check_interval *
self.max_dl_bytes / self.dl_bytes_this_second)
self.next_dl_check = reactor.callLater(self.dl_check_interval, self.check_dl)
if self.max_ul_bytes is not None:
if self.ul_bytes_this_second == 0:
self.ul_check_interval = 1.0
else:
self.ul_check_interval = min(1.0, self.ul_check_interval *
self.max_ul_bytes / self.ul_bytes_this_second)
self.next_ul_check = reactor.callLater(self.ul_check_interval, self.check_ul)
self.dl_bytes_this_second = 0
self.ul_bytes_this_second = 0
self.unthrottle_dl() self.unthrottle_dl()
self.unthrottle_ul() self.unthrottle_ul()
self.next_tick = reactor.callLater(1.0, self.tick)
def stop(self): def stop(self):
if self.next_tick is not None: if self.tick_call is not None:
self.next_tick.cancel() self.tick_call.stop()
self.next_tick = None self.tick_call = None
if self.next_dl_check is not None:
self.next_dl_check.cancel()
self.next_dl_check = None
if self.next_ul_check is not None:
self.next_ul_check.cancel()
self.next_ul_check = None
def set_dl_limit(self, limit): def set_dl_limit(self, limit):
self.max_dl_bytes = limit self.max_dl_bytes = limit
@ -129,27 +88,15 @@ class RateLimiter(object):
def check_dl(self): def check_dl(self):
from twisted.internet import reactor if self.max_dl_bytes is not None and self.dl_bytes_this_interval > self.max_dl_bytes * self.tick_interval:
from twisted.internet import reactor
self.next_dl_check = None reactor.callLater(0, self.throttle_dl)
if self.dl_bytes_this_second > self.max_dl_bytes:
self.throttle_dl()
else:
self.next_dl_check = reactor.callLater(self.dl_check_interval, self.check_dl)
self.dl_check_interval = min(self.dl_check_interval * 2, 1.0)
def check_ul(self): def check_ul(self):
from twisted.internet import reactor if self.max_ul_bytes is not None and self.ul_bytes_this_interval > self.max_ul_bytes * self.tick_interval:
from twisted.internet import reactor
self.next_ul_check = None reactor.callLater(0, self.throttle_ul)
if self.ul_bytes_this_second > self.max_ul_bytes:
self.throttle_ul()
else:
self.next_ul_check = reactor.callLater(self.ul_check_interval, self.check_ul)
self.ul_check_interval = min(self.ul_check_interval * 2, 1.0)
def throttle_dl(self): def throttle_dl(self):
if self.dl_throttled is False: if self.dl_throttled is False:
@ -175,23 +122,17 @@ class RateLimiter(object):
protocol.unthrottle_upload() protocol.unthrottle_upload()
self.ul_throttled = False self.ul_throttled = False
#deprecated
def ul_wait_time(self):
return 0
def dl_wait_time(self):
return 0
#called by protocols #called by protocols
def report_dl_bytes(self, num_bytes): def report_dl_bytes(self, num_bytes):
self.dl_bytes_this_second += num_bytes self.dl_bytes_this_interval += num_bytes
self.total_dl_bytes += num_bytes self.total_dl_bytes += num_bytes
self.check_dl()
def report_ul_bytes(self, num_bytes): def report_ul_bytes(self, num_bytes):
self.ul_bytes_this_second += num_bytes self.ul_bytes_this_interval += num_bytes
self.total_ul_bytes += num_bytes self.total_ul_bytes += num_bytes
self.check_ul()
def register_protocol(self, protocol): def register_protocol(self, protocol):
if protocol not in self.protocols: if protocol not in self.protocols:

View file

@ -243,7 +243,7 @@ class LBRYSession(object):
else: else:
self.blob_manager = DiskBlobManager(self.hash_announcer, self.blob_dir, self.db_dir) self.blob_manager = DiskBlobManager(self.hash_announcer, self.blob_dir, self.db_dir)
self.rate_limiter.tick() self.rate_limiter.start()
d1 = self.blob_manager.setup() d1 = self.blob_manager.setup()
d2 = self.wallet.start() d2 = self.wallet.start()

View file

@ -307,16 +307,13 @@ class BlobRequester(object):
return return
if reason.check(NoResponseError): if reason.check(NoResponseError):
self._incompatible_peers.append(peer) self._incompatible_peers.append(peer)
return
log.warning("Blob requester: a request of type '%s' failed. Reason: %s, Error type: %s", log.warning("Blob requester: a request of type '%s' failed. Reason: %s, Error type: %s",
str(request_type), reason.getErrorMessage(), reason.type) str(request_type), reason.getErrorMessage(), reason.type)
self._update_local_score(peer, -10.0) self._update_local_score(peer, -10.0)
if isinstance(reason, InvalidResponseError): if isinstance(reason, InvalidResponseError) or isinstance(reason, NoResponseError):
peer.update_score(-10.0) peer.update_score(-10.0)
else: else:
peer.update_score(-2.0) peer.update_score(-2.0)
if reason.check(ConnectionClosedBeforeResponseError): if reason.check(ConnectionClosedBeforeResponseError):
return return
# Only unexpected errors should be returned, as they are indicative of real problems
# and may be shown to the user.
return reason return reason

View file

@ -1,6 +1,6 @@
import json import json
import logging import logging
from twisted.internet import error, defer, reactor from twisted.internet import error, defer
from twisted.internet.protocol import Protocol, ClientFactory from twisted.internet.protocol import Protocol, ClientFactory
from twisted.python import failure from twisted.python import failure
from lbrynet.conf import MAX_RESPONSE_INFO_SIZE as MAX_RESPONSE_SIZE from lbrynet.conf import MAX_RESPONSE_INFO_SIZE as MAX_RESPONSE_SIZE
@ -111,6 +111,8 @@ class ClientProtocol(Protocol):
def _ask_for_request(self): def _ask_for_request(self):
log.debug("In _ask_for_request")
if self.connection_closed is True or self.connection_closing is True: if self.connection_closed is True or self.connection_closing is True:
return return
@ -158,8 +160,9 @@ class ClientProtocol(Protocol):
RequestCanceledError): RequestCanceledError):
log.error("The connection to %s is closing due to an unexpected error: %s", str(self.peer), log.error("The connection to %s is closing due to an unexpected error: %s", str(self.peer),
err.getErrorMessage()) err.getErrorMessage())
if not err.check(RequestCanceledError): if not err.check(RequestCanceledError): # The connection manager is closing the connection, so
self.transport.loseConnection() # there's no need to do it here.
return err
def _handle_response(self, response): def _handle_response(self, response):
ds = [] ds = []
@ -181,9 +184,24 @@ class ClientProtocol(Protocol):
d.addErrback(self._handle_response_error) d.addErrback(self._handle_response_error)
ds.append(d) ds.append(d)
dl = defer.DeferredList(ds) dl = defer.DeferredList(ds, consumeErrors=True)
dl.addCallback(lambda _: self._ask_for_request()) def get_next_request(results):
failed = False
for success, result in results:
if success is False:
failed = True
log.info("The connection is closing due to an error: %s", str(result.getTraceback()))
if failed is False:
log.debug("Asking for another request.")
from twisted.internet import reactor
reactor.callLater(0, self._ask_for_request)
#self._ask_for_request()
else:
log.debug("Not asking for another request.")
self.transport.loseConnection()
dl.addCallback(get_next_request)
def _downloading_finished(self, arg): def _downloading_finished(self, arg):
log.debug("The blob has finished downloading") log.debug("The blob has finished downloading")

View file

@ -154,6 +154,6 @@ class BlobRequestHandler(object):
self.currently_uploading = None self.currently_uploading = None
self.file_sender = None self.file_sender = None
if reason is not None and isinstance(reason, Failure): if reason is not None and isinstance(reason, Failure):
log.warning("Upload has failed. Reason: %s", reason.getErrorMessage()) log.info("Upload has failed. Reason: %s", reason.getErrorMessage())
return _send_file() return _send_file()

View file

@ -338,12 +338,9 @@ class LiveStreamMetadataHandler(object):
return return
if reason.check(NoResponseError): if reason.check(NoResponseError):
self._incompatible_peers.append(peer) self._incompatible_peers.append(peer)
return
log.warning("Crypt stream info finder: a request failed. Reason: %s", reason.getErrorMessage()) log.warning("Crypt stream info finder: a request failed. Reason: %s", reason.getErrorMessage())
self._update_local_score(peer, -5.0) self._update_local_score(peer, -5.0)
peer.update_score(-10.0) peer.update_score(-10.0)
if reason.check(ConnectionClosedBeforeResponseError): if reason.check(ConnectionClosedBeforeResponseError):
return return
# Only unexpected errors should be returned, as they are indicative of real problems
# and may be shown to the user.
return reason return reason

View file

@ -290,15 +290,12 @@ class BlindMetadataHandler(object):
return return
if reason.check(NoResponseError): if reason.check(NoResponseError):
self._incompatible_peers.append(peer) self._incompatible_peers.append(peer)
return
log.warning("Valuable blob info requester: a request of type %s has failed. Reason: %s", log.warning("Valuable blob info requester: a request of type %s has failed. Reason: %s",
str(request_type), str(reason.getErrorMessage())) str(request_type), str(reason.getErrorMessage()))
self._update_local_score(peer, -10.0) self._update_local_score(peer, -10.0)
peer.update_score(-5.0) peer.update_score(-5.0)
if reason.check(ConnectionClosedBeforeResponseError): if reason.check(ConnectionClosedBeforeResponseError):
return return
# Only unexpected errors should be returned, as they are indicative of real problems
# and may be shown to the user.
return reason return reason
def _search_for_peers(self): def _search_for_peers(self):

View file

@ -92,30 +92,17 @@ class FakeWallet(object):
class FakePeerFinder(object): class FakePeerFinder(object):
def __init__(self, port, peer_manager): def __init__(self, start_port, peer_manager, num_peers):
self.peer_manager = peer_manager self.start_port = start_port
def find_peers_for_blob(self, *args):
return defer.succeed([self.peer_manager.get_peer("127.0.0.1", 5553)])
def run_manage_loop(self):
pass
def stop(self):
pass
class FakeTwoPeerFinder(object):
def __init__(self, port, peer_manager):
self.peer_manager = peer_manager self.peer_manager = peer_manager
self.num_peers = num_peers
self.count = 0 self.count = 0
def find_peers_for_blob(self, *args): def find_peers_for_blob(self, *args):
if self.count % 2 == 0: peer_port = self.start_port + self.count
peer_port = 5553
else:
peer_port = 5554
self.count += 1 self.count += 1
if self.count >= self.num_peers:
self.count = 0
return defer.succeed([self.peer_manager.get_peer("127.0.0.1", peer_port)]) return defer.succeed([self.peer_manager.get_peer("127.0.0.1", peer_port)])
def run_manage_loop(self): def run_manage_loop(self):
@ -208,7 +195,7 @@ test_create_stream_sd_file = {
'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d315c0f998c4b3545c2dc60906122d94653c23b1898229e3f'} 'stream_hash': '6d27fbe10c86d81aacfb897c7a426d0a2214f5a299455a6d315c0f998c4b3545c2dc60906122d94653c23b1898229e3f'}
def start_lbry_uploader(sd_hash_queue, kill_event, dead_event): def start_lbry_uploader(sd_hash_queue, kill_event, dead_event, file_size, ul_rate_limit=None):
sys.modules = sys.modules.copy() sys.modules = sys.modules.copy()
@ -231,9 +218,9 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event):
wallet = FakeWallet() wallet = FakeWallet()
peer_manager = PeerManager() peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager) peer_finder = FakePeerFinder(5553, peer_manager, 1)
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = RateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
db_dir = "server" db_dir = "server"
@ -247,6 +234,9 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event):
lbry_file_manager = LBRYFileManager(session, stream_info_manager, sd_identifier) lbry_file_manager = LBRYFileManager(session, stream_info_manager, sd_identifier)
if ul_rate_limit is not None:
session.rate_limiter.set_ul_limit(ul_rate_limit)
def start_all(): def start_all():
d = session.setup() d = session.setup()
@ -302,7 +292,7 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event):
return True return True
def create_stream(): def create_stream():
test_file = GenFile(5209343, b''.join([chr(i) for i in xrange(0, 64, 6)])) test_file = GenFile(file_size, b''.join([chr(i) for i in xrange(0, 64, 6)]))
d = create_lbry_file(session, lbry_file_manager, "test_file", test_file) d = create_lbry_file(session, lbry_file_manager, "test_file", test_file)
return d return d
@ -319,6 +309,123 @@ def start_lbry_uploader(sd_hash_queue, kill_event, dead_event):
reactor.run() reactor.run()
def start_lbry_reuploader(sd_hash, kill_event, dead_event, ready_event, n, ul_rate_limit=None):
sys.modules = sys.modules.copy()
del sys.modules['twisted.internet.reactor']
import twisted.internet
twisted.internet.reactor = twisted.internet.epollreactor.EPollReactor()
sys.modules['twisted.internet.reactor'] = twisted.internet.reactor
from twisted.internet import reactor
logging.debug("Starting the uploader")
Random.atfork()
r = random.Random()
r.seed("start_lbry_uploader")
wallet = FakeWallet()
peer_port = 5553 + n
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 1)
hash_announcer = FakeAnnouncer()
rate_limiter = RateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "server_" + str(n)
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
os.mkdir(blob_dir)
session = LBRYSession(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd" + str(n),
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=None, peer_port=peer_port,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet)
stream_info_manager = TempLBRYFileMetadataManager()
lbry_file_manager = LBRYFileManager(session, stream_info_manager, sd_identifier)
if ul_rate_limit is not None:
session.rate_limiter.set_ul_limit(ul_rate_limit)
def make_downloader(metadata, prm):
info_validator = metadata.validator
options = metadata.options
factories = metadata.factories
chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)]
return factories[0].make_downloader(metadata, chosen_options, prm)
def download_file():
prm = PaymentRateManager(session.base_payment_rate_manager)
d = download_sd_blob(session, sd_hash, prm)
d.addCallback(sd_identifier.get_metadata_for_sd_blob)
d.addCallback(make_downloader, prm)
d.addCallback(lambda downloader: downloader.start())
return d
def start_transfer():
logging.debug("Starting the transfer")
d = session.setup()
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: lbry_file_manager.setup())
d.addCallback(lambda _: download_file())
return d
def start_server():
server_port = None
query_handler_factories = {
BlobAvailabilityHandlerFactory(session.blob_manager): True,
BlobRequestHandlerFactory(session.blob_manager, session.wallet,
PaymentRateManager(session.base_payment_rate_manager)): True,
session.wallet.get_wallet_info_query_handler_factory(): True,
}
server_factory = ServerProtocolFactory(session.rate_limiter,
query_handler_factories,
session.peer_manager)
server_port = reactor.listenTCP(peer_port, server_factory)
logging.debug("Started listening")
def kill_server():
ds = []
ds.append(session.shut_down())
ds.append(lbry_file_manager.stop())
if server_port:
ds.append(server_port.stopListening())
kill_check.stop()
dead_event.set()
dl = defer.DeferredList(ds)
dl.addCallback(lambda _: reactor.stop())
return dl
def check_for_kill():
if kill_event.is_set():
kill_server()
kill_check = task.LoopingCall(check_for_kill)
kill_check.start(1.0)
ready_event.set()
logging.debug("set the ready event")
d = task.deferLater(reactor, 1.0, start_transfer)
d.addCallback(lambda _: start_server())
reactor.run()
def start_live_server(sd_hash_queue, kill_event, dead_event): def start_live_server(sd_hash_queue, kill_event, dead_event):
sys.modules = sys.modules.copy() sys.modules = sys.modules.copy()
@ -342,7 +449,7 @@ def start_live_server(sd_hash_queue, kill_event, dead_event):
wallet = FakeWallet() wallet = FakeWallet()
peer_manager = PeerManager() peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager) peer_finder = FakePeerFinder(5553, peer_manager, 1)
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
@ -481,7 +588,7 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow):
wallet = FakeWallet() wallet = FakeWallet()
peer_manager = PeerManager() peer_manager = PeerManager()
peer_finder = FakePeerFinder(5554, peer_manager) peer_finder = FakePeerFinder(5553, peer_manager, 1)
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = RateLimiter() rate_limiter = RateLimiter()
@ -601,29 +708,29 @@ class TestTransfer(TestCase):
return d return d
@staticmethod @staticmethod
def wait_for_dead_event(dead_event): def wait_for_event(event, timeout):
from twisted.internet import reactor from twisted.internet import reactor
d = defer.Deferred() d = defer.Deferred()
def stop(): def stop():
dead_check.stop() set_check.stop()
if stop_call.active(): if stop_call.active():
stop_call.cancel() stop_call.cancel()
d.callback(True) d.callback(True)
def check_if_dead_event_set(): def check_if_event_set():
if dead_event.is_set(): if event.is_set():
logging.debug("Dead event has been found set") logging.debug("Dead event has been found set")
stop() stop()
def done_waiting(): def done_waiting():
logging.warning("Dead event has not been found set and timeout has expired") logging.warning("Event has not been found set and timeout has expired")
stop() stop()
dead_check = task.LoopingCall(check_if_dead_event_set) set_check = task.LoopingCall(check_if_event_set)
dead_check.start(.1) set_check.start(.1)
stop_call = reactor.callLater(15, done_waiting) stop_call = reactor.callLater(timeout, done_waiting)
return d return d
@staticmethod @staticmethod
@ -650,7 +757,7 @@ class TestTransfer(TestCase):
sd_hash_queue = Queue() sd_hash_queue = Queue()
kill_event = Event() kill_event = Event()
dead_event = Event() dead_event = Event()
uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event)) uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event, 5209343))
uploader.start() uploader.start()
self.server_processes.append(uploader) self.server_processes.append(uploader)
@ -658,7 +765,7 @@ class TestTransfer(TestCase):
wallet = FakeWallet() wallet = FakeWallet()
peer_manager = PeerManager() peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager) peer_finder = FakePeerFinder(5553, peer_manager, 1)
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
@ -717,7 +824,7 @@ class TestTransfer(TestCase):
logging.debug("Client is stopping normally.") logging.debug("Client is stopping normally.")
kill_event.set() kill_event.set()
logging.debug("Set the kill event") logging.debug("Set the kill event")
d = self.wait_for_dead_event(dead_event) d = self.wait_for_event(dead_event, 15)
def print_shutting_down(): def print_shutting_down():
logging.info("Client is shutting down") logging.info("Client is shutting down")
@ -744,7 +851,7 @@ class TestTransfer(TestCase):
wallet = FakeWallet() wallet = FakeWallet()
peer_manager = PeerManager() peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager) peer_finder = FakePeerFinder(5553, peer_manager, 1)
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
@ -815,7 +922,7 @@ class TestTransfer(TestCase):
logging.debug("Client is stopping normally.") logging.debug("Client is stopping normally.")
kill_event.set() kill_event.set()
logging.debug("Set the kill event") logging.debug("Set the kill event")
d = self.wait_for_dead_event(dead_event) d = self.wait_for_event(dead_event, 15)
def print_shutting_down(): def print_shutting_down():
logging.info("Client is shutting down") logging.info("Client is shutting down")
@ -847,7 +954,7 @@ class TestTransfer(TestCase):
wallet = FakeWallet() wallet = FakeWallet()
peer_manager = PeerManager() peer_manager = PeerManager()
peer_finder = FakeTwoPeerFinder(5553, peer_manager) peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
@ -896,8 +1003,8 @@ class TestTransfer(TestCase):
logging.debug("Client is stopping normally.") logging.debug("Client is stopping normally.")
kill_event.set() kill_event.set()
logging.debug("Set the kill event") logging.debug("Set the kill event")
d1 = self.wait_for_dead_event(dead_event_1) d1 = self.wait_for_event(dead_event_1, 15)
d2 = self.wait_for_dead_event(dead_event_2) d2 = self.wait_for_event(dead_event_2, 15)
dl = defer.DeferredList([d1, d2]) dl = defer.DeferredList([d1, d2])
def print_shutting_down(): def print_shutting_down():
@ -916,7 +1023,7 @@ class TestTransfer(TestCase):
sd_hash_queue = Queue() sd_hash_queue = Queue()
kill_event = Event() kill_event = Event()
dead_event = Event() dead_event = Event()
uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event)) uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_event, 5209343))
uploader.start() uploader.start()
self.server_processes.append(uploader) self.server_processes.append(uploader)
@ -924,7 +1031,7 @@ class TestTransfer(TestCase):
wallet = FakeWallet() wallet = FakeWallet()
peer_manager = PeerManager() peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager) peer_finder = FakePeerFinder(5553, peer_manager, 1)
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
@ -1012,7 +1119,7 @@ class TestTransfer(TestCase):
logging.debug("Client is stopping normally.") logging.debug("Client is stopping normally.")
kill_event.set() kill_event.set()
logging.debug("Set the kill event") logging.debug("Set the kill event")
d = self.wait_for_dead_event(dead_event) d = self.wait_for_event(dead_event, 15)
def print_shutting_down(): def print_shutting_down():
logging.info("Client is shutting down") logging.info("Client is shutting down")
@ -1026,6 +1133,109 @@ class TestTransfer(TestCase):
d.addBoth(stop) d.addBoth(stop)
return d return d
def test_multiple_uploaders(self):
sd_hash_queue = Queue()
num_uploaders = 3
kill_event = Event()
dead_events = [Event() for _ in range(num_uploaders)]
ready_events = [Event() for _ in range(1, num_uploaders)]
uploader = Process(target=start_lbry_uploader, args=(sd_hash_queue, kill_event, dead_events[0],
9373419, 2**22))
uploader.start()
self.server_processes.append(uploader)
logging.debug("Testing multiple uploaders")
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, num_uploaders)
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
db_dir = "client"
blob_dir = os.path.join(db_dir, "blobfiles")
os.mkdir(db_dir)
os.mkdir(blob_dir)
self.session = LBRYSession(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=None, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet)
self.stream_info_manager = TempLBRYFileMetadataManager()
self.lbry_file_manager = LBRYFileManager(self.session, self.stream_info_manager, sd_identifier)
def start_additional_uploaders(sd_hash):
for i in range(1, num_uploaders):
uploader = Process(target=start_lbry_reuploader,
args=(sd_hash, kill_event, dead_events[i], ready_events[i-1], i, 2**10))
uploader.start()
self.server_processes.append(uploader)
return defer.succeed(True)
def wait_for_ready_events():
return defer.DeferredList([self.wait_for_event(ready_event, 60) for ready_event in ready_events])
def make_downloader(metadata, prm):
info_validator = metadata.validator
options = metadata.options
factories = metadata.factories
chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)]
return factories[0].make_downloader(metadata, chosen_options, prm)
def download_file(sd_hash):
prm = PaymentRateManager(self.session.base_payment_rate_manager)
d = download_sd_blob(self.session, sd_hash, prm)
d.addCallback(sd_identifier.get_metadata_for_sd_blob)
d.addCallback(make_downloader, prm)
d.addCallback(lambda downloader: downloader.start())
return d
def check_md5_sum():
f = open('test_file')
hashsum = MD5.new()
hashsum.update(f.read())
self.assertEqual(hashsum.hexdigest(), "e5941d615f53312fd66638239c1f90d5")
def start_transfer(sd_hash):
logging.debug("Starting the transfer")
d = start_additional_uploaders(sd_hash)
d.addCallback(lambda _: wait_for_ready_events())
d.addCallback(lambda _: self.session.setup())
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: download_file(sd_hash))
d.addCallback(lambda _: check_md5_sum())
return d
def stop(arg):
if isinstance(arg, Failure):
logging.debug("Client is stopping due to an error. Error: %s", arg.getTraceback())
else:
logging.debug("Client is stopping normally.")
kill_event.set()
logging.debug("Set the kill event")
d = defer.DeferredList([self.wait_for_event(dead_event, 15) for dead_event in dead_events])
def print_shutting_down():
logging.info("Client is shutting down")
d.addCallback(lambda _: print_shutting_down())
d.addCallback(lambda _: arg)
return d
d = self.wait_for_hash_from_queue(sd_hash_queue)
d.addCallback(start_transfer)
d.addBoth(stop)
return d
class TestStreamify(TestCase): class TestStreamify(TestCase):
@ -1057,7 +1267,7 @@ class TestStreamify(TestCase):
wallet = FakeWallet() wallet = FakeWallet()
peer_manager = PeerManager() peer_manager = PeerManager()
peer_finder = FakeTwoPeerFinder(5553, peer_manager) peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()
@ -1109,7 +1319,7 @@ class TestStreamify(TestCase):
wallet = FakeWallet() wallet = FakeWallet()
peer_manager = PeerManager() peer_manager = PeerManager()
peer_finder = FakeTwoPeerFinder(5553, peer_manager) peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer() hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter() rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier() sd_identifier = StreamDescriptorIdentifier()