From fa85558d719e783a5845413e880e1daaa2de91c2 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Wed, 16 Sep 2020 10:37:49 -0400 Subject: [PATCH] wip --- lbry/blockchain/lbrycrd.py | 68 +++++++++++++++++------- lbry/blockchain/sync/blocks.py | 63 +++++++++++++++++----- lbry/blockchain/sync/context.py | 8 +++ lbry/blockchain/sync/synchronizer.py | 51 +++++++++++++----- lbry/conf.py | 4 +- lbry/db/database.py | 25 ++++++--- lbry/service/light_client.py | 4 +- lbry/testcase.py | 78 +++++++++++++++++++++++++--- 8 files changed, 240 insertions(+), 61 deletions(-) diff --git a/lbry/blockchain/lbrycrd.py b/lbry/blockchain/lbrycrd.py index 6374f9193..02920d91d 100644 --- a/lbry/blockchain/lbrycrd.py +++ b/lbry/blockchain/lbrycrd.py @@ -77,12 +77,21 @@ class Lbrycrd: self.subscribed = False self.subscription: Optional[asyncio.Task] = None self.default_generate_address = None - self._on_block_controller = EventController() - self.on_block = self._on_block_controller.stream - self.on_block.listen(lambda e: log.info('%s %s', hexlify(e['hash']), e['msg'])) + self._on_block_hash_controller = EventController() + self.on_block_hash = self._on_block_hash_controller.stream + self.on_block_hash.listen(lambda e: log.info('%s %s', hexlify(e['hash']), e['msg'])) + self._on_tx_hash_controller = EventController() + self.on_tx_hash = self._on_tx_hash_controller.stream self.db = BlockchainDB(self.actual_data_dir) - self.session: Optional[aiohttp.ClientSession] = None + self._session: Optional[aiohttp.ClientSession] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + + @property + def session(self) -> aiohttp.ClientSession: + if self._session is None: + self._session = aiohttp.ClientSession() + return self._session @classmethod def temp_regtest(cls): @@ -91,7 +100,7 @@ class Lbrycrd: blockchain="regtest", lbrycrd_rpc_port=9245 + 2, # avoid conflict with default rpc port lbrycrd_peer_port=9246 + 2, # avoid conflict with default peer port - lbrycrd_zmq_blocks="tcp://127.0.0.1:29002" # avoid conflict with default port + lbrycrd_zmq="tcp://127.0.0.1:29002" ) )) @@ -161,8 +170,11 @@ class Lbrycrd: def get_start_command(self, *args): if self.is_regtest: args += ('-regtest',) - if self.conf.lbrycrd_zmq_blocks: - args += (f'-zmqpubhashblock={self.conf.lbrycrd_zmq_blocks}',) + if self.conf.lbrycrd_zmq: + args += ( + f'-zmqpubhashblock={self.conf.lbrycrd_zmq}', + f'-zmqpubhashtx={self.conf.lbrycrd_zmq}', + ) return ( self.daemon_bin, f'-datadir={self.data_dir}', @@ -175,13 +187,15 @@ class Lbrycrd: ) async def open(self): - self.session = aiohttp.ClientSession() await self.db.open() async def close(self): await self.db.close() - if self.session is not None: - await self.session.close() + await self.close_session() + + async def close_session(self): + if self._session is not None: + await self._session.close() async def start(self, *args): loop = asyncio.get_running_loop() @@ -213,26 +227,33 @@ class Lbrycrd: subs = {e['type']: e['address'] for e in zmq_notifications} if ZMQ_BLOCK_EVENT not in subs: raise LbrycrdEventSubscriptionError(ZMQ_BLOCK_EVENT) - if not self.conf.lbrycrd_zmq_blocks: - self.conf.lbrycrd_zmq_blocks = subs[ZMQ_BLOCK_EVENT] + if not self.conf.lbrycrd_zmq: + self.conf.lbrycrd_zmq = subs[ZMQ_BLOCK_EVENT] async def subscribe(self): if not self.subscribed: self.subscribed = True ctx = zmq.asyncio.Context.instance() sock = ctx.socket(zmq.SUB) # pylint: disable=no-member - sock.connect(self.conf.lbrycrd_zmq_blocks) + sock.connect(self.conf.lbrycrd_zmq) sock.subscribe("hashblock") + sock.subscribe("hashtx") self.subscription = asyncio.create_task(self.subscription_handler(sock)) async def subscription_handler(self, sock): try: while self.subscribed: msg = await sock.recv_multipart() - await self._on_block_controller.add({ - 'hash': msg[1], - 'msg': struct.unpack('= 0: + return col >= height + return col <= height + deletes = [ - BlockTable.delete().where(BlockTable.c.height >= height), - TXI.delete().where(TXI.c.height >= height), - TXO.delete().where(TXO.c.height >= height), - TX.delete().where(TX.c.height >= height), + BlockTable.delete().where(constrain(BlockTable.c.height)), + TXI.delete().where(constrain(TXI.c.height)), + TXO.delete().where(constrain(TXO.c.height)), + TX.delete().where(constrain(TX.c.height)), Tag.delete().where( Tag.c.claim_hash.in_( - select(Claim.c.claim_hash).where(Claim.c.height >= height) + select(Claim.c.claim_hash).where(constrain(Claim.c.height)) ) ), - Claim.delete().where(Claim.c.height >= height), - Support.delete().where(Support.c.height >= height), - BlockFilter.delete().where(BlockFilter.c.height >= height), - # TODO: group and tx filters need where() clauses (below actually breaks things) - BlockGroupFilter.delete(), - TXFilter.delete(), - MempoolFilter.delete() + Claim.delete().where(constrain(Claim.c.height)), + Support.delete().where(constrain(Support.c.height)), + MempoolFilter.delete(), ] + if height > 0: + deletes.extend([ + BlockFilter.delete().where(BlockFilter.c.height >= height), + # TODO: group and tx filters need where() clauses (below actually breaks things) + BlockGroupFilter.delete(), + TXFilter.delete(), + ]) for delete in p.iter(deletes): p.ctx.execute(delete) diff --git a/lbry/blockchain/sync/context.py b/lbry/blockchain/sync/context.py index 88e2fc913..d2d758ca8 100644 --- a/lbry/blockchain/sync/context.py +++ b/lbry/blockchain/sync/context.py @@ -15,3 +15,11 @@ def get_or_initialize_lbrycrd(ctx=None) -> Lbrycrd: chain.db.sync_open() _chain.set(chain) return chain + + +def uninitialize(): + chain = _chain.get(None) + if chain is not None: + chain.db.sync_close() + chain.sync_run(chain.close_session()) + _chain.set(None) diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index ee55d23c5..c748cae79 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -2,17 +2,19 @@ import os import asyncio import logging from typing import Optional, Tuple, Set, List, Coroutine +from concurrent.futures import ThreadPoolExecutor from lbry.db import Database from lbry.db import queries as q from lbry.db.constants import TXO_TYPES, CLAIM_TYPE_CODES from lbry.db.query_context import Event, Progress -from lbry.event import BroadcastSubscription +from lbry.event import BroadcastSubscription, EventController from lbry.service.base import Sync, BlockEvent from lbry.blockchain.lbrycrd import Lbrycrd from lbry.error import LbrycrdEventSubscriptionError from . import blocks as block_phase, claims as claim_phase, supports as support_phase +from .context import uninitialize from .filter_builder import split_range_into_10k_batches log = logging.getLogger(__name__) @@ -40,9 +42,13 @@ class BlockchainSync(Sync): super().__init__(chain.ledger, db) self.chain = chain self.pid = os.getpid() - self.on_block_subscription: Optional[BroadcastSubscription] = None + self.on_block_hash_subscription: Optional[BroadcastSubscription] = None + self.on_tx_hash_subscription: Optional[BroadcastSubscription] = None self.advance_loop_task: Optional[asyncio.Task] = None - self.advance_loop_event = asyncio.Event() + self.block_hash_event = asyncio.Event() + self.tx_hash_event = asyncio.Event() + self._on_mempool_controller = EventController() + self.on_mempool = self._on_mempool_controller.stream async def wait_for_chain_ready(self): while True: @@ -67,17 +73,25 @@ class BlockchainSync(Sync): await self.advance_loop_task await self.chain.subscribe() self.advance_loop_task = asyncio.create_task(self.advance_loop()) - self.on_block_subscription = self.chain.on_block.listen( - lambda e: self.advance_loop_event.set() + self.on_block_hash_subscription = self.chain.on_block_hash.listen( + lambda e: self.block_hash_event.set() + ) + self.on_tx_hash_subscription = self.chain.on_tx_hash.listen( + lambda e: self.tx_hash_event.set() ) async def stop(self): self.chain.unsubscribe() - if self.on_block_subscription is not None: - self.on_block_subscription.cancel() self.db.stop_event.set() - if self.advance_loop_task is not None: - self.advance_loop_task.cancel() + for subscription in ( + self.on_block_hash_subscription, + self.on_tx_hash_subscription, + self.advance_loop_task + ): + if subscription is not None: + subscription.cancel() + if isinstance(self.db.executor, ThreadPoolExecutor): + await self.db.run(uninitialize) async def run_tasks(self, tasks: List[Coroutine]) -> Optional[Set[asyncio.Future]]: done, pending = await asyncio.wait( @@ -337,12 +351,25 @@ class BlockchainSync(Sync): if blocks_added: await self._on_block_controller.add(BlockEvent(blocks_added[-1])) + async def sync_mempool(self): + await self.db.run(block_phase.sync_mempool) + await self.sync_spends([-1]) + await self.db.run(claim_phase.claims_insert, [-2, 0], True, self.CLAIM_FLUSH_SIZE) + await self.db.run(claim_phase.claims_vacuum) + async def advance_loop(self): while True: - await self.advance_loop_event.wait() - self.advance_loop_event.clear() try: - await self.advance() + await asyncio.wait([ + self.tx_hash_event.wait(), + self.block_hash_event.wait(), + ], return_when=asyncio.FIRST_COMPLETED) + if self.block_hash_event.is_set(): + self.block_hash_event.clear() + await self.db.run(block_phase.clear_mempool) + await self.advance() + self.tx_hash_event.clear() + await self.sync_mempool() except asyncio.CancelledError: return except Exception as e: diff --git a/lbry/conf.py b/lbry/conf.py index a260cd07f..90deaaccb 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -594,7 +594,7 @@ class Config(CLIConfig): reflector_servers = Servers("Reflector re-hosting servers", [ ('reflector.lbry.com', 5566) ]) - lbryum_servers = Servers("SPV wallet servers", [ + known_full_nodes = Servers("Full blockchain nodes", [ ('spv11.lbry.com', 50001), ('spv12.lbry.com', 50001), ('spv13.lbry.com', 50001), @@ -621,7 +621,7 @@ class Config(CLIConfig): lbrycrd_rpc_host = String("Hostname for connecting to lbrycrd.", "localhost") lbrycrd_rpc_port = Integer("Port for connecting to lbrycrd.", 9245) lbrycrd_peer_port = Integer("Peer port for lbrycrd.", 9246) - lbrycrd_zmq_blocks = String("ZMQ block events address.") + lbrycrd_zmq = String("ZMQ events address.") lbrycrd_dir = Path("Directory containing lbrycrd data.", metavar='DIR') spv_address_filters = Toggle( "Generate Golomb-Rice coding filters for blocks and transactions. Enables " diff --git a/lbry/db/database.py b/lbry/db/database.py index 5b72fd239..af6efeac2 100644 --- a/lbry/db/database.py +++ b/lbry/db/database.py @@ -100,22 +100,31 @@ class Database: return 1 @classmethod - def temp_from_url_regtest(cls, db_url, lbrycrd_dir=None): + def temp_from_url_regtest(cls, db_url, lbrycrd_config=None): from lbry import Config, RegTestLedger # pylint: disable=import-outside-toplevel directory = tempfile.mkdtemp() - conf = Config.with_same_dir(directory).set(db_url=db_url) - if lbrycrd_dir is not None: - conf.lbrycrd_dir = lbrycrd_dir + if lbrycrd_config: + conf = lbrycrd_config + conf.data_dir = directory + conf.download_dir = directory + conf.wallet_dir = directory + else: + conf = Config.with_same_dir(directory) + conf.set(blockchain="regtest", db_url=db_url) ledger = RegTestLedger(conf) return cls(ledger) @classmethod - def temp_sqlite_regtest(cls, lbrycrd_dir=None): + def temp_sqlite_regtest(cls, lbrycrd_config=None): from lbry import Config, RegTestLedger # pylint: disable=import-outside-toplevel directory = tempfile.mkdtemp() - conf = Config.with_same_dir(directory).set(blockchain="regtest") - if lbrycrd_dir is not None: - conf.lbrycrd_dir = lbrycrd_dir + if lbrycrd_config: + conf = lbrycrd_config + conf.data_dir = directory + conf.download_dir = directory + conf.wallet_dir = directory + else: + conf = Config.with_same_dir(directory).set(blockchain="regtest") ledger = RegTestLedger(conf) return cls(ledger) diff --git a/lbry/service/light_client.py b/lbry/service/light_client.py index 34e32aa97..7d64b4a46 100644 --- a/lbry/service/light_client.py +++ b/lbry/service/light_client.py @@ -19,7 +19,9 @@ class LightClient(Service): def __init__(self, ledger: Ledger): super().__init__(ledger) - self.client = Client(Config().api_connection_url) + self.client = Client( + f"http://{ledger.conf.full_nodes[0][0]}:{ledger.conf.full_nodes[0][1]}/api" + ) self.sync = SPVSync(self) async def search_transactions(self, txids): diff --git a/lbry/testcase.py b/lbry/testcase.py index 64468c557..5a2c62cd0 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -16,6 +16,7 @@ from typing import Optional, List, Union from binascii import unhexlify, hexlify import ecdsa +from distutils.dir_util import remove_tree from lbry.db import Database from lbry.blockchain import ( @@ -26,7 +27,7 @@ from lbry.blockchain.bcd_data_stream import BCDataStream from lbry.blockchain.lbrycrd import Lbrycrd from lbry.blockchain.dewies import lbc_to_dewies from lbry.constants import COIN, CENT, NULL_HASH32 -from lbry.service import Daemon, FullNode, jsonrpc_dumps_pretty +from lbry.service import Daemon, FullNode, LightClient, jsonrpc_dumps_pretty from lbry.conf import Config from lbry.console import Console from lbry.wallet import Wallet, Account @@ -400,6 +401,7 @@ class UnitDBTestCase(AsyncioTestCase): class IntegrationTestCase(AsyncioTestCase): SEED = None + LBRYCRD_ARGS = '-rpcworkqueue=128', def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -411,6 +413,52 @@ class IntegrationTestCase(AsyncioTestCase): self.wallet: Optional[Wallet] = None self.account: Optional[Account] = None + async def asyncSetUp(self): + await super().asyncSetUp() + self.chain = self.make_chain() + await self.chain.ensure() + self.addCleanup(self.chain.stop) + await self.chain.start(*self.LBRYCRD_ARGS) + + @staticmethod + def make_chain(): + return Lbrycrd.temp_regtest() + + async def make_db(self, chain): + db_driver = os.environ.get('TEST_DB', 'sqlite') + if db_driver == 'sqlite': + db = Database.temp_sqlite_regtest(chain.ledger.conf) + elif db_driver.startswith('postgres') or db_driver.startswith('psycopg'): + db_driver = 'postgresql' + db_name = f'lbry_test_chain' + db_connection = 'postgres:postgres@localhost:5432' + meta_db = Database.from_url(f'postgresql://{db_connection}/postgres') + await meta_db.drop(db_name) + await meta_db.create(db_name) + db = Database.temp_from_url_regtest( + f'postgresql://{db_connection}/{db_name}', + chain.ledger.conf + ) + else: + raise RuntimeError(f"Unsupported database driver: {db_driver}") + self.addCleanup(remove_tree, db.ledger.conf.data_dir) + await db.open() + self.addCleanup(db.close) + self.db_driver = db_driver + return db + + @staticmethod + def find_claim_txo(tx) -> Optional[Output]: + for txo in tx.outputs: + if txo.is_claim: + return txo + + @staticmethod + def find_support_txo(tx) -> Optional[Output]: + for txo in tx.outputs: + if txo.is_support: + return txo + async def assertBalance(self, account, expected_balance: str): # pylint: disable=C0103 balance = await account.get_balance() self.assertEqual(dewies_to_lbc(balance), expected_balance) @@ -487,14 +535,13 @@ class CommandTestCase(IntegrationTestCase): self.reflector = None async def asyncSetUp(self): - self.chain = Lbrycrd.temp_regtest() - await self.chain.ensure() - self.addCleanup(self.chain.stop) - await self.chain.start('-rpcworkqueue=128') - + await super().asyncSetUp() await self.generate(200, wait=False) - self.daemon = await self.add_daemon() + self.full_node = self.daemon = await self.add_full_node() + if os.environ.get('TEST_MODE', 'full-node') == 'client': + self.daemon = await self.add_light_client(self.full_node) + self.service = self.daemon.service self.ledger = self.service.ledger self.api = self.daemon.api @@ -509,7 +556,7 @@ class CommandTestCase(IntegrationTestCase): await self.chain.send_to_address(addresses[0], '10.0') await self.generate(5) - async def add_daemon(self): + async def add_full_node(self): self.daemon_port += 1 path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, path, True) @@ -528,6 +575,21 @@ class CommandTestCase(IntegrationTestCase): await daemon.start() return daemon + async def add_light_client(self, full_node): + self.daemon_port += 1 + path = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, path, True) + ledger = RegTestLedger(Config.with_same_dir(path).set( + api=f'localhost:{self.daemon_port}', + full_nodes=[(full_node.conf.api_host, full_node.conf.api_port)] + )) + service = LightClient(ledger) + console = Console(service) + daemon = Daemon(service, console) + self.addCleanup(daemon.stop) + await daemon.start() + return daemon + async def asyncTearDown(self): await super().asyncTearDown() for wallet_node in self.extra_wallet_nodes: