Merge pull request #242 from lbryio/shutdown-issues

Shutdown issues
This commit is contained in:
Job Evers‐Meltzer 2016-11-09 08:17:13 -06:00 committed by GitHub
commit 66745b0820
11 changed files with 158 additions and 86 deletions

View file

@ -25,11 +25,12 @@ class BlobAvailabilityTracker(object):
self._check_mine = LoopingCall(self._update_mine)
def start(self):
log.info("Starting blob tracker")
log.info("Starting %s", self)
self._check_popular.start(30)
self._check_mine.start(120)
def stop(self):
log.info("Stopping %s", self)
if self._check_popular.running:
self._check_popular.stop()
if self._check_mine.running:

View file

@ -1,8 +1,13 @@
import logging
from zope.interface import implements
from lbrynet.interfaces import IRateLimiter
from twisted.internet import task
log = logging.getLogger(__name__)
class DummyRateLimiter(object):
def __init__(self):
self.dl_bytes_this_second = 0
@ -64,6 +69,7 @@ class RateLimiter(object):
self.protocols = []
def start(self):
log.info("Starting %s", self)
self.tick_call = task.LoopingCall(self.tick)
self.tick_call.start(self.tick_interval)
@ -74,6 +80,7 @@ class RateLimiter(object):
self.unthrottle_ul()
def stop(self):
log.info("Stopping %s", self)
if self.tick_call is not None:
self.tick_call.stop()
self.tick_call = None
@ -144,4 +151,4 @@ class RateLimiter(object):
def unregister_protocol(self, protocol):
if protocol in self.protocols:
self.protocols.remove(protocol)
self.protocols.remove(protocol)

View file

@ -18,60 +18,87 @@ log = logging.getLogger(__name__)
class Session(object):
"""This class manages all important services common to any application that uses the network:
the hash announcer, which informs other peers that this peer is associated with some hash. Usually,
this means this peer has a blob identified by the hash in question, but it can be used for other
purposes.
the peer finder, which finds peers that are associated with some hash.
the blob manager, which keeps track of which blobs have been downloaded and provides access to them,
the rate limiter, which attempts to ensure download and upload rates stay below a set maximum,
and upnp, which opens holes in compatible firewalls so that remote peers can connect to this peer."""
def __init__(self, blob_data_payment_rate, db_dir=None, lbryid=None, peer_manager=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None,
peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node,
blob_tracker_class=None, payment_rate_manager_class=None, is_generous=True):
"""
@param blob_data_payment_rate: The default payment rate for blob data
"""This class manages all important services common to any application that uses the network.
the hash announcer, which informs other peers that this peer is
associated with some hash. Usually, this means this peer has a
blob identified by the hash in question, but it can be used for
other purposes.
the peer finder, which finds peers that are associated with some
hash.
the blob manager, which keeps track of which blobs have been
downloaded and provides access to them,
the rate limiter, which attempts to ensure download and upload
rates stay below a set maximum
upnp, which opens holes in compatible firewalls so that remote
peers can connect to this peer.
"""
def __init__(self, blob_data_payment_rate, db_dir=None,
lbryid=None, peer_manager=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None,
hash_announcer=None, blob_dir=None,
blob_manager=None, peer_port=None, use_upnp=True,
rate_limiter=None, wallet=None,
dht_node_class=node.Node, blob_tracker_class=None,
payment_rate_manager_class=None, is_generous=True):
"""@param blob_data_payment_rate: The default payment rate for blob data
@param db_dir: The directory in which levelDB files should be stored
@param lbryid: The unique ID of this node
@param peer_manager: An object which keeps track of all known peers. If None, a PeerManager will be created
@param peer_manager: An object which keeps track of all known
peers. If None, a PeerManager will be created
@param dht_node_port: The port on which the dht node should listen for incoming connections
@param dht_node_port: The port on which the dht node should
listen for incoming connections
@param known_dht_nodes: A list of nodes which the dht node should use to bootstrap into the dht
@param known_dht_nodes: A list of nodes which the dht node
should use to bootstrap into the dht
@param peer_finder: An object which is used to look up peers that are associated with some hash. If None,
a DHTPeerFinder will be used, which looks for peers in the distributed hash table.
@param peer_finder: An object which is used to look up peers
that are associated with some hash. If None, a
DHTPeerFinder will be used, which looks for peers in the
distributed hash table.
@param hash_announcer: An object which announces to other peers that this peer is associated with some hash.
If None, and peer_port is not None, a DHTHashAnnouncer will be used. If None and
peer_port is None, a DummyHashAnnouncer will be used, which will not actually announce
anything.
@param hash_announcer: An object which announces to other
peers that this peer is associated with some hash. If
None, and peer_port is not None, a DHTHashAnnouncer will
be used. If None and peer_port is None, a
DummyHashAnnouncer will be used, which will not actually
announce anything.
@param blob_dir: The directory in which blobs will be stored. If None and blob_manager is None, blobs will
be stored in memory only.
@param blob_dir: The directory in which blobs will be
stored. If None and blob_manager is None, blobs will be
stored in memory only.
@param blob_manager: An object which keeps track of downloaded blobs and provides access to them. If None,
and blob_dir is not None, a DiskBlobManager will be used, with the given blob_dir.
If None and blob_dir is None, a TempBlobManager will be used, which stores blobs in
memory only.
@param blob_manager: An object which keeps track of downloaded
blobs and provides access to them. If None, and blob_dir
is not None, a DiskBlobManager will be used, with the
given blob_dir. If None and blob_dir is None, a
TempBlobManager will be used, which stores blobs in memory
only.
@param peer_port: The port on which other peers should connect to this peer
@param peer_port: The port on which other peers should connect
to this peer
@param use_upnp: Whether or not to try to open a hole in the firewall so that outside peers can connect to
this peer's peer_port and dht_node_port
@param use_upnp: Whether or not to try to open a hole in the
firewall so that outside peers can connect to this peer's
peer_port and dht_node_port
@param rate_limiter: An object which keeps track of the amount of data transferred to and from this peer,
and can limit that rate if desired
@param rate_limiter: An object which keeps track of the amount
of data transferred to and from this peer, and can limit
that rate if desired
@param wallet: An object which will be used to keep track of expected payments and which will pay peers.
If None, a wallet which uses the Point Trader system will be used, which is meant for testing
only
@param wallet: An object which will be used to keep track of
expected payments and which will pay peers. If None, a
wallet which uses the Point Trader system will be used,
which is meant for testing only
@return:
"""
self.db_dir = db_dir
@ -142,6 +169,7 @@ class Session(object):
def shut_down(self):
"""Stop all services"""
log.info('Shutting down %s', self)
ds = []
if self.blob_manager is not None:
ds.append(defer.maybeDeferred(self.blob_tracker.stop))
@ -178,7 +206,9 @@ class Session(object):
self.external_ip = external_ip
if self.peer_port is not None:
if u.getspecificportmapping(self.peer_port, 'TCP') is None:
u.addportmapping(self.peer_port, 'TCP', u.lanaddr, self.peer_port, 'LBRY peer port', '')
u.addportmapping(
self.peer_port, 'TCP', u.lanaddr, self.peer_port,
'LBRY peer port', '')
self.upnp_redirects.append((self.peer_port, 'TCP'))
log.info("Set UPnP redirect for TCP port %d", self.peer_port)
else:
@ -187,16 +217,23 @@ class Session(object):
self.upnp_redirects.append((self.peer_port, 'TCP'))
if self.dht_node_port is not None:
if u.getspecificportmapping(self.dht_node_port, 'UDP') is None:
u.addportmapping(self.dht_node_port, 'UDP', u.lanaddr, self.dht_node_port, 'LBRY DHT port', '')
u.addportmapping(
self.dht_node_port, 'UDP', u.lanaddr, self.dht_node_port,
'LBRY DHT port', '')
self.upnp_redirects.append((self.dht_node_port, 'UDP'))
log.info("Set UPnP redirect for UPD port %d", self.dht_node_port)
else:
# TODO: check that the existing redirect was put up by an old lbrynet session before grabbing it
# if such a disconnected redirect exists, then upnp won't work unless the redirect is appended
# or is torn down and set back up. a bad shutdown of lbrynet could leave such a redirect up
# and cause problems on the next start.
# this could be problematic if a previous lbrynet session didn't make the redirect, and it was
# made by another application
# TODO: check that the existing redirect was
# put up by an old lbrynet session before
# grabbing it if such a disconnected redirect
# exists, then upnp won't work unless the
# redirect is appended or is torn down and set
# back up. a bad shutdown of lbrynet could
# leave such a redirect up and cause problems
# on the next start. this could be
# problematic if a previous lbrynet session
# didn't make the redirect, and it was made by
# another application
log.warning("UPnP redirect already set for UDP port %d", self.dht_node_port)
self.upnp_redirects.append((self.dht_node_port, 'UDP'))
return True
@ -271,9 +308,10 @@ class Session(object):
self.peer_finder,
self.dht_node)
if self.payment_rate_manager is None:
self.payment_rate_manager = self.payment_rate_manager_class(self.base_payment_rate_manager,
self.blob_tracker,
self.is_generous)
self.payment_rate_manager = self.payment_rate_manager_class(
self.base_payment_rate_manager,
self.blob_tracker,
self.is_generous)
self.rate_limiter.start()
d1 = self.blob_manager.setup()
@ -286,7 +324,7 @@ class Session(object):
return dl
def _unset_upnp(self):
log.info("Unsetting upnp for %s", self)
def threaded_unset_upnp():
u = miniupnpc.UPnP()
num_devices_found = u.discover()
@ -294,7 +332,9 @@ class Session(object):
u.selectigd()
for port, protocol in self.upnp_redirects:
if u.getspecificportmapping(port, protocol) is None:
log.warning("UPnP redirect for %s %d was removed by something else.", protocol, port)
log.warning(
"UPnP redirect for %s %d was removed by something else.",
protocol, port)
else:
u.deleteportmapping(port, protocol)
log.info("Removed UPnP redirect for %s %d.", protocol, port)
@ -307,5 +347,3 @@ class Session(object):
def _subfailure(self, err):
log.error(err.getTraceback())
return err.value

View file

@ -97,9 +97,8 @@ class Wallet(object):
log.error("An error occurred stopping the wallet: %s", err.getTraceback())
def stop(self):
log.info("Stopping %s", self)
self.stopped = True
# If self.next_manage_call is None, then manage is currently running or else
# start has not been called, so set stopped and do nothing else.
if self.next_manage_call is not None:

View file

@ -1,8 +1,13 @@
import binascii
import logging
from zope.interface import implements
from lbrynet.interfaces import IPeerFinder
log = logging.getLogger(__name__)
class DHTPeerFinder(object):
"""This class finds peers which have announced to the DHT that they have certain blobs"""
implements(IPeerFinder)
@ -21,6 +26,7 @@ class DHTPeerFinder(object):
self.next_manage_call = reactor.callLater(60, self.run_manage_loop)
def stop(self):
log.info("Stopping %s", self)
if self.next_manage_call is not None and self.next_manage_call.active():
self.next_manage_call.cancel()
self.next_manage_call = None
@ -45,4 +51,4 @@ class DHTPeerFinder(object):
return d
def get_most_popular_hashes(self, num_to_return):
return self.dht_node.get_most_popular_hashes(num_to_return)
return self.dht_node.get_most_popular_hashes(num_to_return)

View file

@ -1,6 +1,11 @@
import binascii
from twisted.internet import defer, reactor
import collections
import logging
from twisted.internet import defer, reactor
log = logging.getLogger(__name__)
class DHTHashAnnouncer(object):
@ -22,6 +27,7 @@ class DHTHashAnnouncer(object):
self.next_manage_call = reactor.callLater(60, self.run_manage_loop)
def stop(self):
log.info("Stopping %s", self)
if self.next_manage_call is not None:
self.next_manage_call.cancel()
self.next_manage_call = None

View file

@ -21,6 +21,7 @@ import msgtypes
import msgformat
from contact import Contact
reactor = twisted.internet.reactor
log = logging.getLogger(__name__)
@ -124,10 +125,10 @@ class KademliaProtocol(protocol.DatagramProtocol):
except IndexError:
log.warning("Couldn't decode dht datagram from %s", address)
return
message = self._translator.fromPrimitive(msgPrimitive)
remoteContact = Contact(message.nodeID, address[0], address[1], self)
# Refresh the remote node's details in the local node's k-buckets
self._node.addContact(remoteContact)
@ -143,7 +144,9 @@ class KademliaProtocol(protocol.DatagramProtocol):
del self._sentMessages[message.id]
if hasattr(df, '_rpcRawResponse'):
# The RPC requested that the raw response message and originating address be returned; do not interpret it
# The RPC requested that the raw response message
# and originating address be returned; do not
# interpret it
df.callback((message, address))
elif isinstance(message, msgtypes.ErrorMessage):
# The RPC request raised a remote exception; raise it locally
@ -175,7 +178,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
def _send(self, data, rpcID, address):
""" Transmit the specified data over UDP, breaking it up into several
packets if necessary
If the data is spread over multiple UDP datagrams, the packets have the
following structure::
| | | | | |||||||||||| 0x00 |
@ -183,7 +186,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
| type ID | of packets |of this packet | | indicator|
| (1 byte) | (2 bytes) | (2 bytes) |(20 bytes)| (1 byte) |
| | | | | |||||||||||| |
@note: The header used for breaking up large data segments will
possibly be moved out of the KademliaProtocol class in the
future, into something similar to a message translator/encoder
@ -301,19 +304,19 @@ class KademliaProtocol(protocol.DatagramProtocol):
df.errback(failure.Failure(TimeoutError(remoteContactID)))
else:
# This should never be reached
print "ERROR: deferred timed out, but is not present in sent messages list!"
log.error("deferred timed out, but is not present in sent messages list!")
def stopProtocol(self):
""" Called when the transport is disconnected.
Will only be called once, after all ports are disconnected.
"""
log.info('Stopping dht')
for key in self._callLaterList.keys():
try:
if key > time.time():
log.info('Cancelling %s', self._callLaterList[key])
self._callLaterList[key].cancel()
except Exception, e:
print e
log.exception('Failed to cancel %s', self._callLaterList[key])
del self._callLaterList[key]
#TODO: test: do we really need the reactor.iterate() call?
reactor.iterate()

View file

@ -160,7 +160,7 @@ class EncryptedFileManager(object):
return defer.fail(Failure(ValueError("Could not find that LBRY file")))
def stop(self):
log.info('Stopping %s', self)
ds = []
def wait_for_finished(lbry_file, count=2):
@ -243,4 +243,4 @@ class EncryptedFileManager(object):
@rerun_if_locked
def _get_count_for_stream_hash(self, stream_hash):
return self.sql_db.runQuery("select count(*) from lbry_file_options where stream_hash = ?",
(stream_hash,))
(stream_hash,))

View file

@ -560,6 +560,7 @@ class Daemon(AuthJSONRPCServer):
try:
if self.lbry_server_port is not None:
self.lbry_server_port, p = None, self.lbry_server_port
log.info('Stop listening to %s', p)
return defer.maybeDeferred(p.stopListening)
else:
return defer.succeed(True)
@ -650,13 +651,14 @@ class Daemon(AuthJSONRPCServer):
log.warn('Failed to upload log', exc_info=True)
d = defer.succeed(None)
d.addCallback(lambda _: self._stop_server())
d.addErrback(log_support.failure, log, 'Failure while shutting down: %s')
d.addCallback(lambda _: self._stop_reflector())
d.addErrback(lambda err: True)
d.addErrback(log_support.failure, log, 'Failure while shutting down: %s')
d.addCallback(lambda _: self.lbry_file_manager.stop())
d.addErrback(lambda err: True)
d.addErrback(log_support.failure, log, 'Failure while shutting down: %s')
if self.session is not None:
d.addCallback(lambda _: self.session.shut_down())
d.addErrback(lambda err: True)
d.addErrback(log_support.failure, log, 'Failure while shutting down: %s')
return d
def _update_settings(self, settings):
@ -770,14 +772,11 @@ class Daemon(AuthJSONRPCServer):
d = self.lbry_file_metadata_manager.setup()
def set_lbry_file_manager():
self.lbry_file_manager = EncryptedFileManager(self.session,
self.lbry_file_metadata_manager,
self.sd_identifier,
download_directory=self.download_directory)
self.lbry_file_manager = EncryptedFileManager(
self.session, self.lbry_file_metadata_manager,
self.sd_identifier, download_directory=self.download_directory)
return self.lbry_file_manager.setup()
d.addCallback(lambda _: set_lbry_file_manager())
return d
def _get_analytics(self):

View file

@ -29,6 +29,7 @@ from lbrynet.core import utils
if platform.mac_ver()[0] >= "10.10":
from LBRYNotify import LBRYNotify
log = logging.getLogger(__name__)
@ -46,9 +47,11 @@ class LBRYDaemonApp(AppKit.NSApplication):
self.icon.setSize_((20, 20))
self.statusitem.setImage_(self.icon)
self.menubarMenu = AppKit.NSMenu.alloc().init()
self.open = AppKit.NSMenuItem.alloc().initWithTitle_action_keyEquivalent_("Open", "openui:", "")
self.open = AppKit.NSMenuItem.alloc().initWithTitle_action_keyEquivalent_(
"Open", "openui:", "")
self.menubarMenu.addItem_(self.open)
self.quit = AppKit.NSMenuItem.alloc().initWithTitle_action_keyEquivalent_("Quit", "replyToApplicationShouldTerminate:", "")
self.quit = AppKit.NSMenuItem.alloc().initWithTitle_action_keyEquivalent_(
"Quit", "applicationShouldTerminate:", "")
self.menubarMenu.addItem_(self.quit)
self.statusitem.setMenu_(self.menubarMenu)
self.statusitem.setToolTip_(settings.APP_NAME)
@ -64,9 +67,14 @@ class LBRYDaemonApp(AppKit.NSApplication):
def openui_(self, sender):
webbrowser.open(settings.UI_ADDRESS)
def replyToApplicationShouldTerminate_(self, shouldTerminate):
notify("Goodbye!")
reactor.stop()
# this code is from the example https://pythonhosted.org/pyobjc/examples/Cocoa/Twisted/WebServicesTool/index.html
def applicationShouldTerminate_(self, sender):
if reactor.running:
log.info('Stopping twisted event loop')
notify("Goodbye!")
reactor.stop()
return False
return True
def notify(msg):

View file

@ -18,9 +18,14 @@ def main():
log_file = conf.settings.get_log_filename()
log_support.configure_logging(log_file, console=True)
app = LBRYDaemonApp.sharedApplication()
reactor.addSystemEventTrigger("after", "shutdown", AppHelper.stopEventLoop)
reactor.addSystemEventTrigger("after", "shutdown", shutdown)
reactor.run()
def shutdown():
log.info('Stopping event loop')
AppHelper.stopEventLoop()
if __name__ == "__main__":
main()