lbry-sdk/lbry/db/database.py

331 lines
12 KiB
Python
Raw Normal View History

2020-05-01 15:29:44 +02:00
import os
2020-04-11 23:27:41 +02:00
import asyncio
2020-05-20 23:59:26 +02:00
import tempfile
2020-06-05 06:35:22 +02:00
import multiprocessing as mp
2020-06-22 01:51:09 +02:00
from typing import List, Optional, Iterable, Iterator, TypeVar, Generic, TYPE_CHECKING, Dict
2020-05-01 15:29:44 +02:00
from concurrent.futures import Executor, ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
2020-04-11 23:27:41 +02:00
2020-05-01 15:29:44 +02:00
from sqlalchemy import create_engine, text
2020-04-11 23:27:41 +02:00
2020-06-05 06:35:22 +02:00
from lbry.event import EventController
2020-05-01 15:29:44 +02:00
from lbry.crypto.bip32 import PubKey
from lbry.blockchain.transaction import Transaction, Output
2020-06-22 01:51:09 +02:00
from .constants import TXO_TYPES, CLAIM_TYPE_CODES
2020-06-28 04:24:59 +02:00
from .query_context import initialize, uninitialize, ProgressPublisher
2020-05-01 15:29:44 +02:00
from . import queries as q
2020-06-05 06:35:22 +02:00
from . import sync
2020-04-11 23:27:41 +02:00
if TYPE_CHECKING:
from lbry.blockchain.ledger import Ledger
2020-05-01 15:29:44 +02:00
def clean_wallet_account_ids(constraints):
wallet = constraints.pop('wallet', None)
account = constraints.pop('account', None)
2020-04-11 23:27:41 +02:00
accounts = constraints.pop('accounts', [])
2020-05-01 15:29:44 +02:00
if account and not accounts:
accounts = [account]
if wallet:
constraints['wallet_account_ids'] = [account.id for account in wallet.accounts]
if not accounts:
accounts = wallet.accounts
2020-04-11 23:27:41 +02:00
if accounts:
2020-05-01 15:29:44 +02:00
constraints['account_ids'] = [account.id for account in accounts]
2020-04-11 23:27:41 +02:00
async def add_channel_keys_to_txo_results(accounts: List, txos: Iterable[Output]):
2020-05-01 15:29:44 +02:00
sub_channels = set()
for txo in txos:
if txo.claim.is_channel:
for account in accounts:
private_key = await account.get_channel_private_key(
2020-05-01 15:29:44 +02:00
txo.claim.channel.public_key_bytes
)
if private_key:
txo.private_key = private_key
break
if txo.channel is not None:
sub_channels.add(txo.channel)
if sub_channels:
await add_channel_keys_to_txo_results(accounts, sub_channels)
2020-04-11 23:27:41 +02:00
2020-06-05 06:35:22 +02:00
ResultType = TypeVar('ResultType')
class Result(Generic[ResultType]):
__slots__ = 'rows', 'total', 'censor'
def __init__(self, rows: List[ResultType], total, censor=None):
self.rows = rows
self.total = total
self.censor = censor
def __getitem__(self, item: int) -> ResultType:
return self.rows[item]
def __iter__(self) -> Iterator[ResultType]:
return iter(self.rows)
def __len__(self):
return len(self.rows)
def __repr__(self):
return repr(self.rows)
2020-04-11 23:27:41 +02:00
class Database:
def __init__(self, ledger: 'Ledger'):
2020-06-05 06:35:22 +02:00
self.url = ledger.conf.db_url_or_default
2020-05-01 15:29:44 +02:00
self.ledger = ledger
self.workers = self._normalize_worker_processes(ledger.conf.workers)
2020-05-01 15:29:44 +02:00
self.executor: Optional[Executor] = None
2020-06-05 06:35:22 +02:00
self.message_queue = mp.Queue()
self.stop_event = mp.Event()
self._on_progress_controller = EventController()
self.on_progress = self._on_progress_controller.stream
self.progress_publisher = ProgressPublisher(
self.message_queue, self._on_progress_controller
)
@staticmethod
def _normalize_worker_processes(workers):
if workers == 0:
2020-06-05 06:35:22 +02:00
return os.cpu_count()
elif workers > 0:
return workers
2020-06-05 06:35:22 +02:00
return 1
2020-04-12 02:01:10 +02:00
2020-06-28 04:24:59 +02:00
@classmethod
def temp_from_url_regtest(cls, db_url, lbrycrd_dir=None):
from lbry import Config, RegTestLedger # pylint: disable=import-outside-toplevel
directory = tempfile.mkdtemp()
conf = Config.with_same_dir(directory).set(db_url=db_url)
if lbrycrd_dir is not None:
conf.lbrycrd_dir = lbrycrd_dir
ledger = RegTestLedger(conf)
return cls(ledger)
2020-05-20 23:59:26 +02:00
@classmethod
2020-06-05 06:35:22 +02:00
def temp_sqlite_regtest(cls, lbrycrd_dir=None):
from lbry import Config, RegTestLedger # pylint: disable=import-outside-toplevel
2020-05-20 23:59:26 +02:00
directory = tempfile.mkdtemp()
2020-09-17 02:02:34 +02:00
conf = Config.with_same_dir(directory).set(blockchain="regtest")
2020-06-05 06:35:22 +02:00
if lbrycrd_dir is not None:
conf.lbrycrd_dir = lbrycrd_dir
2020-05-20 23:59:26 +02:00
ledger = RegTestLedger(conf)
2020-06-05 06:35:22 +02:00
return cls(ledger)
2020-05-20 23:59:26 +02:00
2020-06-19 20:28:34 +02:00
@classmethod
def temp_sqlite(cls):
from lbry import Config, Ledger # pylint: disable=import-outside-toplevel
conf = Config.with_same_dir(tempfile.mkdtemp())
return cls(Ledger(conf))
@classmethod
2020-06-28 04:24:59 +02:00
def from_url(cls, db_url):
2020-06-05 06:35:22 +02:00
from lbry import Config, Ledger # pylint: disable=import-outside-toplevel
2020-07-12 00:18:33 +02:00
return cls(Ledger(Config.with_null_dir().set(db_url=db_url)))
2020-06-28 04:24:59 +02:00
@classmethod
def in_memory(cls):
return cls.from_url('sqlite:///:memory:')
2020-05-01 15:29:44 +02:00
def sync_create(self, name):
engine = create_engine(self.url)
db = engine.connect()
db.execute(text("COMMIT"))
db.execute(text(f"CREATE DATABASE {name}"))
2020-04-11 23:27:41 +02:00
2020-05-01 15:29:44 +02:00
async def create(self, name):
2020-08-20 16:43:44 +02:00
return await asyncio.get_running_loop().run_in_executor(None, self.sync_create, name)
2020-04-11 23:27:41 +02:00
2020-05-01 15:29:44 +02:00
def sync_drop(self, name):
engine = create_engine(self.url)
db = engine.connect()
db.execute(text("COMMIT"))
db.execute(text(f"DROP DATABASE IF EXISTS {name}"))
2020-04-11 23:27:41 +02:00
2020-05-01 15:29:44 +02:00
async def drop(self, name):
2020-08-20 16:43:44 +02:00
return await asyncio.get_running_loop().run_in_executor(None, self.sync_drop, name)
2020-04-11 23:27:41 +02:00
async def open(self):
2020-05-01 15:29:44 +02:00
assert self.executor is None, "Database already open."
2020-06-05 06:35:22 +02:00
self.progress_publisher.start()
kwargs = {
"initializer": initialize,
"initargs": (
self.ledger,
self.message_queue, self.stop_event
2020-05-01 15:29:44 +02:00
)
2020-06-05 06:35:22 +02:00
}
if self.workers > 1:
self.executor = ProcessPoolExecutor(max_workers=self.workers, **kwargs)
2020-05-01 15:29:44 +02:00
else:
2020-06-05 06:35:22 +02:00
self.executor = ThreadPoolExecutor(max_workers=1, **kwargs)
2020-07-12 00:18:33 +02:00
return await self.run(q.check_version_and_create_tables)
2020-04-11 23:27:41 +02:00
async def close(self):
2020-06-05 06:35:22 +02:00
self.progress_publisher.stop()
2020-05-01 15:29:44 +02:00
if self.executor is not None:
2020-06-28 04:24:59 +02:00
if isinstance(self.executor, ThreadPoolExecutor):
2020-07-12 00:18:33 +02:00
await self.run(uninitialize)
2020-05-01 15:29:44 +02:00
self.executor.shutdown()
self.executor = None
2020-08-20 16:43:44 +02:00
# fixes "OSError: handle is closed"
# seems to only happen when running in PyCharm
# https://github.com/python/cpython/pull/6084#issuecomment-564585446
# TODO: delete this in Python 3.8/3.9?
2020-08-20 16:46:28 +02:00
from concurrent.futures.process import _threads_wakeups # pylint: disable=import-outside-toplevel
2020-08-20 16:43:44 +02:00
_threads_wakeups.clear()
2020-05-01 15:29:44 +02:00
2020-07-12 00:18:33 +02:00
async def run(self, func, *args, **kwargs):
2020-05-01 15:29:44 +02:00
if kwargs:
clean_wallet_account_ids(kwargs)
2020-08-20 16:43:44 +02:00
return await asyncio.get_running_loop().run_in_executor(
2020-05-01 15:29:44 +02:00
self.executor, partial(func, *args, **kwargs)
2020-04-11 23:27:41 +02:00
)
2020-06-05 06:35:22 +02:00
async def fetch_result(self, func, *args, **kwargs) -> Result:
2020-07-12 00:18:33 +02:00
rows, total = await self.run(func, *args, **kwargs)
2020-06-05 06:35:22 +02:00
return Result(rows, total)
2020-05-18 14:14:30 +02:00
async def execute(self, sql):
2020-07-12 00:18:33 +02:00
return await self.run(q.execute, sql)
2020-05-18 14:14:30 +02:00
2020-05-01 15:29:44 +02:00
async def execute_fetchall(self, sql):
2020-07-12 00:18:33 +02:00
return await self.run(q.execute_fetchall, sql)
2020-04-11 23:27:41 +02:00
async def has_filters(self):
return await self.run(q.has_filters)
2020-07-12 00:18:33 +02:00
async def has_claims(self):
return await self.run(q.has_claims)
2020-06-05 06:35:22 +02:00
2020-07-12 00:18:33 +02:00
async def has_supports(self):
2020-07-12 14:55:25 +02:00
return await self.run(q.has_supports)
2020-04-11 23:27:41 +02:00
2020-07-12 00:18:33 +02:00
async def get_best_block_height(self) -> int:
return await self.run(q.get_best_block_height)
2020-04-11 23:27:41 +02:00
2020-07-12 00:18:33 +02:00
async def process_all_things_after_sync(self):
return await self.run(sync.process_all_things_after_sync)
2020-04-11 23:27:41 +02:00
2020-06-19 20:28:34 +02:00
async def insert_block(self, block):
2020-07-12 00:18:33 +02:00
return await self.run(q.insert_block, block)
2020-06-19 20:28:34 +02:00
2020-06-05 06:35:22 +02:00
async def insert_transaction(self, block_hash, tx):
2020-07-12 00:18:33 +02:00
return await self.run(q.insert_transaction, block_hash, tx)
2020-04-11 23:27:41 +02:00
2020-05-01 15:29:44 +02:00
async def update_address_used_times(self, addresses):
2020-07-12 00:18:33 +02:00
return await self.run(q.update_address_used_times, addresses)
2020-04-11 23:27:41 +02:00
async def reserve_outputs(self, txos, is_reserved=True):
txo_hashes = [txo.hash for txo in txos]
if txo_hashes:
2020-07-12 00:18:33 +02:00
return await self.run(
2020-05-01 15:29:44 +02:00
q.reserve_outputs, txo_hashes, is_reserved
2020-04-11 23:27:41 +02:00
)
async def release_outputs(self, txos):
2020-05-01 15:29:44 +02:00
return await self.reserve_outputs(txos, is_reserved=False)
2020-04-11 23:27:41 +02:00
2020-05-01 15:29:44 +02:00
async def release_tx(self, tx):
return await self.release_outputs([txi.txo_ref.txo for txi in tx.inputs])
2020-04-11 23:27:41 +02:00
2020-05-01 15:29:44 +02:00
async def release_all_outputs(self, account):
2020-07-12 00:18:33 +02:00
return await self.run(q.release_all_outputs, account.id)
2020-04-11 23:27:41 +02:00
2020-05-01 15:29:44 +02:00
async def get_balance(self, **constraints):
2020-07-12 00:18:33 +02:00
return await self.run(q.get_balance, **constraints)
2020-04-11 23:27:41 +02:00
async def get_report(self, accounts):
2020-07-12 00:18:33 +02:00
return await self.run(q.get_report, accounts=accounts)
2020-04-11 23:27:41 +02:00
2020-06-05 06:35:22 +02:00
async def get_addresses(self, **constraints) -> Result[dict]:
addresses = await self.fetch_result(q.get_addresses, **constraints)
2020-05-01 15:29:44 +02:00
if addresses and 'pubkey' in addresses[0]:
2020-04-11 23:27:41 +02:00
for address in addresses:
address['pubkey'] = PubKey(
self.ledger, bytes(address.pop('pubkey')), bytes(address.pop('chain_code')),
address.pop('n'), address.pop('depth')
)
2020-06-05 06:35:22 +02:00
return addresses
2020-04-11 23:27:41 +02:00
2020-05-01 15:29:44 +02:00
async def get_all_addresses(self):
2020-07-12 00:18:33 +02:00
return await self.run(q.get_all_addresses)
2020-04-11 23:27:41 +02:00
async def get_address(self, **constraints):
2020-06-05 06:35:22 +02:00
for address in await self.get_addresses(limit=1, **constraints):
return address
2020-04-11 23:27:41 +02:00
async def add_keys(self, account, chain, pubkeys):
2020-07-12 00:18:33 +02:00
return await self.run(q.add_keys, account, chain, pubkeys)
2020-04-11 23:27:41 +02:00
2020-06-05 06:35:22 +02:00
async def get_transactions(self, **constraints) -> Result[Transaction]:
return await self.fetch_result(q.get_transactions, **constraints)
2020-04-11 23:27:41 +02:00
2020-05-01 15:29:44 +02:00
async def get_transaction(self, **constraints) -> Optional[Transaction]:
2020-06-05 06:35:22 +02:00
txs = await self.get_transactions(limit=1, **constraints)
2020-05-01 15:29:44 +02:00
if txs:
return txs[0]
2020-04-11 23:27:41 +02:00
2020-06-05 06:35:22 +02:00
async def get_purchases(self, **constraints) -> Result[Output]:
return await self.fetch_result(q.get_purchases, **constraints)
2020-04-11 23:27:41 +02:00
2020-06-05 06:35:22 +02:00
async def search_claims(self, **constraints) -> Result[Output]:
2020-06-19 20:28:34 +02:00
#assert set(constraints).issubset(SEARCH_PARAMS), \
# f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}"
2020-07-12 00:18:33 +02:00
claims, total, censor = await self.run(q.search_claims, **constraints)
2020-06-05 06:35:22 +02:00
return Result(claims, total, censor)
2020-04-11 23:27:41 +02:00
2020-08-04 19:49:59 +02:00
async def protobuf_search_claims(self, **constraints) -> str:
return await self.run(q.protobuf_search_claims, **constraints)
2020-06-22 01:51:09 +02:00
async def search_supports(self, **constraints) -> Result[Output]:
return await self.fetch_result(q.search_supports, **constraints)
2020-08-04 16:41:49 +02:00
async def resolve(self, urls, **kwargs) -> Dict[str, Output]:
return await self.run(q.resolve, urls, **kwargs)
2020-08-04 17:47:03 +02:00
async def protobuf_resolve(self, urls, **kwargs) -> str:
2020-08-04 16:41:49 +02:00
return await self.run(q.protobuf_resolve, urls, **kwargs)
2020-06-22 01:51:09 +02:00
2020-06-05 06:35:22 +02:00
async def get_txo_sum(self, **constraints) -> int:
2020-07-12 00:18:33 +02:00
return await self.run(q.get_txo_sum, **constraints)
2020-04-11 23:27:41 +02:00
2020-06-05 06:35:22 +02:00
async def get_txo_plot(self, **constraints) -> List[dict]:
2020-07-12 00:18:33 +02:00
return await self.run(q.get_txo_plot, **constraints)
2020-04-11 23:27:41 +02:00
2020-06-05 06:35:22 +02:00
async def get_txos(self, **constraints) -> Result[Output]:
txos = await self.fetch_result(q.get_txos, **constraints)
2020-05-01 15:29:44 +02:00
if 'wallet' in constraints:
await add_channel_keys_to_txo_results(constraints['wallet'].accounts, txos)
2020-06-05 06:35:22 +02:00
return txos
2020-04-11 23:27:41 +02:00
2020-06-05 06:35:22 +02:00
async def get_utxos(self, **constraints) -> Result[Output]:
2020-07-06 05:03:45 +02:00
return await self.get_txos(spent_height=0, **constraints)
2020-04-11 23:27:41 +02:00
2020-06-05 06:35:22 +02:00
async def get_supports(self, **constraints) -> Result[Output]:
2020-05-01 15:29:44 +02:00
return await self.get_utxos(txo_type=TXO_TYPES['support'], **constraints)
2020-04-11 23:27:41 +02:00
2020-06-05 06:35:22 +02:00
async def get_claims(self, **constraints) -> Result[Output]:
2020-06-19 20:28:34 +02:00
if 'txo_type' not in constraints:
2020-06-22 01:51:09 +02:00
constraints['txo_type__in'] = CLAIM_TYPE_CODES
2020-06-19 20:28:34 +02:00
txos = await self.fetch_result(q.get_txos, **constraints)
2020-05-01 15:29:44 +02:00
if 'wallet' in constraints:
await add_channel_keys_to_txo_results(constraints['wallet'].accounts, txos)
2020-06-05 06:35:22 +02:00
return txos
2020-04-11 23:27:41 +02:00
2020-06-05 06:35:22 +02:00
async def get_streams(self, **constraints) -> Result[Output]:
2020-05-01 15:29:44 +02:00
return await self.get_claims(txo_type=TXO_TYPES['stream'], **constraints)
2020-04-11 23:27:41 +02:00
2020-06-05 06:35:22 +02:00
async def get_channels(self, **constraints) -> Result[Output]:
2020-05-01 15:29:44 +02:00
return await self.get_claims(txo_type=TXO_TYPES['channel'], **constraints)
2020-04-11 23:27:41 +02:00
2020-06-05 06:35:22 +02:00
async def get_collections(self, **constraints) -> Result[Output]:
2020-05-01 15:29:44 +02:00
return await self.get_claims(txo_type=TXO_TYPES['collection'], **constraints)