Compare commits

...
Sign in to create a new pull request.

16 commits

Author SHA1 Message Date
Jack Robison
1298e0e725
reproduce sync error 2020-05-29 14:22:32 -04:00
Jack Robison
5d5cd3499a
prefer confirmed but allow spending unconfirmed with sqlite coin chooser 2020-05-27 12:27:04 -04:00
Jack Robison
c7e0e9f085
lint 2020-05-27 01:51:34 -04:00
Jack Robison
70464a23d2
logging 2020-05-27 00:59:24 -04:00
Jack Robison
4217bbbea6
don't block notifying sessions on the notifications going through 2020-05-27 00:59:24 -04:00
Jack Robison
8a06b1b5fa
fix history cache 2020-05-27 00:59:24 -04:00
Jack Robison
00c89a4f0b
debug 2020-05-27 00:59:24 -04:00
Jack Robison
ad7441ca47
fix prometheus 2020-05-27 00:59:24 -04:00
Jack Robison
f461fc6ef9
non blocking save_claims 2020-05-27 00:59:24 -04:00
Jack Robison
1c4c18dec9
debug 2020-05-27 00:59:24 -04:00
Jack Robison
597a101030
update sqlite coin chooser to only select verified transactions 2020-05-27 00:59:24 -04:00
Jack Robison
adbbb11dbf
fix 'timed out subscribing to addresses' logging error 2020-05-27 00:59:24 -04:00
Jack Robison
b66281eabc
leave reservations for rejected transactions 2020-05-27 00:58:35 -04:00
Jack Robison
be5f6491bc
add sqlite coin chooser test 2020-05-27 00:58:35 -04:00
Jack Robison
9d34e027f3
update tests 2020-05-27 00:58:35 -04:00
Jack Robison
6dbea6f4ab
add sqlite coin chooser, set it as the default coin selection strategy 2020-05-27 00:58:35 -04:00
15 changed files with 331 additions and 60 deletions

View file

@ -634,7 +634,7 @@ class Config(CLIConfig):
coin_selection_strategy = StringChoice( coin_selection_strategy = StringChoice(
"Strategy to use when selecting UTXOs for a transaction", "Strategy to use when selecting UTXOs for a transaction",
STRATEGIES, "standard") STRATEGIES, "sqlite")
transaction_cache_size = Integer("Transaction cache size", 100_000) transaction_cache_size = Integer("Transaction cache size", 100_000)
save_resolved_claims = Toggle( save_resolved_claims = Toggle(

View file

@ -2569,9 +2569,9 @@ class Daemon(metaclass=JSONRPCServerType):
account.add_channel_private_key(txo.private_key) account.add_channel_private_key(txo.private_key)
wallet.save() wallet.save()
await self.broadcast_or_release(tx, blocking) await self.broadcast_or_release(tx, blocking)
await self.storage.save_claims([self._old_get_temp_claim_info( self.component_manager.loop.create_task(self.storage.save_claims([self._old_get_temp_claim_info(
tx, txo, claim_address, claim, name, dewies_to_lbc(amount) tx, txo, claim_address, claim, name, dewies_to_lbc(amount)
)]) )]))
self.component_manager.loop.create_task(self.analytics_manager.send_new_channel()) self.component_manager.loop.create_task(self.analytics_manager.send_new_channel())
else: else:
await account.ledger.release_tx(tx) await account.ledger.release_tx(tx)
@ -2725,9 +2725,9 @@ class Daemon(metaclass=JSONRPCServerType):
account.add_channel_private_key(new_txo.private_key) account.add_channel_private_key(new_txo.private_key)
wallet.save() wallet.save()
await self.broadcast_or_release(tx, blocking) await self.broadcast_or_release(tx, blocking)
await self.storage.save_claims([self._old_get_temp_claim_info( self.component_manager.loop.create_task(self.storage.save_claims([self._old_get_temp_claim_info(
tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount) tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount)
)]) )]))
self.component_manager.loop.create_task(self.analytics_manager.send_new_channel()) self.component_manager.loop.create_task(self.analytics_manager.send_new_channel())
else: else:
await account.ledger.release_tx(tx) await account.ledger.release_tx(tx)
@ -3262,14 +3262,18 @@ class Daemon(metaclass=JSONRPCServerType):
if not preview: if not preview:
await self.broadcast_or_release(tx, blocking) await self.broadcast_or_release(tx, blocking)
await self.storage.save_claims([self._old_get_temp_claim_info(
tx, new_txo, claim_address, claim, name, dewies_to_lbc(amount) async def save_claims():
)]) await self.storage.save_claims([self._old_get_temp_claim_info(
await self.storage.save_content_claim(file_stream.stream_hash, new_txo.id) tx, new_txo, claim_address, claim, name, dewies_to_lbc(amount)
)])
await self.storage.save_content_claim(file_stream.stream_hash, new_txo.id)
self.component_manager.loop.create_task(save_claims())
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish')) self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish'))
else: else:
await account.ledger.release_tx(tx) await account.ledger.release_tx(tx)
log.info("successful publish %s", tx.id)
return tx return tx
@requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
@ -3480,11 +3484,15 @@ class Daemon(metaclass=JSONRPCServerType):
if not preview: if not preview:
await self.broadcast_or_release(tx, blocking) await self.broadcast_or_release(tx, blocking)
await self.storage.save_claims([self._old_get_temp_claim_info(
tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount) async def save_claims():
)]) await self.storage.save_claims([self._old_get_temp_claim_info(
if stream_hash: tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount)
await self.storage.save_content_claim(stream_hash, new_txo.id) )])
if stream_hash:
await self.storage.save_content_claim(stream_hash, new_txo.id)
self.component_manager.loop.create_task(save_claims())
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish')) self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish'))
else: else:
await account.ledger.release_tx(tx) await account.ledger.release_tx(tx)

View file

@ -5,7 +5,7 @@ from lbry.wallet.transaction import OutputEffectiveAmountEstimator
MAXIMUM_TRIES = 100000 MAXIMUM_TRIES = 100000
STRATEGIES = [] STRATEGIES = ['sqlite'] # sqlite coin chooser is in database.py
def strategy(method): def strategy(method):

View file

@ -4,6 +4,7 @@ import asyncio
import sqlite3 import sqlite3
import platform import platform
from binascii import hexlify from binascii import hexlify
from collections import defaultdict
from dataclasses import dataclass from dataclasses import dataclass
from contextvars import ContextVar from contextvars import ContextVar
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
@ -14,7 +15,7 @@ from prometheus_client import Gauge, Counter, Histogram
from lbry.utils import LockWithMetrics from lbry.utils import LockWithMetrics
from .bip32 import PubKey from .bip32 import PubKey
from .transaction import Transaction, Output, OutputScript, TXRefImmutable from .transaction import Transaction, Output, OutputScript, TXRefImmutable, Input
from .constants import TXO_TYPES, CLAIM_TYPES from .constants import TXO_TYPES, CLAIM_TYPES
from .util import date_to_julian_day from .util import date_to_julian_day
@ -466,6 +467,70 @@ def dict_row_factory(cursor, row):
return d return d
def get_spendable_utxos(transaction: sqlite3.Connection, accounts: List, reserve_amount: int, floor: int,
fee_per_byte: int):
txs = defaultdict(list)
decoded_transactions = {}
accumulated = 0
multiplier = 10
gap_count = 0
accounts_fmt = ",".join(["?"] * len(accounts))
txo_query = f"""
SELECT tx.txid, txo.txoid, tx.raw, tx.height, txo.position as nout, tx.is_verified, txo.amount FROM txo
INNER JOIN account_address USING (address)
LEFT JOIN txi USING (txoid)
INNER JOIN tx USING (txid)
WHERE txo.txo_type=0 AND txi.txoid IS NULL AND tx.txid IS NOT NULL AND tx.is_verified AND NOT txo.is_reserved
AND txo.amount BETWEEN ? AND ?
"""
if accounts:
txo_query += f"""
AND account_address.account {'= ?' if len(accounts_fmt) == 1 else 'IN (' + accounts_fmt + ')'}
"""
reserved = []
while accumulated < reserve_amount:
found_txs = False
# prefer confirmed, but save unconfirmed utxos from this selection in case they are needed
unconfirmed = []
for row in transaction.execute(txo_query, (floor, floor * multiplier, *accounts)):
(txid, txoid, raw, height, nout, verified, amount) = row.values()
found_txs = True
if txid not in decoded_transactions:
decoded_transactions[txid] = Transaction(raw)
decoded_tx = decoded_transactions[txid]
if not verified:
unconfirmed.append((txid, txoid, raw, height, nout, verified, amount))
continue
accumulated += amount
accumulated -= Input.spend(decoded_tx.outputs[nout]).size * fee_per_byte
txs[(raw, height, verified)].append(nout)
reserved.append(txoid)
if accumulated >= reserve_amount:
break
unconfirmed.reverse()
while unconfirmed:
(txid, txoid, raw, height, nout, verified, amount) = unconfirmed.pop()
decoded_tx = decoded_transactions[txid]
accumulated += amount
accumulated -= Input.spend(decoded_tx.outputs[nout]).size * fee_per_byte
txs[(raw, height, verified)].append(nout)
reserved.append(txoid)
if accumulated >= reserve_amount:
break
if not found_txs:
gap_count += 1
if gap_count == 5:
break
floor *= multiplier
# reserve the accumulated txos if enough were found
if accumulated >= reserve_amount:
transaction.executemany("UPDATE txo SET is_reserved = ? WHERE txoid = ?",
[(True, txoid) for txoid in reserved]).fetchall()
return txs
class Database(SQLiteMixin): class Database(SQLiteMixin):
SCHEMA_VERSION = "1.3" SCHEMA_VERSION = "1.3"
@ -666,6 +731,19 @@ class Database(SQLiteMixin):
# 2. update address histories removing deleted TXs # 2. update address histories removing deleted TXs
return True return True
async def get_spendable_utxos(self, ledger, reserve_amount, accounts: Optional[Iterable], min_amount: int = 100000,
fee_per_byte: int = 50) -> List:
to_spend = await self.db.run(
get_spendable_utxos, tuple(account.id for account in accounts), reserve_amount, min_amount,
fee_per_byte
)
txos = []
for (raw, height, verified), positions in to_spend.items():
tx = Transaction(raw, height=height, is_verified=verified)
for nout in positions:
txos.append(tx.outputs[nout].get_estimator(ledger))
return txos
async def select_transactions(self, cols, accounts=None, read_only=False, **constraints): async def select_transactions(self, cols, accounts=None, read_only=False, **constraints):
if not {'txid', 'txid__in'}.intersection(constraints): if not {'txid', 'txid__in'}.intersection(constraints):
assert accounts, "'accounts' argument required when no 'txid' constraint is present" assert accounts, "'accounts' argument required when no 'txid' constraint is present"

View file

@ -133,7 +133,7 @@ class Ledger(metaclass=LedgerRegistry):
self._on_transaction_controller = StreamController() self._on_transaction_controller = StreamController()
self.on_transaction = self._on_transaction_controller.stream self.on_transaction = self._on_transaction_controller.stream
self.on_transaction.listen( self.on_transaction.listen(
lambda e: log.info( lambda e: log.debug(
'(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s', '(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s',
self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id
) )
@ -167,6 +167,7 @@ class Ledger(metaclass=LedgerRegistry):
self.coin_selection_strategy = None self.coin_selection_strategy = None
self._known_addresses_out_of_sync = set() self._known_addresses_out_of_sync = set()
self.went_out_of_sync = asyncio.Queue()
self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char) self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char)
self._balance_cache = pylru.lrucache(100000) self._balance_cache = pylru.lrucache(100000)
@ -244,11 +245,16 @@ class Ledger(metaclass=LedgerRegistry):
def get_address_count(self, **constraints): def get_address_count(self, **constraints):
return self.db.get_address_count(**constraints) return self.db.get_address_count(**constraints)
async def get_spendable_utxos(self, amount: int, funding_accounts): async def get_spendable_utxos(self, amount: int, funding_accounts: Optional[Iterable['Account']],
min_amount=100000):
min_amount = min(amount // 10, min_amount)
fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self)
selector = CoinSelector(amount, fee)
async with self._utxo_reservation_lock: async with self._utxo_reservation_lock:
if self.coin_selection_strategy == 'sqlite':
return await self.db.get_spendable_utxos(self, amount + fee, funding_accounts, min_amount=min_amount,
fee_per_byte=self.fee_per_byte)
txos = await self.get_effective_amount_estimators(funding_accounts) txos = await self.get_effective_amount_estimators(funding_accounts)
fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self)
selector = CoinSelector(amount, fee)
spendables = selector.select(txos, self.coin_selection_strategy) spendables = selector.select(txos, self.coin_selection_strategy)
if spendables: if spendables:
await self.reserve_outputs(s.txo for s in spendables) await self.reserve_outputs(s.txo for s in spendables)
@ -492,12 +498,16 @@ class Ledger(metaclass=LedgerRegistry):
local_status, local_history = await self.get_local_status_and_history(address) local_status, local_history = await self.get_local_status_and_history(address)
if local_status == remote_status: if local_status == remote_status:
log.info("no new txs needed for %s, remote matches local", address)
return True return True
remote_history = await self.network.retriable_call(self.network.get_history, address) remote_history = await self.network.retriable_call(self.network.get_history, address)
remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history)) remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history))
we_need = set(remote_history) - set(local_history) we_need = set(remote_history) - set(local_history)
if not we_need: if not we_need:
log.info(
"no new txs needed for %s, local status %s, remote status %s", address, local_status, remote_status
)
return True return True
cache_tasks: List[asyncio.Task[Transaction]] = [] cache_tasks: List[asyncio.Task[Transaction]] = []
@ -563,16 +573,35 @@ class Ledger(metaclass=LedgerRegistry):
await self.get_local_status_and_history(address, synced_history.getvalue()) await self.get_local_status_and_history(address, synced_history.getvalue())
if local_status != remote_status: if local_status != remote_status:
if local_history == remote_history: if local_history == remote_history:
log.warning(
"%s has a synced history but a mismatched status", address
)
return True return True
remote_set = set(remote_history)
local_set = set(local_history)
log.warning( log.warning(
"Wallet is out of sync after syncing. Remote: %s with %d items, local: %s with %d items", "%s is out of sync after syncing.\n"
remote_status, len(remote_history), local_status, len(local_history) "Remote: %s with %d items (%i unique), local: %s with %d items (%i unique).\n"
"Histories are mismatched on %i items.\n"
"Local is missing\n"
"%s\n"
"Remote is missing\n"
"%s\n"
"******",
address, remote_status, len(remote_history), len(remote_set),
local_status, len(local_history), len(local_set), len(remote_set.symmetric_difference(local_set)),
"\n".join([f"{txid} - {height}" for txid, height in remote_set.difference(local_set)]),
"\n".join([f"{txid} - {height}" for txid, height in local_set.difference(remote_set)])
) )
log.warning("local: %s", local_history) # log.warning("local: %s", local_history)
log.warning("remote: %s", remote_history) # log.warning("remote: %s", remote_history)
self.went_out_of_sync.put_nowait(address)
self._known_addresses_out_of_sync.add(address) self._known_addresses_out_of_sync.add(address)
return False return False
else: else:
log.warning(
"%s is synced", address
)
return True return True
async def cache_transaction(self, txid, remote_height, check_local=True): async def cache_transaction(self, txid, remote_height, check_local=True):

View file

@ -302,6 +302,9 @@ class WalletManager:
await self.ledger.broadcast(tx) await self.ledger.broadcast(tx)
if blocking: if blocking:
await self.ledger.wait(tx, timeout=None) await self.ledger.wait(tx, timeout=None)
except CodeMessageError as err:
log.warning("transaction rejected, leaving reserved")
raise
except: except:
await self.ledger.release_tx(tx) await self.ledger.release_tx(tx)
raise raise

View file

@ -1,6 +1,7 @@
import logging import logging
import asyncio import asyncio
import json import json
import time
from time import perf_counter from time import perf_counter
from operator import itemgetter from operator import itemgetter
from typing import Dict, Optional, Tuple from typing import Dict, Optional, Tuple
@ -54,8 +55,8 @@ class ClientSession(BaseClientSession):
return result return result
async def send_request(self, method, args=()): async def send_request(self, method, args=()):
log.info("%i in flight, send %s%s to %s:%i ", self.pending_amount, method, tuple(args), *self.server)
self.pending_amount += 1 self.pending_amount += 1
log.debug("send %s%s to %s:%i", method, tuple(args), *self.server)
try: try:
if method == 'server.version': if method == 'server.version':
return await self.send_timed_server_version_request(args, self.timeout) return await self.send_timed_server_version_request(args, self.timeout)
@ -63,11 +64,11 @@ class ClientSession(BaseClientSession):
while not request.done(): while not request.done():
done, pending = await asyncio.wait([request], timeout=self.timeout) done, pending = await asyncio.wait([request], timeout=self.timeout)
if pending: if pending:
log.debug("Time since last packet: %s", perf_counter() - self.last_packet_received) log.info("Time since last packet: %s", perf_counter() - self.last_packet_received)
if (perf_counter() - self.last_packet_received) < self.timeout: if (perf_counter() - self.last_packet_received) < self.timeout:
continue continue
log.info("timeout sending %s to %s:%i", method, *self.server) log.warning("timeout sending %s(%s) to %s:%i", method, str(args), *self.server)
raise asyncio.TimeoutError raise asyncio.TimeoutError()
if done: if done:
try: try:
return request.result() return request.result()
@ -91,6 +92,7 @@ class ClientSession(BaseClientSession):
raise raise
finally: finally:
self.pending_amount -= 1 self.pending_amount -= 1
log.info("%i in flight, finished %s%s ", self.pending_amount, method, tuple(args))
async def ensure_session(self): async def ensure_session(self):
# Handles reconnecting and maintaining a session alive # Handles reconnecting and maintaining a session alive
@ -144,12 +146,12 @@ class ClientSession(BaseClientSession):
controller.add(request.args) controller.add(request.args)
def connection_lost(self, exc): def connection_lost(self, exc):
log.debug("Connection lost: %s:%d", *self.server) log.warning("Connection lost: %s:%d", *self.server)
super().connection_lost(exc) super().connection_lost(exc)
self.response_time = None self.response_time = None
self.connection_latency = None self.connection_latency = None
self._response_samples = 0 self._response_samples = 0
self.pending_amount = 0 # self.pending_amount = 0
self._on_disconnect_controller.add(True) self._on_disconnect_controller.add(True)
@ -274,8 +276,13 @@ class Network:
return self.rpc('blockchain.block.headers', [height, count, 0, b64], restricted) return self.rpc('blockchain.block.headers', [height, count, 0, b64], restricted)
# --- Subscribes, history and broadcasts are always aimed towards the master client directly # --- Subscribes, history and broadcasts are always aimed towards the master client directly
def get_history(self, address): async def get_history(self, address):
return self.rpc('blockchain.address.get_history', [address], True) log.info("get history %s", address)
start = time.perf_counter()
try:
return await self.rpc('blockchain.address.get_history', [address], True)
finally:
log.info("%s history took %s", address, time.perf_counter() - start)
def broadcast(self, raw_transaction): def broadcast(self, raw_transaction):
return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True) return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True)
@ -285,16 +292,18 @@ class Network:
async def subscribe_address(self, address, *addresses): async def subscribe_address(self, address, *addresses):
addresses = list((address, ) + addresses) addresses = list((address, ) + addresses)
server_addr_and_port = self.client.server_address_and_port # on disconnect client will be None
try: try:
return await self.rpc('blockchain.address.subscribe', addresses, True) return await self.rpc('blockchain.address.subscribe', addresses, True)
except asyncio.TimeoutError: except asyncio.TimeoutError:
log.warning( log.warning(
"timed out subscribing to addresses from %s:%i", "timed out subscribing to addresses from %s:%i",
*self.client.server_address_and_port *server_addr_and_port
) )
# abort and cancel, we can't lose a subscription, it will happen again on reconnect # abort and cancel, we can't lose a subscription, it will happen again on reconnect
if self.client: if self.client:
self.client.abort() self.client.abort()
log.warning("raise cancelled")
raise asyncio.CancelledError() raise asyncio.CancelledError()
def unsubscribe_address(self, address): def unsubscribe_address(self, address):

View file

@ -480,6 +480,7 @@ class RPCSession(SessionBase):
await event.wait() await event.wait()
result = event.result result = event.result
if isinstance(result, Exception): if isinstance(result, Exception):
self.logger.error("%s sending %s(%s)", result, method, "" if not args else ",".join([str(a) for a in args]))
raise result raise result
return result return result

View file

@ -183,6 +183,7 @@ class BlockProcessor:
self.state_lock = asyncio.Lock() self.state_lock = asyncio.Lock()
self.search_cache = {} self.search_cache = {}
self.history_cache = {}
async def run_in_thread_with_lock(self, func, *args): async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that # Run in a thread to prevent blocking. Shielded so that
@ -213,6 +214,7 @@ class BlockProcessor:
await self.run_in_thread_with_lock(self.advance_blocks, blocks) await self.run_in_thread_with_lock(self.advance_blocks, blocks)
for cache in self.search_cache.values(): for cache in self.search_cache.values():
cache.clear() cache.clear()
self.history_cache.clear()
await self._maybe_flush() await self._maybe_flush()
processed_time = time.perf_counter() - start processed_time = time.perf_counter() - start
self.block_count_metric.set(self.height) self.block_count_metric.set(self.height)
@ -754,6 +756,8 @@ class LBRYBlockProcessor(BlockProcessor):
self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}") self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}")
self.sql: SQLDB = self.db.sql self.sql: SQLDB = self.db.sql
self.timer = Timer('BlockProcessor') self.timer = Timer('BlockProcessor')
self.block_notify = asyncio.Event()
self.block_notify.set()
def advance_blocks(self, blocks): def advance_blocks(self, blocks):
self.sql.begin() self.sql.begin()

View file

@ -213,10 +213,18 @@ class MemPool:
continue continue
hashes = {hex_str_to_hash(hh) for hh in hex_hashes} hashes = {hex_str_to_hash(hh) for hh in hex_hashes}
async with self.lock: async with self.lock:
start = time.perf_counter()
touched = await self._process_mempool(hashes) touched = await self._process_mempool(hashes)
duration = time.perf_counter() - start
if duration >= 1:
self.logger.info("processed mempool in %s", duration)
synchronized_event.set() synchronized_event.set()
synchronized_event.clear() synchronized_event.clear()
start = time.perf_counter()
await self.api.on_mempool(touched, height) await self.api.on_mempool(touched, height)
duration = time.perf_counter() - start
if duration >= 1:
self.logger.info("sent history notifications in %s", duration)
try: try:
# we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event) # we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event)
await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs) await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs)

View file

@ -135,7 +135,7 @@ class SessionManager:
"docker_tag": DOCKER_TAG, "docker_tag": DOCKER_TAG,
'version': lbry.__version__, 'version': lbry.__version__,
"min_version": util.version_string(VERSION.PROTOCOL_MIN), "min_version": util.version_string(VERSION.PROTOCOL_MIN),
"cpu_count": os.cpu_count() "cpu_count": str(os.cpu_count())
}) })
session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE, session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE,
labelnames=("version",)) labelnames=("version",))
@ -177,7 +177,7 @@ class SessionManager:
self.cur_group = SessionGroup(0) self.cur_group = SessionGroup(0)
self.txs_sent = 0 self.txs_sent = 0
self.start_time = time.time() self.start_time = time.time()
self.history_cache = pylru.lrucache(256) self.history_cache = self.bp.history_cache
self.notified_height: typing.Optional[int] = None self.notified_height: typing.Optional[int] = None
# Cache some idea of room to avoid recounting on each subscription # Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0 self.subs_room = 0
@ -608,26 +608,20 @@ class SessionManager:
async def limited_history(self, hashX): async def limited_history(self, hashX):
"""A caching layer.""" """A caching layer."""
hc = self.history_cache if hashX not in self.history_cache:
if hashX not in hc:
# History DoS limit. Each element of history is about 99 # History DoS limit. Each element of history is about 99
# bytes when encoded as JSON. This limits resource usage # bytes when encoded as JSON. This limits resource usage
# on bloated history requests, and uses a smaller divisor # on bloated history requests, and uses a smaller divisor
# so large requests are logged before refusing them. # so large requests are logged before refusing them.
limit = self.env.max_send // 97 limit = self.env.max_send // 97
hc[hashX] = await self.db.limited_history(hashX, limit=limit) self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit)
return hc[hashX] return self.history_cache[hashX]
async def _notify_sessions(self, height, touched): async def _notify_sessions(self, height, touched):
"""Notify sessions about height changes and touched addresses.""" """Notify sessions about height changes and touched addresses."""
height_changed = height != self.notified_height height_changed = height != self.notified_height
if height_changed: if height_changed:
await self._refresh_hsub_results(height) await self._refresh_hsub_results(height)
# Invalidate our history cache for touched hashXs
hc = self.history_cache
for hashX in set(hc).intersection(touched):
del hc[hashX]
if self.sessions: if self.sessions:
await asyncio.wait([ await asyncio.wait([
session.notify(touched, height_changed) for session in self.sessions session.notify(touched, height_changed) for session in self.sessions
@ -921,10 +915,15 @@ class LBRYElectrumX(SessionBase):
def sub_count(self): def sub_count(self):
return len(self.hashX_subs) return len(self.hashX_subs)
UGLY_COUNT = 0
async def notify(self, touched, height_changed): async def notify(self, touched, height_changed):
"""Notify the client about changes to touched addresses (from mempool """Notify the client about changes to touched addresses (from mempool
updates or new blocks) and height. updates or new blocks) and height.
""" """
self.UGLY_COUNT += 1
if height_changed and self.subscribe_headers: if height_changed and self.subscribe_headers:
args = (await self.subscribe_headers_result(), ) args = (await self.subscribe_headers_result(), )
try: try:
@ -940,6 +939,11 @@ class LBRYElectrumX(SessionBase):
for hashX in touched: for hashX in touched:
alias = self.hashX_subs[hashX] alias = self.hashX_subs[hashX]
if self.UGLY_COUNT == 25:
print('sleeping for ', hashX)
if not self.bp.block_notify.is_set():
await self.bp.block_notify.wait()
await asyncio.sleep(3)
status = await self.address_status(hashX) status = await self.address_status(hashX)
changed[alias] = status changed[alias] = status
@ -960,13 +964,11 @@ class LBRYElectrumX(SessionBase):
method = 'blockchain.scripthash.subscribe' method = 'blockchain.scripthash.subscribe'
else: else:
method = 'blockchain.address.subscribe' method = 'blockchain.address.subscribe'
start = time.perf_counter()
try: if not self.bp.block_notify.is_set():
await self.send_notification(method, (alias, status)) await self.bp.block_notify.wait()
except asyncio.TimeoutError: t = asyncio.create_task(self.send_notification(method, (alias, status)))
self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True)) t.add_done_callback(lambda _: self.logger.warning("sent notification to %s in %s", alias, time.perf_counter() - start))
self.abort()
return
if changed: if changed:
es = '' if len(changed) == 1 else 'es' es = '' if len(changed) == 1 else 'es'
@ -1180,6 +1182,7 @@ class LBRYElectrumX(SessionBase):
""" """
# Note history is ordered and mempool unordered in electrum-server # Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if it has unconfirmed inputs, otherwise 0 # For mempool, height is -1 if it has unconfirmed inputs, otherwise 0
db_history = await self.session_mgr.limited_history(hashX) db_history = await self.session_mgr.limited_history(hashX)
mempool = await self.mempool.transaction_summaries(hashX) mempool = await self.mempool.transaction_summaries(hashX)

View file

@ -40,6 +40,8 @@ class ClaimTestCase(CommandTestCase):
class ClaimSearchCommand(ClaimTestCase): class ClaimSearchCommand(ClaimTestCase):
VERBOSITY = logging.WARNING
async def create_channel(self): async def create_channel(self):
self.channel = await self.channel_create('@abc', '1.0') self.channel = await self.channel_create('@abc', '1.0')
self.channel_id = self.get_claim_id(self.channel) self.channel_id = self.get_claim_id(self.channel)
@ -157,6 +159,60 @@ class ClaimSearchCommand(ClaimTestCase):
await self.stream_abandon(txid=signed2['txid'], nout=0) await self.stream_abandon(txid=signed2['txid'], nout=0)
await self.assertFindsClaims([], channel_ids=[channel_id2]) await self.assertFindsClaims([], channel_ids=[channel_id2])
async def test_break_it(self):
await self.generate(5)
address = await self.account.receiving.get_or_create_usable_address()
sendtxid = await self.blockchain.send_to_address(address, 1)
await self.confirm_tx(sendtxid)
address = await self.account.receiving.get_or_create_usable_address()
sendtxid = await self.blockchain.send_to_address(address, 1)
await self.confirm_tx(sendtxid)
address = await self.account.receiving.get_or_create_usable_address()
sendtxid = await self.blockchain.send_to_address(address, 1)
await self.confirm_tx(sendtxid)
address = await self.account.receiving.get_or_create_usable_address()
sendtxid = await self.blockchain.send_to_address(address, 1)
await self.confirm_tx(sendtxid)
await self.generate(7)
async def _doit(n):
try:
await self.daemon.jsonrpc_channel_create(
name=f'@arena{n}', bid='0.1', blocking=True
)
except InsufficientFundsError:
pass
def doit(n):
asyncio.create_task(_doit(n))
async def break_it():
count = 0
for _ in range(4):
for _ in range(10):
count += 1
if not count % 7:
asyncio.create_task(self.generate(1))
doit(count)
if self.ledger._known_addresses_out_of_sync:
print('out of sync', self.ledger._known_addresses_out_of_sync)
await asyncio.sleep(1)
await self.generate(1)
bp = self.conductor.spv_node.server.bp
break_task = asyncio.create_task(break_it())
address = await self.ledger.went_out_of_sync.get()
bp.block_notify.clear()
print('%s is out of sync' % address)
with self.assertRaises(InsufficientFundsError):
await self.daemon.jsonrpc_channel_create(
name=f'@derp', bid='0.1', blocking=True
)
self.assertTrue(False)
print("woohoo")
if not break_task.done():
break_task.cancel()
async def test_pagination(self): async def test_pagination(self):
await self.create_channel() await self.create_channel()
await self.create_lots_of_streams() await self.create_lots_of_streams()

View file

@ -28,9 +28,10 @@ def mock_config():
class BlobExchangeTestBase(AsyncioTestCase): class BlobExchangeTestBase(AsyncioTestCase):
async def asyncSetUp(self): async def asyncSetUp(self):
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.client_wallet_dir = tempfile.mkdtemp()
self.client_dir = tempfile.mkdtemp() self.client_dir = tempfile.mkdtemp()
self.server_dir = tempfile.mkdtemp() self.server_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.client_wallet_dir)
self.addCleanup(shutil.rmtree, self.client_dir) self.addCleanup(shutil.rmtree, self.client_dir)
self.addCleanup(shutil.rmtree, self.server_dir) self.addCleanup(shutil.rmtree, self.server_dir)
self.server_config = Config(data_dir=self.server_dir, download_dir=self.server_dir, wallet=self.server_dir, self.server_config = Config(data_dir=self.server_dir, download_dir=self.server_dir, wallet=self.server_dir,
@ -39,8 +40,8 @@ class BlobExchangeTestBase(AsyncioTestCase):
self.server_blob_manager = BlobManager(self.loop, self.server_dir, self.server_storage, self.server_config) self.server_blob_manager = BlobManager(self.loop, self.server_dir, self.server_storage, self.server_config)
self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP') self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')
self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir, wallet=self.client_dir, self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir,
fixed_peers=[]) wallet=self.client_wallet_dir, fixed_peers=[])
self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite")) self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite"))
self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage, self.client_config) self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage, self.client_config)
self.client_peer_manager = PeerManager(self.loop) self.client_peer_manager = PeerManager(self.loop)

View file

@ -65,7 +65,7 @@ def get_claim_transaction(claim_name, claim=b''):
) )
async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None): async def get_mock_wallet(sd_hash, storage, wallet_dir, balance=10.0, fee=None):
claim = Claim() claim = Claim()
if fee: if fee:
if fee['currency'] == 'LBC': if fee['currency'] == 'LBC':
@ -97,7 +97,7 @@ async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
wallet = Wallet() wallet = Wallet()
ledger = Ledger({ ledger = Ledger({
'db': Database(':memory:'), 'db': Database(os.path.join(wallet_dir, 'blockchain.db')),
'headers': FakeHeaders(514082) 'headers': FakeHeaders(514082)
}) })
await ledger.db.open() await ledger.db.open()
@ -136,7 +136,8 @@ class TestStreamManager(BlobExchangeTestBase):
self.loop, self.server_blob_manager.blob_dir, file_path, old_sort=old_sort self.loop, self.server_blob_manager.blob_dir, file_path, old_sort=old_sort
) )
self.sd_hash = descriptor.sd_hash self.sd_hash = descriptor.sd_hash
self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, balance, fee) self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, self.client_wallet_dir,
balance, fee)
analytics_manager = AnalyticsManager( analytics_manager = AnalyticsManager(
self.client_config, self.client_config,
binascii.hexlify(generate_id()).decode(), binascii.hexlify(generate_id()).decode(),

View file

@ -1,4 +1,7 @@
import os
import unittest import unittest
import tempfile
import shutil
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from itertools import cycle from itertools import cycle
@ -302,9 +305,11 @@ class TestTransactionSigning(AsyncioTestCase):
class TransactionIOBalancing(AsyncioTestCase): class TransactionIOBalancing(AsyncioTestCase):
async def asyncSetUp(self): async def asyncSetUp(self):
wallet_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, wallet_dir)
self.ledger = Ledger({ self.ledger = Ledger({
'db': Database(':memory:'), 'db': Database(os.path.join(wallet_dir, 'blockchain.db')),
'headers': Headers(':memory:') 'headers': Headers(':memory:'),
}) })
await self.ledger.db.open() await self.ledger.db.open()
self.account = Account.from_dict( self.account = Account.from_dict(
@ -419,3 +424,68 @@ class TransactionIOBalancing(AsyncioTestCase):
self.assertListEqual([0.01, 1], self.inputs(tx)) self.assertListEqual([0.01, 1], self.inputs(tx))
# change is now needed to consume extra input # change is now needed to consume extra input
self.assertListEqual([0.97], self.outputs(tx)) self.assertListEqual([0.97], self.outputs(tx))
async def test_basic_use_cases_sqlite(self):
self.ledger.coin_selection_strategy = 'sqlite'
self.ledger.fee_per_byte = int(.01*CENT)
# available UTXOs for filling missing inputs
utxos = await self.create_utxos([
1, 1, 3, 5, 10
])
# pay 3 coins (3.07 w/ fees)
tx = await self.tx(
[], # inputs
[self.txo(3)] # outputs
)
# there are 4x 1.0 utxos available
self.assertListEqual(self.inputs(tx), [1.0, 1.0, 1.0, 1.0])
# a change of 0.93 is added to reach balance
self.assertListEqual(self.outputs(tx), [3, 0.93])
await self.ledger.release_outputs(utxos)
# pay 6.917 coins (7.00 w/ fees)
tx = await self.tx(
[], # inputs
[self.txo(6.917)] # outputs
)
self.assertListEqual(self.inputs(tx), [1.0, 1.0, 1.0, 1.0, 3.0])
self.assertListEqual(self.outputs(tx), [6.92])
await self.ledger.release_outputs(utxos)
# supplied input and output, but input is not enough to cover output
tx = await self.tx(
[self.txi(self.txo(10))], # inputs
[self.txo(11)] # outputs
)
# additional input is chosen (UTXO 1)
self.assertListEqual([10, 1.0, 1.0], self.inputs(tx))
# change is now needed to consume extra input
self.assertListEqual([11, 0.95], self.outputs(tx))
await self.ledger.release_outputs(utxos)
# liquidating a UTXO
tx = await self.tx(
[self.txi(self.txo(10))], # inputs
[] # outputs
)
self.assertListEqual([10], self.inputs(tx))
# missing change added to consume the amount
self.assertListEqual([9.98], self.outputs(tx))
await self.ledger.release_outputs(utxos)
# liquidating at a loss, requires adding extra inputs
tx = await self.tx(
[self.txi(self.txo(0.01))], # inputs
[] # outputs
)
# UTXO 1 is added to cover some of the fee
self.assertListEqual([0.01, 1], self.inputs(tx))
# change is now needed to consume extra input
self.assertListEqual([0.97], self.outputs(tx))