Compare commits
16 commits
Author | SHA1 | Date | |
---|---|---|---|
|
1298e0e725 | ||
|
5d5cd3499a | ||
|
c7e0e9f085 | ||
|
70464a23d2 | ||
|
4217bbbea6 | ||
|
8a06b1b5fa | ||
|
00c89a4f0b | ||
|
ad7441ca47 | ||
|
f461fc6ef9 | ||
|
1c4c18dec9 | ||
|
597a101030 | ||
|
adbbb11dbf | ||
|
b66281eabc | ||
|
be5f6491bc | ||
|
9d34e027f3 | ||
|
6dbea6f4ab |
15 changed files with 331 additions and 60 deletions
|
@ -634,7 +634,7 @@ class Config(CLIConfig):
|
|||
|
||||
coin_selection_strategy = StringChoice(
|
||||
"Strategy to use when selecting UTXOs for a transaction",
|
||||
STRATEGIES, "standard")
|
||||
STRATEGIES, "sqlite")
|
||||
|
||||
transaction_cache_size = Integer("Transaction cache size", 100_000)
|
||||
save_resolved_claims = Toggle(
|
||||
|
|
|
@ -2569,9 +2569,9 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
account.add_channel_private_key(txo.private_key)
|
||||
wallet.save()
|
||||
await self.broadcast_or_release(tx, blocking)
|
||||
await self.storage.save_claims([self._old_get_temp_claim_info(
|
||||
self.component_manager.loop.create_task(self.storage.save_claims([self._old_get_temp_claim_info(
|
||||
tx, txo, claim_address, claim, name, dewies_to_lbc(amount)
|
||||
)])
|
||||
)]))
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_new_channel())
|
||||
else:
|
||||
await account.ledger.release_tx(tx)
|
||||
|
@ -2725,9 +2725,9 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
account.add_channel_private_key(new_txo.private_key)
|
||||
wallet.save()
|
||||
await self.broadcast_or_release(tx, blocking)
|
||||
await self.storage.save_claims([self._old_get_temp_claim_info(
|
||||
self.component_manager.loop.create_task(self.storage.save_claims([self._old_get_temp_claim_info(
|
||||
tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount)
|
||||
)])
|
||||
)]))
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_new_channel())
|
||||
else:
|
||||
await account.ledger.release_tx(tx)
|
||||
|
@ -3262,14 +3262,18 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
|
||||
if not preview:
|
||||
await self.broadcast_or_release(tx, blocking)
|
||||
await self.storage.save_claims([self._old_get_temp_claim_info(
|
||||
tx, new_txo, claim_address, claim, name, dewies_to_lbc(amount)
|
||||
)])
|
||||
await self.storage.save_content_claim(file_stream.stream_hash, new_txo.id)
|
||||
|
||||
async def save_claims():
|
||||
await self.storage.save_claims([self._old_get_temp_claim_info(
|
||||
tx, new_txo, claim_address, claim, name, dewies_to_lbc(amount)
|
||||
)])
|
||||
await self.storage.save_content_claim(file_stream.stream_hash, new_txo.id)
|
||||
|
||||
self.component_manager.loop.create_task(save_claims())
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish'))
|
||||
else:
|
||||
await account.ledger.release_tx(tx)
|
||||
|
||||
log.info("successful publish %s", tx.id)
|
||||
return tx
|
||||
|
||||
@requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
|
||||
|
@ -3480,11 +3484,15 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
|
||||
if not preview:
|
||||
await self.broadcast_or_release(tx, blocking)
|
||||
await self.storage.save_claims([self._old_get_temp_claim_info(
|
||||
tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount)
|
||||
)])
|
||||
if stream_hash:
|
||||
await self.storage.save_content_claim(stream_hash, new_txo.id)
|
||||
|
||||
async def save_claims():
|
||||
await self.storage.save_claims([self._old_get_temp_claim_info(
|
||||
tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount)
|
||||
)])
|
||||
if stream_hash:
|
||||
await self.storage.save_content_claim(stream_hash, new_txo.id)
|
||||
|
||||
self.component_manager.loop.create_task(save_claims())
|
||||
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish'))
|
||||
else:
|
||||
await account.ledger.release_tx(tx)
|
||||
|
|
|
@ -5,7 +5,7 @@ from lbry.wallet.transaction import OutputEffectiveAmountEstimator
|
|||
|
||||
MAXIMUM_TRIES = 100000
|
||||
|
||||
STRATEGIES = []
|
||||
STRATEGIES = ['sqlite'] # sqlite coin chooser is in database.py
|
||||
|
||||
|
||||
def strategy(method):
|
||||
|
|
|
@ -4,6 +4,7 @@ import asyncio
|
|||
import sqlite3
|
||||
import platform
|
||||
from binascii import hexlify
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass
|
||||
from contextvars import ContextVar
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
|
@ -14,7 +15,7 @@ from prometheus_client import Gauge, Counter, Histogram
|
|||
from lbry.utils import LockWithMetrics
|
||||
|
||||
from .bip32 import PubKey
|
||||
from .transaction import Transaction, Output, OutputScript, TXRefImmutable
|
||||
from .transaction import Transaction, Output, OutputScript, TXRefImmutable, Input
|
||||
from .constants import TXO_TYPES, CLAIM_TYPES
|
||||
from .util import date_to_julian_day
|
||||
|
||||
|
@ -466,6 +467,70 @@ def dict_row_factory(cursor, row):
|
|||
return d
|
||||
|
||||
|
||||
def get_spendable_utxos(transaction: sqlite3.Connection, accounts: List, reserve_amount: int, floor: int,
|
||||
fee_per_byte: int):
|
||||
txs = defaultdict(list)
|
||||
decoded_transactions = {}
|
||||
|
||||
accumulated = 0
|
||||
multiplier = 10
|
||||
gap_count = 0
|
||||
accounts_fmt = ",".join(["?"] * len(accounts))
|
||||
txo_query = f"""
|
||||
SELECT tx.txid, txo.txoid, tx.raw, tx.height, txo.position as nout, tx.is_verified, txo.amount FROM txo
|
||||
INNER JOIN account_address USING (address)
|
||||
LEFT JOIN txi USING (txoid)
|
||||
INNER JOIN tx USING (txid)
|
||||
WHERE txo.txo_type=0 AND txi.txoid IS NULL AND tx.txid IS NOT NULL AND tx.is_verified AND NOT txo.is_reserved
|
||||
AND txo.amount BETWEEN ? AND ?
|
||||
"""
|
||||
if accounts:
|
||||
txo_query += f"""
|
||||
AND account_address.account {'= ?' if len(accounts_fmt) == 1 else 'IN (' + accounts_fmt + ')'}
|
||||
"""
|
||||
reserved = []
|
||||
while accumulated < reserve_amount:
|
||||
found_txs = False
|
||||
# prefer confirmed, but save unconfirmed utxos from this selection in case they are needed
|
||||
unconfirmed = []
|
||||
for row in transaction.execute(txo_query, (floor, floor * multiplier, *accounts)):
|
||||
(txid, txoid, raw, height, nout, verified, amount) = row.values()
|
||||
found_txs = True
|
||||
if txid not in decoded_transactions:
|
||||
decoded_transactions[txid] = Transaction(raw)
|
||||
decoded_tx = decoded_transactions[txid]
|
||||
if not verified:
|
||||
unconfirmed.append((txid, txoid, raw, height, nout, verified, amount))
|
||||
continue
|
||||
accumulated += amount
|
||||
accumulated -= Input.spend(decoded_tx.outputs[nout]).size * fee_per_byte
|
||||
txs[(raw, height, verified)].append(nout)
|
||||
reserved.append(txoid)
|
||||
if accumulated >= reserve_amount:
|
||||
break
|
||||
unconfirmed.reverse()
|
||||
while unconfirmed:
|
||||
(txid, txoid, raw, height, nout, verified, amount) = unconfirmed.pop()
|
||||
decoded_tx = decoded_transactions[txid]
|
||||
accumulated += amount
|
||||
accumulated -= Input.spend(decoded_tx.outputs[nout]).size * fee_per_byte
|
||||
txs[(raw, height, verified)].append(nout)
|
||||
reserved.append(txoid)
|
||||
if accumulated >= reserve_amount:
|
||||
break
|
||||
if not found_txs:
|
||||
gap_count += 1
|
||||
if gap_count == 5:
|
||||
break
|
||||
floor *= multiplier
|
||||
# reserve the accumulated txos if enough were found
|
||||
if accumulated >= reserve_amount:
|
||||
transaction.executemany("UPDATE txo SET is_reserved = ? WHERE txoid = ?",
|
||||
[(True, txoid) for txoid in reserved]).fetchall()
|
||||
|
||||
return txs
|
||||
|
||||
|
||||
class Database(SQLiteMixin):
|
||||
|
||||
SCHEMA_VERSION = "1.3"
|
||||
|
@ -666,6 +731,19 @@ class Database(SQLiteMixin):
|
|||
# 2. update address histories removing deleted TXs
|
||||
return True
|
||||
|
||||
async def get_spendable_utxos(self, ledger, reserve_amount, accounts: Optional[Iterable], min_amount: int = 100000,
|
||||
fee_per_byte: int = 50) -> List:
|
||||
to_spend = await self.db.run(
|
||||
get_spendable_utxos, tuple(account.id for account in accounts), reserve_amount, min_amount,
|
||||
fee_per_byte
|
||||
)
|
||||
txos = []
|
||||
for (raw, height, verified), positions in to_spend.items():
|
||||
tx = Transaction(raw, height=height, is_verified=verified)
|
||||
for nout in positions:
|
||||
txos.append(tx.outputs[nout].get_estimator(ledger))
|
||||
return txos
|
||||
|
||||
async def select_transactions(self, cols, accounts=None, read_only=False, **constraints):
|
||||
if not {'txid', 'txid__in'}.intersection(constraints):
|
||||
assert accounts, "'accounts' argument required when no 'txid' constraint is present"
|
||||
|
|
|
@ -133,7 +133,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
self._on_transaction_controller = StreamController()
|
||||
self.on_transaction = self._on_transaction_controller.stream
|
||||
self.on_transaction.listen(
|
||||
lambda e: log.info(
|
||||
lambda e: log.debug(
|
||||
'(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s',
|
||||
self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id
|
||||
)
|
||||
|
@ -167,6 +167,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
|
||||
self.coin_selection_strategy = None
|
||||
self._known_addresses_out_of_sync = set()
|
||||
self.went_out_of_sync = asyncio.Queue()
|
||||
|
||||
self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char)
|
||||
self._balance_cache = pylru.lrucache(100000)
|
||||
|
@ -244,11 +245,16 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
def get_address_count(self, **constraints):
|
||||
return self.db.get_address_count(**constraints)
|
||||
|
||||
async def get_spendable_utxos(self, amount: int, funding_accounts):
|
||||
async def get_spendable_utxos(self, amount: int, funding_accounts: Optional[Iterable['Account']],
|
||||
min_amount=100000):
|
||||
min_amount = min(amount // 10, min_amount)
|
||||
fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self)
|
||||
selector = CoinSelector(amount, fee)
|
||||
async with self._utxo_reservation_lock:
|
||||
if self.coin_selection_strategy == 'sqlite':
|
||||
return await self.db.get_spendable_utxos(self, amount + fee, funding_accounts, min_amount=min_amount,
|
||||
fee_per_byte=self.fee_per_byte)
|
||||
txos = await self.get_effective_amount_estimators(funding_accounts)
|
||||
fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self)
|
||||
selector = CoinSelector(amount, fee)
|
||||
spendables = selector.select(txos, self.coin_selection_strategy)
|
||||
if spendables:
|
||||
await self.reserve_outputs(s.txo for s in spendables)
|
||||
|
@ -492,12 +498,16 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
local_status, local_history = await self.get_local_status_and_history(address)
|
||||
|
||||
if local_status == remote_status:
|
||||
log.info("no new txs needed for %s, remote matches local", address)
|
||||
return True
|
||||
|
||||
remote_history = await self.network.retriable_call(self.network.get_history, address)
|
||||
remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history))
|
||||
we_need = set(remote_history) - set(local_history)
|
||||
if not we_need:
|
||||
log.info(
|
||||
"no new txs needed for %s, local status %s, remote status %s", address, local_status, remote_status
|
||||
)
|
||||
return True
|
||||
|
||||
cache_tasks: List[asyncio.Task[Transaction]] = []
|
||||
|
@ -563,16 +573,35 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
await self.get_local_status_and_history(address, synced_history.getvalue())
|
||||
if local_status != remote_status:
|
||||
if local_history == remote_history:
|
||||
log.warning(
|
||||
"%s has a synced history but a mismatched status", address
|
||||
)
|
||||
return True
|
||||
remote_set = set(remote_history)
|
||||
local_set = set(local_history)
|
||||
log.warning(
|
||||
"Wallet is out of sync after syncing. Remote: %s with %d items, local: %s with %d items",
|
||||
remote_status, len(remote_history), local_status, len(local_history)
|
||||
"%s is out of sync after syncing.\n"
|
||||
"Remote: %s with %d items (%i unique), local: %s with %d items (%i unique).\n"
|
||||
"Histories are mismatched on %i items.\n"
|
||||
"Local is missing\n"
|
||||
"%s\n"
|
||||
"Remote is missing\n"
|
||||
"%s\n"
|
||||
"******",
|
||||
address, remote_status, len(remote_history), len(remote_set),
|
||||
local_status, len(local_history), len(local_set), len(remote_set.symmetric_difference(local_set)),
|
||||
"\n".join([f"{txid} - {height}" for txid, height in remote_set.difference(local_set)]),
|
||||
"\n".join([f"{txid} - {height}" for txid, height in local_set.difference(remote_set)])
|
||||
)
|
||||
log.warning("local: %s", local_history)
|
||||
log.warning("remote: %s", remote_history)
|
||||
# log.warning("local: %s", local_history)
|
||||
# log.warning("remote: %s", remote_history)
|
||||
self.went_out_of_sync.put_nowait(address)
|
||||
self._known_addresses_out_of_sync.add(address)
|
||||
return False
|
||||
else:
|
||||
log.warning(
|
||||
"%s is synced", address
|
||||
)
|
||||
return True
|
||||
|
||||
async def cache_transaction(self, txid, remote_height, check_local=True):
|
||||
|
|
|
@ -302,6 +302,9 @@ class WalletManager:
|
|||
await self.ledger.broadcast(tx)
|
||||
if blocking:
|
||||
await self.ledger.wait(tx, timeout=None)
|
||||
except CodeMessageError as err:
|
||||
log.warning("transaction rejected, leaving reserved")
|
||||
raise
|
||||
except:
|
||||
await self.ledger.release_tx(tx)
|
||||
raise
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import logging
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
from time import perf_counter
|
||||
from operator import itemgetter
|
||||
from typing import Dict, Optional, Tuple
|
||||
|
@ -54,8 +55,8 @@ class ClientSession(BaseClientSession):
|
|||
return result
|
||||
|
||||
async def send_request(self, method, args=()):
|
||||
log.info("%i in flight, send %s%s to %s:%i ", self.pending_amount, method, tuple(args), *self.server)
|
||||
self.pending_amount += 1
|
||||
log.debug("send %s%s to %s:%i", method, tuple(args), *self.server)
|
||||
try:
|
||||
if method == 'server.version':
|
||||
return await self.send_timed_server_version_request(args, self.timeout)
|
||||
|
@ -63,11 +64,11 @@ class ClientSession(BaseClientSession):
|
|||
while not request.done():
|
||||
done, pending = await asyncio.wait([request], timeout=self.timeout)
|
||||
if pending:
|
||||
log.debug("Time since last packet: %s", perf_counter() - self.last_packet_received)
|
||||
log.info("Time since last packet: %s", perf_counter() - self.last_packet_received)
|
||||
if (perf_counter() - self.last_packet_received) < self.timeout:
|
||||
continue
|
||||
log.info("timeout sending %s to %s:%i", method, *self.server)
|
||||
raise asyncio.TimeoutError
|
||||
log.warning("timeout sending %s(%s) to %s:%i", method, str(args), *self.server)
|
||||
raise asyncio.TimeoutError()
|
||||
if done:
|
||||
try:
|
||||
return request.result()
|
||||
|
@ -91,6 +92,7 @@ class ClientSession(BaseClientSession):
|
|||
raise
|
||||
finally:
|
||||
self.pending_amount -= 1
|
||||
log.info("%i in flight, finished %s%s ", self.pending_amount, method, tuple(args))
|
||||
|
||||
async def ensure_session(self):
|
||||
# Handles reconnecting and maintaining a session alive
|
||||
|
@ -144,12 +146,12 @@ class ClientSession(BaseClientSession):
|
|||
controller.add(request.args)
|
||||
|
||||
def connection_lost(self, exc):
|
||||
log.debug("Connection lost: %s:%d", *self.server)
|
||||
log.warning("Connection lost: %s:%d", *self.server)
|
||||
super().connection_lost(exc)
|
||||
self.response_time = None
|
||||
self.connection_latency = None
|
||||
self._response_samples = 0
|
||||
self.pending_amount = 0
|
||||
# self.pending_amount = 0
|
||||
self._on_disconnect_controller.add(True)
|
||||
|
||||
|
||||
|
@ -274,8 +276,13 @@ class Network:
|
|||
return self.rpc('blockchain.block.headers', [height, count, 0, b64], restricted)
|
||||
|
||||
# --- Subscribes, history and broadcasts are always aimed towards the master client directly
|
||||
def get_history(self, address):
|
||||
return self.rpc('blockchain.address.get_history', [address], True)
|
||||
async def get_history(self, address):
|
||||
log.info("get history %s", address)
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
return await self.rpc('blockchain.address.get_history', [address], True)
|
||||
finally:
|
||||
log.info("%s history took %s", address, time.perf_counter() - start)
|
||||
|
||||
def broadcast(self, raw_transaction):
|
||||
return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True)
|
||||
|
@ -285,16 +292,18 @@ class Network:
|
|||
|
||||
async def subscribe_address(self, address, *addresses):
|
||||
addresses = list((address, ) + addresses)
|
||||
server_addr_and_port = self.client.server_address_and_port # on disconnect client will be None
|
||||
try:
|
||||
return await self.rpc('blockchain.address.subscribe', addresses, True)
|
||||
except asyncio.TimeoutError:
|
||||
log.warning(
|
||||
"timed out subscribing to addresses from %s:%i",
|
||||
*self.client.server_address_and_port
|
||||
*server_addr_and_port
|
||||
)
|
||||
# abort and cancel, we can't lose a subscription, it will happen again on reconnect
|
||||
if self.client:
|
||||
self.client.abort()
|
||||
log.warning("raise cancelled")
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
def unsubscribe_address(self, address):
|
||||
|
|
|
@ -480,6 +480,7 @@ class RPCSession(SessionBase):
|
|||
await event.wait()
|
||||
result = event.result
|
||||
if isinstance(result, Exception):
|
||||
self.logger.error("%s sending %s(%s)", result, method, "" if not args else ",".join([str(a) for a in args]))
|
||||
raise result
|
||||
return result
|
||||
|
||||
|
|
|
@ -183,6 +183,7 @@ class BlockProcessor:
|
|||
self.state_lock = asyncio.Lock()
|
||||
|
||||
self.search_cache = {}
|
||||
self.history_cache = {}
|
||||
|
||||
async def run_in_thread_with_lock(self, func, *args):
|
||||
# Run in a thread to prevent blocking. Shielded so that
|
||||
|
@ -213,6 +214,7 @@ class BlockProcessor:
|
|||
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
|
||||
for cache in self.search_cache.values():
|
||||
cache.clear()
|
||||
self.history_cache.clear()
|
||||
await self._maybe_flush()
|
||||
processed_time = time.perf_counter() - start
|
||||
self.block_count_metric.set(self.height)
|
||||
|
@ -754,6 +756,8 @@ class LBRYBlockProcessor(BlockProcessor):
|
|||
self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}")
|
||||
self.sql: SQLDB = self.db.sql
|
||||
self.timer = Timer('BlockProcessor')
|
||||
self.block_notify = asyncio.Event()
|
||||
self.block_notify.set()
|
||||
|
||||
def advance_blocks(self, blocks):
|
||||
self.sql.begin()
|
||||
|
|
|
@ -213,10 +213,18 @@ class MemPool:
|
|||
continue
|
||||
hashes = {hex_str_to_hash(hh) for hh in hex_hashes}
|
||||
async with self.lock:
|
||||
start = time.perf_counter()
|
||||
touched = await self._process_mempool(hashes)
|
||||
duration = time.perf_counter() - start
|
||||
if duration >= 1:
|
||||
self.logger.info("processed mempool in %s", duration)
|
||||
synchronized_event.set()
|
||||
synchronized_event.clear()
|
||||
start = time.perf_counter()
|
||||
await self.api.on_mempool(touched, height)
|
||||
duration = time.perf_counter() - start
|
||||
if duration >= 1:
|
||||
self.logger.info("sent history notifications in %s", duration)
|
||||
try:
|
||||
# we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event)
|
||||
await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs)
|
||||
|
|
|
@ -135,7 +135,7 @@ class SessionManager:
|
|||
"docker_tag": DOCKER_TAG,
|
||||
'version': lbry.__version__,
|
||||
"min_version": util.version_string(VERSION.PROTOCOL_MIN),
|
||||
"cpu_count": os.cpu_count()
|
||||
"cpu_count": str(os.cpu_count())
|
||||
})
|
||||
session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE,
|
||||
labelnames=("version",))
|
||||
|
@ -177,7 +177,7 @@ class SessionManager:
|
|||
self.cur_group = SessionGroup(0)
|
||||
self.txs_sent = 0
|
||||
self.start_time = time.time()
|
||||
self.history_cache = pylru.lrucache(256)
|
||||
self.history_cache = self.bp.history_cache
|
||||
self.notified_height: typing.Optional[int] = None
|
||||
# Cache some idea of room to avoid recounting on each subscription
|
||||
self.subs_room = 0
|
||||
|
@ -608,26 +608,20 @@ class SessionManager:
|
|||
|
||||
async def limited_history(self, hashX):
|
||||
"""A caching layer."""
|
||||
hc = self.history_cache
|
||||
if hashX not in hc:
|
||||
if hashX not in self.history_cache:
|
||||
# History DoS limit. Each element of history is about 99
|
||||
# bytes when encoded as JSON. This limits resource usage
|
||||
# on bloated history requests, and uses a smaller divisor
|
||||
# so large requests are logged before refusing them.
|
||||
limit = self.env.max_send // 97
|
||||
hc[hashX] = await self.db.limited_history(hashX, limit=limit)
|
||||
return hc[hashX]
|
||||
self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit)
|
||||
return self.history_cache[hashX]
|
||||
|
||||
async def _notify_sessions(self, height, touched):
|
||||
"""Notify sessions about height changes and touched addresses."""
|
||||
height_changed = height != self.notified_height
|
||||
if height_changed:
|
||||
await self._refresh_hsub_results(height)
|
||||
# Invalidate our history cache for touched hashXs
|
||||
hc = self.history_cache
|
||||
for hashX in set(hc).intersection(touched):
|
||||
del hc[hashX]
|
||||
|
||||
if self.sessions:
|
||||
await asyncio.wait([
|
||||
session.notify(touched, height_changed) for session in self.sessions
|
||||
|
@ -921,10 +915,15 @@ class LBRYElectrumX(SessionBase):
|
|||
def sub_count(self):
|
||||
return len(self.hashX_subs)
|
||||
|
||||
UGLY_COUNT = 0
|
||||
|
||||
async def notify(self, touched, height_changed):
|
||||
"""Notify the client about changes to touched addresses (from mempool
|
||||
updates or new blocks) and height.
|
||||
"""
|
||||
|
||||
self.UGLY_COUNT += 1
|
||||
|
||||
if height_changed and self.subscribe_headers:
|
||||
args = (await self.subscribe_headers_result(), )
|
||||
try:
|
||||
|
@ -940,6 +939,11 @@ class LBRYElectrumX(SessionBase):
|
|||
|
||||
for hashX in touched:
|
||||
alias = self.hashX_subs[hashX]
|
||||
if self.UGLY_COUNT == 25:
|
||||
print('sleeping for ', hashX)
|
||||
if not self.bp.block_notify.is_set():
|
||||
await self.bp.block_notify.wait()
|
||||
await asyncio.sleep(3)
|
||||
status = await self.address_status(hashX)
|
||||
changed[alias] = status
|
||||
|
||||
|
@ -960,13 +964,11 @@ class LBRYElectrumX(SessionBase):
|
|||
method = 'blockchain.scripthash.subscribe'
|
||||
else:
|
||||
method = 'blockchain.address.subscribe'
|
||||
|
||||
try:
|
||||
await self.send_notification(method, (alias, status))
|
||||
except asyncio.TimeoutError:
|
||||
self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True))
|
||||
self.abort()
|
||||
return
|
||||
start = time.perf_counter()
|
||||
if not self.bp.block_notify.is_set():
|
||||
await self.bp.block_notify.wait()
|
||||
t = asyncio.create_task(self.send_notification(method, (alias, status)))
|
||||
t.add_done_callback(lambda _: self.logger.warning("sent notification to %s in %s", alias, time.perf_counter() - start))
|
||||
|
||||
if changed:
|
||||
es = '' if len(changed) == 1 else 'es'
|
||||
|
@ -1180,6 +1182,7 @@ class LBRYElectrumX(SessionBase):
|
|||
"""
|
||||
# Note history is ordered and mempool unordered in electrum-server
|
||||
# For mempool, height is -1 if it has unconfirmed inputs, otherwise 0
|
||||
|
||||
db_history = await self.session_mgr.limited_history(hashX)
|
||||
mempool = await self.mempool.transaction_summaries(hashX)
|
||||
|
||||
|
|
|
@ -40,6 +40,8 @@ class ClaimTestCase(CommandTestCase):
|
|||
|
||||
class ClaimSearchCommand(ClaimTestCase):
|
||||
|
||||
VERBOSITY = logging.WARNING
|
||||
|
||||
async def create_channel(self):
|
||||
self.channel = await self.channel_create('@abc', '1.0')
|
||||
self.channel_id = self.get_claim_id(self.channel)
|
||||
|
@ -157,6 +159,60 @@ class ClaimSearchCommand(ClaimTestCase):
|
|||
await self.stream_abandon(txid=signed2['txid'], nout=0)
|
||||
await self.assertFindsClaims([], channel_ids=[channel_id2])
|
||||
|
||||
async def test_break_it(self):
|
||||
await self.generate(5)
|
||||
address = await self.account.receiving.get_or_create_usable_address()
|
||||
sendtxid = await self.blockchain.send_to_address(address, 1)
|
||||
await self.confirm_tx(sendtxid)
|
||||
address = await self.account.receiving.get_or_create_usable_address()
|
||||
sendtxid = await self.blockchain.send_to_address(address, 1)
|
||||
await self.confirm_tx(sendtxid)
|
||||
address = await self.account.receiving.get_or_create_usable_address()
|
||||
sendtxid = await self.blockchain.send_to_address(address, 1)
|
||||
await self.confirm_tx(sendtxid)
|
||||
address = await self.account.receiving.get_or_create_usable_address()
|
||||
sendtxid = await self.blockchain.send_to_address(address, 1)
|
||||
await self.confirm_tx(sendtxid)
|
||||
await self.generate(7)
|
||||
|
||||
async def _doit(n):
|
||||
try:
|
||||
await self.daemon.jsonrpc_channel_create(
|
||||
name=f'@arena{n}', bid='0.1', blocking=True
|
||||
)
|
||||
except InsufficientFundsError:
|
||||
pass
|
||||
|
||||
def doit(n):
|
||||
asyncio.create_task(_doit(n))
|
||||
|
||||
async def break_it():
|
||||
count = 0
|
||||
for _ in range(4):
|
||||
for _ in range(10):
|
||||
count += 1
|
||||
if not count % 7:
|
||||
asyncio.create_task(self.generate(1))
|
||||
doit(count)
|
||||
if self.ledger._known_addresses_out_of_sync:
|
||||
print('out of sync', self.ledger._known_addresses_out_of_sync)
|
||||
await asyncio.sleep(1)
|
||||
await self.generate(1)
|
||||
|
||||
bp = self.conductor.spv_node.server.bp
|
||||
break_task = asyncio.create_task(break_it())
|
||||
address = await self.ledger.went_out_of_sync.get()
|
||||
bp.block_notify.clear()
|
||||
print('%s is out of sync' % address)
|
||||
with self.assertRaises(InsufficientFundsError):
|
||||
await self.daemon.jsonrpc_channel_create(
|
||||
name=f'@derp', bid='0.1', blocking=True
|
||||
)
|
||||
self.assertTrue(False)
|
||||
print("woohoo")
|
||||
if not break_task.done():
|
||||
break_task.cancel()
|
||||
|
||||
async def test_pagination(self):
|
||||
await self.create_channel()
|
||||
await self.create_lots_of_streams()
|
||||
|
|
|
@ -28,9 +28,10 @@ def mock_config():
|
|||
class BlobExchangeTestBase(AsyncioTestCase):
|
||||
async def asyncSetUp(self):
|
||||
self.loop = asyncio.get_event_loop()
|
||||
|
||||
self.client_wallet_dir = tempfile.mkdtemp()
|
||||
self.client_dir = tempfile.mkdtemp()
|
||||
self.server_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(shutil.rmtree, self.client_wallet_dir)
|
||||
self.addCleanup(shutil.rmtree, self.client_dir)
|
||||
self.addCleanup(shutil.rmtree, self.server_dir)
|
||||
self.server_config = Config(data_dir=self.server_dir, download_dir=self.server_dir, wallet=self.server_dir,
|
||||
|
@ -39,8 +40,8 @@ class BlobExchangeTestBase(AsyncioTestCase):
|
|||
self.server_blob_manager = BlobManager(self.loop, self.server_dir, self.server_storage, self.server_config)
|
||||
self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')
|
||||
|
||||
self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir, wallet=self.client_dir,
|
||||
fixed_peers=[])
|
||||
self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir,
|
||||
wallet=self.client_wallet_dir, fixed_peers=[])
|
||||
self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite"))
|
||||
self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage, self.client_config)
|
||||
self.client_peer_manager = PeerManager(self.loop)
|
||||
|
|
|
@ -65,7 +65,7 @@ def get_claim_transaction(claim_name, claim=b''):
|
|||
)
|
||||
|
||||
|
||||
async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
|
||||
async def get_mock_wallet(sd_hash, storage, wallet_dir, balance=10.0, fee=None):
|
||||
claim = Claim()
|
||||
if fee:
|
||||
if fee['currency'] == 'LBC':
|
||||
|
@ -97,7 +97,7 @@ async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
|
|||
|
||||
wallet = Wallet()
|
||||
ledger = Ledger({
|
||||
'db': Database(':memory:'),
|
||||
'db': Database(os.path.join(wallet_dir, 'blockchain.db')),
|
||||
'headers': FakeHeaders(514082)
|
||||
})
|
||||
await ledger.db.open()
|
||||
|
@ -136,7 +136,8 @@ class TestStreamManager(BlobExchangeTestBase):
|
|||
self.loop, self.server_blob_manager.blob_dir, file_path, old_sort=old_sort
|
||||
)
|
||||
self.sd_hash = descriptor.sd_hash
|
||||
self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, balance, fee)
|
||||
self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, self.client_wallet_dir,
|
||||
balance, fee)
|
||||
analytics_manager = AnalyticsManager(
|
||||
self.client_config,
|
||||
binascii.hexlify(generate_id()).decode(),
|
||||
|
|
|
@ -1,4 +1,7 @@
|
|||
import os
|
||||
import unittest
|
||||
import tempfile
|
||||
import shutil
|
||||
from binascii import hexlify, unhexlify
|
||||
from itertools import cycle
|
||||
|
||||
|
@ -302,9 +305,11 @@ class TestTransactionSigning(AsyncioTestCase):
|
|||
class TransactionIOBalancing(AsyncioTestCase):
|
||||
|
||||
async def asyncSetUp(self):
|
||||
wallet_dir = tempfile.mkdtemp()
|
||||
self.addCleanup(shutil.rmtree, wallet_dir)
|
||||
self.ledger = Ledger({
|
||||
'db': Database(':memory:'),
|
||||
'headers': Headers(':memory:')
|
||||
'db': Database(os.path.join(wallet_dir, 'blockchain.db')),
|
||||
'headers': Headers(':memory:'),
|
||||
})
|
||||
await self.ledger.db.open()
|
||||
self.account = Account.from_dict(
|
||||
|
@ -419,3 +424,68 @@ class TransactionIOBalancing(AsyncioTestCase):
|
|||
self.assertListEqual([0.01, 1], self.inputs(tx))
|
||||
# change is now needed to consume extra input
|
||||
self.assertListEqual([0.97], self.outputs(tx))
|
||||
|
||||
async def test_basic_use_cases_sqlite(self):
|
||||
self.ledger.coin_selection_strategy = 'sqlite'
|
||||
self.ledger.fee_per_byte = int(.01*CENT)
|
||||
|
||||
# available UTXOs for filling missing inputs
|
||||
utxos = await self.create_utxos([
|
||||
1, 1, 3, 5, 10
|
||||
])
|
||||
# pay 3 coins (3.07 w/ fees)
|
||||
tx = await self.tx(
|
||||
[], # inputs
|
||||
[self.txo(3)] # outputs
|
||||
)
|
||||
# there are 4x 1.0 utxos available
|
||||
self.assertListEqual(self.inputs(tx), [1.0, 1.0, 1.0, 1.0])
|
||||
|
||||
# a change of 0.93 is added to reach balance
|
||||
self.assertListEqual(self.outputs(tx), [3, 0.93])
|
||||
|
||||
await self.ledger.release_outputs(utxos)
|
||||
|
||||
# pay 6.917 coins (7.00 w/ fees)
|
||||
tx = await self.tx(
|
||||
[], # inputs
|
||||
[self.txo(6.917)] # outputs
|
||||
)
|
||||
|
||||
self.assertListEqual(self.inputs(tx), [1.0, 1.0, 1.0, 1.0, 3.0])
|
||||
self.assertListEqual(self.outputs(tx), [6.92])
|
||||
|
||||
await self.ledger.release_outputs(utxos)
|
||||
|
||||
# supplied input and output, but input is not enough to cover output
|
||||
tx = await self.tx(
|
||||
[self.txi(self.txo(10))], # inputs
|
||||
[self.txo(11)] # outputs
|
||||
)
|
||||
# additional input is chosen (UTXO 1)
|
||||
self.assertListEqual([10, 1.0, 1.0], self.inputs(tx))
|
||||
# change is now needed to consume extra input
|
||||
self.assertListEqual([11, 0.95], self.outputs(tx))
|
||||
|
||||
await self.ledger.release_outputs(utxos)
|
||||
|
||||
# liquidating a UTXO
|
||||
tx = await self.tx(
|
||||
[self.txi(self.txo(10))], # inputs
|
||||
[] # outputs
|
||||
)
|
||||
self.assertListEqual([10], self.inputs(tx))
|
||||
# missing change added to consume the amount
|
||||
self.assertListEqual([9.98], self.outputs(tx))
|
||||
|
||||
await self.ledger.release_outputs(utxos)
|
||||
|
||||
# liquidating at a loss, requires adding extra inputs
|
||||
tx = await self.tx(
|
||||
[self.txi(self.txo(0.01))], # inputs
|
||||
[] # outputs
|
||||
)
|
||||
# UTXO 1 is added to cover some of the fee
|
||||
self.assertListEqual([0.01, 1], self.inputs(tx))
|
||||
# change is now needed to consume extra input
|
||||
self.assertListEqual([0.97], self.outputs(tx))
|
||||
|
|
Loading…
Add table
Reference in a new issue