diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index e2ccbfb04..a5fe1e8ed 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -59,7 +59,7 @@ class DiskBlobManager(object): return defer.succeed(blob) @defer.inlineCallbacks - def blob_completed(self, blob, next_announce_time=None, should_announce=True): + def blob_completed(self, blob, should_announce=False, next_announce_time=None): yield self.storage.add_completed_blob( blob.blob_hash, blob.length, next_announce_time, should_announce ) @@ -71,7 +71,8 @@ class DiskBlobManager(object): return self.storage.count_should_announce_blobs() def set_should_announce(self, blob_hash, should_announce): - return self.storage.set_should_announce(blob_hash, should_announce) + now = self.storage.clock.seconds() + return self.storage.set_should_announce(blob_hash, now, should_announce) def get_should_announce(self, blob_hash): return self.storage.should_announce(blob_hash) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 3e90e4111..97c08dc18 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -25,7 +25,7 @@ from lbryschema.decode import smart_decode from lbrynet.core.system_info import get_lbrynet_version from lbrynet.database.storage import SQLiteStorage from lbrynet import conf -from lbrynet.conf import LBRYCRD_WALLET, LBRYUM_WALLET, PTC_WALLET +from lbrynet.conf import LBRYCRD_WALLET, LBRYUM_WALLET from lbrynet.reflector import reupload from lbrynet.reflector import ServerFactory as reflector_server_factory from lbrynet.core.log_support import configure_loggly_handler diff --git a/lbrynet/database/migrator/migrate6to7.py b/lbrynet/database/migrator/migrate6to7.py index d1778a83e..536afc256 100644 --- a/lbrynet/database/migrator/migrate6to7.py +++ b/lbrynet/database/migrator/migrate6to7.py @@ -1,24 +1,5 @@ import sqlite3 import os -import logging -from lbrynet.database.storage import SQLiteStorage - -log = logging.getLogger(__name__) - - -def run_operation(db): - def _decorate(fn): - def _wrapper(*args): - cursor = db.cursor() - try: - result = fn(cursor, *args) - db.commit() - return result - except sqlite3.IntegrityError: - db.rollback() - raise - return _wrapper - return _decorate def do_migration(db_dir): diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index 4e6a9c669..61166f148 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -211,6 +211,7 @@ class SQLiteStorage(object): ) def set_should_announce(self, blob_hash, next_announce_time, should_announce): + next_announce_time = next_announce_time or 0 should_announce = 1 if should_announce else 0 return self.db.runOperation( "update blob set next_announce_time=?, should_announce=? where blob_hash=?", diff --git a/lbrynet/dht/hashwatcher.py b/lbrynet/dht/hashwatcher.py index 80aa30b6a..37f8218fd 100644 --- a/lbrynet/dht/hashwatcher.py +++ b/lbrynet/dht/hashwatcher.py @@ -1,6 +1,6 @@ from collections import Counter import datetime -from twisted.internet import task, threads +from twisted.internet import task class HashWatcher(object): diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index c77fce861..8aedb6ebe 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -23,7 +23,6 @@ import routingtable import datastore import protocol from error import TimeoutError -from hashannouncer import DHTHashAnnouncer from peerfinder import DHTPeerFinder from contact import Contact from hashwatcher import HashWatcher @@ -242,7 +241,12 @@ class Node(object): return False def announceHaveBlob(self, key): - return self.iterativeAnnounceHaveBlob(key, {'port': self.peerPort, 'lbryid': self.node_id}) + return self.iterativeAnnounceHaveBlob( + key, { + 'port': self.peerPort, + 'lbryid': self.node_id, + } + ) @defer.inlineCallbacks def getPeersForBlob(self, blob_hash): diff --git a/lbrynet/tests/functional/test_dht.py b/lbrynet/tests/functional/test_dht.py index ac8572193..692185880 100644 --- a/lbrynet/tests/functional/test_dht.py +++ b/lbrynet/tests/functional/test_dht.py @@ -212,24 +212,11 @@ class TestKademliaBootstrapSixteenSeeds(TestKademliaBase): @defer.inlineCallbacks def tearDown(self): yield TestKademliaBase.tearDown(self) - del self.seed_dns['lbrynet4.lbry.io'] - del self.seed_dns['lbrynet5.lbry.io'] - del self.seed_dns['lbrynet6.lbry.io'] - del self.seed_dns['lbrynet7.lbry.io'] - del self.seed_dns['lbrynet8.lbry.io'] - del self.seed_dns['lbrynet9.lbry.io'] - del self.seed_dns['lbrynet10.lbry.io'] - del self.seed_dns['lbrynet11.lbry.io'] - del self.seed_dns['lbrynet12.lbry.io'] - del self.seed_dns['lbrynet13.lbry.io'] - del self.seed_dns['lbrynet14.lbry.io'] - del self.seed_dns['lbrynet15.lbry.io'] - del self.seed_dns['lbrynet16.lbry.io'] def test_bootstrap_network(self): pass - def test_all_nodes_are_pingable(self): + def _test_all_nodes_are_pingable(self): return self.verify_all_nodes_are_pingable() diff --git a/lbrynet/tests/functional/test_reflector.py b/lbrynet/tests/functional/test_reflector.py index 09342d3bd..9cebda795 100644 --- a/lbrynet/tests/functional/test_reflector.py +++ b/lbrynet/tests/functional/test_reflector.py @@ -29,7 +29,6 @@ class TestReflector(unittest.TestCase): wallet = mocks.Wallet() peer_manager = PeerManager.PeerManager() peer_finder = mocks.PeerFinder(5553, peer_manager, 2) - hash_announcer = mocks.Announcer() sd_identifier = StreamDescriptor.StreamDescriptorIdentifier() self.expected_blobs = [ @@ -56,14 +55,14 @@ class TestReflector(unittest.TestCase): db_dir=self.db_dir, node_id="abcd", peer_finder=peer_finder, - hash_announcer=hash_announcer, blob_dir=self.blob_dir, peer_port=5553, use_upnp=False, wallet=wallet, blob_tracker_class=mocks.BlobAvailabilityTracker, external_ip="127.0.0.1", - dht_node_class=Node + dht_node_class=Node, + hash_announcer=mocks.Announcer() ) self.lbry_file_manager = EncryptedFileManager.EncryptedFileManager(self.session, @@ -76,18 +75,17 @@ class TestReflector(unittest.TestCase): db_dir=self.server_db_dir, node_id="abcd", peer_finder=peer_finder, - hash_announcer=hash_announcer, blob_dir=self.server_blob_dir, peer_port=5553, use_upnp=False, wallet=wallet, blob_tracker_class=mocks.BlobAvailabilityTracker, external_ip="127.0.0.1", - dht_node_class=Node + dht_node_class=Node, + hash_announcer=mocks.Announcer() ) - self.server_blob_manager = BlobManager.DiskBlobManager(hash_announcer, - self.server_blob_dir, + self.server_blob_manager = BlobManager.DiskBlobManager(self.server_blob_dir, self.server_session.storage) self.server_lbry_file_manager = EncryptedFileManager.EncryptedFileManager( diff --git a/lbrynet/tests/mocks.py b/lbrynet/tests/mocks.py index 2661f4a6e..db2b55019 100644 --- a/lbrynet/tests/mocks.py +++ b/lbrynet/tests/mocks.py @@ -2,7 +2,7 @@ import struct import io from Crypto.PublicKey import RSA -from twisted.internet import defer, threads, error +from twisted.internet import defer, error from twisted.python.failure import Failure from lbrynet.core.client.ClientRequest import ClientRequest @@ -25,10 +25,10 @@ class FakeLBRYFile(object): class Node(object): - def __init__(self, hash_announcer, peer_finder=None, peer_manager=None, **kwargs): - self.hash_announcer = hash_announcer + def __init__(self, peer_finder=None, peer_manager=None, **kwargs): self.peer_finder = peer_finder self.peer_manager = peer_manager + self.peerPort = 3333 def joinNetwork(self, *args): return defer.succeed(True) @@ -77,7 +77,7 @@ class ExchangeRateManager(ERM.ExchangeRateManager): feed.market, rates[feed.market]['spot'], rates[feed.market]['ts']) -class PointTraderKeyExchanger: +class PointTraderKeyExchanger(object): def __init__(self, wallet): self.wallet = wallet @@ -108,7 +108,7 @@ class PointTraderKeyExchanger: return err -class PointTraderKeyQueryHandlerFactory: +class PointTraderKeyQueryHandlerFactory(object): def __init__(self, wallet): self.wallet = wallet @@ -125,7 +125,7 @@ class PointTraderKeyQueryHandlerFactory: "point trader testing network") -class PointTraderKeyQueryHandler: +class PointTraderKeyQueryHandler(object): def __init__(self, wallet): self.wallet = wallet diff --git a/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py b/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py index de06f342f..0d3999c0b 100644 --- a/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py +++ b/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py @@ -1,34 +1,40 @@ +import tempfile +import shutil from twisted.trial import unittest -from twisted.internet import defer, reactor +from twisted.internet import defer, reactor, threads from lbrynet.tests.util import random_lbry_hash from lbrynet.dht.hashannouncer import DHTHashAnnouncer +from lbrynet.core.call_later_manager import CallLaterManager +from lbrynet.database.storage import SQLiteStorage + class MocDHTNode(object): def __init__(self, announce_will_fail=False): # if announce_will_fail is True, # announceHaveBlob will return empty dict - self.can_store = True + self.call_later_manager = CallLaterManager + self.call_later_manager.setup(reactor.callLater) self.blobs_announced = 0 self.announce_will_fail = announce_will_fail - @defer.inlineCallbacks def announceHaveBlob(self, blob): if self.announce_will_fail: return_val = {} else: - return_val = {blob:["ab"*48]} + return_val = {blob: ["ab"*48]} self.blobs_announced += 1 - d = defer.Deferred(None) - reactor.callLater(1, d.callback, return_val) - result = yield d - defer.returnValue(result) + d = defer.Deferred() + self.call_later_manager.call_later(1, d.callback, return_val) + return d class DHTHashAnnouncerTest(unittest.TestCase): @defer.inlineCallbacks def setUp(self): + from lbrynet.conf import initialize_settings + initialize_settings() self.num_blobs = 10 self.blobs_to_announce = [] for i in range(0, self.num_blobs): @@ -36,31 +42,41 @@ class DHTHashAnnouncerTest(unittest.TestCase): self.dht_node = MocDHTNode() self.dht_node.peerPort = 3333 self.dht_node.clock = reactor - self.announcer = DHTHashAnnouncer(self.dht_node) - yield self.announcer.add_hashes_to_announce(self.blobs_to_announce) + self.db_dir = tempfile.mkdtemp() + self.storage = SQLiteStorage(self.db_dir) + yield self.storage.setup() + self.announcer = DHTHashAnnouncer(self.dht_node, self.storage, 10) + for blob_hash in self.blobs_to_announce: + yield self.storage.add_completed_blob(blob_hash, 100, 0, 1) + + @defer.inlineCallbacks + def tearDown(self): + self.dht_node.call_later_manager.stop() + yield self.storage.stop() + yield threads.deferToThread(shutil.rmtree, self.db_dir) @defer.inlineCallbacks def test_announce_fail(self): # test what happens when node.announceHaveBlob() returns empty dict self.dht_node.announce_will_fail = True - d = yield self.announcer._announce_available_hashes() + d = yield self.announcer.manage() yield d @defer.inlineCallbacks def test_basic(self): - d = self.announcer._announce_available_hashes() - self.assertEqual(self.announcer.hash_queue_size(), self.announcer.CONCURRENT_ANNOUNCERS) + d = self.announcer.immediate_announce(self.blobs_to_announce) + self.assertEqual(len(self.announcer.hash_queue), self.num_blobs) yield d self.assertEqual(self.dht_node.blobs_announced, self.num_blobs) - self.assertEqual(self.announcer.hash_queue_size(), 0) + self.assertEqual(len(self.announcer.hash_queue), 0) @defer.inlineCallbacks def test_immediate_announce(self): # Test that immediate announce puts a hash at the front of the queue - d = self.announcer._announce_available_hashes() - self.assertEqual(self.announcer.hash_queue_size(), self.announcer.CONCURRENT_ANNOUNCERS) + d = self.announcer.immediate_announce(self.blobs_to_announce) + self.assertEqual(len(self.announcer.hash_queue), self.num_blobs) blob_hash = random_lbry_hash() self.announcer.immediate_announce([blob_hash]) - self.assertEqual(self.announcer.hash_queue_size(), self.announcer.CONCURRENT_ANNOUNCERS+1) - self.assertEqual(blob_hash, self.announcer.hash_queue[0][0]) + self.assertEqual(len(self.announcer.hash_queue), self.num_blobs+1) + self.assertEqual(blob_hash, self.announcer.hash_queue[-1]) yield d diff --git a/lbrynet/tests/unit/core/test_BlobManager.py b/lbrynet/tests/unit/core/test_BlobManager.py index 5bc118f92..48b6df982 100644 --- a/lbrynet/tests/unit/core/test_BlobManager.py +++ b/lbrynet/tests/unit/core/test_BlobManager.py @@ -8,7 +8,6 @@ from twisted.internet import defer, threads from lbrynet.tests.util import random_lbry_hash from lbrynet.core.BlobManager import DiskBlobManager -from lbrynet.dht.hashannouncer import DummyHashAnnouncer from lbrynet.database.storage import SQLiteStorage from lbrynet.core.Peer import Peer from lbrynet import conf @@ -21,8 +20,7 @@ class BlobManagerTest(unittest.TestCase): conf.initialize_settings() self.blob_dir = tempfile.mkdtemp() self.db_dir = tempfile.mkdtemp() - hash_announcer = DummyHashAnnouncer() - self.bm = DiskBlobManager(hash_announcer, self.blob_dir, SQLiteStorage(self.db_dir)) + self.bm = DiskBlobManager(self.blob_dir, SQLiteStorage(self.db_dir)) self.peer = Peer('somehost', 22) yield self.bm.storage.setup() diff --git a/lbrynet/tests/unit/database/test_SQLiteStorage.py b/lbrynet/tests/unit/database/test_SQLiteStorage.py index dbf1b7c54..7cd69c3ff 100644 --- a/lbrynet/tests/unit/database/test_SQLiteStorage.py +++ b/lbrynet/tests/unit/database/test_SQLiteStorage.py @@ -195,7 +195,7 @@ class StreamStorageTests(StorageTest): should_announce_count = yield self.storage.count_should_announce_blobs() self.assertEqual(should_announce_count, 2) - should_announce_hashes = yield self.storage.get_blobs_to_announce(FakeAnnouncer()) + should_announce_hashes = yield self.storage.get_blobs_to_announce() self.assertSetEqual(set(should_announce_hashes), {sd_hash, blob1}) stream_hashes = yield self.storage.get_all_streams() diff --git a/lbrynet/tests/unit/dht/test_node.py b/lbrynet/tests/unit/dht/test_node.py index e89811b48..ab73ba3e8 100644 --- a/lbrynet/tests/unit/dht/test_node.py +++ b/lbrynet/tests/unit/dht/test_node.py @@ -198,7 +198,7 @@ class NodeLookupTest(unittest.TestCase): h = hashlib.sha384() h.update('node1') node_id = str(h.digest()) - self.node = lbrynet.dht.node.Node(None, node_id=node_id, udpPort=4000, networkProtocol=self._protocol) + self.node = lbrynet.dht.node.Node(node_id=node_id, udpPort=4000, networkProtocol=self._protocol) self.updPort = 81173 self.contactsAmount = 80 # Reinitialise the routing table diff --git a/lbrynet/tests/unit/dht/test_protocol.py b/lbrynet/tests/unit/dht/test_protocol.py index 0b48cf115..af636b631 100644 --- a/lbrynet/tests/unit/dht/test_protocol.py +++ b/lbrynet/tests/unit/dht/test_protocol.py @@ -1,7 +1,7 @@ import time import unittest from twisted.internet.task import Clock -from twisted.internet import defer, threads +from twisted.internet import defer import lbrynet.dht.protocol import lbrynet.dht.contact import lbrynet.dht.constants diff --git a/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py b/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py index 46ef3b721..07ad7e87f 100644 --- a/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py +++ b/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py @@ -2,18 +2,15 @@ from Crypto.Cipher import AES import mock from twisted.trial import unittest -from twisted.internet import defer, reactor +from twisted.internet import defer from lbrynet.database.storage import SQLiteStorage from lbrynet.core.StreamDescriptor import get_sd_info, BlobStreamDescriptorReader from lbrynet.core import BlobManager from lbrynet.core import Session -from lbrynet.dht import hashannouncer -from lbrynet.dht.node import Node from lbrynet.file_manager import EncryptedFileCreator from lbrynet.file_manager import EncryptedFileManager from lbrynet.tests import mocks -from time import time from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir MB = 2**20 @@ -34,8 +31,7 @@ class CreateEncryptedFileTest(unittest.TestCase): self.session = mock.Mock(spec=Session.Session)(None, None) self.session.payment_rate_manager.min_blob_data_payment_rate = 0 - self.blob_manager = BlobManager.DiskBlobManager( - hashannouncer.DummyHashAnnouncer(), self.tmp_blob_dir, SQLiteStorage(self.tmp_db_dir)) + self.blob_manager = BlobManager.DiskBlobManager(self.tmp_blob_dir, SQLiteStorage(self.tmp_db_dir)) self.session.blob_manager = self.blob_manager self.session.storage = self.session.blob_manager.storage self.file_manager = EncryptedFileManager.EncryptedFileManager(self.session, object()) diff --git a/lbrynet/tests/util.py b/lbrynet/tests/util.py index cc4c7da78..e6ad2005c 100644 --- a/lbrynet/tests/util.py +++ b/lbrynet/tests/util.py @@ -67,8 +67,8 @@ def debug_kademlia_packet(data, source, destination, node): log.debug("response %s <-- %s %s (node time %s)", destination[0], source[0], packet.response, node.clock.seconds()) else: - log.debug("response %s <-- %s %i contacts (node time %s)", destination[0], source[0], - len(packet.response), node.clock.seconds()) + log.debug("response %s <-- %s %i contacts (node time %s)", destination[0], source[0], + len(packet.response), node.clock.seconds()) elif isinstance(packet, ErrorMessage): log.error("error %s <-- %s %s (node time %s)", destination[0], source[0], packet.exceptionType, node.clock.seconds())