diff --git a/lbry/blockchain/lbrycrd.py b/lbry/blockchain/lbrycrd.py index 8a1e9a0ad..62acedf10 100644 --- a/lbry/blockchain/lbrycrd.py +++ b/lbry/blockchain/lbrycrd.py @@ -15,6 +15,7 @@ import zmq.asyncio from lbry.conf import Config from lbry.event import EventController +from lbry.error import LbrycrdEventSubscriptionError from .database import BlockchainDB from .ledger import Ledger, RegTestLedger @@ -57,10 +58,13 @@ class Process(asyncio.SubprocessProtocol): self.ready.set() +ZMQ_BLOCK_EVENT = 'pubhashblock' + + class Lbrycrd: def __init__(self, ledger: Ledger): - self.ledger = ledger + self.ledger, self.conf = ledger, ledger.conf self.data_dir = self.actual_data_dir = ledger.conf.lbrycrd_dir if self.is_regtest: self.actual_data_dir = os.path.join(self.data_dir, 'regtest') @@ -70,14 +74,8 @@ class Lbrycrd: self.cli_bin = os.path.join(self.bin_dir, 'lbrycrd-cli') self.protocol = None self.transport = None - self.hostname = 'localhost' - self.peerport = 9246 + 2 # avoid conflict with default peer port - self.rpcport = 9245 + 2 # avoid conflict with default rpc port - self.rpcuser = 'rpcuser' - self.rpcpassword = 'rpcpassword' self.subscribed = False self.subscription: Optional[asyncio.Task] = None - self.subscription_url = 'tcp://127.0.0.1:29000' self.default_generate_address = None self._on_block_controller = EventController() self.on_block = self._on_block_controller.stream @@ -88,7 +86,13 @@ class Lbrycrd: @classmethod def temp_regtest(cls): - return cls(RegTestLedger(Config.with_same_dir(tempfile.mkdtemp()))) + return cls(RegTestLedger( + Config.with_same_dir(tempfile.mkdtemp()).set( + lbrycrd_rpc_port=9245 + 2, # avoid conflict with default rpc port + lbrycrd_peer_port=9246 + 2, # avoid conflict with default peer port + lbrycrd_zmq_blocks="tcp://127.0.0.1:29000" + ) + )) @staticmethod def get_block_file_name(block_file_number): @@ -106,7 +110,10 @@ class Lbrycrd: @property def rpc_url(self): - return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.rpcport}/' + return ( + f'http://{self.conf.lbrycrd_rpc_user}:{self.conf.lbrycrd_rpc_pass}' + f'@{self.conf.lbrycrd_rpc_host}:{self.conf.lbrycrd_rpc_port}/' + ) @property def exists(self): @@ -153,14 +160,15 @@ class Lbrycrd: def get_start_command(self, *args): if self.is_regtest: args += ('-regtest',) + if self.conf.lbrycrd_zmq_blocks: + args += (f'-zmqpubhashblock={self.conf.lbrycrd_zmq_blocks}',) return ( self.daemon_bin, f'-datadir={self.data_dir}', - f'-port={self.peerport}', - f'-rpcport={self.rpcport}', - f'-rpcuser={self.rpcuser}', - f'-rpcpassword={self.rpcpassword}', - f'-zmqpubhashblock={self.subscription_url}', + f'-port={self.conf.lbrycrd_peer_port}', + f'-rpcport={self.conf.lbrycrd_rpc_port}', + f'-rpcuser={self.conf.lbrycrd_rpc_user}', + f'-rpcpassword={self.conf.lbrycrd_rpc_pass}', '-server', '-printtoconsole', *args ) @@ -197,12 +205,20 @@ class Lbrycrd: None, shutil.rmtree, self.data_dir, True ) - def subscribe(self): + async def ensure_subscribable(self): + zmq_notifications = await self.get_zmq_notifications() + subs = {e['type']: e['address'] for e in zmq_notifications} + if ZMQ_BLOCK_EVENT not in subs: + raise LbrycrdEventSubscriptionError(ZMQ_BLOCK_EVENT) + if not self.conf.lbrycrd_zmq_blocks: + self.conf.lbrycrd_zmq_blocks = subs[ZMQ_BLOCK_EVENT] + + async def subscribe(self): if not self.subscribed: self.subscribed = True ctx = zmq.asyncio.Context.instance() sock = ctx.socket(zmq.SUB) # pylint: disable=no-member - sock.connect(self.subscription_url) + sock.connect(self.conf.lbrycrd_zmq_blocks) sock.subscribe("hashblock") self.subscription = asyncio.create_task(self.subscription_handler(sock)) @@ -242,6 +258,9 @@ class Lbrycrd: result['error'].update(method=method, params=params) raise Exception(result['error']) + async def get_zmq_notifications(self): + return await self.rpc("getzmqnotifications") + async def generate(self, blocks): if self.default_generate_address is None: self.default_generate_address = await self.get_new_address() diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index d628116ce..64d169a74 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -48,9 +48,10 @@ class BlockchainSync(Sync): async def start(self): self.db.stop_event.clear() + await self.chain.ensure_subscribable() self.advance_loop_task = asyncio.create_task(self.advance()) await self.advance_loop_task - self.chain.subscribe() + await self.chain.subscribe() self.advance_loop_task = asyncio.create_task(self.advance_loop()) self.on_block_subscription = self.chain.on_block.listen( lambda e: self.advance_loop_event.set() diff --git a/lbry/conf.py b/lbry/conf.py index 80d3904c5..389ad1e10 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -505,8 +505,8 @@ class CLIConfig(TranscodeConfig): class Config(CLIConfig): db_url = String("Database connection URL, uses a local file based SQLite by default.") - processes = Integer( - "Multiprocessing, specify number of processes lbrynet can start (including main process)." + workers = Integer( + "Multiprocessing, specify number of worker processes lbrynet can start (including main process)." " (-1: threads only, 0: equal to number of CPUs, >1: specific number of processes)", -1 ) console = StringChoice( @@ -615,6 +615,12 @@ class Config(CLIConfig): comment_server = String("Comment server API URL", "https://comments.lbry.com/api") # blockchain + lbrycrd_rpc_user = String("Username for connecting to lbrycrd.", "rpcuser") + lbrycrd_rpc_pass = String("Password for connecting to lbrycrd.", "rpcpassword") + lbrycrd_rpc_host = String("Hostname for connecting to lbrycrd.", "localhost") + lbrycrd_rpc_port = Integer("Port for connecting to lbrycrd.", 9245) + lbrycrd_peer_port = Integer("Port for connecting to lbrycrd.", 9246) + lbrycrd_zmq_blocks = String("ZMQ block events address.") lbrycrd_dir = Path("Directory containing lbrycrd data.", metavar='DIR') blockchain_name = String("Blockchain name - lbrycrd_main, lbrycrd_regtest, or lbrycrd_testnet", 'lbrycrd_main') spv_address_filters = Toggle( diff --git a/lbry/console.py b/lbry/console.py index 3695b49d4..4e775444c 100644 --- a/lbry/console.py +++ b/lbry/console.py @@ -88,13 +88,12 @@ class Basic(Console): s.append('Full Node') elif isinstance(self.service, LightClient): s.append('Light Client') - if conf.processes in (-1, 99): + if conf.workers == -1: s.append('Threads Only') - elif conf.processes == 0: - s.append(f'{os.cpu_count()} Process(es)') else: - s.append(f'{conf.processes} Process(es)') - s.append(f'({os.cpu_count()} CPU(s) available)') + workers = os.cpu_count() if conf.workers == 0 else conf.workers + s.append(f'{workers} Worker' if workers == 1 else f'{workers} Workers') + s.append(f'({os.cpu_count()} CPUs available)') print(' '.join(s)) def stopping(self): diff --git a/lbry/db/database.py b/lbry/db/database.py index 14e5c6c0a..ce74005b4 100644 --- a/lbry/db/database.py +++ b/lbry/db/database.py @@ -81,7 +81,7 @@ class Database: def __init__(self, ledger: 'Ledger'): self.url = ledger.conf.db_url_or_default self.ledger = ledger - self.processes = self._normalize_processes(ledger.conf.processes) + self.workers = self._normalize_worker_processes(ledger.conf.workers) self.executor: Optional[Executor] = None self.message_queue = mp.Queue() self.stop_event = mp.Event() @@ -92,11 +92,11 @@ class Database: ) @staticmethod - def _normalize_processes(processes): - if processes == 0: + def _normalize_worker_processes(workers): + if workers == 0: return os.cpu_count() - elif processes > 0: - return processes + elif workers > 0: + return workers return 1 @classmethod @@ -162,8 +162,8 @@ class Database: self.message_queue, self.stop_event ) } - if self.processes > 1 and self.processes != 99: - self.executor = ProcessPoolExecutor(max_workers=self.processes, **kwargs) + if self.workers > 1: + self.executor = ProcessPoolExecutor(max_workers=self.workers, **kwargs) else: self.executor = ThreadPoolExecutor(max_workers=1, **kwargs) return await self.run(q.check_version_and_create_tables) diff --git a/lbry/error/README.md b/lbry/error/README.md index 4af499334..791b63309 100644 --- a/lbry/error/README.md +++ b/lbry/error/README.md @@ -81,3 +81,5 @@ Code | Name | Message 701 | InvalidExchangeRateResponse | Failed to get exchange rate from {source}: {reason} 702 | CurrencyConversion | {message} 703 | InvalidCurrency | Invalid currency: {currency} is not a supported currency. +**8xx** | Lbrycrd | **Lbrycrd** +811 | LbrycrdEventSubscription | Lbrycrd is not publishing '{event}' events. diff --git a/lbry/error/__init__.py b/lbry/error/__init__.py index 8827f5e17..66fa6c0a7 100644 --- a/lbry/error/__init__.py +++ b/lbry/error/__init__.py @@ -398,3 +398,16 @@ class InvalidCurrencyError(CurrencyExchangeError): def __init__(self, currency): self.currency = currency super().__init__(f"Invalid currency: {currency} is not a supported currency.") + + +class LbrycrdError(BaseError): + """ + **Lbrycrd** + """ + + +class LbrycrdEventSubscriptionError(LbrycrdError): + + def __init__(self, event): + self.event = event + super().__init__(f"Lbrycrd is not publishing '{event}' events.") diff --git a/lbry/testcase.py b/lbry/testcase.py index 18011baf5..995fb611e 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -742,3 +742,211 @@ class CommandTestCase(IntegrationTestCase): @staticmethod def get_address(tx): return tx['outputs'][0]['address'] + + +class EventGenerator: + + def __init__( + self, initial_sync=False, start=None, end=None, block_files=None, claims=None, + takeovers=None, stakes=0, supports=None + ): + self.initial_sync = initial_sync + self.block_files = block_files or [] + self.claims = claims or [] + self.takeovers = takeovers or [] + self.stakes = stakes + self.supports = supports or [] + self.start_height = start + self.end_height = end + + @property + def events(self): + yield from self.blocks_init() + if self.block_files: + yield from self.blocks_main_start() + for block_file in self.block_files: + yield from self.blocks_file(*block_file) + yield from self.blocks_main_finish() + + yield from self.spends_steps() + + if self.claims: + if not self.initial_sync: + yield from self.claims_init() + yield from self.claims_main_start() + yield from self.claims_insert(self.claims) + if self.initial_sync: + yield from self.generate("blockchain.sync.claims.indexes", ("steps",), 0, None, (8,), (1,)) + else: + yield from self.claims_takeovers(self.takeovers) + yield from self.claims_stakes() + yield from self.claims_vacuum() + yield from self.claims_main_finish() + + if self.supports: + if not self.initial_sync: + yield from self.supports_init() + yield from self.supports_main_start() + yield from self.supports_insert(self.supports) + if self.initial_sync: + yield from self.generate("blockchain.sync.supports.indexes", ("steps",), 0, None, (3,), (1,)) + else: + yield from self.supports_vacuum() + yield from self.supports_main_finish() + + def blocks_init(self): + yield from self.generate("blockchain.sync.blocks.init", ("steps",), 0, None, (3,), (1,)) + + def blocks_main_start(self): + files = len(self.block_files) + blocks = sum([bf[1] for bf in self.block_files]) + txs = sum([bf[2] for bf in self.block_files]) + claims = sum([c[2] for c in self.claims]) + supports = sum([c[2] for c in self.supports]) + yield { + "event": "blockchain.sync.blocks.main", + "data": { + "id": 0, "done": (0, 0), "total": (blocks, txs), "units": ("blocks", "txs"), + "starting_height": self.start_height, "ending_height": self.end_height, + "files": files, "claims": claims, "supports": supports + } + } + + def blocks_main_finish(self): + yield { + "event": "blockchain.sync.blocks.main", + "data": {"id": 0, "done": (-1, -1)} + } + + def blocks_files(self, files): + for file in files: + yield from self.blocks_file(*file) + + def blocks_file(self, file, blocks, txs, steps): + for i, step in enumerate(steps): + if i == 0: + yield { + "event": "blockchain.sync.blocks.file", + "data": { + "id": file, + "done": (0, 0), + "total": (blocks, txs), + "units": ("blocks", "txs"), + "label": f"blk0000{file}.dat", + } + } + yield { + "event": "blockchain.sync.blocks.file", + "data": {"id": file, "done": step} + } + + def spends_steps(self): + yield from self.generate( + "blockchain.sync.spends.main", ("steps",), 0, None, + (15 if self.initial_sync else 5,), + (1,) + ) + + def claims_init(self): + yield from self.generate("blockchain.sync.claims.init", ("steps",), 0, None, (4,), (1,)) + + def claims_main_start(self): + total = ( + sum([c[2] for c in self.claims]) + + sum([c[2] for c in self.takeovers]) + + self.stakes + ) + yield { + "event": "blockchain.sync.claims.main", + "data": { + "id": 0, "done": (0,), + "total": (total,), + "units": ("claims",)} + } + + def claims_main_finish(self): + yield { + "event": "blockchain.sync.claims.main", + "data": {"id": 0, "done": (-1,)} + } + + def claims_insert(self, heights): + for start, end, total, count in heights: + yield from self.generate( + "blockchain.sync.claims.insert", ("claims",), start, + f"add claims {start}- {end}", (total,), (count,) + ) + + def claims_takeovers(self, heights): + for start, end, total, count in heights: + yield from self.generate( + "blockchain.sync.claims.takeovers", ("claims",), 0, + f"mod winner {start}- {end}", (total,), (count,) + ) + + def claims_stakes(self): + yield from self.generate( + "blockchain.sync.claims.stakes", ("claims",), 0, None, (self.stakes,), (1,) + ) + + def claims_vacuum(self): + yield from self.generate( + "blockchain.sync.claims.vacuum", ("steps",), 0, None, (2,), (1,) + ) + + def supports_init(self): + yield from self.generate("blockchain.sync.supports.init", ("steps",), 0, None, (2,), (1,)) + + def supports_main_start(self): + yield { + "event": "blockchain.sync.supports.main", + "data": { + "id": 0, "done": (0,), + "total": (sum([c[2] for c in self.supports]),), + "units": ("supports",) + } + } + + def supports_main_finish(self): + yield { + "event": "blockchain.sync.supports.main", + "data": {"id": 0, "done": (-1,)} + } + + def supports_insert(self, heights): + for start, end, total, count in heights: + yield from self.generate( + "blockchain.sync.supports.insert", ("supports",), start, + f"add supprt {start}" if start==end else f"add supprt {start}- {end}", + (total,), (count,) + ) + + def supports_vacuum(self): + yield from self.generate( + "blockchain.sync.supports.vacuum", ("steps",), 0, None, (1,), (1,) + ) + + def generate(self, name, units, eid, label, total, steps): + done = (0,)*len(total) + while not all(d >= t for d, t in zip(done, total)): + if done[0] == 0: + first_event = { + "event": name, + "data": { + "id": eid, + "done": done, + "total": total, + "units": units, + } + } + if label is not None: + first_event["data"]["label"] = label + yield first_event + done = tuple(min(d+s, t) for d, s, t in zip(done, steps, total)) + yield { + "event": name, + "data": { + "id": eid, + "done": done, + } + } diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 1511dde77..5ee6e88f6 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -4,19 +4,19 @@ import asyncio import tempfile from unittest import skip from binascii import hexlify, unhexlify -from typing import List, Optional +from typing import List, Optional, Iterable from distutils.dir_util import copy_tree, remove_tree from lbry import Config, Database, RegTestLedger, Transaction, Output, Input from lbry.crypto.base58 import Base58 from lbry.schema.claim import Stream, Channel from lbry.schema.support import Support -from lbry.blockchain.script import OutputScript +from lbry.error import LbrycrdEventSubscriptionError from lbry.blockchain.lbrycrd import Lbrycrd from lbry.blockchain.sync import BlockchainSync from lbry.blockchain.dewies import dewies_to_lbc, lbc_to_dewies from lbry.constants import CENT, COIN -from lbry.testcase import AsyncioTestCase +from lbry.testcase import AsyncioTestCase, EventGenerator #logging.getLogger('lbry.blockchain').setLevel(logging.DEBUG) @@ -284,26 +284,58 @@ class SyncingBlockchainTestCase(BasicBlockchainTestCase): self.assertEqual(accepted or [], await self.get_accepted()) -class TestLbrycrdEvents(BasicBlockchainTestCase): +class TestLbrycrdEvents(AsyncioTestCase): + + async def test_zmq(self): + chain = Lbrycrd.temp_regtest() + chain.ledger.conf.set(lbrycrd_zmq_blocks='') + await chain.ensure() + self.addCleanup(chain.stop) + + # lbrycrdr started without zmq + await chain.start() + with self.assertRaises(LbrycrdEventSubscriptionError): + await chain.ensure_subscribable() + await chain.stop() + + # lbrycrdr started with zmq, ensure_subscribable updates lbrycrd_zmq_blocks config + await chain.start('-zmqpubhashblock=tcp://127.0.0.1:29000') + self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, '') + await chain.ensure_subscribable() + self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, 'tcp://127.0.0.1:29000') + await chain.stop() + + # lbrycrdr started with zmq, ensure_subscribable does not override lbrycrd_zmq_blocks config + chain.ledger.conf.set(lbrycrd_zmq_blocks='') + await chain.start('-zmqpubhashblock=tcp://127.0.0.1:29000') + self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, '') + chain.ledger.conf.set(lbrycrd_zmq_blocks='tcp://external-ip:29000') + await chain.ensure_subscribable() + self.assertEqual(chain.ledger.conf.lbrycrd_zmq_blocks, 'tcp://external-ip:29000') async def test_block_event(self): + chain = Lbrycrd.temp_regtest() + await chain.ensure() + self.addCleanup(chain.stop) + await chain.start() + msgs = [] - self.chain.subscribe() - self.chain.on_block.listen(lambda e: msgs.append(e['msg'])) - res = await self.chain.generate(5) - await self.chain.on_block.where(lambda e: e['msg'] == 4) + await chain.subscribe() + chain.on_block.listen(lambda e: msgs.append(e['msg'])) + res = await chain.generate(5) + await chain.on_block.where(lambda e: e['msg'] == 4) self.assertEqual([0, 1, 2, 3, 4], msgs) self.assertEqual(5, len(res)) - self.chain.unsubscribe() - res = await self.chain.generate(2) + chain.unsubscribe() + res = await chain.generate(2) self.assertEqual(2, len(res)) await asyncio.sleep(0.1) # give some time to "miss" the new block events - self.chain.subscribe() - res = await self.chain.generate(3) - await self.chain.on_block.where(lambda e: e['msg'] == 9) + await chain.subscribe() + res = await chain.generate(3) + await chain.on_block.where(lambda e: e['msg'] == 9) self.assertEqual(3, len(res)) self.assertEqual([ 0, 1, 2, 3, 4, @@ -381,32 +413,6 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): await self.chain.start(*self.LBRYCRD_ARGS) @staticmethod - def extract_block_events(name, events): - return sorted([[ - p['data']['block_file'], - p['data']['step'], - p['data']['total'], - p['data']['txs_done'], - p['data']['txs_total'], - ] for p in events if p['event'] == name]) - - @staticmethod - def extract_events(name, events): - return sorted([ - [p['data']['step'], p['data']['total']] - for p in events if p['event'] == name - ]) - - def assertEventsAlmostEqual(self, actual, expected): - # this is needed because the sample tx data created - # by lbrycrd does not have deterministic number of TXIs, - # which throws off the progress reporting steps. - # adjust the 'actual' to match 'expected' if it's only off by 1: - for e, a in zip(expected, actual): - if a[1] != e[1] and abs(a[1]-e[1]) <= 1: - a[1] = e[1] - self.assertEqual(expected, actual) - async def test_lbrycrd_database_queries(self): db = self.chain.db @@ -461,148 +467,65 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): for c in await db.get_support_metadata(0, 500)] ) - def assertConsumingEvents(self, events: list, name, units, expectation_generator): - expected_count = 0 - for expectation in expectation_generator: - expected_count += len(expectation[2:]) - self.assertGreaterEqual(len(events), expected_count) - extracted = [] - for _ in range(expected_count): - extracted.append(events.pop(0)) - actual = sorted(extracted, key=lambda e: (e["event"], e["data"]["id"], e["data"]["done"])) - expected = [] - for expectation in expectation_generator: - for i, done in enumerate(expectation[2:]): - if i == 0: - first_event = { - "event": name, - "data": { - "id": expectation[0], - "done": (0,) * len(units), - "total": expectation[2], - "units": units, - } - } - if expectation[1] is not None: - first_event["data"]["label"] = expectation[1] - expected.append(first_event) - else: - expected.append({ - "event": name, - "data": { - "id": expectation[0], - "done": done, - } - }) - self.assertEqual(expected, actual) + @staticmethod + def sorted_events(events): + sorted_events = [] + buffer = [] + sort_key = lambda e: (e["event"], e["data"]["id"], e["data"]["done"]) + for event in events: + if buffer and event['event'] != buffer[-1]['event']: + buffer.sort(key=sort_key) + sorted_events.extend(buffer) + buffer.clear() + buffer.append(event) + buffer.sort(key=sort_key) + sorted_events.extend(buffer) + return sorted_events async def test_multi_block_file_sync(self): + events = [] self.sync.on_progress.listen(events.append) # initial sync await self.sync.advance() await asyncio.sleep(1) # give it time to collect events - self.assertConsumingEvents( - events, "blockchain.sync.blocks.init", ("steps",), [ - (0, None, (3,), (1,), (2,), (3,)) - ] - ) self.assertEqual( - events.pop(0), { - "event": "blockchain.sync.blocks.main", - "data": { - "id": 0, "done": (0, 0), "total": (353, 544), "units": ("blocks", "txs"), - "starting_height": 0, "ending_height": 352, - "files": 3, "claims": 3610, "supports": 2 - } - } + self.sorted_events(events), + list(EventGenerator( + initial_sync=True, + start=0, end=352, + block_files=[ + (0, 191, 280, ((100, 0), (191, 280))), + (1, 89, 178, ((89, 178),)), + (2, 73, 86, ((73, 86),)), + ], + claims=[ + (102, 120, 361, 361), + (121, 139, 361, 361), + (140, 158, 361, 361), + (159, 177, 361, 361), + (178, 196, 361, 361), + (197, 215, 361, 361), + (216, 234, 361, 361), + (235, 253, 361, 361), + (254, 272, 361, 361), + (273, 291, 361, 361), + ], + supports=[ + (352, 352, 2, 2), + ] + ).events) ) - self.assertConsumingEvents( - events, "blockchain.sync.blocks.file", ("blocks", "txs"), [ - (0, "blk00000.dat", (191, 280), (100, 0), (191, 280)), - (1, "blk00001.dat", (89, 178), (89, 178)), - (2, "blk00002.dat", (73, 86), (73, 86)), - ] - ) - self.assertEqual( - events.pop(0), { - "event": "blockchain.sync.blocks.main", - "data": {"id": 0, "done": (-1, -1)} - } - ) - self.assertConsumingEvents( - events, "blockchain.sync.spends.main", ("steps",), [ - (0, None, (15,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), - (8,), (9,), (10,), (11,), (12,), (13,), (14,), (15,)) - ] - ) - self.assertEqual( - events.pop(0), { - "event": "blockchain.sync.claims.main", - "data": {"id": 0, "done": (0,), "total": (3610,), "units": ("claims",)} - } - ) - self.assertConsumingEvents( - events, "blockchain.sync.claims.insert", ("claims",), [ - (102, "add claims 102- 120", (361,), (361,)), - (121, "add claims 121- 139", (361,), (361,)), - (140, "add claims 140- 158", (361,), (361,)), - (159, "add claims 159- 177", (361,), (361,)), - (178, "add claims 178- 196", (361,), (361,)), - (197, "add claims 197- 215", (361,), (361,)), - (216, "add claims 216- 234", (361,), (361,)), - (235, "add claims 235- 253", (361,), (361,)), - (254, "add claims 254- 272", (361,), (361,)), - (273, "add claims 273- 291", (361,), (361,)), - ] - ) - self.assertConsumingEvents( - events, "blockchain.sync.claims.indexes", ("steps",), [ - (0, None, (8,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,)) - ] - ) - self.assertEqual( - events.pop(0), { - "event": "blockchain.sync.claims.main", - "data": {"id": 0, "done": (-1,)} - } - ) - self.assertEqual( - events.pop(0), { - "event": "blockchain.sync.supports.main", - "data": {"id": 0, "done": (0,), "total": (2,), "units": ("supports",)} - } - ) - self.assertConsumingEvents( - events, "blockchain.sync.supports.insert", ("supports",), [ - (352, "add supprt 352", (2,), (2,)), - ] - ) - self.assertConsumingEvents( - events, "blockchain.sync.supports.indexes", ("steps",), [ - (0, None, (3,), (1,), (2,), (3,)) - ] - ) - self.assertEqual( - events.pop(0), { - "event": "blockchain.sync.supports.main", - "data": {"id": 0, "done": (-1,)} - } - ) - self.assertEqual(events, []) # initial_sync = False & no new blocks + events.clear() await self.sync.advance() # should be no-op await asyncio.sleep(1) # give it time to collect events - self.assertConsumingEvents( - events, "blockchain.sync.blocks.init", ("steps",), [ - (0, None, (3,), (1,), (2,), (3,)) - ] - ) - self.assertEqual(events, []) + self.assertEqual(self.sorted_events(events), list(EventGenerator().events)) # initial_sync = False + events.clear() txid = await self.chain.claim_name('foo', 'beef', '0.01') await self.chain.generate(1) tx = Transaction(unhexlify(await self.chain.get_raw_transaction(txid))) @@ -611,102 +534,26 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): await self.chain.generate(1) await self.sync.advance() await asyncio.sleep(1) # give it time to collect events - self.assertConsumingEvents( - events, "blockchain.sync.blocks.init", ("steps",), [ - (0, None, (3,), (1,), (2,), (3,)) - ] - ) self.assertEqual( - events.pop(0), { - "event": "blockchain.sync.blocks.main", - "data": { - "id": 0, "done": (0, 0), "total": (2, 4), "units": ("blocks", "txs"), - "starting_height": 353, "ending_height": 354, - "files": 1, "claims": 1, "supports": 1 - } - } + self.sorted_events(events), + list(EventGenerator( + initial_sync=False, + start=353, end=354, + block_files=[ + (2, 2, 4, ((2, 4),)), + ], + claims=[ + (353, 354, 1, 1), + ], + takeovers=[ + (353, 354, 1, 1), + ], + stakes=1, + supports=[ + (353, 354, 1, 1), + ] + ).events) ) - self.assertConsumingEvents( - events, "blockchain.sync.blocks.file", ("blocks", "txs"), [ - (2, "blk00002.dat", (2, 4), (2, 4)), - ] - ) - self.assertEqual( - events.pop(0), { - "event": "blockchain.sync.blocks.main", - "data": {"id": 0, "done": (-1, -1)} - } - ) - self.assertConsumingEvents( - events, "blockchain.sync.spends.main", ("steps",), [ - (0, None, (5,), (1,), (2,), (3,), (4,), (5,)) - ] - ) - self.assertConsumingEvents( - events, "blockchain.sync.claims.init", ("steps",), [ - (0, None, (4,), (1,), (2,), (3,), (4,)) - ] - ) - self.assertEqual( - events.pop(0), { - "event": "blockchain.sync.claims.main", - "data": {"id": 0, "done": (0,), "total": (3,), "units": ("claims",)} - } - ) - self.assertConsumingEvents( - events, "blockchain.sync.claims.insert", ("claims",), [ - (353, "add claims 353- 354", (1,), (1,)), - ] - ) - self.assertConsumingEvents( - events, "blockchain.sync.claims.takeovers", ("claims",), [ - (0, "mod winner 353- 354", (1,), (1,)), - ] - ) - self.assertConsumingEvents( - events, "blockchain.sync.claims.stakes", ("claims",), [ - (0, None, (1,), (1,)), - ] - ) - self.assertConsumingEvents( - events, "blockchain.sync.claims.vacuum", ("steps",), [ - (0, None, (2,), (1,), (2,)) - ] - ) - self.assertEqual( - events.pop(0), { - "event": "blockchain.sync.claims.main", - "data": {"id": 0, "done": (-1,)} - } - ) - self.assertConsumingEvents( - events, "blockchain.sync.supports.init", ("steps",), [ - (0, None, (2,), (1,), (2,)) - ] - ) - self.assertEqual( - events.pop(0), { - "event": "blockchain.sync.supports.main", - "data": {"id": 0, "done": (0,), "total": (1,), "units": ("supports",)} - } - ) - self.assertConsumingEvents( - events, "blockchain.sync.supports.insert", ("supports",), [ - (353, "add supprt 353- 354", (1,), (1,)), - ] - ) - self.assertConsumingEvents( - events, "blockchain.sync.supports.vacuum", ("steps",), [ - (0, None, (1,), (1,)) - ] - ) - self.assertEqual( - events.pop(0), { - "event": "blockchain.sync.supports.main", - "data": {"id": 0, "done": (-1,)} - } - ) - self.assertEqual(events, []) class TestGeneralBlockchainSync(SyncingBlockchainTestCase):