Compare commits
16 commits
Author | SHA1 | Date | |
---|---|---|---|
|
1298e0e725 | ||
|
5d5cd3499a | ||
|
c7e0e9f085 | ||
|
70464a23d2 | ||
|
4217bbbea6 | ||
|
8a06b1b5fa | ||
|
00c89a4f0b | ||
|
ad7441ca47 | ||
|
f461fc6ef9 | ||
|
1c4c18dec9 | ||
|
597a101030 | ||
|
adbbb11dbf | ||
|
b66281eabc | ||
|
be5f6491bc | ||
|
9d34e027f3 | ||
|
6dbea6f4ab |
15 changed files with 331 additions and 60 deletions
|
@ -634,7 +634,7 @@ class Config(CLIConfig):
|
||||||
|
|
||||||
coin_selection_strategy = StringChoice(
|
coin_selection_strategy = StringChoice(
|
||||||
"Strategy to use when selecting UTXOs for a transaction",
|
"Strategy to use when selecting UTXOs for a transaction",
|
||||||
STRATEGIES, "standard")
|
STRATEGIES, "sqlite")
|
||||||
|
|
||||||
transaction_cache_size = Integer("Transaction cache size", 100_000)
|
transaction_cache_size = Integer("Transaction cache size", 100_000)
|
||||||
save_resolved_claims = Toggle(
|
save_resolved_claims = Toggle(
|
||||||
|
|
|
@ -2569,9 +2569,9 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
account.add_channel_private_key(txo.private_key)
|
account.add_channel_private_key(txo.private_key)
|
||||||
wallet.save()
|
wallet.save()
|
||||||
await self.broadcast_or_release(tx, blocking)
|
await self.broadcast_or_release(tx, blocking)
|
||||||
await self.storage.save_claims([self._old_get_temp_claim_info(
|
self.component_manager.loop.create_task(self.storage.save_claims([self._old_get_temp_claim_info(
|
||||||
tx, txo, claim_address, claim, name, dewies_to_lbc(amount)
|
tx, txo, claim_address, claim, name, dewies_to_lbc(amount)
|
||||||
)])
|
)]))
|
||||||
self.component_manager.loop.create_task(self.analytics_manager.send_new_channel())
|
self.component_manager.loop.create_task(self.analytics_manager.send_new_channel())
|
||||||
else:
|
else:
|
||||||
await account.ledger.release_tx(tx)
|
await account.ledger.release_tx(tx)
|
||||||
|
@ -2725,9 +2725,9 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
account.add_channel_private_key(new_txo.private_key)
|
account.add_channel_private_key(new_txo.private_key)
|
||||||
wallet.save()
|
wallet.save()
|
||||||
await self.broadcast_or_release(tx, blocking)
|
await self.broadcast_or_release(tx, blocking)
|
||||||
await self.storage.save_claims([self._old_get_temp_claim_info(
|
self.component_manager.loop.create_task(self.storage.save_claims([self._old_get_temp_claim_info(
|
||||||
tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount)
|
tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount)
|
||||||
)])
|
)]))
|
||||||
self.component_manager.loop.create_task(self.analytics_manager.send_new_channel())
|
self.component_manager.loop.create_task(self.analytics_manager.send_new_channel())
|
||||||
else:
|
else:
|
||||||
await account.ledger.release_tx(tx)
|
await account.ledger.release_tx(tx)
|
||||||
|
@ -3262,14 +3262,18 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
|
|
||||||
if not preview:
|
if not preview:
|
||||||
await self.broadcast_or_release(tx, blocking)
|
await self.broadcast_or_release(tx, blocking)
|
||||||
await self.storage.save_claims([self._old_get_temp_claim_info(
|
|
||||||
tx, new_txo, claim_address, claim, name, dewies_to_lbc(amount)
|
async def save_claims():
|
||||||
)])
|
await self.storage.save_claims([self._old_get_temp_claim_info(
|
||||||
await self.storage.save_content_claim(file_stream.stream_hash, new_txo.id)
|
tx, new_txo, claim_address, claim, name, dewies_to_lbc(amount)
|
||||||
|
)])
|
||||||
|
await self.storage.save_content_claim(file_stream.stream_hash, new_txo.id)
|
||||||
|
|
||||||
|
self.component_manager.loop.create_task(save_claims())
|
||||||
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish'))
|
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish'))
|
||||||
else:
|
else:
|
||||||
await account.ledger.release_tx(tx)
|
await account.ledger.release_tx(tx)
|
||||||
|
log.info("successful publish %s", tx.id)
|
||||||
return tx
|
return tx
|
||||||
|
|
||||||
@requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
|
@requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
|
||||||
|
@ -3480,11 +3484,15 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
|
|
||||||
if not preview:
|
if not preview:
|
||||||
await self.broadcast_or_release(tx, blocking)
|
await self.broadcast_or_release(tx, blocking)
|
||||||
await self.storage.save_claims([self._old_get_temp_claim_info(
|
|
||||||
tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount)
|
async def save_claims():
|
||||||
)])
|
await self.storage.save_claims([self._old_get_temp_claim_info(
|
||||||
if stream_hash:
|
tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount)
|
||||||
await self.storage.save_content_claim(stream_hash, new_txo.id)
|
)])
|
||||||
|
if stream_hash:
|
||||||
|
await self.storage.save_content_claim(stream_hash, new_txo.id)
|
||||||
|
|
||||||
|
self.component_manager.loop.create_task(save_claims())
|
||||||
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish'))
|
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish'))
|
||||||
else:
|
else:
|
||||||
await account.ledger.release_tx(tx)
|
await account.ledger.release_tx(tx)
|
||||||
|
|
|
@ -5,7 +5,7 @@ from lbry.wallet.transaction import OutputEffectiveAmountEstimator
|
||||||
|
|
||||||
MAXIMUM_TRIES = 100000
|
MAXIMUM_TRIES = 100000
|
||||||
|
|
||||||
STRATEGIES = []
|
STRATEGIES = ['sqlite'] # sqlite coin chooser is in database.py
|
||||||
|
|
||||||
|
|
||||||
def strategy(method):
|
def strategy(method):
|
||||||
|
|
|
@ -4,6 +4,7 @@ import asyncio
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import platform
|
import platform
|
||||||
from binascii import hexlify
|
from binascii import hexlify
|
||||||
|
from collections import defaultdict
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from contextvars import ContextVar
|
from contextvars import ContextVar
|
||||||
from concurrent.futures.thread import ThreadPoolExecutor
|
from concurrent.futures.thread import ThreadPoolExecutor
|
||||||
|
@ -14,7 +15,7 @@ from prometheus_client import Gauge, Counter, Histogram
|
||||||
from lbry.utils import LockWithMetrics
|
from lbry.utils import LockWithMetrics
|
||||||
|
|
||||||
from .bip32 import PubKey
|
from .bip32 import PubKey
|
||||||
from .transaction import Transaction, Output, OutputScript, TXRefImmutable
|
from .transaction import Transaction, Output, OutputScript, TXRefImmutable, Input
|
||||||
from .constants import TXO_TYPES, CLAIM_TYPES
|
from .constants import TXO_TYPES, CLAIM_TYPES
|
||||||
from .util import date_to_julian_day
|
from .util import date_to_julian_day
|
||||||
|
|
||||||
|
@ -466,6 +467,70 @@ def dict_row_factory(cursor, row):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
def get_spendable_utxos(transaction: sqlite3.Connection, accounts: List, reserve_amount: int, floor: int,
|
||||||
|
fee_per_byte: int):
|
||||||
|
txs = defaultdict(list)
|
||||||
|
decoded_transactions = {}
|
||||||
|
|
||||||
|
accumulated = 0
|
||||||
|
multiplier = 10
|
||||||
|
gap_count = 0
|
||||||
|
accounts_fmt = ",".join(["?"] * len(accounts))
|
||||||
|
txo_query = f"""
|
||||||
|
SELECT tx.txid, txo.txoid, tx.raw, tx.height, txo.position as nout, tx.is_verified, txo.amount FROM txo
|
||||||
|
INNER JOIN account_address USING (address)
|
||||||
|
LEFT JOIN txi USING (txoid)
|
||||||
|
INNER JOIN tx USING (txid)
|
||||||
|
WHERE txo.txo_type=0 AND txi.txoid IS NULL AND tx.txid IS NOT NULL AND tx.is_verified AND NOT txo.is_reserved
|
||||||
|
AND txo.amount BETWEEN ? AND ?
|
||||||
|
"""
|
||||||
|
if accounts:
|
||||||
|
txo_query += f"""
|
||||||
|
AND account_address.account {'= ?' if len(accounts_fmt) == 1 else 'IN (' + accounts_fmt + ')'}
|
||||||
|
"""
|
||||||
|
reserved = []
|
||||||
|
while accumulated < reserve_amount:
|
||||||
|
found_txs = False
|
||||||
|
# prefer confirmed, but save unconfirmed utxos from this selection in case they are needed
|
||||||
|
unconfirmed = []
|
||||||
|
for row in transaction.execute(txo_query, (floor, floor * multiplier, *accounts)):
|
||||||
|
(txid, txoid, raw, height, nout, verified, amount) = row.values()
|
||||||
|
found_txs = True
|
||||||
|
if txid not in decoded_transactions:
|
||||||
|
decoded_transactions[txid] = Transaction(raw)
|
||||||
|
decoded_tx = decoded_transactions[txid]
|
||||||
|
if not verified:
|
||||||
|
unconfirmed.append((txid, txoid, raw, height, nout, verified, amount))
|
||||||
|
continue
|
||||||
|
accumulated += amount
|
||||||
|
accumulated -= Input.spend(decoded_tx.outputs[nout]).size * fee_per_byte
|
||||||
|
txs[(raw, height, verified)].append(nout)
|
||||||
|
reserved.append(txoid)
|
||||||
|
if accumulated >= reserve_amount:
|
||||||
|
break
|
||||||
|
unconfirmed.reverse()
|
||||||
|
while unconfirmed:
|
||||||
|
(txid, txoid, raw, height, nout, verified, amount) = unconfirmed.pop()
|
||||||
|
decoded_tx = decoded_transactions[txid]
|
||||||
|
accumulated += amount
|
||||||
|
accumulated -= Input.spend(decoded_tx.outputs[nout]).size * fee_per_byte
|
||||||
|
txs[(raw, height, verified)].append(nout)
|
||||||
|
reserved.append(txoid)
|
||||||
|
if accumulated >= reserve_amount:
|
||||||
|
break
|
||||||
|
if not found_txs:
|
||||||
|
gap_count += 1
|
||||||
|
if gap_count == 5:
|
||||||
|
break
|
||||||
|
floor *= multiplier
|
||||||
|
# reserve the accumulated txos if enough were found
|
||||||
|
if accumulated >= reserve_amount:
|
||||||
|
transaction.executemany("UPDATE txo SET is_reserved = ? WHERE txoid = ?",
|
||||||
|
[(True, txoid) for txoid in reserved]).fetchall()
|
||||||
|
|
||||||
|
return txs
|
||||||
|
|
||||||
|
|
||||||
class Database(SQLiteMixin):
|
class Database(SQLiteMixin):
|
||||||
|
|
||||||
SCHEMA_VERSION = "1.3"
|
SCHEMA_VERSION = "1.3"
|
||||||
|
@ -666,6 +731,19 @@ class Database(SQLiteMixin):
|
||||||
# 2. update address histories removing deleted TXs
|
# 2. update address histories removing deleted TXs
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
async def get_spendable_utxos(self, ledger, reserve_amount, accounts: Optional[Iterable], min_amount: int = 100000,
|
||||||
|
fee_per_byte: int = 50) -> List:
|
||||||
|
to_spend = await self.db.run(
|
||||||
|
get_spendable_utxos, tuple(account.id for account in accounts), reserve_amount, min_amount,
|
||||||
|
fee_per_byte
|
||||||
|
)
|
||||||
|
txos = []
|
||||||
|
for (raw, height, verified), positions in to_spend.items():
|
||||||
|
tx = Transaction(raw, height=height, is_verified=verified)
|
||||||
|
for nout in positions:
|
||||||
|
txos.append(tx.outputs[nout].get_estimator(ledger))
|
||||||
|
return txos
|
||||||
|
|
||||||
async def select_transactions(self, cols, accounts=None, read_only=False, **constraints):
|
async def select_transactions(self, cols, accounts=None, read_only=False, **constraints):
|
||||||
if not {'txid', 'txid__in'}.intersection(constraints):
|
if not {'txid', 'txid__in'}.intersection(constraints):
|
||||||
assert accounts, "'accounts' argument required when no 'txid' constraint is present"
|
assert accounts, "'accounts' argument required when no 'txid' constraint is present"
|
||||||
|
|
|
@ -133,7 +133,7 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
self._on_transaction_controller = StreamController()
|
self._on_transaction_controller = StreamController()
|
||||||
self.on_transaction = self._on_transaction_controller.stream
|
self.on_transaction = self._on_transaction_controller.stream
|
||||||
self.on_transaction.listen(
|
self.on_transaction.listen(
|
||||||
lambda e: log.info(
|
lambda e: log.debug(
|
||||||
'(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s',
|
'(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s',
|
||||||
self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id
|
self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id
|
||||||
)
|
)
|
||||||
|
@ -167,6 +167,7 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
|
|
||||||
self.coin_selection_strategy = None
|
self.coin_selection_strategy = None
|
||||||
self._known_addresses_out_of_sync = set()
|
self._known_addresses_out_of_sync = set()
|
||||||
|
self.went_out_of_sync = asyncio.Queue()
|
||||||
|
|
||||||
self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char)
|
self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char)
|
||||||
self._balance_cache = pylru.lrucache(100000)
|
self._balance_cache = pylru.lrucache(100000)
|
||||||
|
@ -244,11 +245,16 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
def get_address_count(self, **constraints):
|
def get_address_count(self, **constraints):
|
||||||
return self.db.get_address_count(**constraints)
|
return self.db.get_address_count(**constraints)
|
||||||
|
|
||||||
async def get_spendable_utxos(self, amount: int, funding_accounts):
|
async def get_spendable_utxos(self, amount: int, funding_accounts: Optional[Iterable['Account']],
|
||||||
|
min_amount=100000):
|
||||||
|
min_amount = min(amount // 10, min_amount)
|
||||||
|
fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self)
|
||||||
|
selector = CoinSelector(amount, fee)
|
||||||
async with self._utxo_reservation_lock:
|
async with self._utxo_reservation_lock:
|
||||||
|
if self.coin_selection_strategy == 'sqlite':
|
||||||
|
return await self.db.get_spendable_utxos(self, amount + fee, funding_accounts, min_amount=min_amount,
|
||||||
|
fee_per_byte=self.fee_per_byte)
|
||||||
txos = await self.get_effective_amount_estimators(funding_accounts)
|
txos = await self.get_effective_amount_estimators(funding_accounts)
|
||||||
fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self)
|
|
||||||
selector = CoinSelector(amount, fee)
|
|
||||||
spendables = selector.select(txos, self.coin_selection_strategy)
|
spendables = selector.select(txos, self.coin_selection_strategy)
|
||||||
if spendables:
|
if spendables:
|
||||||
await self.reserve_outputs(s.txo for s in spendables)
|
await self.reserve_outputs(s.txo for s in spendables)
|
||||||
|
@ -492,12 +498,16 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
local_status, local_history = await self.get_local_status_and_history(address)
|
local_status, local_history = await self.get_local_status_and_history(address)
|
||||||
|
|
||||||
if local_status == remote_status:
|
if local_status == remote_status:
|
||||||
|
log.info("no new txs needed for %s, remote matches local", address)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
remote_history = await self.network.retriable_call(self.network.get_history, address)
|
remote_history = await self.network.retriable_call(self.network.get_history, address)
|
||||||
remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history))
|
remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history))
|
||||||
we_need = set(remote_history) - set(local_history)
|
we_need = set(remote_history) - set(local_history)
|
||||||
if not we_need:
|
if not we_need:
|
||||||
|
log.info(
|
||||||
|
"no new txs needed for %s, local status %s, remote status %s", address, local_status, remote_status
|
||||||
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
cache_tasks: List[asyncio.Task[Transaction]] = []
|
cache_tasks: List[asyncio.Task[Transaction]] = []
|
||||||
|
@ -563,16 +573,35 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
await self.get_local_status_and_history(address, synced_history.getvalue())
|
await self.get_local_status_and_history(address, synced_history.getvalue())
|
||||||
if local_status != remote_status:
|
if local_status != remote_status:
|
||||||
if local_history == remote_history:
|
if local_history == remote_history:
|
||||||
|
log.warning(
|
||||||
|
"%s has a synced history but a mismatched status", address
|
||||||
|
)
|
||||||
return True
|
return True
|
||||||
|
remote_set = set(remote_history)
|
||||||
|
local_set = set(local_history)
|
||||||
log.warning(
|
log.warning(
|
||||||
"Wallet is out of sync after syncing. Remote: %s with %d items, local: %s with %d items",
|
"%s is out of sync after syncing.\n"
|
||||||
remote_status, len(remote_history), local_status, len(local_history)
|
"Remote: %s with %d items (%i unique), local: %s with %d items (%i unique).\n"
|
||||||
|
"Histories are mismatched on %i items.\n"
|
||||||
|
"Local is missing\n"
|
||||||
|
"%s\n"
|
||||||
|
"Remote is missing\n"
|
||||||
|
"%s\n"
|
||||||
|
"******",
|
||||||
|
address, remote_status, len(remote_history), len(remote_set),
|
||||||
|
local_status, len(local_history), len(local_set), len(remote_set.symmetric_difference(local_set)),
|
||||||
|
"\n".join([f"{txid} - {height}" for txid, height in remote_set.difference(local_set)]),
|
||||||
|
"\n".join([f"{txid} - {height}" for txid, height in local_set.difference(remote_set)])
|
||||||
)
|
)
|
||||||
log.warning("local: %s", local_history)
|
# log.warning("local: %s", local_history)
|
||||||
log.warning("remote: %s", remote_history)
|
# log.warning("remote: %s", remote_history)
|
||||||
|
self.went_out_of_sync.put_nowait(address)
|
||||||
self._known_addresses_out_of_sync.add(address)
|
self._known_addresses_out_of_sync.add(address)
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
|
log.warning(
|
||||||
|
"%s is synced", address
|
||||||
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def cache_transaction(self, txid, remote_height, check_local=True):
|
async def cache_transaction(self, txid, remote_height, check_local=True):
|
||||||
|
|
|
@ -302,6 +302,9 @@ class WalletManager:
|
||||||
await self.ledger.broadcast(tx)
|
await self.ledger.broadcast(tx)
|
||||||
if blocking:
|
if blocking:
|
||||||
await self.ledger.wait(tx, timeout=None)
|
await self.ledger.wait(tx, timeout=None)
|
||||||
|
except CodeMessageError as err:
|
||||||
|
log.warning("transaction rejected, leaving reserved")
|
||||||
|
raise
|
||||||
except:
|
except:
|
||||||
await self.ledger.release_tx(tx)
|
await self.ledger.release_tx(tx)
|
||||||
raise
|
raise
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
|
import time
|
||||||
from time import perf_counter
|
from time import perf_counter
|
||||||
from operator import itemgetter
|
from operator import itemgetter
|
||||||
from typing import Dict, Optional, Tuple
|
from typing import Dict, Optional, Tuple
|
||||||
|
@ -54,8 +55,8 @@ class ClientSession(BaseClientSession):
|
||||||
return result
|
return result
|
||||||
|
|
||||||
async def send_request(self, method, args=()):
|
async def send_request(self, method, args=()):
|
||||||
|
log.info("%i in flight, send %s%s to %s:%i ", self.pending_amount, method, tuple(args), *self.server)
|
||||||
self.pending_amount += 1
|
self.pending_amount += 1
|
||||||
log.debug("send %s%s to %s:%i", method, tuple(args), *self.server)
|
|
||||||
try:
|
try:
|
||||||
if method == 'server.version':
|
if method == 'server.version':
|
||||||
return await self.send_timed_server_version_request(args, self.timeout)
|
return await self.send_timed_server_version_request(args, self.timeout)
|
||||||
|
@ -63,11 +64,11 @@ class ClientSession(BaseClientSession):
|
||||||
while not request.done():
|
while not request.done():
|
||||||
done, pending = await asyncio.wait([request], timeout=self.timeout)
|
done, pending = await asyncio.wait([request], timeout=self.timeout)
|
||||||
if pending:
|
if pending:
|
||||||
log.debug("Time since last packet: %s", perf_counter() - self.last_packet_received)
|
log.info("Time since last packet: %s", perf_counter() - self.last_packet_received)
|
||||||
if (perf_counter() - self.last_packet_received) < self.timeout:
|
if (perf_counter() - self.last_packet_received) < self.timeout:
|
||||||
continue
|
continue
|
||||||
log.info("timeout sending %s to %s:%i", method, *self.server)
|
log.warning("timeout sending %s(%s) to %s:%i", method, str(args), *self.server)
|
||||||
raise asyncio.TimeoutError
|
raise asyncio.TimeoutError()
|
||||||
if done:
|
if done:
|
||||||
try:
|
try:
|
||||||
return request.result()
|
return request.result()
|
||||||
|
@ -91,6 +92,7 @@ class ClientSession(BaseClientSession):
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
self.pending_amount -= 1
|
self.pending_amount -= 1
|
||||||
|
log.info("%i in flight, finished %s%s ", self.pending_amount, method, tuple(args))
|
||||||
|
|
||||||
async def ensure_session(self):
|
async def ensure_session(self):
|
||||||
# Handles reconnecting and maintaining a session alive
|
# Handles reconnecting and maintaining a session alive
|
||||||
|
@ -144,12 +146,12 @@ class ClientSession(BaseClientSession):
|
||||||
controller.add(request.args)
|
controller.add(request.args)
|
||||||
|
|
||||||
def connection_lost(self, exc):
|
def connection_lost(self, exc):
|
||||||
log.debug("Connection lost: %s:%d", *self.server)
|
log.warning("Connection lost: %s:%d", *self.server)
|
||||||
super().connection_lost(exc)
|
super().connection_lost(exc)
|
||||||
self.response_time = None
|
self.response_time = None
|
||||||
self.connection_latency = None
|
self.connection_latency = None
|
||||||
self._response_samples = 0
|
self._response_samples = 0
|
||||||
self.pending_amount = 0
|
# self.pending_amount = 0
|
||||||
self._on_disconnect_controller.add(True)
|
self._on_disconnect_controller.add(True)
|
||||||
|
|
||||||
|
|
||||||
|
@ -274,8 +276,13 @@ class Network:
|
||||||
return self.rpc('blockchain.block.headers', [height, count, 0, b64], restricted)
|
return self.rpc('blockchain.block.headers', [height, count, 0, b64], restricted)
|
||||||
|
|
||||||
# --- Subscribes, history and broadcasts are always aimed towards the master client directly
|
# --- Subscribes, history and broadcasts are always aimed towards the master client directly
|
||||||
def get_history(self, address):
|
async def get_history(self, address):
|
||||||
return self.rpc('blockchain.address.get_history', [address], True)
|
log.info("get history %s", address)
|
||||||
|
start = time.perf_counter()
|
||||||
|
try:
|
||||||
|
return await self.rpc('blockchain.address.get_history', [address], True)
|
||||||
|
finally:
|
||||||
|
log.info("%s history took %s", address, time.perf_counter() - start)
|
||||||
|
|
||||||
def broadcast(self, raw_transaction):
|
def broadcast(self, raw_transaction):
|
||||||
return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True)
|
return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True)
|
||||||
|
@ -285,16 +292,18 @@ class Network:
|
||||||
|
|
||||||
async def subscribe_address(self, address, *addresses):
|
async def subscribe_address(self, address, *addresses):
|
||||||
addresses = list((address, ) + addresses)
|
addresses = list((address, ) + addresses)
|
||||||
|
server_addr_and_port = self.client.server_address_and_port # on disconnect client will be None
|
||||||
try:
|
try:
|
||||||
return await self.rpc('blockchain.address.subscribe', addresses, True)
|
return await self.rpc('blockchain.address.subscribe', addresses, True)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
log.warning(
|
log.warning(
|
||||||
"timed out subscribing to addresses from %s:%i",
|
"timed out subscribing to addresses from %s:%i",
|
||||||
*self.client.server_address_and_port
|
*server_addr_and_port
|
||||||
)
|
)
|
||||||
# abort and cancel, we can't lose a subscription, it will happen again on reconnect
|
# abort and cancel, we can't lose a subscription, it will happen again on reconnect
|
||||||
if self.client:
|
if self.client:
|
||||||
self.client.abort()
|
self.client.abort()
|
||||||
|
log.warning("raise cancelled")
|
||||||
raise asyncio.CancelledError()
|
raise asyncio.CancelledError()
|
||||||
|
|
||||||
def unsubscribe_address(self, address):
|
def unsubscribe_address(self, address):
|
||||||
|
|
|
@ -480,6 +480,7 @@ class RPCSession(SessionBase):
|
||||||
await event.wait()
|
await event.wait()
|
||||||
result = event.result
|
result = event.result
|
||||||
if isinstance(result, Exception):
|
if isinstance(result, Exception):
|
||||||
|
self.logger.error("%s sending %s(%s)", result, method, "" if not args else ",".join([str(a) for a in args]))
|
||||||
raise result
|
raise result
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
|
@ -183,6 +183,7 @@ class BlockProcessor:
|
||||||
self.state_lock = asyncio.Lock()
|
self.state_lock = asyncio.Lock()
|
||||||
|
|
||||||
self.search_cache = {}
|
self.search_cache = {}
|
||||||
|
self.history_cache = {}
|
||||||
|
|
||||||
async def run_in_thread_with_lock(self, func, *args):
|
async def run_in_thread_with_lock(self, func, *args):
|
||||||
# Run in a thread to prevent blocking. Shielded so that
|
# Run in a thread to prevent blocking. Shielded so that
|
||||||
|
@ -213,6 +214,7 @@ class BlockProcessor:
|
||||||
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
|
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
|
||||||
for cache in self.search_cache.values():
|
for cache in self.search_cache.values():
|
||||||
cache.clear()
|
cache.clear()
|
||||||
|
self.history_cache.clear()
|
||||||
await self._maybe_flush()
|
await self._maybe_flush()
|
||||||
processed_time = time.perf_counter() - start
|
processed_time = time.perf_counter() - start
|
||||||
self.block_count_metric.set(self.height)
|
self.block_count_metric.set(self.height)
|
||||||
|
@ -754,6 +756,8 @@ class LBRYBlockProcessor(BlockProcessor):
|
||||||
self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}")
|
self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}")
|
||||||
self.sql: SQLDB = self.db.sql
|
self.sql: SQLDB = self.db.sql
|
||||||
self.timer = Timer('BlockProcessor')
|
self.timer = Timer('BlockProcessor')
|
||||||
|
self.block_notify = asyncio.Event()
|
||||||
|
self.block_notify.set()
|
||||||
|
|
||||||
def advance_blocks(self, blocks):
|
def advance_blocks(self, blocks):
|
||||||
self.sql.begin()
|
self.sql.begin()
|
||||||
|
|
|
@ -213,10 +213,18 @@ class MemPool:
|
||||||
continue
|
continue
|
||||||
hashes = {hex_str_to_hash(hh) for hh in hex_hashes}
|
hashes = {hex_str_to_hash(hh) for hh in hex_hashes}
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
|
start = time.perf_counter()
|
||||||
touched = await self._process_mempool(hashes)
|
touched = await self._process_mempool(hashes)
|
||||||
|
duration = time.perf_counter() - start
|
||||||
|
if duration >= 1:
|
||||||
|
self.logger.info("processed mempool in %s", duration)
|
||||||
synchronized_event.set()
|
synchronized_event.set()
|
||||||
synchronized_event.clear()
|
synchronized_event.clear()
|
||||||
|
start = time.perf_counter()
|
||||||
await self.api.on_mempool(touched, height)
|
await self.api.on_mempool(touched, height)
|
||||||
|
duration = time.perf_counter() - start
|
||||||
|
if duration >= 1:
|
||||||
|
self.logger.info("sent history notifications in %s", duration)
|
||||||
try:
|
try:
|
||||||
# we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event)
|
# we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event)
|
||||||
await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs)
|
await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs)
|
||||||
|
|
|
@ -135,7 +135,7 @@ class SessionManager:
|
||||||
"docker_tag": DOCKER_TAG,
|
"docker_tag": DOCKER_TAG,
|
||||||
'version': lbry.__version__,
|
'version': lbry.__version__,
|
||||||
"min_version": util.version_string(VERSION.PROTOCOL_MIN),
|
"min_version": util.version_string(VERSION.PROTOCOL_MIN),
|
||||||
"cpu_count": os.cpu_count()
|
"cpu_count": str(os.cpu_count())
|
||||||
})
|
})
|
||||||
session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE,
|
session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE,
|
||||||
labelnames=("version",))
|
labelnames=("version",))
|
||||||
|
@ -177,7 +177,7 @@ class SessionManager:
|
||||||
self.cur_group = SessionGroup(0)
|
self.cur_group = SessionGroup(0)
|
||||||
self.txs_sent = 0
|
self.txs_sent = 0
|
||||||
self.start_time = time.time()
|
self.start_time = time.time()
|
||||||
self.history_cache = pylru.lrucache(256)
|
self.history_cache = self.bp.history_cache
|
||||||
self.notified_height: typing.Optional[int] = None
|
self.notified_height: typing.Optional[int] = None
|
||||||
# Cache some idea of room to avoid recounting on each subscription
|
# Cache some idea of room to avoid recounting on each subscription
|
||||||
self.subs_room = 0
|
self.subs_room = 0
|
||||||
|
@ -608,26 +608,20 @@ class SessionManager:
|
||||||
|
|
||||||
async def limited_history(self, hashX):
|
async def limited_history(self, hashX):
|
||||||
"""A caching layer."""
|
"""A caching layer."""
|
||||||
hc = self.history_cache
|
if hashX not in self.history_cache:
|
||||||
if hashX not in hc:
|
|
||||||
# History DoS limit. Each element of history is about 99
|
# History DoS limit. Each element of history is about 99
|
||||||
# bytes when encoded as JSON. This limits resource usage
|
# bytes when encoded as JSON. This limits resource usage
|
||||||
# on bloated history requests, and uses a smaller divisor
|
# on bloated history requests, and uses a smaller divisor
|
||||||
# so large requests are logged before refusing them.
|
# so large requests are logged before refusing them.
|
||||||
limit = self.env.max_send // 97
|
limit = self.env.max_send // 97
|
||||||
hc[hashX] = await self.db.limited_history(hashX, limit=limit)
|
self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit)
|
||||||
return hc[hashX]
|
return self.history_cache[hashX]
|
||||||
|
|
||||||
async def _notify_sessions(self, height, touched):
|
async def _notify_sessions(self, height, touched):
|
||||||
"""Notify sessions about height changes and touched addresses."""
|
"""Notify sessions about height changes and touched addresses."""
|
||||||
height_changed = height != self.notified_height
|
height_changed = height != self.notified_height
|
||||||
if height_changed:
|
if height_changed:
|
||||||
await self._refresh_hsub_results(height)
|
await self._refresh_hsub_results(height)
|
||||||
# Invalidate our history cache for touched hashXs
|
|
||||||
hc = self.history_cache
|
|
||||||
for hashX in set(hc).intersection(touched):
|
|
||||||
del hc[hashX]
|
|
||||||
|
|
||||||
if self.sessions:
|
if self.sessions:
|
||||||
await asyncio.wait([
|
await asyncio.wait([
|
||||||
session.notify(touched, height_changed) for session in self.sessions
|
session.notify(touched, height_changed) for session in self.sessions
|
||||||
|
@ -921,10 +915,15 @@ class LBRYElectrumX(SessionBase):
|
||||||
def sub_count(self):
|
def sub_count(self):
|
||||||
return len(self.hashX_subs)
|
return len(self.hashX_subs)
|
||||||
|
|
||||||
|
UGLY_COUNT = 0
|
||||||
|
|
||||||
async def notify(self, touched, height_changed):
|
async def notify(self, touched, height_changed):
|
||||||
"""Notify the client about changes to touched addresses (from mempool
|
"""Notify the client about changes to touched addresses (from mempool
|
||||||
updates or new blocks) and height.
|
updates or new blocks) and height.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
self.UGLY_COUNT += 1
|
||||||
|
|
||||||
if height_changed and self.subscribe_headers:
|
if height_changed and self.subscribe_headers:
|
||||||
args = (await self.subscribe_headers_result(), )
|
args = (await self.subscribe_headers_result(), )
|
||||||
try:
|
try:
|
||||||
|
@ -940,6 +939,11 @@ class LBRYElectrumX(SessionBase):
|
||||||
|
|
||||||
for hashX in touched:
|
for hashX in touched:
|
||||||
alias = self.hashX_subs[hashX]
|
alias = self.hashX_subs[hashX]
|
||||||
|
if self.UGLY_COUNT == 25:
|
||||||
|
print('sleeping for ', hashX)
|
||||||
|
if not self.bp.block_notify.is_set():
|
||||||
|
await self.bp.block_notify.wait()
|
||||||
|
await asyncio.sleep(3)
|
||||||
status = await self.address_status(hashX)
|
status = await self.address_status(hashX)
|
||||||
changed[alias] = status
|
changed[alias] = status
|
||||||
|
|
||||||
|
@ -960,13 +964,11 @@ class LBRYElectrumX(SessionBase):
|
||||||
method = 'blockchain.scripthash.subscribe'
|
method = 'blockchain.scripthash.subscribe'
|
||||||
else:
|
else:
|
||||||
method = 'blockchain.address.subscribe'
|
method = 'blockchain.address.subscribe'
|
||||||
|
start = time.perf_counter()
|
||||||
try:
|
if not self.bp.block_notify.is_set():
|
||||||
await self.send_notification(method, (alias, status))
|
await self.bp.block_notify.wait()
|
||||||
except asyncio.TimeoutError:
|
t = asyncio.create_task(self.send_notification(method, (alias, status)))
|
||||||
self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True))
|
t.add_done_callback(lambda _: self.logger.warning("sent notification to %s in %s", alias, time.perf_counter() - start))
|
||||||
self.abort()
|
|
||||||
return
|
|
||||||
|
|
||||||
if changed:
|
if changed:
|
||||||
es = '' if len(changed) == 1 else 'es'
|
es = '' if len(changed) == 1 else 'es'
|
||||||
|
@ -1180,6 +1182,7 @@ class LBRYElectrumX(SessionBase):
|
||||||
"""
|
"""
|
||||||
# Note history is ordered and mempool unordered in electrum-server
|
# Note history is ordered and mempool unordered in electrum-server
|
||||||
# For mempool, height is -1 if it has unconfirmed inputs, otherwise 0
|
# For mempool, height is -1 if it has unconfirmed inputs, otherwise 0
|
||||||
|
|
||||||
db_history = await self.session_mgr.limited_history(hashX)
|
db_history = await self.session_mgr.limited_history(hashX)
|
||||||
mempool = await self.mempool.transaction_summaries(hashX)
|
mempool = await self.mempool.transaction_summaries(hashX)
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,8 @@ class ClaimTestCase(CommandTestCase):
|
||||||
|
|
||||||
class ClaimSearchCommand(ClaimTestCase):
|
class ClaimSearchCommand(ClaimTestCase):
|
||||||
|
|
||||||
|
VERBOSITY = logging.WARNING
|
||||||
|
|
||||||
async def create_channel(self):
|
async def create_channel(self):
|
||||||
self.channel = await self.channel_create('@abc', '1.0')
|
self.channel = await self.channel_create('@abc', '1.0')
|
||||||
self.channel_id = self.get_claim_id(self.channel)
|
self.channel_id = self.get_claim_id(self.channel)
|
||||||
|
@ -157,6 +159,60 @@ class ClaimSearchCommand(ClaimTestCase):
|
||||||
await self.stream_abandon(txid=signed2['txid'], nout=0)
|
await self.stream_abandon(txid=signed2['txid'], nout=0)
|
||||||
await self.assertFindsClaims([], channel_ids=[channel_id2])
|
await self.assertFindsClaims([], channel_ids=[channel_id2])
|
||||||
|
|
||||||
|
async def test_break_it(self):
|
||||||
|
await self.generate(5)
|
||||||
|
address = await self.account.receiving.get_or_create_usable_address()
|
||||||
|
sendtxid = await self.blockchain.send_to_address(address, 1)
|
||||||
|
await self.confirm_tx(sendtxid)
|
||||||
|
address = await self.account.receiving.get_or_create_usable_address()
|
||||||
|
sendtxid = await self.blockchain.send_to_address(address, 1)
|
||||||
|
await self.confirm_tx(sendtxid)
|
||||||
|
address = await self.account.receiving.get_or_create_usable_address()
|
||||||
|
sendtxid = await self.blockchain.send_to_address(address, 1)
|
||||||
|
await self.confirm_tx(sendtxid)
|
||||||
|
address = await self.account.receiving.get_or_create_usable_address()
|
||||||
|
sendtxid = await self.blockchain.send_to_address(address, 1)
|
||||||
|
await self.confirm_tx(sendtxid)
|
||||||
|
await self.generate(7)
|
||||||
|
|
||||||
|
async def _doit(n):
|
||||||
|
try:
|
||||||
|
await self.daemon.jsonrpc_channel_create(
|
||||||
|
name=f'@arena{n}', bid='0.1', blocking=True
|
||||||
|
)
|
||||||
|
except InsufficientFundsError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def doit(n):
|
||||||
|
asyncio.create_task(_doit(n))
|
||||||
|
|
||||||
|
async def break_it():
|
||||||
|
count = 0
|
||||||
|
for _ in range(4):
|
||||||
|
for _ in range(10):
|
||||||
|
count += 1
|
||||||
|
if not count % 7:
|
||||||
|
asyncio.create_task(self.generate(1))
|
||||||
|
doit(count)
|
||||||
|
if self.ledger._known_addresses_out_of_sync:
|
||||||
|
print('out of sync', self.ledger._known_addresses_out_of_sync)
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
await self.generate(1)
|
||||||
|
|
||||||
|
bp = self.conductor.spv_node.server.bp
|
||||||
|
break_task = asyncio.create_task(break_it())
|
||||||
|
address = await self.ledger.went_out_of_sync.get()
|
||||||
|
bp.block_notify.clear()
|
||||||
|
print('%s is out of sync' % address)
|
||||||
|
with self.assertRaises(InsufficientFundsError):
|
||||||
|
await self.daemon.jsonrpc_channel_create(
|
||||||
|
name=f'@derp', bid='0.1', blocking=True
|
||||||
|
)
|
||||||
|
self.assertTrue(False)
|
||||||
|
print("woohoo")
|
||||||
|
if not break_task.done():
|
||||||
|
break_task.cancel()
|
||||||
|
|
||||||
async def test_pagination(self):
|
async def test_pagination(self):
|
||||||
await self.create_channel()
|
await self.create_channel()
|
||||||
await self.create_lots_of_streams()
|
await self.create_lots_of_streams()
|
||||||
|
|
|
@ -28,9 +28,10 @@ def mock_config():
|
||||||
class BlobExchangeTestBase(AsyncioTestCase):
|
class BlobExchangeTestBase(AsyncioTestCase):
|
||||||
async def asyncSetUp(self):
|
async def asyncSetUp(self):
|
||||||
self.loop = asyncio.get_event_loop()
|
self.loop = asyncio.get_event_loop()
|
||||||
|
self.client_wallet_dir = tempfile.mkdtemp()
|
||||||
self.client_dir = tempfile.mkdtemp()
|
self.client_dir = tempfile.mkdtemp()
|
||||||
self.server_dir = tempfile.mkdtemp()
|
self.server_dir = tempfile.mkdtemp()
|
||||||
|
self.addCleanup(shutil.rmtree, self.client_wallet_dir)
|
||||||
self.addCleanup(shutil.rmtree, self.client_dir)
|
self.addCleanup(shutil.rmtree, self.client_dir)
|
||||||
self.addCleanup(shutil.rmtree, self.server_dir)
|
self.addCleanup(shutil.rmtree, self.server_dir)
|
||||||
self.server_config = Config(data_dir=self.server_dir, download_dir=self.server_dir, wallet=self.server_dir,
|
self.server_config = Config(data_dir=self.server_dir, download_dir=self.server_dir, wallet=self.server_dir,
|
||||||
|
@ -39,8 +40,8 @@ class BlobExchangeTestBase(AsyncioTestCase):
|
||||||
self.server_blob_manager = BlobManager(self.loop, self.server_dir, self.server_storage, self.server_config)
|
self.server_blob_manager = BlobManager(self.loop, self.server_dir, self.server_storage, self.server_config)
|
||||||
self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')
|
self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')
|
||||||
|
|
||||||
self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir, wallet=self.client_dir,
|
self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir,
|
||||||
fixed_peers=[])
|
wallet=self.client_wallet_dir, fixed_peers=[])
|
||||||
self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite"))
|
self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite"))
|
||||||
self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage, self.client_config)
|
self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage, self.client_config)
|
||||||
self.client_peer_manager = PeerManager(self.loop)
|
self.client_peer_manager = PeerManager(self.loop)
|
||||||
|
|
|
@ -65,7 +65,7 @@ def get_claim_transaction(claim_name, claim=b''):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
|
async def get_mock_wallet(sd_hash, storage, wallet_dir, balance=10.0, fee=None):
|
||||||
claim = Claim()
|
claim = Claim()
|
||||||
if fee:
|
if fee:
|
||||||
if fee['currency'] == 'LBC':
|
if fee['currency'] == 'LBC':
|
||||||
|
@ -97,7 +97,7 @@ async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
|
||||||
|
|
||||||
wallet = Wallet()
|
wallet = Wallet()
|
||||||
ledger = Ledger({
|
ledger = Ledger({
|
||||||
'db': Database(':memory:'),
|
'db': Database(os.path.join(wallet_dir, 'blockchain.db')),
|
||||||
'headers': FakeHeaders(514082)
|
'headers': FakeHeaders(514082)
|
||||||
})
|
})
|
||||||
await ledger.db.open()
|
await ledger.db.open()
|
||||||
|
@ -136,7 +136,8 @@ class TestStreamManager(BlobExchangeTestBase):
|
||||||
self.loop, self.server_blob_manager.blob_dir, file_path, old_sort=old_sort
|
self.loop, self.server_blob_manager.blob_dir, file_path, old_sort=old_sort
|
||||||
)
|
)
|
||||||
self.sd_hash = descriptor.sd_hash
|
self.sd_hash = descriptor.sd_hash
|
||||||
self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, balance, fee)
|
self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, self.client_wallet_dir,
|
||||||
|
balance, fee)
|
||||||
analytics_manager = AnalyticsManager(
|
analytics_manager = AnalyticsManager(
|
||||||
self.client_config,
|
self.client_config,
|
||||||
binascii.hexlify(generate_id()).decode(),
|
binascii.hexlify(generate_id()).decode(),
|
||||||
|
|
|
@ -1,4 +1,7 @@
|
||||||
|
import os
|
||||||
import unittest
|
import unittest
|
||||||
|
import tempfile
|
||||||
|
import shutil
|
||||||
from binascii import hexlify, unhexlify
|
from binascii import hexlify, unhexlify
|
||||||
from itertools import cycle
|
from itertools import cycle
|
||||||
|
|
||||||
|
@ -302,9 +305,11 @@ class TestTransactionSigning(AsyncioTestCase):
|
||||||
class TransactionIOBalancing(AsyncioTestCase):
|
class TransactionIOBalancing(AsyncioTestCase):
|
||||||
|
|
||||||
async def asyncSetUp(self):
|
async def asyncSetUp(self):
|
||||||
|
wallet_dir = tempfile.mkdtemp()
|
||||||
|
self.addCleanup(shutil.rmtree, wallet_dir)
|
||||||
self.ledger = Ledger({
|
self.ledger = Ledger({
|
||||||
'db': Database(':memory:'),
|
'db': Database(os.path.join(wallet_dir, 'blockchain.db')),
|
||||||
'headers': Headers(':memory:')
|
'headers': Headers(':memory:'),
|
||||||
})
|
})
|
||||||
await self.ledger.db.open()
|
await self.ledger.db.open()
|
||||||
self.account = Account.from_dict(
|
self.account = Account.from_dict(
|
||||||
|
@ -419,3 +424,68 @@ class TransactionIOBalancing(AsyncioTestCase):
|
||||||
self.assertListEqual([0.01, 1], self.inputs(tx))
|
self.assertListEqual([0.01, 1], self.inputs(tx))
|
||||||
# change is now needed to consume extra input
|
# change is now needed to consume extra input
|
||||||
self.assertListEqual([0.97], self.outputs(tx))
|
self.assertListEqual([0.97], self.outputs(tx))
|
||||||
|
|
||||||
|
async def test_basic_use_cases_sqlite(self):
|
||||||
|
self.ledger.coin_selection_strategy = 'sqlite'
|
||||||
|
self.ledger.fee_per_byte = int(.01*CENT)
|
||||||
|
|
||||||
|
# available UTXOs for filling missing inputs
|
||||||
|
utxos = await self.create_utxos([
|
||||||
|
1, 1, 3, 5, 10
|
||||||
|
])
|
||||||
|
# pay 3 coins (3.07 w/ fees)
|
||||||
|
tx = await self.tx(
|
||||||
|
[], # inputs
|
||||||
|
[self.txo(3)] # outputs
|
||||||
|
)
|
||||||
|
# there are 4x 1.0 utxos available
|
||||||
|
self.assertListEqual(self.inputs(tx), [1.0, 1.0, 1.0, 1.0])
|
||||||
|
|
||||||
|
# a change of 0.93 is added to reach balance
|
||||||
|
self.assertListEqual(self.outputs(tx), [3, 0.93])
|
||||||
|
|
||||||
|
await self.ledger.release_outputs(utxos)
|
||||||
|
|
||||||
|
# pay 6.917 coins (7.00 w/ fees)
|
||||||
|
tx = await self.tx(
|
||||||
|
[], # inputs
|
||||||
|
[self.txo(6.917)] # outputs
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertListEqual(self.inputs(tx), [1.0, 1.0, 1.0, 1.0, 3.0])
|
||||||
|
self.assertListEqual(self.outputs(tx), [6.92])
|
||||||
|
|
||||||
|
await self.ledger.release_outputs(utxos)
|
||||||
|
|
||||||
|
# supplied input and output, but input is not enough to cover output
|
||||||
|
tx = await self.tx(
|
||||||
|
[self.txi(self.txo(10))], # inputs
|
||||||
|
[self.txo(11)] # outputs
|
||||||
|
)
|
||||||
|
# additional input is chosen (UTXO 1)
|
||||||
|
self.assertListEqual([10, 1.0, 1.0], self.inputs(tx))
|
||||||
|
# change is now needed to consume extra input
|
||||||
|
self.assertListEqual([11, 0.95], self.outputs(tx))
|
||||||
|
|
||||||
|
await self.ledger.release_outputs(utxos)
|
||||||
|
|
||||||
|
# liquidating a UTXO
|
||||||
|
tx = await self.tx(
|
||||||
|
[self.txi(self.txo(10))], # inputs
|
||||||
|
[] # outputs
|
||||||
|
)
|
||||||
|
self.assertListEqual([10], self.inputs(tx))
|
||||||
|
# missing change added to consume the amount
|
||||||
|
self.assertListEqual([9.98], self.outputs(tx))
|
||||||
|
|
||||||
|
await self.ledger.release_outputs(utxos)
|
||||||
|
|
||||||
|
# liquidating at a loss, requires adding extra inputs
|
||||||
|
tx = await self.tx(
|
||||||
|
[self.txi(self.txo(0.01))], # inputs
|
||||||
|
[] # outputs
|
||||||
|
)
|
||||||
|
# UTXO 1 is added to cover some of the fee
|
||||||
|
self.assertListEqual([0.01, 1], self.inputs(tx))
|
||||||
|
# change is now needed to consume extra input
|
||||||
|
self.assertListEqual([0.97], self.outputs(tx))
|
||||||
|
|
Loading…
Add table
Reference in a new issue