Merge pull request #475 from lbryio/call_later

Adding reactor.callLater as a utils function
This commit is contained in:
Umpei Kay Kurokawa 2017-02-20 19:15:12 -05:00 committed by GitHub
commit 07841a7803
6 changed files with 23 additions and 16 deletions

View file

@ -12,6 +12,7 @@ at anytime.
\#\# [0.8.6rc0] - 2017-02-19 \#\# [0.8.6rc0] - 2017-02-19
### Changed ### Changed
* Add `file_get` by stream hash * Add `file_get` by stream hash
* Add utils.call_later to replace reactor.callLater
### ###
* Fix unhandled error in `get` * Fix unhandled error in `get`
* Fix sd blob timeout handling in `get_availability`, return 0.0 * Fix sd blob timeout handling in `get_availability`, return 0.0

View file

@ -5,7 +5,7 @@ from lbrynet import interfaces
from lbrynet import conf from lbrynet import conf
from lbrynet.core.client.ClientProtocol import ClientProtocolFactory from lbrynet.core.client.ClientProtocol import ClientProtocolFactory
from lbrynet.core.Error import InsufficientFundsError from lbrynet.core.Error import InsufficientFundsError
from lbrynet.core import utils
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -19,7 +19,6 @@ class PeerConnectionHandler(object):
class ConnectionManager(object): class ConnectionManager(object):
implements(interfaces.IConnectionManager) implements(interfaces.IConnectionManager)
callLater = reactor.callLater
MANAGE_CALL_INTERVAL_SEC = 1 MANAGE_CALL_INTERVAL_SEC = 1
def __init__(self, downloader, rate_limiter, def __init__(self, downloader, rate_limiter,
@ -54,7 +53,7 @@ class ConnectionManager(object):
def start(self): def start(self):
log.debug("%s starting", self._get_log_name()) log.debug("%s starting", self._get_log_name())
self._start() self._start()
self._next_manage_call = self.callLater(0, self.manage) self._next_manage_call = utils.call_later(0, self.manage)
return defer.succeed(True) return defer.succeed(True)
@ -156,7 +155,7 @@ class ConnectionManager(object):
self._manage_deferred.callback(None) self._manage_deferred.callback(None)
self._manage_deferred = None self._manage_deferred = None
if not self.stopped and schedule_next_call: if not self.stopped and schedule_next_call:
self._next_manage_call = self.callLater(self.MANAGE_CALL_INTERVAL_SEC, self.manage) self._next_manage_call = utils.call_later(self.MANAGE_CALL_INTERVAL_SEC, self.manage)
def _rank_request_creator_connections(self): def _rank_request_creator_connections(self):
"""Returns an ordered list of our request creators, ranked according """Returns an ordered list of our request creators, ranked according

View file

@ -3,14 +3,13 @@ import collections
import logging import logging
import time import time
from twisted.internet import defer, reactor from twisted.internet import defer
from lbrynet.core import utils
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class DHTHashAnnouncer(object): class DHTHashAnnouncer(object):
callLater = reactor.callLater
ANNOUNCE_CHECK_INTERVAL = 60 ANNOUNCE_CHECK_INTERVAL = 60
CONCURRENT_ANNOUNCERS = 5 CONCURRENT_ANNOUNCERS = 5
@ -26,7 +25,7 @@ class DHTHashAnnouncer(object):
def run_manage_loop(self): def run_manage_loop(self):
if self.peer_port is not None: if self.peer_port is not None:
self._announce_available_hashes() self._announce_available_hashes()
self.next_manage_call = self.callLater(self.ANNOUNCE_CHECK_INTERVAL, self.run_manage_loop) self.next_manage_call = utils.call_later(self.ANNOUNCE_CHECK_INTERVAL, self.run_manage_loop)
def stop(self): def stop(self):
log.info("Stopping %s", self) log.info("Stopping %s", self)
@ -79,7 +78,7 @@ class DHTHashAnnouncer(object):
log.debug('Announcing blob %s to dht', h) log.debug('Announcing blob %s to dht', h)
d = self.dht_node.announceHaveBlob(binascii.unhexlify(h), self.peer_port) d = self.dht_node.announceHaveBlob(binascii.unhexlify(h), self.peer_port)
d.chainDeferred(announce_deferred) d.chainDeferred(announce_deferred)
d.addBoth(lambda _: self.callLater(0, announce)) d.addBoth(lambda _: utils.call_later(0, announce))
else: else:
self._concurrent_announcers -= 1 self._concurrent_announcers -= 1

View file

@ -45,6 +45,13 @@ def datetime_obj(*args, **kwargs):
return datetime.datetime(*args, **kwargs) return datetime.datetime(*args, **kwargs)
def call_later(delay, func, *args, **kwargs):
# Import here to ensure that it gets called after installing a reator
# see: http://twistedmatrix.com/documents/current/core/howto/choosing-reactor.html
from twisted.internet import reactor
return reactor.callLater(delay, func, *args, **kwargs)
def generate_id(num=None): def generate_id(num=None):
h = get_lbry_hash_obj() h = get_lbry_hash_obj()
if num is not None: if num is not None:

View file

@ -3,7 +3,7 @@ import time
import logging import logging
from lbrynet.core import log_support from lbrynet.core import log_support
from lbrynet.core.client.ConnectionManager import ConnectionManager #from lbrynet.core.client.ConnectionManager import ConnectionManager
from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.client.ClientRequest import ClientRequest
from lbrynet.core.server.ServerProtocol import ServerProtocol from lbrynet.core.server.ServerProtocol import ServerProtocol
from lbrynet.core.RateLimiter import RateLimiter from lbrynet.core.RateLimiter import RateLimiter
@ -16,6 +16,7 @@ from twisted.internet import defer, reactor, task
from twisted.internet.task import deferLater from twisted.internet.task import deferLater
from twisted.internet.protocol import Protocol, ServerFactory from twisted.internet.protocol import Protocol, ServerFactory
from lbrynet import conf from lbrynet import conf
from lbrynet.core import utils
from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IRequestCreator from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IRequestCreator
from zope.interface import implements from zope.interface import implements
@ -122,11 +123,12 @@ class TestIntegrationConnectionManager(unittest.TestCase):
self.downloader = MocDownloader() self.downloader = MocDownloader()
self.rate_limiter = RateLimiter() self.rate_limiter = RateLimiter()
self.primary_request_creator = MocRequestCreator([self.TEST_PEER]) self.primary_request_creator = MocRequestCreator([self.TEST_PEER])
self.clock = task.Clock()
utils.call_later = self.clock.callLater
from lbrynet.core.client.ConnectionManager import ConnectionManager
self.connection_manager = ConnectionManager(self.downloader, self.rate_limiter, self.connection_manager = ConnectionManager(self.downloader, self.rate_limiter,
[self.primary_request_creator], []) [self.primary_request_creator], [])
self.clock = task.Clock()
self.connection_manager.callLater = self.clock.callLater
self.connection_manager._start() self.connection_manager._start()
self.server_port = None self.server_port = None

View file

@ -2,9 +2,7 @@ import os
import binascii import binascii
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer,task from twisted.internet import defer,task
from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer,DHTHashSupplier from lbrynet.core import log_support, utils
from lbrynet.core.utils import random_string
from lbrynet.core import log_support
class MocDHTNode(object): class MocDHTNode(object):
@ -35,8 +33,9 @@ class DHTHashAnnouncerTest(unittest.TestCase):
self.blobs_to_announce.append(binascii.b2a_hex(os.urandom(32))) self.blobs_to_announce.append(binascii.b2a_hex(os.urandom(32)))
self.clock = task.Clock() self.clock = task.Clock()
self.dht_node = MocDHTNode() self.dht_node = MocDHTNode()
utils.call_later = self.clock.callLater
from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer,DHTHashSupplier
self.announcer = DHTHashAnnouncer(self.dht_node, peer_port=3333) self.announcer = DHTHashAnnouncer(self.dht_node, peer_port=3333)
self.announcer.callLater = self.clock.callLater
self.supplier = MocSupplier(self.blobs_to_announce) self.supplier = MocSupplier(self.blobs_to_announce)
self.announcer.add_supplier(self.supplier) self.announcer.add_supplier(self.supplier)