forked from LBRYCommunity/lbry-sdk
fix and enable test_blob_announcer
This commit is contained in:
parent
a76a0ac8c4
commit
441cc950aa
2 changed files with 78 additions and 61 deletions
|
@ -1,60 +1,70 @@
|
||||||
import contextlib
|
import contextlib
|
||||||
|
import logging
|
||||||
import typing
|
import typing
|
||||||
import binascii
|
import binascii
|
||||||
import socket
|
import socket
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
from lbry.testcase import AsyncioTestCase
|
from lbry.testcase import AsyncioTestCase
|
||||||
from tests import dht_mocks
|
from tests import dht_mocks
|
||||||
|
from lbry.dht.protocol.distance import Distance
|
||||||
from lbry.conf import Config
|
from lbry.conf import Config
|
||||||
from lbry.dht import constants
|
from lbry.dht import constants
|
||||||
from lbry.dht.node import Node
|
from lbry.dht.node import Node
|
||||||
from lbry.dht.peer import PeerManager, make_kademlia_peer
|
from lbry.dht.peer import PeerManager, make_kademlia_peer
|
||||||
from lbry.dht.blob_announcer import BlobAnnouncer
|
from lbry.dht.blob_announcer import BlobAnnouncer
|
||||||
from lbry.extras.daemon.storage import SQLiteStorage
|
from lbry.extras.daemon.storage import SQLiteStorage
|
||||||
from unittest import skip
|
|
||||||
|
|
||||||
class TestBlobAnnouncer(AsyncioTestCase):
|
class TestBlobAnnouncer(AsyncioTestCase):
|
||||||
|
TIMEOUT = 20.0 # lower than default
|
||||||
|
|
||||||
async def setup_node(self, peer_addresses, address, node_id):
|
async def setup_node(self, peer_addresses, address, node_id):
|
||||||
self.nodes: typing.Dict[int, Node] = {}
|
self.nodes: typing.Dict[int, Node] = {}
|
||||||
self.advance = dht_mocks.get_time_accelerator(self.loop, self.loop.time())
|
self.advance = dht_mocks.get_time_accelerator(self.loop)
|
||||||
|
self.instant_advance = dht_mocks.get_time_accelerator(self.loop)
|
||||||
self.conf = Config()
|
self.conf = Config()
|
||||||
self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time)
|
|
||||||
await self.storage.open()
|
|
||||||
self.peer_manager = PeerManager(self.loop)
|
self.peer_manager = PeerManager(self.loop)
|
||||||
self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address)
|
self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address)
|
||||||
await self.node.start_listening(address)
|
await self.node.start_listening(address)
|
||||||
self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage)
|
await asyncio.gather(*[self.add_peer(node_id, address) for node_id, address in peer_addresses])
|
||||||
for node_id, address in peer_addresses:
|
for first_peer in self.nodes.values():
|
||||||
await self.add_peer(node_id, address)
|
for second_peer in self.nodes.values():
|
||||||
|
if first_peer == second_peer:
|
||||||
|
continue
|
||||||
|
self.add_peer_to_routing_table(first_peer, second_peer)
|
||||||
|
self.add_peer_to_routing_table(second_peer, first_peer)
|
||||||
|
await self.advance(0.1) # just to make pings go through
|
||||||
self.node.joined.set()
|
self.node.joined.set()
|
||||||
self.node._refresh_task = self.loop.create_task(self.node.refresh_node())
|
self.node._refresh_task = self.loop.create_task(self.node.refresh_node())
|
||||||
|
self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time)
|
||||||
|
await self.storage.open()
|
||||||
|
self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage)
|
||||||
|
|
||||||
async def add_peer(self, node_id, address, add_to_routing_table=True):
|
async def add_peer(self, node_id, address, add_to_routing_table=True):
|
||||||
|
#print('add', node_id.hex()[:8], address)
|
||||||
n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address)
|
n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address)
|
||||||
await n.start_listening(address)
|
await n.start_listening(address)
|
||||||
self.nodes.update({len(self.nodes): n})
|
self.nodes.update({len(self.nodes): n})
|
||||||
if add_to_routing_table:
|
if add_to_routing_table:
|
||||||
self.node.protocol.add_peer(
|
self.add_peer_to_routing_table(self.node, n)
|
||||||
|
|
||||||
|
def add_peer_to_routing_table(self, adder, being_added):
|
||||||
|
adder.protocol.add_peer(
|
||||||
make_kademlia_peer(
|
make_kademlia_peer(
|
||||||
n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port
|
being_added.protocol.node_id, being_added.protocol.external_ip, being_added.protocol.udp_port
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@contextlib.asynccontextmanager
|
@contextlib.asynccontextmanager
|
||||||
async def _test_network_context(self, peer_addresses=None):
|
async def _test_network_context(self, peer_count=200):
|
||||||
self.peer_addresses = peer_addresses or [
|
self.peer_addresses = [
|
||||||
(constants.generate_id(2), '1.2.3.2'),
|
(constants.generate_id(i), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big')))
|
||||||
(constants.generate_id(3), '1.2.3.3'),
|
for i in range(1, peer_count + 1)
|
||||||
(constants.generate_id(4), '1.2.3.4'),
|
|
||||||
(constants.generate_id(5), '1.2.3.5'),
|
|
||||||
(constants.generate_id(6), '1.2.3.6'),
|
|
||||||
(constants.generate_id(7), '1.2.3.7'),
|
|
||||||
(constants.generate_id(8), '1.2.3.8'),
|
|
||||||
(constants.generate_id(9), '1.2.3.9'),
|
|
||||||
]
|
]
|
||||||
try:
|
try:
|
||||||
with dht_mocks.mock_network_loop(self.loop):
|
with dht_mocks.mock_network_loop(self.loop):
|
||||||
await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1))
|
await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1000))
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
self.blob_announcer.stop()
|
self.blob_announcer.stop()
|
||||||
|
@ -73,43 +83,58 @@ class TestBlobAnnouncer(AsyncioTestCase):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
await peer.ping()
|
await peer.ping()
|
||||||
return peer
|
return last_node
|
||||||
|
|
||||||
@skip("Something from a previous test is leaking into this test and causing it to fail intermittently")
|
|
||||||
async def test_announce_blobs(self):
|
async def test_announce_blobs(self):
|
||||||
blob1 = binascii.hexlify(b'1' * 48).decode()
|
blob1 = binascii.hexlify(b'1' * 48).decode()
|
||||||
blob2 = binascii.hexlify(b'2' * 48).decode()
|
blob2 = binascii.hexlify(b'2' * 48).decode()
|
||||||
|
|
||||||
async with self._test_network_context():
|
async with self._test_network_context(peer_count=100):
|
||||||
await self.storage.add_blobs((blob1, 1024), (blob2, 1024), finished=True)
|
await self.storage.add_blobs((blob1, 1024, 0, True), (blob2, 1024, 0, True), finished=True)
|
||||||
await self.storage.db.execute(
|
await self.storage.add_blobs(
|
||||||
"update blob set next_announce_time=0, should_announce=1 where blob_hash in (?, ?)",
|
*((constants.generate_id(value).hex(), 1024, 0, True) for value in range(1000, 1090)),
|
||||||
(blob1, blob2)
|
finished=True)
|
||||||
)
|
await self.storage.db.execute("update blob set next_announce_time=0, should_announce=1")
|
||||||
to_announce = await self.storage.get_blobs_to_announce()
|
to_announce = await self.storage.get_blobs_to_announce()
|
||||||
self.assertEqual(2, len(to_announce))
|
self.assertEqual(92, len(to_announce))
|
||||||
self.blob_announcer.start(batch_size=1) # so it covers batching logic
|
self.blob_announcer.start(batch_size=10) # so it covers batching logic
|
||||||
# takes 60 seconds to start, but we advance 120 to ensure it processed all batches
|
# takes 60 seconds to start, but we advance 120 to ensure it processed all batches
|
||||||
await self.advance(60.0 * 2)
|
ongoing_announcements = asyncio.ensure_future(self.blob_announcer.wait())
|
||||||
|
await self.instant_advance(60.0)
|
||||||
|
await ongoing_announcements
|
||||||
to_announce = await self.storage.get_blobs_to_announce()
|
to_announce = await self.storage.get_blobs_to_announce()
|
||||||
self.assertEqual(0, len(to_announce))
|
self.assertEqual(0, len(to_announce))
|
||||||
self.blob_announcer.stop()
|
self.blob_announcer.stop()
|
||||||
|
|
||||||
|
# as routing table pollution will cause some peers to be hard to reach, we add a tolerance for CI
|
||||||
|
tolerance = 0.8 # at least 80% of the announcements are within the top K
|
||||||
|
for blob in await self.storage.get_all_blob_hashes():
|
||||||
|
distance = Distance(bytes.fromhex(blob))
|
||||||
|
candidates = list(self.nodes.values())
|
||||||
|
candidates.sort(key=lambda sorting_node: distance(sorting_node.protocol.node_id))
|
||||||
|
has_it = 0
|
||||||
|
for index, node in enumerate(candidates[:constants.K], start=1):
|
||||||
|
if node.protocol.data_store.get_peers_for_blob(bytes.fromhex(blob)):
|
||||||
|
has_it += 1
|
||||||
|
else:
|
||||||
|
logging.warning("blob %s wasnt found between the best K (%s)", blob[:8], node.protocol.node_id.hex()[:8])
|
||||||
|
self.assertGreaterEqual(has_it, int(tolerance * constants.K))
|
||||||
|
|
||||||
|
|
||||||
# test that we can route from a poorly connected peer all the way to the announced blob
|
# test that we can route from a poorly connected peer all the way to the announced blob
|
||||||
|
|
||||||
await self.chain_peer(constants.generate_id(10), '1.2.3.10')
|
current = len(self.nodes)
|
||||||
await self.chain_peer(constants.generate_id(11), '1.2.3.11')
|
await self.chain_peer(constants.generate_id(current + 1), '1.2.3.10')
|
||||||
await self.chain_peer(constants.generate_id(12), '1.2.3.12')
|
await self.chain_peer(constants.generate_id(current + 2), '1.2.3.11')
|
||||||
await self.chain_peer(constants.generate_id(13), '1.2.3.13')
|
await self.chain_peer(constants.generate_id(current + 3), '1.2.3.12')
|
||||||
await self.chain_peer(constants.generate_id(14), '1.2.3.14')
|
await self.chain_peer(constants.generate_id(current + 4), '1.2.3.13')
|
||||||
await self.advance(61.0)
|
last = await self.chain_peer(constants.generate_id(current + 5), '1.2.3.14')
|
||||||
|
|
||||||
last = self.nodes[len(self.nodes) - 1]
|
|
||||||
search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop)
|
search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop)
|
||||||
search_q.put_nowait(blob1)
|
search_q.put_nowait(blob1)
|
||||||
|
|
||||||
_, task = last.accumulate_peers(search_q, peer_q)
|
_, task = last.accumulate_peers(search_q, peer_q)
|
||||||
found_peers = await peer_q.get()
|
found_peers = await asyncio.wait_for(peer_q.get(), 1.0)
|
||||||
task.cancel()
|
task.cancel()
|
||||||
|
|
||||||
self.assertEqual(1, len(found_peers))
|
self.assertEqual(1, len(found_peers))
|
||||||
|
@ -119,21 +144,13 @@ class TestBlobAnnouncer(AsyncioTestCase):
|
||||||
|
|
||||||
async def test_popular_blob(self):
|
async def test_popular_blob(self):
|
||||||
peer_count = 150
|
peer_count = 150
|
||||||
addresses = [
|
blob_hash = constants.generate_id(99999)
|
||||||
(constants.generate_id(i + 1), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big')))
|
|
||||||
for i in range(peer_count)
|
|
||||||
]
|
|
||||||
blob_hash = b'1' * 48
|
|
||||||
|
|
||||||
async with self._test_network_context(peer_addresses=addresses):
|
async with self._test_network_context(peer_count=peer_count):
|
||||||
total_seen = set()
|
total_seen = set()
|
||||||
announced_to = self.nodes[0]
|
announced_to = self.nodes.pop(0)
|
||||||
for i in range(1, peer_count):
|
for i, node in enumerate(self.nodes.values()):
|
||||||
node = self.nodes[i]
|
self.add_peer_to_routing_table(announced_to, node)
|
||||||
kad_peer = make_kademlia_peer(
|
|
||||||
node.protocol.node_id, node.protocol.external_ip, node.protocol.udp_port
|
|
||||||
)
|
|
||||||
await announced_to.protocol._add_peer(kad_peer)
|
|
||||||
peer = node.protocol.get_rpc_peer(
|
peer = node.protocol.get_rpc_peer(
|
||||||
make_kademlia_peer(
|
make_kademlia_peer(
|
||||||
announced_to.protocol.node_id,
|
announced_to.protocol.node_id,
|
||||||
|
@ -144,15 +161,15 @@ class TestBlobAnnouncer(AsyncioTestCase):
|
||||||
response = await peer.store(blob_hash)
|
response = await peer.store(blob_hash)
|
||||||
self.assertEqual(response, b'OK')
|
self.assertEqual(response, b'OK')
|
||||||
peers_for_blob = await peer.find_value(blob_hash, 0)
|
peers_for_blob = await peer.find_value(blob_hash, 0)
|
||||||
if i == 1:
|
if i == 0:
|
||||||
self.assertNotIn(blob_hash, peers_for_blob)
|
self.assertNotIn(blob_hash, peers_for_blob)
|
||||||
self.assertEqual(peers_for_blob[b'p'], 0)
|
self.assertEqual(peers_for_blob[b'p'], 0)
|
||||||
else:
|
else:
|
||||||
self.assertEqual(len(peers_for_blob[blob_hash]), min(i - 1, constants.K))
|
self.assertEqual(len(peers_for_blob[blob_hash]), min(i, constants.K))
|
||||||
self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i)
|
self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i + 1)
|
||||||
if i - 1 > constants.K:
|
if i - 1 > constants.K:
|
||||||
self.assertEqual(len(peers_for_blob[b'contacts']), constants.K)
|
self.assertEqual(len(peers_for_blob[b'contacts']), constants.K)
|
||||||
self.assertEqual(peers_for_blob[b'p'], ((i - 1) // (constants.K + 1)) + 1)
|
self.assertEqual(peers_for_blob[b'p'], (i // (constants.K + 1)) + 1)
|
||||||
seen = set(peers_for_blob[blob_hash])
|
seen = set(peers_for_blob[blob_hash])
|
||||||
self.assertEqual(len(seen), constants.K)
|
self.assertEqual(len(seen), constants.K)
|
||||||
self.assertEqual(len(peers_for_blob[blob_hash]), len(seen))
|
self.assertEqual(len(peers_for_blob[blob_hash]), len(seen))
|
||||||
|
@ -167,5 +184,5 @@ class TestBlobAnnouncer(AsyncioTestCase):
|
||||||
seen.intersection_update(page_x_set)
|
seen.intersection_update(page_x_set)
|
||||||
total_seen.update(page_x_set)
|
total_seen.update(page_x_set)
|
||||||
else:
|
else:
|
||||||
self.assertEqual(len(peers_for_blob[b'contacts']), i - 1)
|
self.assertEqual(len(peers_for_blob[b'contacts']), 8) # we always add 8 on first page
|
||||||
self.assertEqual(len(total_seen), peer_count - 2)
|
self.assertEqual(len(total_seen), peer_count - 2)
|
||||||
|
|
|
@ -29,7 +29,7 @@ class TestNodePingQueueDiscover(AsyncioTestCase):
|
||||||
(constants.generate_id(9), '1.2.3.9'),
|
(constants.generate_id(9), '1.2.3.9'),
|
||||||
]
|
]
|
||||||
with dht_mocks.mock_network_loop(loop):
|
with dht_mocks.mock_network_loop(loop):
|
||||||
advance = dht_mocks.get_time_accelerator(loop, loop.time())
|
advance = dht_mocks.get_time_accelerator(loop)
|
||||||
# start the nodes
|
# start the nodes
|
||||||
nodes: typing.Dict[int, Node] = {
|
nodes: typing.Dict[int, Node] = {
|
||||||
i: Node(loop, PeerManager(loop), node_id, 4444, 4444, 3333, address)
|
i: Node(loop, PeerManager(loop), node_id, 4444, 4444, 3333, address)
|
||||||
|
@ -131,7 +131,7 @@ class TestTemporarilyLosingConnection(AsyncioTestCase):
|
||||||
await asyncio.gather(*[n.joined.wait() for n in nodes])
|
await asyncio.gather(*[n.joined.wait() for n in nodes])
|
||||||
|
|
||||||
node = nodes[-1]
|
node = nodes[-1]
|
||||||
advance = dht_mocks.get_time_accelerator(loop, loop.time())
|
advance = dht_mocks.get_time_accelerator(loop)
|
||||||
await advance(500)
|
await advance(500)
|
||||||
|
|
||||||
# Join the network, assert that at least the known peers are in RT
|
# Join the network, assert that at least the known peers are in RT
|
||||||
|
|
Loading…
Reference in a new issue