From 9df659b647877cc02728bb8f1385e120edad048c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 26 Aug 2019 18:23:43 -0300 Subject: [PATCH 01/10] no chdir --- lbry/lbry/wallet/server/db/writer.py | 4 +++- lbry/lbry/wallet/server/session.py | 3 ++- torba/torba/server/db.py | 20 +++++++++++--------- torba/torba/server/storage.py | 15 +++++++++------ 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/lbry/lbry/wallet/server/db/writer.py b/lbry/lbry/wallet/server/db/writer.py index 10eb6b136..e2bec94c2 100644 --- a/lbry/lbry/wallet/server/db/writer.py +++ b/lbry/lbry/wallet/server/db/writer.py @@ -1,3 +1,4 @@ +import os import sqlite3 from typing import Union, Tuple, Set, List from itertools import chain @@ -705,7 +706,8 @@ class LBRYDB(DB): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.sql = SQLDB(self, 'claims.db') + path = os.path.join(self.env.db_dir, 'claims.db') + self.sql = SQLDB(self, path) def close(self): super().close() diff --git a/lbry/lbry/wallet/server/session.py b/lbry/lbry/wallet/server/session.py index aca6ac8e7..be7cfd0c0 100644 --- a/lbry/lbry/wallet/server/session.py +++ b/lbry/lbry/wallet/server/session.py @@ -64,9 +64,10 @@ class LBRYSessionManager(SessionManager): async def start_other(self): self.running = True + path = os.path.join(self.env.db_dir, 'claims.db') args = dict( initializer=reader.initializer, - initargs=(self.logger, 'claims.db', self.env.coin.NET, self.env.database_query_timeout, + initargs=(self.logger, path, self.env.coin.NET, self.env.database_query_timeout, self.env.track_metrics) ) if self.env.max_query_workers is not None and self.env.max_query_workers == 0: diff --git a/torba/torba/server/db.py b/torba/torba/server/db.py index 8fff9dadf..8ca9835b7 100644 --- a/torba/torba/server/db.py +++ b/torba/torba/server/db.py @@ -17,6 +17,7 @@ import time from asyncio import sleep from bisect import bisect_right from collections import namedtuple +from functools import partial from glob import glob from struct import pack, unpack @@ -72,9 +73,8 @@ class DB: self.header_len = self.dynamic_header_len self.logger.info(f'switching current directory to {env.db_dir}') - os.chdir(env.db_dir) - self.db_class = db_class(self.env.db_engine) + self.db_class = db_class(env.db_dir, self.env.db_engine) self.history = History() self.utxo_db = None self.tx_counts = None @@ -86,12 +86,13 @@ class DB: self.merkle = Merkle() self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) - self.headers_file = util.LogicalFile('meta/headers', 2, 16000000) - self.tx_counts_file = util.LogicalFile('meta/txcounts', 2, 2000000) - self.hashes_file = util.LogicalFile('meta/hashes', 4, 16000000) + path = partial(os.path.join, self.env.db_dir) + self.headers_file = util.LogicalFile(path('meta/headers'), 2, 16000000) + self.tx_counts_file = util.LogicalFile(path('meta/txcounts'), 2, 2000000) + self.hashes_file = util.LogicalFile(path('meta/hashes'), 4, 16000000) if not self.coin.STATIC_BLOCK_HEADERS: self.headers_offsets_file = util.LogicalFile( - 'meta/headers_offsets', 2, 16000000) + path('meta/headers_offsets'), 2, 16000000) async def _read_tx_counts(self): if self.tx_counts is not None: @@ -115,8 +116,9 @@ class DB: if self.utxo_db.is_new: self.logger.info('created new database') self.logger.info('creating metadata directory') - os.mkdir('meta') - with util.open_file('COIN', create=True) as f: + os.mkdir(os.path.join(self.env.db_dir, 'meta')) + coin_path = os.path.join(self.env.db_dir, 'meta', 'COIN') + with util.open_file(coin_path, create=True) as f: f.write(f'ElectrumX databases and metadata for ' f'{self.coin.NAME} {self.coin.NET}'.encode()) if not self.coin.STATIC_BLOCK_HEADERS: @@ -474,7 +476,7 @@ class DB: return 'meta/block' def raw_block_path(self, height): - return f'{self.raw_block_prefix()}{height:d}' + return os.path.join(self.env.db_dir, f'{self.raw_block_prefix()}{height:d}') def read_raw_block(self, height): """Returns a raw block read from disk. Raises FileNotFoundError diff --git a/torba/torba/server/storage.py b/torba/torba/server/storage.py index d52d2eba7..4ada09e07 100644 --- a/torba/torba/server/storage.py +++ b/torba/torba/server/storage.py @@ -13,20 +13,21 @@ from functools import partial from torba.server import util -def db_class(name): +def db_class(db_dir, name): """Returns a DB engine class.""" for db_class in util.subclasses(Storage): if db_class.__name__.lower() == name.lower(): db_class.import_module() - return db_class + return partial(db_class, db_dir) raise RuntimeError('unrecognised DB engine "{}"'.format(name)) class Storage: """Abstract base class of the DB backend abstraction.""" - def __init__(self, name, for_sync): - self.is_new = not os.path.exists(name) + def __init__(self, db_dir, name, for_sync): + self.db_dir = db_dir + self.is_new = not os.path.exists(os.path.join(db_dir, name)) self.for_sync = for_sync or self.is_new self.open(name, create=self.is_new) @@ -78,8 +79,9 @@ class LevelDB(Storage): def open(self, name, create): mof = 512 if self.for_sync else 128 + path = os.path.join(self.db_dir, name) # Use snappy compression (the default) - self.db = self.module.DB(name, create_if_missing=create, + self.db = self.module.DB(path, create_if_missing=create, max_open_files=mof) self.close = self.db.close self.get = self.db.get @@ -99,12 +101,13 @@ class RocksDB(Storage): def open(self, name, create): mof = 512 if self.for_sync else 128 + path = os.path.join(self.db_dir, name) # Use snappy compression (the default) options = self.module.Options(create_if_missing=create, use_fsync=True, target_file_size_base=33554432, max_open_files=mof) - self.db = self.module.DB(name, options) + self.db = self.module.DB(path, options) self.get = self.db.get self.put = self.db.put From 96800de0524f5a801a0f4e6ef0db0a34c8eb80e9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 29 Aug 2019 01:12:49 -0300 Subject: [PATCH 02/10] switch tests and fixes + support for multiple servers --- .../test_wallet_server_sessions.py | 3 +- .../client_tests/integration/test_network.py | 24 ++++++++- torba/torba/client/basenetwork.py | 54 +++++++++---------- torba/torba/orchstr8/node.py | 6 ++- 4 files changed, 53 insertions(+), 34 deletions(-) diff --git a/lbry/tests/integration/test_wallet_server_sessions.py b/lbry/tests/integration/test_wallet_server_sessions.py index 618d71b57..041a072fd 100644 --- a/lbry/tests/integration/test_wallet_server_sessions.py +++ b/lbry/tests/integration/test_wallet_server_sessions.py @@ -16,11 +16,12 @@ class TestSessionBloat(IntegrationTestCase): LEDGER = lbry.wallet async def test_session_bloat_from_socket_timeout(self): + client = self.ledger.network.client await self.conductor.stop_spv() await self.ledger.stop() self.conductor.spv_node.session_timeout = 1 await self.conductor.start_spv() - session = ClientSession(network=None, server=self.ledger.network.client.server, timeout=0.2) + session = ClientSession(network=None, server=client.server, timeout=0.2) await session.create_connection() await session.send_request('server.banner', ()) self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 1) diff --git a/torba/tests/client_tests/integration/test_network.py b/torba/tests/client_tests/integration/test_network.py index 8d0faed2a..75a28a28a 100644 --- a/torba/tests/client_tests/integration/test_network.py +++ b/torba/tests/client_tests/integration/test_network.py @@ -3,6 +3,7 @@ import asyncio from unittest.mock import Mock from torba.client.basenetwork import BaseNetwork +from torba.orchstr8.node import SPVNode from torba.rpc import RPCSession from torba.testcase import IntegrationTestCase, AsyncioTestCase @@ -20,6 +21,26 @@ class ReconnectTests(IntegrationTestCase): VERBOSITY = logging.WARN + async def test_multiple_servers(self): + # we have a secondary node that connects later, so + node2 = SPVNode(self.conductor.spv_module, node_number=2) + self.ledger.network.config['default_servers'].append((node2.hostname, node2.port)) + await asyncio.wait_for(self.ledger.stop(), timeout=1) + await asyncio.wait_for(self.ledger.start(), timeout=1) + self.ledger.network.session_pool.new_connection_event.clear() + await node2.start(self.blockchain) + # this is only to speed up the test as retrying would take 4+ seconds + for session in self.ledger.network.session_pool.sessions: + session.trigger_urgent_reconnect.set() + await asyncio.wait_for(self.ledger.network.session_pool.new_connection_event.wait(), timeout=1) + self.assertEqual(2, len(self.ledger.network.session_pool.available_sessions)) + self.assertTrue(self.ledger.network.is_connected) + switch_event = self.ledger.network.on_connected.first + await node2.stop(True) + # secondary down, but primary is ok, do not switch! (switches trigger new on_connected events) + with self.assertRaises(asyncio.TimeoutError): + await asyncio.wait_for(switch_event, timeout=1) + async def test_connection_drop_still_receives_events_after_reconnected(self): address1 = await self.account.receiving.get_or_create_usable_address() # disconnect and send a new tx, should reconnect and get it @@ -35,10 +56,11 @@ class ReconnectTests(IntegrationTestCase): # is it real? are we rich!? let me see this tx... d = self.ledger.network.get_transaction(sendtxid) # what's that smoke on my ethernet cable? oh no! + master_client = self.ledger.network.client self.ledger.network.client.connection_lost(Exception()) with self.assertRaises(asyncio.TimeoutError): await d - self.assertIsNone(self.ledger.network.client.response_time) # response time unknown as it failed + self.assertIsNone(master_client.response_time) # response time unknown as it failed # rich but offline? no way, no water, let's retry with self.assertRaisesRegex(ConnectionError, 'connection is not available'): await self.ledger.network.get_transaction(sendtxid) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 1d6883fd6..19dbd58b7 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -144,6 +144,7 @@ class BaseNetwork: self.config = ledger.config self.session_pool = SessionPool(network=self, timeout=self.config.get('connect_timeout', 6)) self.client: Optional[ClientSession] = None + self._switch_task: Optional[asyncio.Task] = None self.running = False self.remote_height: int = 0 @@ -161,42 +162,34 @@ class BaseNetwork: 'blockchain.address.subscribe': self._on_status_controller, } - async def switch_to_fastest(self): - try: - client = await asyncio.wait_for(self.session_pool.wait_for_fastest_session(), 30) - except asyncio.TimeoutError: - if self.client: - await self.client.close() - self.client = None - for session in self.session_pool.sessions: - session.synchronous_close() - log.warning("not connected to any wallet servers") - return - current_client = self.client - self.client = client - log.info("Switching to SPV wallet server: %s:%d", *self.client.server) - self._on_connected_controller.add(True) - try: - self._update_remote_height((await self.subscribe_headers(),)) - log.info("Subscribed to headers: %s:%d", *self.client.server) - except asyncio.TimeoutError: - if self.client: - await self.client.close() - self.client = current_client - return - self.session_pool.new_connection_event.clear() - return await self.session_pool.new_connection_event.wait() + async def switch_forever(self): + while self.running: + if self.is_connected: + await self.client.on_disconnected.first + self.client = None + continue + self.client = await self.session_pool.wait_for_fastest_session() + log.info("Switching to SPV wallet server: %s:%d", *self.client.server) + self._on_connected_controller.add(True) + try: + self._update_remote_height((await self.subscribe_headers(),)) + log.info("Subscribed to headers: %s:%d", *self.client.server) + except asyncio.TimeoutError: + log.info("Switching to %s:%d timed out, closing and retrying.") + self.client.synchronous_close() + self.client = None async def start(self): self.running = True + self._switch_task = asyncio.ensure_future(self.switch_forever()) self.session_pool.start(self.config['default_servers']) self.on_header.listen(self._update_remote_height) - while self.running: - await self.switch_to_fastest() async def stop(self): - self.running = False - self.session_pool.stop() + if self.running: + self.running = False + self._switch_task.cancel() + self.session_pool.stop() @property def is_connected(self): @@ -329,8 +322,9 @@ class SessionPool: self._connect_session(server) def stop(self): - for task in self.sessions.values(): + for session, task in self.sessions.items(): task.cancel() + session.synchronous_close() self.sessions.clear() def ensure_connections(self): diff --git a/torba/torba/orchstr8/node.py b/torba/torba/orchstr8/node.py index b89899684..453016bfa 100644 --- a/torba/torba/orchstr8/node.py +++ b/torba/torba/orchstr8/node.py @@ -190,14 +190,15 @@ class WalletNode: class SPVNode: - def __init__(self, coin_class): + def __init__(self, coin_class, node_number=1): self.coin_class = coin_class self.controller = None self.data_path = None self.server = None self.hostname = 'localhost' - self.port = 50001 + 1 # avoid conflict with default daemon + self.port = 50001 + node_number # avoid conflict with default daemon self.session_timeout = 600 + self.rpc_port = '0' # disabled by default async def start(self, blockchain_node: 'BlockchainNode'): self.data_path = tempfile.mkdtemp() @@ -210,6 +211,7 @@ class SPVNode: 'SESSION_TIMEOUT': str(self.session_timeout), 'MAX_QUERY_WORKERS': '0', 'INDIVIDUAL_TAG_INDEXES': '', + 'RPC_PORT': self.rpc_port } # TODO: don't use os.environ os.environ.update(conf) From 925eb618de9bb88decd433fdd964d6d5f1f5c5e7 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 30 Aug 2019 00:52:08 -0300 Subject: [PATCH 03/10] failing test for unordered mempool --- .../integration/test_transactions.py | 39 +++++++++++++++++++ torba/torba/client/baseledger.py | 8 +++- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/torba/tests/client_tests/integration/test_transactions.py b/torba/tests/client_tests/integration/test_transactions.py index bdbd51dfa..4fb67f11b 100644 --- a/torba/tests/client_tests/integration/test_transactions.py +++ b/torba/tests/client_tests/integration/test_transactions.py @@ -1,6 +1,9 @@ import logging import asyncio +import random from itertools import chain +from random import shuffle + from torba.testcase import IntegrationTestCase from torba.client.util import satoshis_to_coins, coins_to_satoshis @@ -129,3 +132,39 @@ class BasicTransactionTests(IntegrationTestCase): self.assertEqual(tx.outputs[0].get_address(self.ledger), address2) self.assertEqual(tx.outputs[0].is_change, False) self.assertEqual(tx.outputs[1].is_change, True) + + async def test_history_edge_cases(self): + await self.assertBalance(self.account, '0.0') + address = await self.account.receiving.get_or_create_usable_address() + # evil trick: mempool is unsorted on real life, but same order between python instances. reproduce it + original_summary = self.conductor.spv_node.server.mempool.transaction_summaries + async def random_summary(*args, **kwargs): + summary = await original_summary(*args, **kwargs) + if summary and len(summary) > 2: + ordered = summary.copy() + while summary == ordered: + random.shuffle(summary) + return summary + self.conductor.spv_node.server.mempool.transaction_summaries = random_summary + # 10 unconfirmed txs, all from blockchain wallet + sends = list(self.blockchain.send_to_address(address, 10) for _ in range(10)) + # use batching to reduce issues with send_to_address on cli + for batch in range(0, len(sends), 10): + txids = await asyncio.gather(*sends[batch:batch + 10]) + await asyncio.wait([self.on_transaction_id(txid) for txid in txids]) + remote_status = await self.ledger.network.subscribe_address(address) + self.assertTrue(await self.ledger.update_history(address, remote_status)) + # 20 unconfirmed txs, 10 from blockchain, 10 from local to local + utxos = await self.account.get_utxos() + txs = [] + for utxo in utxos: + tx = await self.ledger.transaction_class.create( + [self.ledger.transaction_class.input_class.spend(utxo)], + [], + [self.account], self.account + ) + await self.broadcast(tx) + txs.append(tx) + await asyncio.wait([self.on_transaction_address(tx, address) for tx in txs], timeout=1) + remote_status = await self.ledger.network.subscribe_address(address) + self.assertTrue(await self.ledger.update_history(address, remote_status)) diff --git a/torba/torba/client/baseledger.py b/torba/torba/client/baseledger.py index d127243d9..704654669 100644 --- a/torba/torba/client/baseledger.py +++ b/torba/torba/client/baseledger.py @@ -415,7 +415,7 @@ class BaseLedger(metaclass=LedgerRegistry): local_status, local_history = await self.get_local_status_and_history(address) if local_status == remote_status: - return + return True remote_history = await self.network.retriable_call(self.network.get_history, address) @@ -472,14 +472,18 @@ class BaseLedger(metaclass=LedgerRegistry): local_status, local_history = await self.get_local_status_and_history(address) if local_status != remote_status: + remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history)) + if remote_history == local_history: + return True log.debug( "Wallet is out of sync after syncing. Remote: %s with %d items, local: %s with %d items", remote_status, len(remote_history), local_status, len(local_history) ) log.debug("local: %s", local_history) log.debug("remote: %s", remote_history) + return False else: - log.debug("Sync completed for: %s", address) + return True async def cache_transaction(self, txid, remote_height): cache_item = self._tx_cache.get(txid) From 10b7ccaa92509d0e2290e2756fae1103599d8fa9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 30 Aug 2019 02:50:21 -0300 Subject: [PATCH 04/10] fix unordered history inconsistency --- .../client_tests/integration/test_transactions.py | 6 ++++++ torba/torba/client/baseledger.py | 14 ++++++++------ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/torba/tests/client_tests/integration/test_transactions.py b/torba/tests/client_tests/integration/test_transactions.py index 4fb67f11b..4ee4e2dfe 100644 --- a/torba/tests/client_tests/integration/test_transactions.py +++ b/torba/tests/client_tests/integration/test_transactions.py @@ -168,3 +168,9 @@ class BasicTransactionTests(IntegrationTestCase): await asyncio.wait([self.on_transaction_address(tx, address) for tx in txs], timeout=1) remote_status = await self.ledger.network.subscribe_address(address) self.assertTrue(await self.ledger.update_history(address, remote_status)) + # server history grows unordered + txid = await self.blockchain.send_to_address(address, 1) + await self.on_transaction_id(txid) + self.assertTrue(await self.ledger.update_history(address, remote_status)) + self.assertEqual(21, len((await self.ledger.get_local_status_and_history(address))[1])) + self.assertEqual(0, len(self.ledger._known_addresses_out_of_sync)) diff --git a/torba/torba/client/baseledger.py b/torba/torba/client/baseledger.py index 704654669..5c47cb79f 100644 --- a/torba/torba/client/baseledger.py +++ b/torba/torba/client/baseledger.py @@ -142,6 +142,7 @@ class BaseLedger(metaclass=LedgerRegistry): self._address_update_locks: Dict[str, asyncio.Lock] = {} self.coin_selection_strategy = None + self._known_addresses_out_of_sync = set() @classmethod def get_id(cls): @@ -411,6 +412,7 @@ class BaseLedger(metaclass=LedgerRegistry): address_manager: baseaccount.AddressManager = None): async with self._address_update_locks.setdefault(address, asyncio.Lock()): + self._known_addresses_out_of_sync.discard(address) local_status, local_history = await self.get_local_status_and_history(address) @@ -422,7 +424,7 @@ class BaseLedger(metaclass=LedgerRegistry): cache_tasks = [] synced_history = StringIO() for i, (txid, remote_height) in enumerate(map(itemgetter('tx_hash', 'height'), remote_history)): - if i < len(local_history) and local_history[i] == (txid, remote_height): + if i < len(local_history) and local_history[i] == (txid, remote_height) and not cache_tasks: synced_history.write(f'{txid}:{remote_height}:') else: cache_tasks.append(asyncio.ensure_future( @@ -472,15 +474,15 @@ class BaseLedger(metaclass=LedgerRegistry): local_status, local_history = await self.get_local_status_and_history(address) if local_status != remote_status: - remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history)) - if remote_history == local_history: + if local_history == list(map(itemgetter('tx_hash', 'height'), remote_history)): return True - log.debug( + log.warning( "Wallet is out of sync after syncing. Remote: %s with %d items, local: %s with %d items", remote_status, len(remote_history), local_status, len(local_history) ) - log.debug("local: %s", local_history) - log.debug("remote: %s", remote_history) + log.warning("local: %s", local_history) + log.warning("remote: %s", remote_history) + self._known_addresses_out_of_sync.add(address) return False else: return True From d2cd0ece5ffb7aa06028b4c2365f12519fc6a73e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 30 Aug 2019 19:53:51 -0300 Subject: [PATCH 05/10] restore multi server, improve sync concurrency --- .../client_tests/integration/test_network.py | 4 +- torba/torba/client/basedatabase.py | 77 +++++++++++-------- torba/torba/client/baseledger.py | 46 ++++++----- torba/torba/client/basenetwork.py | 35 +++++---- 4 files changed, 93 insertions(+), 69 deletions(-) diff --git a/torba/tests/client_tests/integration/test_network.py b/torba/tests/client_tests/integration/test_network.py index 75a28a28a..ee7ed4882 100644 --- a/torba/tests/client_tests/integration/test_network.py +++ b/torba/tests/client_tests/integration/test_network.py @@ -33,7 +33,7 @@ class ReconnectTests(IntegrationTestCase): for session in self.ledger.network.session_pool.sessions: session.trigger_urgent_reconnect.set() await asyncio.wait_for(self.ledger.network.session_pool.new_connection_event.wait(), timeout=1) - self.assertEqual(2, len(self.ledger.network.session_pool.available_sessions)) + self.assertEqual(2, len(list(self.ledger.network.session_pool.available_sessions))) self.assertTrue(self.ledger.network.is_connected) switch_event = self.ledger.network.on_connected.first await node2.stop(True) @@ -126,4 +126,4 @@ class ServerPickingTestCase(AsyncioTestCase): self.assertTrue(all([not session.is_closing() for session in network.session_pool.available_sessions])) # ensure we are connected to all of them after a while await asyncio.sleep(1) - self.assertEqual(len(network.session_pool.available_sessions), 3) + self.assertEqual(len(list(network.session_pool.available_sessions)), 3) diff --git a/torba/torba/client/basedatabase.py b/torba/torba/client/basedatabase.py index d0f37ccaf..a9b802dbf 100644 --- a/torba/torba/client/basedatabase.py +++ b/torba/torba/client/basedatabase.py @@ -227,16 +227,19 @@ class SQLiteMixin: await self.db.close() @staticmethod - def _insert_sql(table: str, data: dict, ignore_duplicate: bool = False) -> Tuple[str, List]: + def _insert_sql(table: str, data: dict, ignore_duplicate: bool = False, + replace: bool = False) -> Tuple[str, List]: columns, values = [], [] for column, value in data.items(): columns.append(column) values.append(value) - or_ignore = "" + policy = "" if ignore_duplicate: - or_ignore = " OR IGNORE" + policy = " OR IGNORE" + if replace: + policy = " OR REPLACE" sql = "INSERT{} INTO {} ({}) VALUES ({})".format( - or_ignore, table, ', '.join(columns), ', '.join(['?'] * len(values)) + policy, table, ', '.join(columns), ', '.join(['?'] * len(values)) ) return sql, values @@ -348,35 +351,47 @@ class BaseDatabase(SQLiteMixin): 'height': tx.height, 'position': tx.position, 'is_verified': tx.is_verified }, 'txid = ?', (tx.id,))) + def _transaction_io(self, conn: sqlite3.Connection, tx: BaseTransaction, address, txhash, history): + conn.execute(*self._insert_sql('tx', { + 'txid': tx.id, + 'raw': sqlite3.Binary(tx.raw), + 'height': tx.height, + 'position': tx.position, + 'is_verified': tx.is_verified + }, replace=True)) + + for txo in tx.outputs: + if txo.script.is_pay_pubkey_hash and txo.script.values['pubkey_hash'] == txhash: + conn.execute(*self._insert_sql( + "txo", self.txo_to_row(tx, address, txo), ignore_duplicate=True + )) + elif txo.script.is_pay_script_hash: + # TODO: implement script hash payments + log.warning('Database.save_transaction_io: pay script hash is not implemented!') + + for txi in tx.inputs: + if txi.txo_ref.txo is not None: + txo = txi.txo_ref.txo + if txo.get_address(self.ledger) == address: + conn.execute(*self._insert_sql("txi", { + 'txid': tx.id, + 'txoid': txo.id, + 'address': address, + }, ignore_duplicate=True)) + + conn.execute( + "UPDATE pubkey_address SET history = ?, used_times = ? WHERE address = ?", + (history, history.count(':') // 2, address) + ) + def save_transaction_io(self, tx: BaseTransaction, address, txhash, history): + return self.db.run(self._transaction_io, tx, address, txhash, history) - def _transaction(conn: sqlite3.Connection, tx: BaseTransaction, address, txhash, history): - - for txo in tx.outputs: - if txo.script.is_pay_pubkey_hash and txo.script.values['pubkey_hash'] == txhash: - conn.execute(*self._insert_sql( - "txo", self.txo_to_row(tx, address, txo), ignore_duplicate=True - )) - elif txo.script.is_pay_script_hash: - # TODO: implement script hash payments - log.warning('Database.save_transaction_io: pay script hash is not implemented!') - - for txi in tx.inputs: - if txi.txo_ref.txo is not None: - txo = txi.txo_ref.txo - if txo.get_address(self.ledger) == address: - conn.execute(*self._insert_sql("txi", { - 'txid': tx.id, - 'txoid': txo.id, - 'address': address, - }, ignore_duplicate=True)) - - conn.execute( - "UPDATE pubkey_address SET history = ?, used_times = ? WHERE address = ?", - (history, history.count(':')//2, address) - ) - - return self.db.run(_transaction, tx, address, txhash, history) + def save_transaction_io_batch(self, txs: Iterable[BaseTransaction], address, txhash, history): + def __many(conn): + for tx in txs: + self._transaction_io(conn, tx, address, txhash, history) + return self.db.run(__many) async def reserve_outputs(self, txos, is_reserved=True): txoids = ((is_reserved, txo.id) for txo in txos) diff --git a/torba/torba/client/baseledger.py b/torba/torba/client/baseledger.py index 5c47cb79f..c31612aee 100644 --- a/torba/torba/client/baseledger.py +++ b/torba/torba/client/baseledger.py @@ -10,6 +10,7 @@ from operator import itemgetter from collections import namedtuple import pylru +from torba.client.basetransaction import BaseTransaction from torba.tasks import TaskGroup from torba.client import baseaccount, basenetwork, basetransaction from torba.client.basedatabase import BaseDatabase @@ -251,9 +252,10 @@ class BaseLedger(metaclass=LedgerRegistry): self.constraint_account_or_all(constraints) return self.db.get_transaction_count(**constraints) - async def get_local_status_and_history(self, address): - address_details = await self.db.get_address(address=address) - history = address_details['history'] or '' + async def get_local_status_and_history(self, address, history=None): + if not history: + address_details = await self.db.get_address(address=address) + history = address_details['history'] or '' parts = history.split(':')[:-1] return ( hexlify(sha256(history.encode())).decode() if history else None, @@ -420,17 +422,23 @@ class BaseLedger(metaclass=LedgerRegistry): return True remote_history = await self.network.retriable_call(self.network.get_history, address) + remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history)) + we_need = set(remote_history) - set(local_history) + if not we_need: + return True - cache_tasks = [] + cache_tasks: List[asyncio.Future[BaseTransaction]] = [] synced_history = StringIO() - for i, (txid, remote_height) in enumerate(map(itemgetter('tx_hash', 'height'), remote_history)): + for i, (txid, remote_height) in enumerate(remote_history): if i < len(local_history) and local_history[i] == (txid, remote_height) and not cache_tasks: synced_history.write(f'{txid}:{remote_height}:') else: + check_local = (txid, remote_height) not in we_need cache_tasks.append(asyncio.ensure_future( - self.cache_transaction(txid, remote_height) + self.cache_transaction(txid, remote_height, check_local=check_local) )) + synced_txs = [] for task in cache_tasks: tx = await task @@ -459,11 +467,13 @@ class BaseLedger(metaclass=LedgerRegistry): txi.txo_ref = referenced_txo.ref synced_history.write(f'{tx.id}:{tx.height}:') + synced_txs.append(tx) - await self.db.save_transaction_io( - tx, address, self.address_to_hash160(address), synced_history.getvalue() - ) + await self.db.save_transaction_io_batch( + synced_txs, address, self.address_to_hash160(address), synced_history.getvalue() + ) + for tx in synced_txs: await self._on_transaction_controller.add(TransactionEvent(address, tx)) if address_manager is None: @@ -472,9 +482,10 @@ class BaseLedger(metaclass=LedgerRegistry): if address_manager is not None: await address_manager.ensure_address_gap() - local_status, local_history = await self.get_local_status_and_history(address) + local_status, local_history = \ + await self.get_local_status_and_history(address, synced_history.getvalue()) if local_status != remote_status: - if local_history == list(map(itemgetter('tx_hash', 'height'), remote_history)): + if local_history == remote_history: return True log.warning( "Wallet is out of sync after syncing. Remote: %s with %d items, local: %s with %d items", @@ -487,7 +498,7 @@ class BaseLedger(metaclass=LedgerRegistry): else: return True - async def cache_transaction(self, txid, remote_height): + async def cache_transaction(self, txid, remote_height, check_local=True): cache_item = self._tx_cache.get(txid) if cache_item is None: cache_item = self._tx_cache[txid] = TransactionCacheItem() @@ -500,7 +511,7 @@ class BaseLedger(metaclass=LedgerRegistry): tx = cache_item.tx - if tx is None: + if tx is None and check_local: # check local db tx = cache_item.tx = await self.db.get_transaction(txid=txid) @@ -509,19 +520,12 @@ class BaseLedger(metaclass=LedgerRegistry): _raw = await self.network.retriable_call(self.network.get_transaction, txid) if _raw: tx = self.transaction_class(unhexlify(_raw)) - await self.maybe_verify_transaction(tx, remote_height) - await self.db.insert_transaction(tx) cache_item.tx = tx # make sure it's saved before caching it - return tx if tx is None: raise ValueError(f'Transaction {txid} was not in database and not on network.') - if remote_height > 0 and not tx.is_verified: - # tx from cache / db is not up-to-date - await self.maybe_verify_transaction(tx, remote_height) - await self.db.update_transaction(tx) - + await self.maybe_verify_transaction(tx, remote_height) return tx async def maybe_verify_transaction(self, tx, remote_height): diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 19dbd58b7..121c3f3d9 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -30,11 +30,11 @@ class ClientSession(BaseClientSession): self._on_connect_cb = on_connect_callback or (lambda: None) self.trigger_urgent_reconnect = asyncio.Event() # one request per second of timeout, conservative default - self._semaphore = asyncio.Semaphore(self.timeout) + self._semaphore = asyncio.Semaphore(self.timeout * 2) @property def available(self): - return not self.is_closing() and self._can_send.is_set() and self.response_time is not None + return not self.is_closing() and self.response_time is not None @property def server_address_and_port(self) -> Optional[Tuple[str, int]]: @@ -195,10 +195,8 @@ class BaseNetwork: def is_connected(self): return self.client and not self.client.is_closing() - def rpc(self, list_or_method, args, session=None): - # fixme: use fastest unloaded session, but for now it causes issues with wallet sync - # session = session or self.session_pool.fastest_session - session = self.client + def rpc(self, list_or_method, args, restricted=False): + session = self.client if restricted else self.session_pool.fastest_session if session and not session.is_closing(): return session.send_request(list_or_method, args) else: @@ -225,28 +223,35 @@ class BaseNetwork: def get_transaction(self, tx_hash): return self.rpc('blockchain.transaction.get', [tx_hash]) - def get_transaction_height(self, tx_hash): - return self.rpc('blockchain.transaction.get_height', [tx_hash]) + def get_transaction_height(self, tx_hash, known_height=None): + restricted = True # by default, check master for consistency + if known_height: + if 0 < known_height < self.remote_height - 10: + restricted = False # we can get from any server, its old + return self.rpc('blockchain.transaction.get_height', [tx_hash], restricted) def get_merkle(self, tx_hash, height): - return self.rpc('blockchain.transaction.get_merkle', [tx_hash, height]) + restricted = True # by default, check master for consistency + if 0 < height < self.remote_height - 10: + restricted = False # we can get from any server, its old + return self.rpc('blockchain.transaction.get_merkle', [tx_hash, height], restricted) def get_headers(self, height, count=10000): return self.rpc('blockchain.block.headers', [height, count]) # --- Subscribes, history and broadcasts are always aimed towards the master client directly def get_history(self, address): - return self.rpc('blockchain.address.get_history', [address], session=self.client) + return self.rpc('blockchain.address.get_history', [address], True) def broadcast(self, raw_transaction): - return self.rpc('blockchain.transaction.broadcast', [raw_transaction], session=self.client) + return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True) def subscribe_headers(self): - return self.rpc('blockchain.headers.subscribe', [True], session=self.client) + return self.rpc('blockchain.headers.subscribe', [True], True) async def subscribe_address(self, address): try: - return await self.rpc('blockchain.address.subscribe', [address], session=self.client) + return await self.rpc('blockchain.address.subscribe', [address], True) except asyncio.TimeoutError: # abort and cancel, we cant lose a subscription, it will happen again on reconnect self.client.abort() @@ -267,11 +272,11 @@ class SessionPool: @property def available_sessions(self): - return [session for session in self.sessions if session.available] + return (session for session in self.sessions if session.available) @property def fastest_session(self): - if not self.available_sessions: + if not self.online: return None return min( [((session.response_time + session.connection_latency) * (session.pending_amount + 1), session) From 379144bcfeedd7d99e514d9ef904f2645649731c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 31 Aug 2019 06:37:17 -0300 Subject: [PATCH 06/10] fix restriction rules, minor refactor, translate error from transaction not found into returning none --- .../client_tests/integration/test_network.py | 2 +- .../integration/test_transactions.py | 3 +++ torba/tests/client_tests/unit/test_ledger.py | 2 +- torba/torba/client/basedatabase.py | 2 +- torba/torba/client/baseledger.py | 11 ++++++----- torba/torba/client/basenetwork.py | 19 ++++++++++--------- 6 files changed, 22 insertions(+), 17 deletions(-) diff --git a/torba/tests/client_tests/integration/test_network.py b/torba/tests/client_tests/integration/test_network.py index ee7ed4882..7f60441a2 100644 --- a/torba/tests/client_tests/integration/test_network.py +++ b/torba/tests/client_tests/integration/test_network.py @@ -59,7 +59,7 @@ class ReconnectTests(IntegrationTestCase): master_client = self.ledger.network.client self.ledger.network.client.connection_lost(Exception()) with self.assertRaises(asyncio.TimeoutError): - await d + await d self.assertIsNone(master_client.response_time) # response time unknown as it failed # rich but offline? no way, no water, let's retry with self.assertRaisesRegex(ConnectionError, 'connection is not available'): diff --git a/torba/tests/client_tests/integration/test_transactions.py b/torba/tests/client_tests/integration/test_transactions.py index 4ee4e2dfe..f7f7581a0 100644 --- a/torba/tests/client_tests/integration/test_transactions.py +++ b/torba/tests/client_tests/integration/test_transactions.py @@ -138,6 +138,7 @@ class BasicTransactionTests(IntegrationTestCase): address = await self.account.receiving.get_or_create_usable_address() # evil trick: mempool is unsorted on real life, but same order between python instances. reproduce it original_summary = self.conductor.spv_node.server.mempool.transaction_summaries + async def random_summary(*args, **kwargs): summary = await original_summary(*args, **kwargs) if summary and len(summary) > 2: @@ -174,3 +175,5 @@ class BasicTransactionTests(IntegrationTestCase): self.assertTrue(await self.ledger.update_history(address, remote_status)) self.assertEqual(21, len((await self.ledger.get_local_status_and_history(address))[1])) self.assertEqual(0, len(self.ledger._known_addresses_out_of_sync)) + # should be another test, but it would be too much to setup just for that and it affects sync + self.assertIsNone(await self.ledger.network.get_transaction('1'*64)) diff --git a/torba/tests/client_tests/unit/test_ledger.py b/torba/tests/client_tests/unit/test_ledger.py index beb26a5d1..f94326f5d 100644 --- a/torba/tests/client_tests/unit/test_ledger.py +++ b/torba/tests/client_tests/unit/test_ledger.py @@ -29,7 +29,7 @@ class MockNetwork: async def get_merkle(self, txid, height): return {'merkle': ['abcd01'], 'pos': 1} - async def get_transaction(self, tx_hash): + async def get_transaction(self, tx_hash, _=None): self.get_transaction_called.append(tx_hash) return self.transaction[tx_hash] diff --git a/torba/torba/client/basedatabase.py b/torba/torba/client/basedatabase.py index a9b802dbf..f31e816df 100644 --- a/torba/torba/client/basedatabase.py +++ b/torba/torba/client/basedatabase.py @@ -220,7 +220,7 @@ class SQLiteMixin: async def open(self): log.info("connecting to database: %s", self._db_path) - self.db = await AIOSQLite.connect(self._db_path) + self.db = await AIOSQLite.connect(self._db_path, isolation_level=None) await self.db.executescript(self.CREATE_TABLES_QUERY) async def close(self): diff --git a/torba/torba/client/baseledger.py b/torba/torba/client/baseledger.py index c31612aee..cba9a8d9a 100644 --- a/torba/torba/client/baseledger.py +++ b/torba/torba/client/baseledger.py @@ -287,7 +287,7 @@ class BaseLedger(metaclass=LedgerRegistry): await self.join_network() self.network.on_connected.listen(self.join_network) - async def join_network(self, *args): + async def join_network(self, *_): log.info("Subscribing and updating accounts.") async with self._header_processing_lock: await self.update_headers() @@ -472,9 +472,10 @@ class BaseLedger(metaclass=LedgerRegistry): await self.db.save_transaction_io_batch( synced_txs, address, self.address_to_hash160(address), synced_history.getvalue() ) - - for tx in synced_txs: - await self._on_transaction_controller.add(TransactionEvent(address, tx)) + await asyncio.wait([ + self._on_transaction_controller.add(TransactionEvent(address, tx)) + for tx in synced_txs + ]) if address_manager is None: address_manager = await self.get_address_manager_for_address(address) @@ -517,7 +518,7 @@ class BaseLedger(metaclass=LedgerRegistry): if tx is None: # fetch from network - _raw = await self.network.retriable_call(self.network.get_transaction, txid) + _raw = await self.network.retriable_call(self.network.get_transaction, txid, remote_height) if _raw: tx = self.transaction_class(unhexlify(_raw)) cache_item.tx = tx # make sure it's saved before caching it diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 121c3f3d9..6bb789d3d 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -72,6 +72,10 @@ class ClientSession(BaseClientSession): log.debug("got reply for %s from %s:%i", method, *self.server) return reply except RPCError as e: + if str(e).find('.*no such .*transaction.*') and args: + # shouldnt the server return none instead? + log.warning("Requested transaction missing from server: %s", args[0]) + return None log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s", *self.server, *e.args) raise e @@ -220,20 +224,17 @@ class BaseNetwork: def _update_remote_height(self, header_args): self.remote_height = header_args[0]["height"] - def get_transaction(self, tx_hash): - return self.rpc('blockchain.transaction.get', [tx_hash]) + def get_transaction(self, tx_hash, known_height=None): + # use any server if its old, otherwise restrict to who gave us the history + restricted = not known_height or 0 > known_height > self.remote_height - 10 + return self.rpc('blockchain.transaction.get', [tx_hash], restricted) def get_transaction_height(self, tx_hash, known_height=None): - restricted = True # by default, check master for consistency - if known_height: - if 0 < known_height < self.remote_height - 10: - restricted = False # we can get from any server, its old + restricted = not known_height or 0 > known_height > self.remote_height - 10 return self.rpc('blockchain.transaction.get_height', [tx_hash], restricted) def get_merkle(self, tx_hash, height): - restricted = True # by default, check master for consistency - if 0 < height < self.remote_height - 10: - restricted = False # we can get from any server, its old + restricted = 0 > height > self.remote_height - 10 return self.rpc('blockchain.transaction.get_merkle', [tx_hash, height], restricted) def get_headers(self, height, count=10000): From 5018fe90debff7b42db62f854aa7e6dfa67443ca Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 1 Sep 2019 00:02:07 -0300 Subject: [PATCH 07/10] can also be protocolerror --- torba/tests/client_tests/integration/test_transactions.py | 2 +- torba/torba/client/basenetwork.py | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/torba/tests/client_tests/integration/test_transactions.py b/torba/tests/client_tests/integration/test_transactions.py index f7f7581a0..0e1b4b898 100644 --- a/torba/tests/client_tests/integration/test_transactions.py +++ b/torba/tests/client_tests/integration/test_transactions.py @@ -176,4 +176,4 @@ class BasicTransactionTests(IntegrationTestCase): self.assertEqual(21, len((await self.ledger.get_local_status_and_history(address))[1])) self.assertEqual(0, len(self.ledger._known_addresses_out_of_sync)) # should be another test, but it would be too much to setup just for that and it affects sync - self.assertIsNone(await self.ledger.network.get_transaction('1'*64)) + self.assertIsNone(await self.ledger.network.retriable_call(self.ledger.network.get_transaction, '1'*64)) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index 6bb789d3d..c79a9274e 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -4,7 +4,7 @@ from operator import itemgetter from typing import Dict, Optional, Tuple from time import perf_counter -from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError +from torba.rpc import RPCSession as BaseClientSession, Connector, RPCError, ProtocolError from torba import __version__ from torba.stream import StreamController @@ -71,10 +71,9 @@ class ClientSession(BaseClientSession): ) log.debug("got reply for %s from %s:%i", method, *self.server) return reply - except RPCError as e: - if str(e).find('.*no such .*transaction.*') and args: + except (RPCError, ProtocolError) as e: + if str(e).find('.*no such .*transaction.*'): # shouldnt the server return none instead? - log.warning("Requested transaction missing from server: %s", args[0]) return None log.warning("Wallet server (%s:%i) returned an error. Code: %s Message: %s", *self.server, *e.args) From 4839ca631a282701e22d3b86fc2c2f65314e00ff Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 2 Sep 2019 15:10:16 -0300 Subject: [PATCH 08/10] make restricted default --- torba/torba/client/basenetwork.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/torba/torba/client/basenetwork.py b/torba/torba/client/basenetwork.py index c79a9274e..56912f01e 100644 --- a/torba/torba/client/basenetwork.py +++ b/torba/torba/client/basenetwork.py @@ -198,7 +198,7 @@ class BaseNetwork: def is_connected(self): return self.client and not self.client.is_closing() - def rpc(self, list_or_method, args, restricted=False): + def rpc(self, list_or_method, args, restricted=True): session = self.client if restricted else self.session_pool.fastest_session if session and not session.is_closing(): return session.send_request(list_or_method, args) From 8edebbb6848d8424c29f2df411bc86c7e892b6fd Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 2 Sep 2019 15:43:33 -0300 Subject: [PATCH 09/10] fix tests for transaction show --- lbry/lbry/wallet/manager.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lbry/lbry/wallet/manager.py b/lbry/lbry/wallet/manager.py index 5ac58994b..6740edad6 100644 --- a/lbry/lbry/wallet/manager.py +++ b/lbry/lbry/wallet/manager.py @@ -199,6 +199,8 @@ class LbryWalletManager(BaseWalletManager): if not tx: try: raw = await self.ledger.network.get_transaction(txid) + if not raw: + return {'success': False, 'code': 404, 'message': 'transaction not found'} height = await self.ledger.network.get_transaction_height(txid) except CodeMessageError as e: return {'success': False, 'code': e.code, 'message': e.message} From 6c79f98d3cff215226c1626fc2f782ec51181bec Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 3 Sep 2019 11:28:30 -0300 Subject: [PATCH 10/10] connect directly on test server --- lbry/tests/integration/test_wallet_server_sessions.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lbry/tests/integration/test_wallet_server_sessions.py b/lbry/tests/integration/test_wallet_server_sessions.py index 041a072fd..b19c77e29 100644 --- a/lbry/tests/integration/test_wallet_server_sessions.py +++ b/lbry/tests/integration/test_wallet_server_sessions.py @@ -16,12 +16,13 @@ class TestSessionBloat(IntegrationTestCase): LEDGER = lbry.wallet async def test_session_bloat_from_socket_timeout(self): - client = self.ledger.network.client await self.conductor.stop_spv() await self.ledger.stop() self.conductor.spv_node.session_timeout = 1 await self.conductor.start_spv() - session = ClientSession(network=None, server=client.server, timeout=0.2) + session = ClientSession( + network=None, server=(self.conductor.spv_node.hostname, self.conductor.spv_node.port), timeout=0.2 + ) await session.create_connection() await session.send_request('server.banner', ()) self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 1)