Compare commits

...
Sign in to create a new pull request.

16 commits

Author SHA1 Message Date
Jack Robison
1298e0e725
reproduce sync error 2020-05-29 14:22:32 -04:00
Jack Robison
5d5cd3499a
prefer confirmed but allow spending unconfirmed with sqlite coin chooser 2020-05-27 12:27:04 -04:00
Jack Robison
c7e0e9f085
lint 2020-05-27 01:51:34 -04:00
Jack Robison
70464a23d2
logging 2020-05-27 00:59:24 -04:00
Jack Robison
4217bbbea6
don't block notifying sessions on the notifications going through 2020-05-27 00:59:24 -04:00
Jack Robison
8a06b1b5fa
fix history cache 2020-05-27 00:59:24 -04:00
Jack Robison
00c89a4f0b
debug 2020-05-27 00:59:24 -04:00
Jack Robison
ad7441ca47
fix prometheus 2020-05-27 00:59:24 -04:00
Jack Robison
f461fc6ef9
non blocking save_claims 2020-05-27 00:59:24 -04:00
Jack Robison
1c4c18dec9
debug 2020-05-27 00:59:24 -04:00
Jack Robison
597a101030
update sqlite coin chooser to only select verified transactions 2020-05-27 00:59:24 -04:00
Jack Robison
adbbb11dbf
fix 'timed out subscribing to addresses' logging error 2020-05-27 00:59:24 -04:00
Jack Robison
b66281eabc
leave reservations for rejected transactions 2020-05-27 00:58:35 -04:00
Jack Robison
be5f6491bc
add sqlite coin chooser test 2020-05-27 00:58:35 -04:00
Jack Robison
9d34e027f3
update tests 2020-05-27 00:58:35 -04:00
Jack Robison
6dbea6f4ab
add sqlite coin chooser, set it as the default coin selection strategy 2020-05-27 00:58:35 -04:00
15 changed files with 331 additions and 60 deletions

View file

@ -634,7 +634,7 @@ class Config(CLIConfig):
coin_selection_strategy = StringChoice(
"Strategy to use when selecting UTXOs for a transaction",
STRATEGIES, "standard")
STRATEGIES, "sqlite")
transaction_cache_size = Integer("Transaction cache size", 100_000)
save_resolved_claims = Toggle(

View file

@ -2569,9 +2569,9 @@ class Daemon(metaclass=JSONRPCServerType):
account.add_channel_private_key(txo.private_key)
wallet.save()
await self.broadcast_or_release(tx, blocking)
await self.storage.save_claims([self._old_get_temp_claim_info(
self.component_manager.loop.create_task(self.storage.save_claims([self._old_get_temp_claim_info(
tx, txo, claim_address, claim, name, dewies_to_lbc(amount)
)])
)]))
self.component_manager.loop.create_task(self.analytics_manager.send_new_channel())
else:
await account.ledger.release_tx(tx)
@ -2725,9 +2725,9 @@ class Daemon(metaclass=JSONRPCServerType):
account.add_channel_private_key(new_txo.private_key)
wallet.save()
await self.broadcast_or_release(tx, blocking)
await self.storage.save_claims([self._old_get_temp_claim_info(
self.component_manager.loop.create_task(self.storage.save_claims([self._old_get_temp_claim_info(
tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount)
)])
)]))
self.component_manager.loop.create_task(self.analytics_manager.send_new_channel())
else:
await account.ledger.release_tx(tx)
@ -3262,14 +3262,18 @@ class Daemon(metaclass=JSONRPCServerType):
if not preview:
await self.broadcast_or_release(tx, blocking)
async def save_claims():
await self.storage.save_claims([self._old_get_temp_claim_info(
tx, new_txo, claim_address, claim, name, dewies_to_lbc(amount)
)])
await self.storage.save_content_claim(file_stream.stream_hash, new_txo.id)
self.component_manager.loop.create_task(save_claims())
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish'))
else:
await account.ledger.release_tx(tx)
log.info("successful publish %s", tx.id)
return tx
@requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
@ -3480,11 +3484,15 @@ class Daemon(metaclass=JSONRPCServerType):
if not preview:
await self.broadcast_or_release(tx, blocking)
async def save_claims():
await self.storage.save_claims([self._old_get_temp_claim_info(
tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount)
)])
if stream_hash:
await self.storage.save_content_claim(stream_hash, new_txo.id)
self.component_manager.loop.create_task(save_claims())
self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish'))
else:
await account.ledger.release_tx(tx)

View file

@ -5,7 +5,7 @@ from lbry.wallet.transaction import OutputEffectiveAmountEstimator
MAXIMUM_TRIES = 100000
STRATEGIES = []
STRATEGIES = ['sqlite'] # sqlite coin chooser is in database.py
def strategy(method):

View file

@ -4,6 +4,7 @@ import asyncio
import sqlite3
import platform
from binascii import hexlify
from collections import defaultdict
from dataclasses import dataclass
from contextvars import ContextVar
from concurrent.futures.thread import ThreadPoolExecutor
@ -14,7 +15,7 @@ from prometheus_client import Gauge, Counter, Histogram
from lbry.utils import LockWithMetrics
from .bip32 import PubKey
from .transaction import Transaction, Output, OutputScript, TXRefImmutable
from .transaction import Transaction, Output, OutputScript, TXRefImmutable, Input
from .constants import TXO_TYPES, CLAIM_TYPES
from .util import date_to_julian_day
@ -466,6 +467,70 @@ def dict_row_factory(cursor, row):
return d
def get_spendable_utxos(transaction: sqlite3.Connection, accounts: List, reserve_amount: int, floor: int,
fee_per_byte: int):
txs = defaultdict(list)
decoded_transactions = {}
accumulated = 0
multiplier = 10
gap_count = 0
accounts_fmt = ",".join(["?"] * len(accounts))
txo_query = f"""
SELECT tx.txid, txo.txoid, tx.raw, tx.height, txo.position as nout, tx.is_verified, txo.amount FROM txo
INNER JOIN account_address USING (address)
LEFT JOIN txi USING (txoid)
INNER JOIN tx USING (txid)
WHERE txo.txo_type=0 AND txi.txoid IS NULL AND tx.txid IS NOT NULL AND tx.is_verified AND NOT txo.is_reserved
AND txo.amount BETWEEN ? AND ?
"""
if accounts:
txo_query += f"""
AND account_address.account {'= ?' if len(accounts_fmt) == 1 else 'IN (' + accounts_fmt + ')'}
"""
reserved = []
while accumulated < reserve_amount:
found_txs = False
# prefer confirmed, but save unconfirmed utxos from this selection in case they are needed
unconfirmed = []
for row in transaction.execute(txo_query, (floor, floor * multiplier, *accounts)):
(txid, txoid, raw, height, nout, verified, amount) = row.values()
found_txs = True
if txid not in decoded_transactions:
decoded_transactions[txid] = Transaction(raw)
decoded_tx = decoded_transactions[txid]
if not verified:
unconfirmed.append((txid, txoid, raw, height, nout, verified, amount))
continue
accumulated += amount
accumulated -= Input.spend(decoded_tx.outputs[nout]).size * fee_per_byte
txs[(raw, height, verified)].append(nout)
reserved.append(txoid)
if accumulated >= reserve_amount:
break
unconfirmed.reverse()
while unconfirmed:
(txid, txoid, raw, height, nout, verified, amount) = unconfirmed.pop()
decoded_tx = decoded_transactions[txid]
accumulated += amount
accumulated -= Input.spend(decoded_tx.outputs[nout]).size * fee_per_byte
txs[(raw, height, verified)].append(nout)
reserved.append(txoid)
if accumulated >= reserve_amount:
break
if not found_txs:
gap_count += 1
if gap_count == 5:
break
floor *= multiplier
# reserve the accumulated txos if enough were found
if accumulated >= reserve_amount:
transaction.executemany("UPDATE txo SET is_reserved = ? WHERE txoid = ?",
[(True, txoid) for txoid in reserved]).fetchall()
return txs
class Database(SQLiteMixin):
SCHEMA_VERSION = "1.3"
@ -666,6 +731,19 @@ class Database(SQLiteMixin):
# 2. update address histories removing deleted TXs
return True
async def get_spendable_utxos(self, ledger, reserve_amount, accounts: Optional[Iterable], min_amount: int = 100000,
fee_per_byte: int = 50) -> List:
to_spend = await self.db.run(
get_spendable_utxos, tuple(account.id for account in accounts), reserve_amount, min_amount,
fee_per_byte
)
txos = []
for (raw, height, verified), positions in to_spend.items():
tx = Transaction(raw, height=height, is_verified=verified)
for nout in positions:
txos.append(tx.outputs[nout].get_estimator(ledger))
return txos
async def select_transactions(self, cols, accounts=None, read_only=False, **constraints):
if not {'txid', 'txid__in'}.intersection(constraints):
assert accounts, "'accounts' argument required when no 'txid' constraint is present"

View file

@ -133,7 +133,7 @@ class Ledger(metaclass=LedgerRegistry):
self._on_transaction_controller = StreamController()
self.on_transaction = self._on_transaction_controller.stream
self.on_transaction.listen(
lambda e: log.info(
lambda e: log.debug(
'(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s',
self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id
)
@ -167,6 +167,7 @@ class Ledger(metaclass=LedgerRegistry):
self.coin_selection_strategy = None
self._known_addresses_out_of_sync = set()
self.went_out_of_sync = asyncio.Queue()
self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char)
self._balance_cache = pylru.lrucache(100000)
@ -244,11 +245,16 @@ class Ledger(metaclass=LedgerRegistry):
def get_address_count(self, **constraints):
return self.db.get_address_count(**constraints)
async def get_spendable_utxos(self, amount: int, funding_accounts):
async with self._utxo_reservation_lock:
txos = await self.get_effective_amount_estimators(funding_accounts)
async def get_spendable_utxos(self, amount: int, funding_accounts: Optional[Iterable['Account']],
min_amount=100000):
min_amount = min(amount // 10, min_amount)
fee = Output.pay_pubkey_hash(COIN, NULL_HASH32).get_fee(self)
selector = CoinSelector(amount, fee)
async with self._utxo_reservation_lock:
if self.coin_selection_strategy == 'sqlite':
return await self.db.get_spendable_utxos(self, amount + fee, funding_accounts, min_amount=min_amount,
fee_per_byte=self.fee_per_byte)
txos = await self.get_effective_amount_estimators(funding_accounts)
spendables = selector.select(txos, self.coin_selection_strategy)
if spendables:
await self.reserve_outputs(s.txo for s in spendables)
@ -492,12 +498,16 @@ class Ledger(metaclass=LedgerRegistry):
local_status, local_history = await self.get_local_status_and_history(address)
if local_status == remote_status:
log.info("no new txs needed for %s, remote matches local", address)
return True
remote_history = await self.network.retriable_call(self.network.get_history, address)
remote_history = list(map(itemgetter('tx_hash', 'height'), remote_history))
we_need = set(remote_history) - set(local_history)
if not we_need:
log.info(
"no new txs needed for %s, local status %s, remote status %s", address, local_status, remote_status
)
return True
cache_tasks: List[asyncio.Task[Transaction]] = []
@ -563,16 +573,35 @@ class Ledger(metaclass=LedgerRegistry):
await self.get_local_status_and_history(address, synced_history.getvalue())
if local_status != remote_status:
if local_history == remote_history:
return True
log.warning(
"Wallet is out of sync after syncing. Remote: %s with %d items, local: %s with %d items",
remote_status, len(remote_history), local_status, len(local_history)
"%s has a synced history but a mismatched status", address
)
log.warning("local: %s", local_history)
log.warning("remote: %s", remote_history)
return True
remote_set = set(remote_history)
local_set = set(local_history)
log.warning(
"%s is out of sync after syncing.\n"
"Remote: %s with %d items (%i unique), local: %s with %d items (%i unique).\n"
"Histories are mismatched on %i items.\n"
"Local is missing\n"
"%s\n"
"Remote is missing\n"
"%s\n"
"******",
address, remote_status, len(remote_history), len(remote_set),
local_status, len(local_history), len(local_set), len(remote_set.symmetric_difference(local_set)),
"\n".join([f"{txid} - {height}" for txid, height in remote_set.difference(local_set)]),
"\n".join([f"{txid} - {height}" for txid, height in local_set.difference(remote_set)])
)
# log.warning("local: %s", local_history)
# log.warning("remote: %s", remote_history)
self.went_out_of_sync.put_nowait(address)
self._known_addresses_out_of_sync.add(address)
return False
else:
log.warning(
"%s is synced", address
)
return True
async def cache_transaction(self, txid, remote_height, check_local=True):

View file

@ -302,6 +302,9 @@ class WalletManager:
await self.ledger.broadcast(tx)
if blocking:
await self.ledger.wait(tx, timeout=None)
except CodeMessageError as err:
log.warning("transaction rejected, leaving reserved")
raise
except:
await self.ledger.release_tx(tx)
raise

View file

@ -1,6 +1,7 @@
import logging
import asyncio
import json
import time
from time import perf_counter
from operator import itemgetter
from typing import Dict, Optional, Tuple
@ -54,8 +55,8 @@ class ClientSession(BaseClientSession):
return result
async def send_request(self, method, args=()):
log.info("%i in flight, send %s%s to %s:%i ", self.pending_amount, method, tuple(args), *self.server)
self.pending_amount += 1
log.debug("send %s%s to %s:%i", method, tuple(args), *self.server)
try:
if method == 'server.version':
return await self.send_timed_server_version_request(args, self.timeout)
@ -63,11 +64,11 @@ class ClientSession(BaseClientSession):
while not request.done():
done, pending = await asyncio.wait([request], timeout=self.timeout)
if pending:
log.debug("Time since last packet: %s", perf_counter() - self.last_packet_received)
log.info("Time since last packet: %s", perf_counter() - self.last_packet_received)
if (perf_counter() - self.last_packet_received) < self.timeout:
continue
log.info("timeout sending %s to %s:%i", method, *self.server)
raise asyncio.TimeoutError
log.warning("timeout sending %s(%s) to %s:%i", method, str(args), *self.server)
raise asyncio.TimeoutError()
if done:
try:
return request.result()
@ -91,6 +92,7 @@ class ClientSession(BaseClientSession):
raise
finally:
self.pending_amount -= 1
log.info("%i in flight, finished %s%s ", self.pending_amount, method, tuple(args))
async def ensure_session(self):
# Handles reconnecting and maintaining a session alive
@ -144,12 +146,12 @@ class ClientSession(BaseClientSession):
controller.add(request.args)
def connection_lost(self, exc):
log.debug("Connection lost: %s:%d", *self.server)
log.warning("Connection lost: %s:%d", *self.server)
super().connection_lost(exc)
self.response_time = None
self.connection_latency = None
self._response_samples = 0
self.pending_amount = 0
# self.pending_amount = 0
self._on_disconnect_controller.add(True)
@ -274,8 +276,13 @@ class Network:
return self.rpc('blockchain.block.headers', [height, count, 0, b64], restricted)
# --- Subscribes, history and broadcasts are always aimed towards the master client directly
def get_history(self, address):
return self.rpc('blockchain.address.get_history', [address], True)
async def get_history(self, address):
log.info("get history %s", address)
start = time.perf_counter()
try:
return await self.rpc('blockchain.address.get_history', [address], True)
finally:
log.info("%s history took %s", address, time.perf_counter() - start)
def broadcast(self, raw_transaction):
return self.rpc('blockchain.transaction.broadcast', [raw_transaction], True)
@ -285,16 +292,18 @@ class Network:
async def subscribe_address(self, address, *addresses):
addresses = list((address, ) + addresses)
server_addr_and_port = self.client.server_address_and_port # on disconnect client will be None
try:
return await self.rpc('blockchain.address.subscribe', addresses, True)
except asyncio.TimeoutError:
log.warning(
"timed out subscribing to addresses from %s:%i",
*self.client.server_address_and_port
*server_addr_and_port
)
# abort and cancel, we can't lose a subscription, it will happen again on reconnect
if self.client:
self.client.abort()
log.warning("raise cancelled")
raise asyncio.CancelledError()
def unsubscribe_address(self, address):

View file

@ -480,6 +480,7 @@ class RPCSession(SessionBase):
await event.wait()
result = event.result
if isinstance(result, Exception):
self.logger.error("%s sending %s(%s)", result, method, "" if not args else ",".join([str(a) for a in args]))
raise result
return result

View file

@ -183,6 +183,7 @@ class BlockProcessor:
self.state_lock = asyncio.Lock()
self.search_cache = {}
self.history_cache = {}
async def run_in_thread_with_lock(self, func, *args):
# Run in a thread to prevent blocking. Shielded so that
@ -213,6 +214,7 @@ class BlockProcessor:
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
for cache in self.search_cache.values():
cache.clear()
self.history_cache.clear()
await self._maybe_flush()
processed_time = time.perf_counter() - start
self.block_count_metric.set(self.height)
@ -754,6 +756,8 @@ class LBRYBlockProcessor(BlockProcessor):
self.logger.info(f"LbryumX Block Processor - Validating signatures: {self.should_validate_signatures}")
self.sql: SQLDB = self.db.sql
self.timer = Timer('BlockProcessor')
self.block_notify = asyncio.Event()
self.block_notify.set()
def advance_blocks(self, blocks):
self.sql.begin()

View file

@ -213,10 +213,18 @@ class MemPool:
continue
hashes = {hex_str_to_hash(hh) for hh in hex_hashes}
async with self.lock:
start = time.perf_counter()
touched = await self._process_mempool(hashes)
duration = time.perf_counter() - start
if duration >= 1:
self.logger.info("processed mempool in %s", duration)
synchronized_event.set()
synchronized_event.clear()
start = time.perf_counter()
await self.api.on_mempool(touched, height)
duration = time.perf_counter() - start
if duration >= 1:
self.logger.info("sent history notifications in %s", duration)
try:
# we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event)
await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs)

View file

@ -135,7 +135,7 @@ class SessionManager:
"docker_tag": DOCKER_TAG,
'version': lbry.__version__,
"min_version": util.version_string(VERSION.PROTOCOL_MIN),
"cpu_count": os.cpu_count()
"cpu_count": str(os.cpu_count())
})
session_count_metric = Gauge("session_count", "Number of connected client sessions", namespace=NAMESPACE,
labelnames=("version",))
@ -177,7 +177,7 @@ class SessionManager:
self.cur_group = SessionGroup(0)
self.txs_sent = 0
self.start_time = time.time()
self.history_cache = pylru.lrucache(256)
self.history_cache = self.bp.history_cache
self.notified_height: typing.Optional[int] = None
# Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0
@ -608,26 +608,20 @@ class SessionManager:
async def limited_history(self, hashX):
"""A caching layer."""
hc = self.history_cache
if hashX not in hc:
if hashX not in self.history_cache:
# History DoS limit. Each element of history is about 99
# bytes when encoded as JSON. This limits resource usage
# on bloated history requests, and uses a smaller divisor
# so large requests are logged before refusing them.
limit = self.env.max_send // 97
hc[hashX] = await self.db.limited_history(hashX, limit=limit)
return hc[hashX]
self.history_cache[hashX] = await self.db.limited_history(hashX, limit=limit)
return self.history_cache[hashX]
async def _notify_sessions(self, height, touched):
"""Notify sessions about height changes and touched addresses."""
height_changed = height != self.notified_height
if height_changed:
await self._refresh_hsub_results(height)
# Invalidate our history cache for touched hashXs
hc = self.history_cache
for hashX in set(hc).intersection(touched):
del hc[hashX]
if self.sessions:
await asyncio.wait([
session.notify(touched, height_changed) for session in self.sessions
@ -921,10 +915,15 @@ class LBRYElectrumX(SessionBase):
def sub_count(self):
return len(self.hashX_subs)
UGLY_COUNT = 0
async def notify(self, touched, height_changed):
"""Notify the client about changes to touched addresses (from mempool
updates or new blocks) and height.
"""
self.UGLY_COUNT += 1
if height_changed and self.subscribe_headers:
args = (await self.subscribe_headers_result(), )
try:
@ -940,6 +939,11 @@ class LBRYElectrumX(SessionBase):
for hashX in touched:
alias = self.hashX_subs[hashX]
if self.UGLY_COUNT == 25:
print('sleeping for ', hashX)
if not self.bp.block_notify.is_set():
await self.bp.block_notify.wait()
await asyncio.sleep(3)
status = await self.address_status(hashX)
changed[alias] = status
@ -960,13 +964,11 @@ class LBRYElectrumX(SessionBase):
method = 'blockchain.scripthash.subscribe'
else:
method = 'blockchain.address.subscribe'
try:
await self.send_notification(method, (alias, status))
except asyncio.TimeoutError:
self.logger.info("timeout sending address notification to %s", self.peer_address_str(for_log=True))
self.abort()
return
start = time.perf_counter()
if not self.bp.block_notify.is_set():
await self.bp.block_notify.wait()
t = asyncio.create_task(self.send_notification(method, (alias, status)))
t.add_done_callback(lambda _: self.logger.warning("sent notification to %s in %s", alias, time.perf_counter() - start))
if changed:
es = '' if len(changed) == 1 else 'es'
@ -1180,6 +1182,7 @@ class LBRYElectrumX(SessionBase):
"""
# Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if it has unconfirmed inputs, otherwise 0
db_history = await self.session_mgr.limited_history(hashX)
mempool = await self.mempool.transaction_summaries(hashX)

View file

@ -40,6 +40,8 @@ class ClaimTestCase(CommandTestCase):
class ClaimSearchCommand(ClaimTestCase):
VERBOSITY = logging.WARNING
async def create_channel(self):
self.channel = await self.channel_create('@abc', '1.0')
self.channel_id = self.get_claim_id(self.channel)
@ -157,6 +159,60 @@ class ClaimSearchCommand(ClaimTestCase):
await self.stream_abandon(txid=signed2['txid'], nout=0)
await self.assertFindsClaims([], channel_ids=[channel_id2])
async def test_break_it(self):
await self.generate(5)
address = await self.account.receiving.get_or_create_usable_address()
sendtxid = await self.blockchain.send_to_address(address, 1)
await self.confirm_tx(sendtxid)
address = await self.account.receiving.get_or_create_usable_address()
sendtxid = await self.blockchain.send_to_address(address, 1)
await self.confirm_tx(sendtxid)
address = await self.account.receiving.get_or_create_usable_address()
sendtxid = await self.blockchain.send_to_address(address, 1)
await self.confirm_tx(sendtxid)
address = await self.account.receiving.get_or_create_usable_address()
sendtxid = await self.blockchain.send_to_address(address, 1)
await self.confirm_tx(sendtxid)
await self.generate(7)
async def _doit(n):
try:
await self.daemon.jsonrpc_channel_create(
name=f'@arena{n}', bid='0.1', blocking=True
)
except InsufficientFundsError:
pass
def doit(n):
asyncio.create_task(_doit(n))
async def break_it():
count = 0
for _ in range(4):
for _ in range(10):
count += 1
if not count % 7:
asyncio.create_task(self.generate(1))
doit(count)
if self.ledger._known_addresses_out_of_sync:
print('out of sync', self.ledger._known_addresses_out_of_sync)
await asyncio.sleep(1)
await self.generate(1)
bp = self.conductor.spv_node.server.bp
break_task = asyncio.create_task(break_it())
address = await self.ledger.went_out_of_sync.get()
bp.block_notify.clear()
print('%s is out of sync' % address)
with self.assertRaises(InsufficientFundsError):
await self.daemon.jsonrpc_channel_create(
name=f'@derp', bid='0.1', blocking=True
)
self.assertTrue(False)
print("woohoo")
if not break_task.done():
break_task.cancel()
async def test_pagination(self):
await self.create_channel()
await self.create_lots_of_streams()

View file

@ -28,9 +28,10 @@ def mock_config():
class BlobExchangeTestBase(AsyncioTestCase):
async def asyncSetUp(self):
self.loop = asyncio.get_event_loop()
self.client_wallet_dir = tempfile.mkdtemp()
self.client_dir = tempfile.mkdtemp()
self.server_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.client_wallet_dir)
self.addCleanup(shutil.rmtree, self.client_dir)
self.addCleanup(shutil.rmtree, self.server_dir)
self.server_config = Config(data_dir=self.server_dir, download_dir=self.server_dir, wallet=self.server_dir,
@ -39,8 +40,8 @@ class BlobExchangeTestBase(AsyncioTestCase):
self.server_blob_manager = BlobManager(self.loop, self.server_dir, self.server_storage, self.server_config)
self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP')
self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir, wallet=self.client_dir,
fixed_peers=[])
self.client_config = Config(data_dir=self.client_dir, download_dir=self.client_dir,
wallet=self.client_wallet_dir, fixed_peers=[])
self.client_storage = SQLiteStorage(self.client_config, os.path.join(self.client_dir, "lbrynet.sqlite"))
self.client_blob_manager = BlobManager(self.loop, self.client_dir, self.client_storage, self.client_config)
self.client_peer_manager = PeerManager(self.loop)

View file

@ -65,7 +65,7 @@ def get_claim_transaction(claim_name, claim=b''):
)
async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
async def get_mock_wallet(sd_hash, storage, wallet_dir, balance=10.0, fee=None):
claim = Claim()
if fee:
if fee['currency'] == 'LBC':
@ -97,7 +97,7 @@ async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None):
wallet = Wallet()
ledger = Ledger({
'db': Database(':memory:'),
'db': Database(os.path.join(wallet_dir, 'blockchain.db')),
'headers': FakeHeaders(514082)
})
await ledger.db.open()
@ -136,7 +136,8 @@ class TestStreamManager(BlobExchangeTestBase):
self.loop, self.server_blob_manager.blob_dir, file_path, old_sort=old_sort
)
self.sd_hash = descriptor.sd_hash
self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, balance, fee)
self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, self.client_wallet_dir,
balance, fee)
analytics_manager = AnalyticsManager(
self.client_config,
binascii.hexlify(generate_id()).decode(),

View file

@ -1,4 +1,7 @@
import os
import unittest
import tempfile
import shutil
from binascii import hexlify, unhexlify
from itertools import cycle
@ -302,9 +305,11 @@ class TestTransactionSigning(AsyncioTestCase):
class TransactionIOBalancing(AsyncioTestCase):
async def asyncSetUp(self):
wallet_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, wallet_dir)
self.ledger = Ledger({
'db': Database(':memory:'),
'headers': Headers(':memory:')
'db': Database(os.path.join(wallet_dir, 'blockchain.db')),
'headers': Headers(':memory:'),
})
await self.ledger.db.open()
self.account = Account.from_dict(
@ -419,3 +424,68 @@ class TransactionIOBalancing(AsyncioTestCase):
self.assertListEqual([0.01, 1], self.inputs(tx))
# change is now needed to consume extra input
self.assertListEqual([0.97], self.outputs(tx))
async def test_basic_use_cases_sqlite(self):
self.ledger.coin_selection_strategy = 'sqlite'
self.ledger.fee_per_byte = int(.01*CENT)
# available UTXOs for filling missing inputs
utxos = await self.create_utxos([
1, 1, 3, 5, 10
])
# pay 3 coins (3.07 w/ fees)
tx = await self.tx(
[], # inputs
[self.txo(3)] # outputs
)
# there are 4x 1.0 utxos available
self.assertListEqual(self.inputs(tx), [1.0, 1.0, 1.0, 1.0])
# a change of 0.93 is added to reach balance
self.assertListEqual(self.outputs(tx), [3, 0.93])
await self.ledger.release_outputs(utxos)
# pay 6.917 coins (7.00 w/ fees)
tx = await self.tx(
[], # inputs
[self.txo(6.917)] # outputs
)
self.assertListEqual(self.inputs(tx), [1.0, 1.0, 1.0, 1.0, 3.0])
self.assertListEqual(self.outputs(tx), [6.92])
await self.ledger.release_outputs(utxos)
# supplied input and output, but input is not enough to cover output
tx = await self.tx(
[self.txi(self.txo(10))], # inputs
[self.txo(11)] # outputs
)
# additional input is chosen (UTXO 1)
self.assertListEqual([10, 1.0, 1.0], self.inputs(tx))
# change is now needed to consume extra input
self.assertListEqual([11, 0.95], self.outputs(tx))
await self.ledger.release_outputs(utxos)
# liquidating a UTXO
tx = await self.tx(
[self.txi(self.txo(10))], # inputs
[] # outputs
)
self.assertListEqual([10], self.inputs(tx))
# missing change added to consume the amount
self.assertListEqual([9.98], self.outputs(tx))
await self.ledger.release_outputs(utxos)
# liquidating at a loss, requires adding extra inputs
tx = await self.tx(
[self.txi(self.txo(0.01))], # inputs
[] # outputs
)
# UTXO 1 is added to cover some of the fee
self.assertListEqual([0.01, 1], self.inputs(tx))
# change is now needed to consume extra input
self.assertListEqual([0.97], self.outputs(tx))