forked from LBRYCommunity/lbry-sdk
Compare commits
20 commits
master
...
spv-rocksd
Author | SHA1 | Date | |
---|---|---|---|
|
3fff5ccb49 | ||
|
aa0f49633a | ||
|
ede167bb56 | ||
|
c03b1b5a93 | ||
|
68a97b0e61 | ||
|
f86d940f20 | ||
|
314f8b0d42 | ||
|
5ec8f355d4 | ||
|
f46bf0924a | ||
|
6cbe559778 | ||
|
4ab2d7e624 | ||
|
a82abb33ac | ||
|
70e5ce4806 | ||
|
d960ba7412 | ||
|
18340c248d | ||
|
9012db0cfb | ||
|
7143b475a1 | ||
|
976387fefb | ||
|
1c01faed28 | ||
|
df3254b371 |
12 changed files with 520 additions and 375 deletions
|
@ -28,24 +28,32 @@ stages:
|
|||
test:lint:
|
||||
stage: test
|
||||
script:
|
||||
- apt-get update
|
||||
- apt-get install -y liblz4-dev libsnappy-dev librocksdb-dev
|
||||
- make install tools
|
||||
- make lint
|
||||
|
||||
test:unit:
|
||||
stage: test
|
||||
script:
|
||||
- apt-get update
|
||||
- apt-get install -y liblz4-dev libsnappy-dev librocksdb-dev
|
||||
- make install tools
|
||||
- HOME=/tmp coverage run -p --source=lbry -m unittest discover -vv tests.unit
|
||||
|
||||
test:datanetwork-integration:
|
||||
stage: test
|
||||
script:
|
||||
- apt-get update
|
||||
- apt-get install -y liblz4-dev libsnappy-dev librocksdb-dev
|
||||
- pip install tox-travis
|
||||
- tox -e datanetwork --recreate
|
||||
|
||||
test:blockchain-integration:
|
||||
stage: test
|
||||
script:
|
||||
- apt-get update
|
||||
- apt-get install -y liblz4-dev libsnappy-dev librocksdb-dev
|
||||
- pip install tox-travis
|
||||
- tox -e blockchain
|
||||
|
||||
|
@ -53,13 +61,15 @@ test:other-integration:
|
|||
stage: test
|
||||
script:
|
||||
- apt-get update
|
||||
- apt-get install -y --no-install-recommends ffmpeg
|
||||
- apt-get install -y --no-install-recommends ffmpeg liblz4-dev libsnappy-dev librocksdb-dev
|
||||
- pip install tox-travis
|
||||
- tox -e other
|
||||
|
||||
test:json-api:
|
||||
stage: test
|
||||
script:
|
||||
- apt-get update
|
||||
- apt-get install -y liblz4-dev libsnappy-dev librocksdb-dev
|
||||
- make install tools
|
||||
- HOME=/tmp coverage run -p --source=lbry scripts/generate_json_api.py
|
||||
|
||||
|
|
|
@ -10,6 +10,10 @@ ENV DOCKER_TAG=$DOCKER_TAG DOCKER_COMMIT=$DOCKER_COMMIT
|
|||
|
||||
RUN apt-get update && \
|
||||
apt-get -y --no-install-recommends install \
|
||||
liblz4-dev \
|
||||
libbz2-dev \
|
||||
libsnappy-dev \
|
||||
librocksdb-dev \
|
||||
wget \
|
||||
tar unzip \
|
||||
build-essential \
|
||||
|
@ -31,7 +35,8 @@ RUN chown -R $user:$user $projects_dir
|
|||
USER $user
|
||||
WORKDIR $projects_dir
|
||||
|
||||
RUN pip install uvloop
|
||||
RUN pip install uvloop cython
|
||||
RUN pip install python-rocksdb
|
||||
RUN make install
|
||||
RUN python3 docker/set_build.py
|
||||
RUN rm ~/.cache -rf
|
||||
|
@ -48,5 +53,6 @@ ENV DB_DIRECTORY=$db_dir
|
|||
ENV MAX_SESSIONS=1000000000
|
||||
ENV MAX_SEND=1000000000000000000
|
||||
ENV EVENT_LOOP_POLICY=uvloop
|
||||
ENV DB_ENGINE=rocksdb
|
||||
COPY ./docker/wallet_server_entrypoint.sh /entrypoint.sh
|
||||
ENTRYPOINT ["/entrypoint.sh"]
|
||||
|
|
|
@ -131,7 +131,7 @@ class Ledger(metaclass=LedgerRegistry):
|
|||
self._on_transaction_controller = StreamController()
|
||||
self.on_transaction = self._on_transaction_controller.stream
|
||||
self.on_transaction.listen(
|
||||
lambda e: log.info(
|
||||
lambda e: log.debug(
|
||||
'(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s',
|
||||
self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id
|
||||
)
|
||||
|
|
|
@ -10,7 +10,7 @@ from lbry.wallet.server.db.writer import SQLDB
|
|||
from lbry.wallet.server.daemon import DaemonError
|
||||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||
from lbry.wallet.server.util import chunks, class_logger
|
||||
from lbry.wallet.server.leveldb import FlushData
|
||||
from lbry.wallet.server.leveldb import FlushData, reopen_rocksdb_ctx, UTXO_PREFIX, HASHX_UTXO_PREFIX
|
||||
|
||||
|
||||
class Prefetcher:
|
||||
|
@ -275,6 +275,7 @@ class BlockProcessor:
|
|||
await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
|
||||
await self.prefetcher.reset_height(self.height)
|
||||
self.reorg_count_metric.inc()
|
||||
await reopen_rocksdb_ctx(self.db.executor)
|
||||
|
||||
async def reorg_hashes(self, count):
|
||||
"""Return a pair (start, last, hashes) of blocks to back up during a
|
||||
|
@ -289,7 +290,7 @@ class BlockProcessor:
|
|||
self.logger.info(f'chain was reorganised replacing {count:,d} '
|
||||
f'block{s} at heights {start:,d}-{last:,d}')
|
||||
|
||||
return start, last, await self.db.fs_block_hashes(start, count)
|
||||
return start, last, self.db.fs_block_hashes(start, count)
|
||||
|
||||
async def calc_reorg_range(self, count: Optional[int]):
|
||||
"""Calculate the reorg range"""
|
||||
|
@ -307,7 +308,7 @@ class BlockProcessor:
|
|||
start = self.height - 1
|
||||
count = 1
|
||||
while start > 0:
|
||||
hashes = await self.db.fs_block_hashes(start, count)
|
||||
hashes = self.db.fs_block_hashes(start, count)
|
||||
hex_hashes = [hash_to_hex_str(hash) for hash in hashes]
|
||||
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
|
||||
n = diff_pos(hex_hashes, d_hex_hashes)
|
||||
|
@ -346,6 +347,7 @@ class BlockProcessor:
|
|||
self.db.flush_dbs(self.flush_data(), flush_utxos,
|
||||
self.estimate_txs_remaining)
|
||||
await self.run_in_thread_with_lock(flush)
|
||||
await reopen_rocksdb_ctx(self.db.executor)
|
||||
|
||||
async def _maybe_flush(self):
|
||||
# If caught up, flush everything as client queries are
|
||||
|
@ -584,9 +586,9 @@ class BlockProcessor:
|
|||
|
||||
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
||||
# Value: hashX
|
||||
prefix = b'h' + tx_hash[:4] + idx_packed
|
||||
prefix = HASHX_UTXO_PREFIX + tx_hash[:4] + idx_packed
|
||||
candidates = {db_key: hashX for db_key, hashX
|
||||
in self.db.utxo_db.iterator(prefix=prefix)}
|
||||
in self.db.db.iterator(prefix=prefix)}
|
||||
|
||||
for hdb_key, hashX in candidates.items():
|
||||
tx_num_packed = hdb_key[-4:]
|
||||
|
@ -600,8 +602,8 @@ class BlockProcessor:
|
|||
|
||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
udb_key = b'u' + hashX + hdb_key[-6:]
|
||||
utxo_value_packed = self.db.utxo_db.get(udb_key)
|
||||
udb_key = UTXO_PREFIX + hashX + hdb_key[-6:]
|
||||
utxo_value_packed = self.db.db.get(udb_key)
|
||||
if utxo_value_packed:
|
||||
# Remove both entries for this UTXO
|
||||
self.db_deletes.append(hdb_key)
|
||||
|
|
|
@ -36,7 +36,7 @@ class Env:
|
|||
self.loop_policy = self.set_event_loop_policy()
|
||||
self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK'])
|
||||
self.db_dir = self.required('DB_DIRECTORY')
|
||||
self.db_engine = self.default('DB_ENGINE', 'leveldb')
|
||||
self.db_engine = self.default('DB_ENGINE', 'rocksdb')
|
||||
self.trending_algorithms = [
|
||||
trending for trending in set(self.default('TRENDING_ALGORITHMS', 'zscore').split(' ')) if trending
|
||||
]
|
||||
|
|
|
@ -20,6 +20,10 @@ from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from
|
|||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||
|
||||
|
||||
HASHX_HISTORY_PREFIX = b'x'
|
||||
HIST_STATE = b'state-hist'
|
||||
|
||||
|
||||
class History:
|
||||
|
||||
DB_VERSIONS = [0]
|
||||
|
@ -32,8 +36,8 @@ class History:
|
|||
self.unflushed_count = 0
|
||||
self.db = None
|
||||
|
||||
def open_db(self, db_class, for_sync, utxo_flush_count, compacting):
|
||||
self.db = db_class('hist', for_sync)
|
||||
def open_db(self, db, for_sync, utxo_flush_count, compacting):
|
||||
self.db = db #db_class('hist', for_sync)
|
||||
self.read_state()
|
||||
self.clear_excess(utxo_flush_count)
|
||||
# An incomplete compaction needs to be cancelled otherwise
|
||||
|
@ -44,11 +48,11 @@ class History:
|
|||
|
||||
def close_db(self):
|
||||
if self.db:
|
||||
self.db.close()
|
||||
# self.db.close()
|
||||
self.db = None
|
||||
|
||||
def read_state(self):
|
||||
state = self.db.get(b'state\0\0')
|
||||
state = self.db.get(HIST_STATE)
|
||||
if state:
|
||||
state = ast.literal_eval(state.decode())
|
||||
if not isinstance(state, dict):
|
||||
|
@ -80,17 +84,18 @@ class History:
|
|||
'excess history flushes...')
|
||||
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=b''):
|
||||
flush_id, = unpack_be_uint16_from(key[-2:])
|
||||
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
|
||||
k = key[1:]
|
||||
flush_id, = unpack_be_uint16_from(k[-2:])
|
||||
if flush_id > utxo_flush_count:
|
||||
keys.append(key)
|
||||
keys.append(k)
|
||||
|
||||
self.logger.info(f'deleting {len(keys):,d} history entries')
|
||||
|
||||
self.flush_count = utxo_flush_count
|
||||
with self.db.write_batch() as batch:
|
||||
for key in keys:
|
||||
batch.delete(key)
|
||||
batch.delete(HASHX_HISTORY_PREFIX + key)
|
||||
self.write_state(batch)
|
||||
|
||||
self.logger.info('deleted excess history entries')
|
||||
|
@ -105,7 +110,7 @@ class History:
|
|||
}
|
||||
# History entries are not prefixed; the suffix \0\0 ensures we
|
||||
# look similar to other entries and aren't interfered with
|
||||
batch.put(b'state\0\0', repr(state).encode())
|
||||
batch.put(HIST_STATE, repr(state).encode())
|
||||
|
||||
def add_unflushed(self, hashXs_by_tx, first_tx_num):
|
||||
unflushed = self.unflushed
|
||||
|
@ -132,7 +137,7 @@ class History:
|
|||
with self.db.write_batch() as batch:
|
||||
for hashX in sorted(unflushed):
|
||||
key = hashX + flush_id
|
||||
batch.put(key, unflushed[hashX].tobytes())
|
||||
batch.put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes())
|
||||
self.write_state(batch)
|
||||
|
||||
count = len(unflushed)
|
||||
|
@ -154,16 +159,17 @@ class History:
|
|||
for hashX in sorted(hashXs):
|
||||
deletes = []
|
||||
puts = {}
|
||||
for key, hist in self.db.iterator(prefix=hashX, reverse=True):
|
||||
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
|
||||
k = key[1:]
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
# Remove all history entries >= tx_count
|
||||
idx = bisect_left(a, tx_count)
|
||||
nremoves += len(a) - idx
|
||||
if idx > 0:
|
||||
puts[key] = a[:idx].tobytes()
|
||||
puts[k] = a[:idx].tobytes()
|
||||
break
|
||||
deletes.append(key)
|
||||
deletes.append(k)
|
||||
|
||||
for key in deletes:
|
||||
batch.delete(key)
|
||||
|
@ -221,9 +227,9 @@ class History:
|
|||
with self.db.write_batch() as batch:
|
||||
# Important: delete first! The keyspace may overlap.
|
||||
for key in keys_to_delete:
|
||||
batch.delete(key)
|
||||
batch.delete(HASHX_HISTORY_PREFIX + key)
|
||||
for key, value in write_items:
|
||||
batch.put(key, value)
|
||||
batch.put(HASHX_HISTORY_PREFIX + key, value)
|
||||
self.write_state(batch)
|
||||
|
||||
def _compact_hashX(self, hashX, hist_map, hist_list,
|
||||
|
@ -271,11 +277,12 @@ class History:
|
|||
|
||||
key_len = HASHX_LEN + 2
|
||||
write_size = 0
|
||||
for key, hist in self.db.iterator(prefix=prefix):
|
||||
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix):
|
||||
k = key[1:]
|
||||
# Ignore non-history entries
|
||||
if len(key) != key_len:
|
||||
if len(k) != key_len:
|
||||
continue
|
||||
hashX = key[:-2]
|
||||
hashX = k[:-2]
|
||||
if hashX != prior_hashX and prior_hashX:
|
||||
write_size += self._compact_hashX(prior_hashX, hist_map,
|
||||
hist_list, write_items,
|
||||
|
@ -283,7 +290,7 @@ class History:
|
|||
hist_map.clear()
|
||||
hist_list.clear()
|
||||
prior_hashX = hashX
|
||||
hist_map[key] = hist
|
||||
hist_map[k] = hist
|
||||
hist_list.append(hist)
|
||||
|
||||
if prior_hashX:
|
||||
|
|
|
@ -23,17 +23,19 @@ from bisect import bisect_right
|
|||
from collections import namedtuple
|
||||
from glob import glob
|
||||
from struct import pack, unpack
|
||||
from contextvars import ContextVar
|
||||
from dataclasses import dataclass
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from concurrent.futures.process import ProcessPoolExecutor
|
||||
import attr
|
||||
|
||||
from lbry.wallet.server import util
|
||||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||
from lbry.wallet.server.merkle import Merkle, MerkleCache
|
||||
from lbry.wallet.server.util import formatted_time
|
||||
from lbry.wallet.server.storage import db_class
|
||||
from lbry.wallet.server.util import formatted_time, unpack_be_uint64, unpack_le_int32_from, pack_le_int32
|
||||
from lbry.wallet.server.storage import db_class, RocksDB
|
||||
from lbry.wallet.server.history import History
|
||||
|
||||
|
||||
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
|
||||
HEADER_PREFIX = b'H'
|
||||
TX_COUNT_PREFIX = b'T'
|
||||
|
@ -41,8 +43,302 @@ TX_HASH_PREFIX = b'X'
|
|||
TX_PREFIX = b'B'
|
||||
TX_NUM_PREFIX = b'N'
|
||||
BLOCK_HASH_PREFIX = b'C'
|
||||
HISTORY_PREFIX = b'A'
|
||||
HASHX_UTXO_PREFIX = b'h'
|
||||
UTXO_PREFIX = b'u'
|
||||
HASHX_HISTORY_PREFIX = b'x'
|
||||
UNDO_PREFIX = b'U'
|
||||
UTXO_STATE = b'state-utxo'
|
||||
HIST_STATE = b'state-hist'
|
||||
TX_COUNTS_STATE = b'state-tx-counts'
|
||||
|
||||
|
||||
@dataclass
|
||||
class RocksReaderContext:
|
||||
db: 'RocksDB'
|
||||
db_dir: str
|
||||
name: str
|
||||
merkle: Merkle
|
||||
tx_counts: List[int]
|
||||
block_txs_cache: pylru.lrucache
|
||||
merkle_tx_cache: pylru.lrucache
|
||||
txid_cache: pylru.lrucache
|
||||
|
||||
def close(self):
|
||||
self.db.close()
|
||||
|
||||
def reopen(self):
|
||||
self.db.close()
|
||||
self.db.open(self.name, create=False, read_only=True)
|
||||
|
||||
def update_state(self):
|
||||
tx_counts = array.array("L")
|
||||
counts = self.db.get(TX_COUNTS_STATE)
|
||||
if not counts:
|
||||
self.tx_counts = list(map(unpack_be_uint64, self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)))
|
||||
else:
|
||||
tx_counts.frombytes(counts)
|
||||
self.tx_counts = tx_counts.tolist()
|
||||
|
||||
def ctx_tx_hash(self, tx_num):
|
||||
tx_height = bisect_right(self.tx_counts, tx_num)
|
||||
if tx_height > len(self.tx_counts):
|
||||
return None, tx_height
|
||||
key = TX_HASH_PREFIX + util.pack_be_uint64(tx_num)
|
||||
if key in self.txid_cache:
|
||||
return self.txid_cache[key], tx_height
|
||||
tx_hash = self.db.get(key)
|
||||
if tx_height + 100 <= len(self.tx_counts):
|
||||
self.txid_cache[key] = tx_hash
|
||||
return tx_hash, tx_height
|
||||
|
||||
|
||||
proc_ctx: ContextVar[Optional[RocksReaderContext]] = ContextVar('proc_ctx')
|
||||
|
||||
|
||||
def _update_rocksdb_ctx():
|
||||
ctx = proc_ctx.get()
|
||||
ctx.update_state()
|
||||
|
||||
|
||||
def _reopen_rocksdb_ctx():
|
||||
ctx = proc_ctx.get()
|
||||
ctx.db.close()
|
||||
ctx.db = RocksDB(ctx.db_dir, ctx.name, for_sync=False, read_only=True)
|
||||
ctx.update_state()
|
||||
|
||||
|
||||
async def update_rocksdb_ctx(executor: ProcessPoolExecutor):
|
||||
loop = asyncio.get_event_loop()
|
||||
await asyncio.wait([loop.run_in_executor(executor, _update_rocksdb_ctx) for _ in range(executor._max_workers)])
|
||||
|
||||
|
||||
async def reopen_rocksdb_ctx(executor: ProcessPoolExecutor):
|
||||
loop = asyncio.get_event_loop()
|
||||
await asyncio.wait([loop.run_in_executor(executor, _reopen_rocksdb_ctx) for _ in range(executor._max_workers)])
|
||||
|
||||
|
||||
def _initializer(path, name):
|
||||
db = RocksDB(path, name, for_sync=False, read_only=True)
|
||||
proc_ctx.set(RocksReaderContext(db, path, name, Merkle(), [], pylru.lrucache(50000), pylru.lrucache(100000),
|
||||
pylru.lrucache(1000000)))
|
||||
|
||||
|
||||
def _teardown():
|
||||
proc_ctx.get().close()
|
||||
proc_ctx.set(None)
|
||||
|
||||
|
||||
async def initialize_executor(workers, db_dir, for_sync, name):
|
||||
executor = ProcessPoolExecutor(workers, initializer=_initializer, initargs=(db_dir, name))
|
||||
try:
|
||||
writer = RocksDB(db_dir, name, for_sync=for_sync, read_only=False)
|
||||
await update_rocksdb_ctx(executor)
|
||||
except Exception as err:
|
||||
await teardown_executor(executor)
|
||||
executor.shutdown(True)
|
||||
raise err
|
||||
return executor, writer
|
||||
|
||||
|
||||
async def teardown_executor(executor: ProcessPoolExecutor):
|
||||
try:
|
||||
await asyncio.wait(
|
||||
[asyncio.get_event_loop().run_in_executor(executor, _teardown) for _ in range(executor._max_workers)]
|
||||
)
|
||||
finally:
|
||||
executor.shutdown(True)
|
||||
|
||||
|
||||
def lookup_hashXs_utxos(prevouts):
|
||||
"""Return (hashX, suffix) pairs, or None if not found,
|
||||
for each prevout.
|
||||
"""
|
||||
ctx = proc_ctx.get()
|
||||
iterator = ctx.db.iterator
|
||||
get = ctx.db.get
|
||||
|
||||
def lookup_hashXs():
|
||||
"""Return (hashX, suffix) pairs, or None if not found,
|
||||
for each prevout.
|
||||
"""
|
||||
|
||||
def lookup_hashX(tx_hash, tx_idx):
|
||||
idx_packed = pack('<H', tx_idx)
|
||||
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
||||
# Value: hashX
|
||||
prefix = HASHX_UTXO_PREFIX + tx_hash[:4] + idx_packed
|
||||
# Find which entry, if any, the TX_HASH matches.
|
||||
for db_key, hashX in iterator(prefix=prefix):
|
||||
tx_num_packed = db_key[-4:]
|
||||
tx_num, = unpack('<I', tx_num_packed)
|
||||
hash, height = ctx.ctx_tx_hash(tx_num)
|
||||
if hash == tx_hash:
|
||||
return hashX, idx_packed + tx_num_packed
|
||||
return None, None
|
||||
|
||||
return [lookup_hashX(*prevout) for prevout in prevouts]
|
||||
|
||||
def lookup_utxo(hashX, suffix):
|
||||
if not hashX:
|
||||
# This can happen when the daemon is a block ahead
|
||||
# of us and has mempool txs spending outputs from
|
||||
# that new block
|
||||
return None
|
||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
db_value = get(UTXO_PREFIX + hashX + suffix)
|
||||
if not db_value:
|
||||
# This can happen if the DB was updated between
|
||||
# getting the hashXs and getting the UTXOs
|
||||
return None
|
||||
value, = unpack('<Q', db_value)
|
||||
return hashX, value
|
||||
|
||||
return [lookup_utxo(*hashX_pair) for hashX_pair in lookup_hashXs()]
|
||||
|
||||
|
||||
def get_counts():
|
||||
ctx = proc_ctx.get()
|
||||
return ctx.tx_counts
|
||||
|
||||
|
||||
|
||||
def read_txids():
|
||||
return list(proc_ctx.get().db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
|
||||
|
||||
|
||||
def read_headers():
|
||||
return [
|
||||
header for header in proc_ctx.get().db.iterator(prefix=HEADER_PREFIX, include_key=False)
|
||||
]
|
||||
|
||||
|
||||
def read_block_file(path):
|
||||
with util.open_file(path) as f:
|
||||
return f.read(-1)
|
||||
|
||||
|
||||
def tx_hash(self, tx_num):
|
||||
"""Return a par (tx_hash, tx_height) for the given tx number.
|
||||
|
||||
If the tx_height is not on disk, returns (None, tx_height)."""
|
||||
tx_height = bisect_right(self.tx_counts, tx_num)
|
||||
if tx_height > self.db_height:
|
||||
return None, tx_height
|
||||
return self.total_transactions[tx_num], tx_height
|
||||
|
||||
|
||||
def read_utxos(hashX):
|
||||
utxos = []
|
||||
utxos_append = utxos.append
|
||||
s_unpack = unpack
|
||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
for db_key, db_value in proc_ctx.get().db.iterator(prefix=UTXO_PREFIX + hashX):
|
||||
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
|
||||
value, = unpack('<Q', db_value)
|
||||
utxos_append((tx_num, tx_pos, value))
|
||||
return utxos
|
||||
|
||||
|
||||
def limited_history(hashX, limit=None):
|
||||
ctx = proc_ctx.get()
|
||||
tx_counts = ctx.tx_counts
|
||||
db_height = len(tx_counts)
|
||||
cnt = 0
|
||||
txs = []
|
||||
|
||||
for hist in ctx.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False):
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
for tx_num in a:
|
||||
tx_height = bisect_right(tx_counts, tx_num)
|
||||
if tx_height > db_height:
|
||||
return
|
||||
txs.append((tx_num, tx_height))
|
||||
cnt += 1
|
||||
if limit and cnt >= limit:
|
||||
break
|
||||
if limit and cnt >= limit:
|
||||
break
|
||||
return txs
|
||||
|
||||
|
||||
def tx_merkle(tx_num, tx_height):
|
||||
ctx = proc_ctx.get()
|
||||
db = ctx.db
|
||||
tx_counts = ctx.tx_counts
|
||||
db_height = len(tx_counts)
|
||||
|
||||
if tx_height == -1:
|
||||
return {
|
||||
'block_height': -1
|
||||
}
|
||||
tx_pos = tx_num - tx_counts[tx_height - 1]
|
||||
|
||||
uncached = None
|
||||
if (tx_num, tx_height) in ctx.merkle_tx_cache:
|
||||
return ctx.merkle_tx_cache[(tx_num, tx_height)]
|
||||
if tx_height not in ctx.block_txs_cache:
|
||||
block_txs = list(db.iterator(
|
||||
prefix=TX_HASH_PREFIX,
|
||||
start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]),
|
||||
stop=None if tx_height + 1 == len(tx_counts) else
|
||||
TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height] - 1), include_key=False
|
||||
))
|
||||
# print("rocks txs", tx_height, list(map(hash_to_hex_str, block_txs)))
|
||||
if tx_height + 100 <= db_height:
|
||||
ctx.block_txs_cache[tx_height] = block_txs
|
||||
else:
|
||||
block_txs = ctx.block_txs_cache.get(tx_height, uncached)
|
||||
|
||||
branch, root = ctx.merkle.branch_and_root(block_txs, tx_pos)
|
||||
merkle = {
|
||||
'block_height': tx_height,
|
||||
'merkle': [
|
||||
hash_to_hex_str(hash)
|
||||
for hash in branch
|
||||
],
|
||||
'pos': tx_pos
|
||||
}
|
||||
if tx_height + 100 < db_height:
|
||||
ctx.merkle_tx_cache[(tx_num, tx_height)] = merkle
|
||||
return merkle
|
||||
|
||||
|
||||
def transaction_info_get_batch(txids: Iterable[str]):
|
||||
ctx = proc_ctx.get()
|
||||
db_height = len(ctx.tx_counts)
|
||||
tx_counts = ctx.tx_counts
|
||||
tx_db_get = ctx.db.get
|
||||
tx_infos = {}
|
||||
|
||||
for tx_hash in txids:
|
||||
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
||||
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
|
||||
tx = None
|
||||
tx_height = -1
|
||||
if tx_num is not None:
|
||||
tx_num = unpack_be_uint64(tx_num)
|
||||
tx_height = bisect_right(tx_counts, tx_num)
|
||||
if tx_height < db_height:
|
||||
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
|
||||
tx_infos[tx_hash] = (
|
||||
None if not tx else tx.hex(), {'block_height': -1} if tx_height == -1 else tx_merkle(
|
||||
tx_num, tx_height
|
||||
)
|
||||
)
|
||||
|
||||
return tx_infos
|
||||
|
||||
|
||||
def _update_block_txs_cache(tx_num, tx_height):
|
||||
ctx = proc_ctx.get()
|
||||
db = ctx.db
|
||||
tx_counts = ctx.tx_counts
|
||||
db_height = len(tx_counts)
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
class FlushData:
|
||||
|
@ -80,7 +376,7 @@ class LevelDB:
|
|||
|
||||
self.db_class = db_class(env.db_dir, self.env.db_engine)
|
||||
self.history = History()
|
||||
self.utxo_db = None
|
||||
self.db = None
|
||||
self.tx_counts = None
|
||||
self.headers = None
|
||||
self.last_flush = time.time()
|
||||
|
@ -91,9 +387,6 @@ class LevelDB:
|
|||
self.merkle = Merkle()
|
||||
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
|
||||
|
||||
self.headers_db = None
|
||||
self.tx_db = None
|
||||
|
||||
self._block_txs_cache = pylru.lrucache(50000)
|
||||
self._merkle_tx_cache = pylru.lrucache(100000)
|
||||
self.total_transactions = None
|
||||
|
@ -104,15 +397,9 @@ class LevelDB:
|
|||
# tx_counts[N] has the cumulative number of txs at the end of
|
||||
# height N. So tx_counts[0] is 1 - the genesis coinbase
|
||||
|
||||
def get_counts():
|
||||
return tuple(
|
||||
util.unpack_be_uint64(tx_count)
|
||||
for tx_count in self.tx_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
|
||||
)
|
||||
|
||||
tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts)
|
||||
assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}"
|
||||
self.tx_counts = array.array('I', tx_counts)
|
||||
self.tx_counts = tx_counts
|
||||
|
||||
if self.tx_counts:
|
||||
assert self.db_tx_count == self.tx_counts[-1], \
|
||||
|
@ -121,12 +408,9 @@ class LevelDB:
|
|||
assert self.db_tx_count == 0
|
||||
|
||||
async def _read_txids(self):
|
||||
def get_txids():
|
||||
return list(self.tx_db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
|
||||
|
||||
start = time.perf_counter()
|
||||
self.logger.info("loading txids")
|
||||
txids = await asyncio.get_event_loop().run_in_executor(self.executor, get_txids)
|
||||
txids = await asyncio.get_event_loop().run_in_executor(self.executor, read_txids)
|
||||
assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1]
|
||||
self.total_transactions = txids
|
||||
ts = time.perf_counter() - start
|
||||
|
@ -136,47 +420,26 @@ class LevelDB:
|
|||
if self.headers is not None:
|
||||
return
|
||||
|
||||
def get_headers():
|
||||
return [
|
||||
header for header in self.headers_db.iterator(prefix=HEADER_PREFIX, include_key=False)
|
||||
]
|
||||
|
||||
headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers)
|
||||
headers = await asyncio.get_event_loop().run_in_executor(self.executor, read_headers)
|
||||
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
|
||||
self.headers = headers
|
||||
|
||||
async def _open_dbs(self, for_sync, compacting):
|
||||
name = f'lbry-{self.env.db_engine}'
|
||||
if self.executor is None:
|
||||
self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1))
|
||||
coin_path = os.path.join(self.env.db_dir, 'COIN')
|
||||
if not os.path.isfile(coin_path):
|
||||
with util.open_file(coin_path, create=True) as f:
|
||||
f.write(f'ElectrumX databases and metadata for '
|
||||
f'{self.coin.NAME} {self.coin.NET}'.encode())
|
||||
self.executor, self.db = await initialize_executor(
|
||||
max(1, os.cpu_count() - 1), self.env.db_dir, for_sync, name
|
||||
)
|
||||
|
||||
assert self.headers_db is None
|
||||
self.headers_db = self.db_class('headers', for_sync)
|
||||
if self.headers_db.is_new:
|
||||
self.logger.info('created new headers db')
|
||||
self.logger.info(f'opened headers DB (for sync: {for_sync})')
|
||||
if self.db.is_new:
|
||||
self.logger.info('created new db: %s', name)
|
||||
self.logger.info(f'opened DB (for sync: {for_sync})')
|
||||
|
||||
assert self.tx_db is None
|
||||
self.tx_db = self.db_class('tx', for_sync)
|
||||
if self.tx_db.is_new:
|
||||
self.logger.info('created new tx db')
|
||||
self.logger.info(f'opened tx DB (for sync: {for_sync})')
|
||||
|
||||
assert self.utxo_db is None
|
||||
# First UTXO DB
|
||||
self.utxo_db = self.db_class('utxo', for_sync)
|
||||
if self.utxo_db.is_new:
|
||||
self.logger.info('created new utxo db')
|
||||
self.logger.info(f'opened utxo db (for sync: {for_sync})')
|
||||
self.read_utxo_state()
|
||||
|
||||
# Then history DB
|
||||
self.utxo_flush_count = self.history.open_db(
|
||||
self.db_class, for_sync, self.utxo_flush_count, compacting
|
||||
self.db, for_sync, self.utxo_flush_count, compacting
|
||||
)
|
||||
self.clear_excess_undo_info()
|
||||
|
||||
|
@ -187,12 +450,11 @@ class LevelDB:
|
|||
await self._read_headers()
|
||||
|
||||
def close(self):
|
||||
self.utxo_db.close()
|
||||
self.db.close()
|
||||
self.history.close_db()
|
||||
self.headers_db.close()
|
||||
self.tx_db.close()
|
||||
self.executor.shutdown(wait=True)
|
||||
self.executor = None
|
||||
self.db = None
|
||||
|
||||
async def open_for_compacting(self):
|
||||
await self._open_dbs(True, True)
|
||||
|
@ -211,18 +473,12 @@ class LevelDB:
|
|||
"""Open the databases for serving. If they are already open they are
|
||||
closed first.
|
||||
"""
|
||||
self.logger.info('closing DBs to re-open for serving')
|
||||
if self.utxo_db:
|
||||
self.logger.info('closing DBs to re-open for serving')
|
||||
self.utxo_db.close()
|
||||
self.history.close_db()
|
||||
self.utxo_db = None
|
||||
if self.headers_db:
|
||||
self.headers_db.close()
|
||||
self.headers_db = None
|
||||
if self.tx_db:
|
||||
self.tx_db.close()
|
||||
self.tx_db = None
|
||||
if self.db:
|
||||
return
|
||||
# self.logger.info('closing DBs to re-open for serving')
|
||||
# self.db.close()
|
||||
# self.history.close_db()
|
||||
# self.db = None
|
||||
|
||||
await self._open_dbs(False, False)
|
||||
self.logger.info("opened for serving")
|
||||
|
@ -271,14 +527,14 @@ class LevelDB:
|
|||
self.flush_history()
|
||||
|
||||
# Flush state last as it reads the wall time.
|
||||
with self.utxo_db.write_batch() as batch:
|
||||
with self.db.write_batch() as batch:
|
||||
if flush_utxos:
|
||||
self.flush_utxo_db(batch, flush_data)
|
||||
self.flush_state(batch)
|
||||
|
||||
# Update and put the wall time again - otherwise we drop the
|
||||
# time it took to commit the batch
|
||||
self.flush_state(self.utxo_db)
|
||||
self.flush_state(self.db)
|
||||
|
||||
elapsed = self.last_flush - start_time
|
||||
self.logger.info(f'flush #{self.history.flush_count:,d} took '
|
||||
|
@ -286,7 +542,7 @@ class LevelDB:
|
|||
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
|
||||
|
||||
# Catch-up stats
|
||||
if self.utxo_db.for_sync:
|
||||
if self.db.for_sync:
|
||||
flush_interval = self.last_flush - prior_flush
|
||||
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
|
||||
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
|
||||
|
@ -317,18 +573,16 @@ class LevelDB:
|
|||
# Write the headers
|
||||
start_time = time.perf_counter()
|
||||
|
||||
with self.headers_db.write_batch() as batch:
|
||||
with self.db.write_batch() as batch:
|
||||
batch_put = batch.put
|
||||
for i, header in enumerate(flush_data.headers):
|
||||
batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header)
|
||||
self.headers.append(header)
|
||||
flush_data.headers.clear()
|
||||
flush_data.headers.clear()
|
||||
|
||||
height_start = self.fs_height + 1
|
||||
tx_num = prior_tx_count
|
||||
height_start = self.fs_height + 1
|
||||
tx_num = prior_tx_count
|
||||
|
||||
with self.tx_db.write_batch() as batch:
|
||||
batch_put = batch.put
|
||||
for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs):
|
||||
tx_count = self.tx_counts[height_start]
|
||||
batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
|
||||
|
@ -342,14 +596,22 @@ class LevelDB:
|
|||
tx_num += 1
|
||||
offset += 32
|
||||
|
||||
a = array.array('L')
|
||||
a.fromlist(self.tx_counts)
|
||||
batch_put(TX_COUNTS_STATE, a.tobytes())
|
||||
|
||||
flush_data.block_txs.clear()
|
||||
flush_data.block_hashes.clear()
|
||||
|
||||
self.fs_height = flush_data.height
|
||||
self.fs_tx_count = flush_data.tx_count
|
||||
|
||||
elapsed = time.perf_counter() - start_time
|
||||
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
|
||||
|
||||
|
||||
|
||||
|
||||
def flush_history(self):
|
||||
self.history.flush()
|
||||
|
||||
|
@ -374,15 +636,15 @@ class LevelDB:
|
|||
# suffix = tx_idx + tx_num
|
||||
hashX = value[:-12]
|
||||
suffix = key[-2:] + value[-12:-8]
|
||||
batch_put(b'h' + key[:4] + suffix, hashX)
|
||||
batch_put(b'u' + hashX + suffix, value[-8:])
|
||||
batch_put(HASHX_UTXO_PREFIX + key[:4] + suffix, hashX)
|
||||
batch_put(UTXO_PREFIX + hashX + suffix, value[-8:])
|
||||
flush_data.adds.clear()
|
||||
|
||||
# New undo information
|
||||
self.flush_undo_infos(batch_put, flush_data.undo_infos)
|
||||
flush_data.undo_infos.clear()
|
||||
|
||||
if self.utxo_db.for_sync:
|
||||
if self.db.for_sync:
|
||||
block_count = flush_data.height - self.db_height
|
||||
tx_count = flush_data.tx_count - self.db_tx_count
|
||||
elapsed = time.time() - start_time
|
||||
|
@ -416,7 +678,7 @@ class LevelDB:
|
|||
|
||||
self.backup_fs(flush_data.height, flush_data.tx_count)
|
||||
self.history.backup(touched, flush_data.tx_count)
|
||||
with self.utxo_db.write_batch() as batch:
|
||||
with self.db.write_batch() as batch:
|
||||
self.flush_utxo_db(batch, flush_data)
|
||||
# Flush state last as it reads the wall time.
|
||||
self.flush_state(batch)
|
||||
|
@ -470,78 +732,12 @@ class LevelDB:
|
|||
return None, tx_height
|
||||
return self.total_transactions[tx_num], tx_height
|
||||
|
||||
async def tx_merkle(self, tx_num, tx_height):
|
||||
if tx_height == -1:
|
||||
return {
|
||||
'block_height': -1
|
||||
}
|
||||
tx_counts = self.tx_counts
|
||||
tx_pos = tx_num - tx_counts[tx_height - 1]
|
||||
|
||||
def _update_block_txs_cache():
|
||||
block_txs = list(self.tx_db.iterator(
|
||||
start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]),
|
||||
stop=None if tx_height + 1 == len(tx_counts) else
|
||||
TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height]), include_key=False
|
||||
))
|
||||
if tx_height + 100 > self.db_height:
|
||||
return block_txs
|
||||
self._block_txs_cache[tx_height] = block_txs
|
||||
|
||||
uncached = None
|
||||
if (tx_num, tx_height) in self._merkle_tx_cache:
|
||||
return self._merkle_tx_cache[(tx_num, tx_height)]
|
||||
if tx_height not in self._block_txs_cache:
|
||||
uncached = await asyncio.get_event_loop().run_in_executor(self.executor, _update_block_txs_cache)
|
||||
block_txs = self._block_txs_cache.get(tx_height, uncached)
|
||||
branch, root = self.merkle.branch_and_root(block_txs, tx_pos)
|
||||
merkle = {
|
||||
'block_height': tx_height,
|
||||
'merkle': [
|
||||
hash_to_hex_str(hash)
|
||||
for hash in branch
|
||||
],
|
||||
'pos': tx_pos
|
||||
}
|
||||
if tx_height + 100 < self.db_height:
|
||||
self._merkle_tx_cache[(tx_num, tx_height)] = merkle
|
||||
return merkle
|
||||
|
||||
def _fs_transactions(self, txids: Iterable[str]) -> List[Tuple[str, Optional[str], int, int]]:
|
||||
unpack_be_uint64 = util.unpack_be_uint64
|
||||
tx_counts = self.tx_counts
|
||||
tx_db_get = self.tx_db.get
|
||||
tx_infos = []
|
||||
|
||||
for tx_hash in txids:
|
||||
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
||||
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
|
||||
tx = None
|
||||
tx_height = -1
|
||||
if tx_num is not None:
|
||||
tx_num = unpack_be_uint64(tx_num)
|
||||
tx_height = bisect_right(tx_counts, tx_num)
|
||||
if tx_height < self.db_height:
|
||||
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
|
||||
tx_infos.append((tx_hash, None if not tx else tx.hex(), tx_num, tx_height))
|
||||
|
||||
return tx_infos
|
||||
|
||||
async def fs_transactions(self, txids):
|
||||
txs = await asyncio.get_event_loop().run_in_executor(
|
||||
self.executor, self._fs_transactions, txids
|
||||
return await asyncio.get_event_loop().run_in_executor(
|
||||
self.executor, transaction_info_get_batch, txids
|
||||
)
|
||||
unsorted_result = {}
|
||||
|
||||
async def add_result(item):
|
||||
_txid, _tx, _tx_num, _tx_height = item
|
||||
unsorted_result[_txid] = (_tx, await self.tx_merkle(_tx_num, _tx_height))
|
||||
|
||||
if txs:
|
||||
await asyncio.gather(*map(add_result, txs))
|
||||
return {txid: unsorted_result[txid] for txid, _, _, _ in txs}
|
||||
|
||||
async def fs_block_hashes(self, height, count):
|
||||
def fs_block_hashes(self, height, count):
|
||||
if height + count > len(self.headers):
|
||||
raise self.DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}')
|
||||
return [self.coin.header_hash(header) for header in self.headers[height:height + count]]
|
||||
|
@ -553,49 +749,9 @@ class LevelDB:
|
|||
transactions. By default returns at most 1000 entries. Set
|
||||
limit to None to get them all.
|
||||
"""
|
||||
# def read_history():
|
||||
# hashx_history = []
|
||||
# for key, hist in self.history.db.iterator(prefix=hashX):
|
||||
# a = array.array('I')
|
||||
# a.frombytes(hist)
|
||||
# for tx_num in a:
|
||||
# tx_height = bisect_right(self.tx_counts, tx_num)
|
||||
# if tx_height > self.db_height:
|
||||
# tx_hash = None
|
||||
# else:
|
||||
# tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
|
||||
#
|
||||
# hashx_history.append((tx_hash, tx_height))
|
||||
# if limit and len(hashx_history) >= limit:
|
||||
# return hashx_history
|
||||
# return hashx_history
|
||||
|
||||
def read_history():
|
||||
db_height = self.db_height
|
||||
tx_counts = self.tx_counts
|
||||
tx_db_get = self.tx_db.get
|
||||
pack_be_uint64 = util.pack_be_uint64
|
||||
|
||||
cnt = 0
|
||||
txs = []
|
||||
|
||||
for hist in self.history.db.iterator(prefix=hashX, include_key=False):
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
for tx_num in a:
|
||||
tx_height = bisect_right(tx_counts, tx_num)
|
||||
if tx_height > db_height:
|
||||
return
|
||||
txs.append((tx_num, tx_height))
|
||||
cnt += 1
|
||||
if limit and cnt >= limit:
|
||||
break
|
||||
if limit and cnt >= limit:
|
||||
break
|
||||
return txs
|
||||
|
||||
while True:
|
||||
history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history)
|
||||
history = await asyncio.get_event_loop().run_in_executor(self.executor, limited_history, hashX, limit)
|
||||
if history is not None:
|
||||
return [(self.total_transactions[tx_num], tx_height) for (tx_num, tx_height) in history]
|
||||
self.logger.warning(f'limited_history: tx hash '
|
||||
|
@ -610,11 +766,11 @@ class LevelDB:
|
|||
|
||||
def undo_key(self, height):
|
||||
"""DB key for undo information at the given height."""
|
||||
return b'U' + pack('>I', height)
|
||||
return UNDO_PREFIX + pack('>I', height)
|
||||
|
||||
def read_undo_info(self, height):
|
||||
"""Read undo information from a file for the current height."""
|
||||
return self.utxo_db.get(self.undo_key(height))
|
||||
return self.db.get(self.undo_key(height))
|
||||
|
||||
def flush_undo_infos(self, batch_put, undo_infos):
|
||||
"""undo_infos is a list of (undo_info, height) pairs."""
|
||||
|
@ -631,11 +787,7 @@ class LevelDB:
|
|||
"""Returns a raw block read from disk. Raises FileNotFoundError
|
||||
if the block isn't on-disk."""
|
||||
|
||||
def read():
|
||||
with util.open_file(self.raw_block_path(height)) as f:
|
||||
return f.read(-1)
|
||||
|
||||
return await asyncio.get_event_loop().run_in_executor(self.executor, read)
|
||||
return await asyncio.get_event_loop().run_in_executor(self.executor, read_block_file, self.raw_block_path(height))
|
||||
|
||||
def write_raw_block(self, block, height):
|
||||
"""Write a raw block to disk."""
|
||||
|
@ -650,17 +802,16 @@ class LevelDB:
|
|||
|
||||
def clear_excess_undo_info(self):
|
||||
"""Clear excess undo info. Only most recent N are kept."""
|
||||
prefix = b'U'
|
||||
min_height = self.min_undo_height(self.db_height)
|
||||
keys = []
|
||||
for key, hist in self.utxo_db.iterator(prefix=prefix):
|
||||
for key, hist in self.db.iterator(prefix=UNDO_PREFIX):
|
||||
height, = unpack('>I', key[-4:])
|
||||
if height >= min_height:
|
||||
break
|
||||
keys.append(key)
|
||||
|
||||
if keys:
|
||||
with self.utxo_db.write_batch() as batch:
|
||||
with self.db.write_batch() as batch:
|
||||
for key in keys:
|
||||
batch.delete(key)
|
||||
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
|
||||
|
@ -681,7 +832,7 @@ class LevelDB:
|
|||
# -- UTXO database
|
||||
|
||||
def read_utxo_state(self):
|
||||
state = self.utxo_db.get(b'state')
|
||||
state = self.db.get(UTXO_STATE)
|
||||
if not state:
|
||||
self.db_height = -1
|
||||
self.db_tx_count = 0
|
||||
|
@ -724,7 +875,7 @@ class LevelDB:
|
|||
self.logger.info(f'height: {self.db_height:,d}')
|
||||
self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}')
|
||||
self.logger.info(f'tx count: {self.db_tx_count:,d}')
|
||||
if self.utxo_db.for_sync:
|
||||
if self.db.for_sync:
|
||||
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
|
||||
if self.first_sync:
|
||||
self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}')
|
||||
|
@ -741,32 +892,18 @@ class LevelDB:
|
|||
'first_sync': self.first_sync,
|
||||
'db_version': self.db_version,
|
||||
}
|
||||
batch.put(b'state', repr(state).encode())
|
||||
batch.put(UTXO_STATE, repr(state).encode())
|
||||
|
||||
def set_flush_count(self, count):
|
||||
self.utxo_flush_count = count
|
||||
with self.utxo_db.write_batch() as batch:
|
||||
with self.db.write_batch() as batch:
|
||||
self.write_utxo_state(batch)
|
||||
|
||||
async def all_utxos(self, hashX):
|
||||
"""Return all UTXOs for an address sorted in no particular order."""
|
||||
def read_utxos():
|
||||
utxos = []
|
||||
utxos_append = utxos.append
|
||||
s_unpack = unpack
|
||||
fs_tx_hash = self.fs_tx_hash
|
||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
prefix = b'u' + hashX
|
||||
for db_key, db_value in self.utxo_db.iterator(prefix=prefix):
|
||||
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
|
||||
value, = unpack('<Q', db_value)
|
||||
tx_hash, height = fs_tx_hash(tx_num)
|
||||
utxos_append(UTXO(tx_num, tx_pos, tx_hash, height, value))
|
||||
return utxos
|
||||
|
||||
while True:
|
||||
utxos = await asyncio.get_event_loop().run_in_executor(self.executor, read_utxos)
|
||||
utxos = [UTXO(tx_num, tx_pos, *self.fs_tx_hash(tx_num), value=value) for (tx_num, tx_pos, value) in utxos]
|
||||
if all(utxo.tx_hash is not None for utxo in utxos):
|
||||
return utxos
|
||||
self.logger.warning(f'all_utxos: tx hash not '
|
||||
|
@ -779,45 +916,4 @@ class LevelDB:
|
|||
|
||||
Used by the mempool code.
|
||||
"""
|
||||
def lookup_hashXs():
|
||||
"""Return (hashX, suffix) pairs, or None if not found,
|
||||
for each prevout.
|
||||
"""
|
||||
def lookup_hashX(tx_hash, tx_idx):
|
||||
idx_packed = pack('<H', tx_idx)
|
||||
|
||||
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
||||
# Value: hashX
|
||||
prefix = b'h' + tx_hash[:4] + idx_packed
|
||||
|
||||
# Find which entry, if any, the TX_HASH matches.
|
||||
for db_key, hashX in self.utxo_db.iterator(prefix=prefix):
|
||||
tx_num_packed = db_key[-4:]
|
||||
tx_num, = unpack('<I', tx_num_packed)
|
||||
hash, height = self.fs_tx_hash(tx_num)
|
||||
if hash == tx_hash:
|
||||
return hashX, idx_packed + tx_num_packed
|
||||
return None, None
|
||||
return [lookup_hashX(*prevout) for prevout in prevouts]
|
||||
|
||||
def lookup_utxos(hashX_pairs):
|
||||
def lookup_utxo(hashX, suffix):
|
||||
if not hashX:
|
||||
# This can happen when the daemon is a block ahead
|
||||
# of us and has mempool txs spending outputs from
|
||||
# that new block
|
||||
return None
|
||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
key = b'u' + hashX + suffix
|
||||
db_value = self.utxo_db.get(key)
|
||||
if not db_value:
|
||||
# This can happen if the DB was updated between
|
||||
# getting the hashXs and getting the UTXOs
|
||||
return None
|
||||
value, = unpack('<Q', db_value)
|
||||
return hashX, value
|
||||
return [lookup_utxo(*hashX_pair) for hashX_pair in hashX_pairs]
|
||||
|
||||
hashX_pairs = await asyncio.get_event_loop().run_in_executor(self.executor, lookup_hashXs)
|
||||
return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_utxos, hashX_pairs)
|
||||
return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_hashXs_utxos, prevouts)
|
||||
|
|
|
@ -43,10 +43,12 @@ class Merkle:
|
|||
def __init__(self, hash_func=double_sha256):
|
||||
self.hash_func = hash_func
|
||||
|
||||
def tree_depth(self, hash_count):
|
||||
return self.branch_length(hash_count) + 1
|
||||
@staticmethod
|
||||
def tree_depth(hash_count):
|
||||
return Merkle.branch_length(hash_count) + 1
|
||||
|
||||
def branch_length(self, hash_count):
|
||||
@staticmethod
|
||||
def branch_length(hash_count):
|
||||
"""Return the length of a merkle branch given the number of hashes."""
|
||||
if not isinstance(hash_count, int):
|
||||
raise TypeError('hash_count must be an integer')
|
||||
|
@ -54,7 +56,8 @@ class Merkle:
|
|||
raise ValueError('hash_count must be at least 1')
|
||||
return ceil(log(hash_count, 2))
|
||||
|
||||
def branch_and_root(self, hashes, index, length=None):
|
||||
@staticmethod
|
||||
def branch_and_root(hashes, index, length=None, hash_func=double_sha256):
|
||||
"""Return a (merkle branch, merkle_root) pair given hashes, and the
|
||||
index of one of those hashes.
|
||||
"""
|
||||
|
@ -64,7 +67,7 @@ class Merkle:
|
|||
# This also asserts hashes is not empty
|
||||
if not 0 <= index < len(hashes):
|
||||
raise ValueError(f"index '{index}/{len(hashes)}' out of range")
|
||||
natural_length = self.branch_length(len(hashes))
|
||||
natural_length = Merkle.branch_length(len(hashes))
|
||||
if length is None:
|
||||
length = natural_length
|
||||
else:
|
||||
|
@ -73,7 +76,6 @@ class Merkle:
|
|||
if length < natural_length:
|
||||
raise ValueError('length out of range')
|
||||
|
||||
hash_func = self.hash_func
|
||||
branch = []
|
||||
for _ in range(length):
|
||||
if len(hashes) & 1:
|
||||
|
@ -85,44 +87,47 @@ class Merkle:
|
|||
|
||||
return branch, hashes[0]
|
||||
|
||||
def root(self, hashes, length=None):
|
||||
@staticmethod
|
||||
def root(hashes, length=None):
|
||||
"""Return the merkle root of a non-empty iterable of binary hashes."""
|
||||
branch, root = self.branch_and_root(hashes, 0, length)
|
||||
branch, root = Merkle.branch_and_root(hashes, 0, length)
|
||||
return root
|
||||
|
||||
def root_from_proof(self, hash, branch, index):
|
||||
"""Return the merkle root given a hash, a merkle branch to it, and
|
||||
its index in the hashes array.
|
||||
# @staticmethod
|
||||
# def root_from_proof(hash, branch, index, hash_func=double_sha256):
|
||||
# """Return the merkle root given a hash, a merkle branch to it, and
|
||||
# its index in the hashes array.
|
||||
#
|
||||
# branch is an iterable sorted deepest to shallowest. If the
|
||||
# returned root is the expected value then the merkle proof is
|
||||
# verified.
|
||||
#
|
||||
# The caller should have confirmed the length of the branch with
|
||||
# branch_length(). Unfortunately this is not easily done for
|
||||
# bitcoin transactions as the number of transactions in a block
|
||||
# is unknown to an SPV client.
|
||||
# """
|
||||
# for elt in branch:
|
||||
# if index & 1:
|
||||
# hash = hash_func(elt + hash)
|
||||
# else:
|
||||
# hash = hash_func(hash + elt)
|
||||
# index >>= 1
|
||||
# if index:
|
||||
# raise ValueError('index out of range for branch')
|
||||
# return hash
|
||||
|
||||
branch is an iterable sorted deepest to shallowest. If the
|
||||
returned root is the expected value then the merkle proof is
|
||||
verified.
|
||||
|
||||
The caller should have confirmed the length of the branch with
|
||||
branch_length(). Unfortunately this is not easily done for
|
||||
bitcoin transactions as the number of transactions in a block
|
||||
is unknown to an SPV client.
|
||||
"""
|
||||
hash_func = self.hash_func
|
||||
for elt in branch:
|
||||
if index & 1:
|
||||
hash = hash_func(elt + hash)
|
||||
else:
|
||||
hash = hash_func(hash + elt)
|
||||
index >>= 1
|
||||
if index:
|
||||
raise ValueError('index out of range for branch')
|
||||
return hash
|
||||
|
||||
def level(self, hashes, depth_higher):
|
||||
@staticmethod
|
||||
def level(hashes, depth_higher):
|
||||
"""Return a level of the merkle tree of hashes the given depth
|
||||
higher than the bottom row of the original tree."""
|
||||
size = 1 << depth_higher
|
||||
root = self.root
|
||||
root = Merkle.root
|
||||
return [root(hashes[n: n + size], depth_higher)
|
||||
for n in range(0, len(hashes), size)]
|
||||
|
||||
def branch_and_root_from_level(self, level, leaf_hashes, index,
|
||||
@staticmethod
|
||||
def branch_and_root_from_level(level, leaf_hashes, index,
|
||||
depth_higher):
|
||||
"""Return a (merkle branch, merkle_root) pair when a merkle-tree has a
|
||||
level cached.
|
||||
|
@ -146,10 +151,10 @@ class Merkle:
|
|||
if not isinstance(leaf_hashes, list):
|
||||
raise TypeError("leaf_hashes must be a list")
|
||||
leaf_index = (index >> depth_higher) << depth_higher
|
||||
leaf_branch, leaf_root = self.branch_and_root(
|
||||
leaf_branch, leaf_root = Merkle.branch_and_root(
|
||||
leaf_hashes, index - leaf_index, depth_higher)
|
||||
index >>= depth_higher
|
||||
level_branch, root = self.branch_and_root(level, index)
|
||||
level_branch, root = Merkle.branch_and_root(level, index)
|
||||
# Check last so that we know index is in-range
|
||||
if leaf_root != level[index]:
|
||||
raise ValueError('leaf hashes inconsistent with level')
|
||||
|
@ -191,7 +196,7 @@ class MerkleCache:
|
|||
# Start from the beginning of any final partial segment.
|
||||
# Retain the value of depth_higher; in practice this is fine
|
||||
start = self._leaf_start(self.length)
|
||||
hashes = await self.source_func(start, length - start)
|
||||
hashes = self.source_func(start, length - start)
|
||||
self.level[start >> self.depth_higher:] = self._level(hashes)
|
||||
self.length = length
|
||||
|
||||
|
@ -203,7 +208,7 @@ class MerkleCache:
|
|||
level = self.level[:length >> self.depth_higher]
|
||||
leaf_start = self._leaf_start(length)
|
||||
count = min(self._segment_length(), length - leaf_start)
|
||||
hashes = await self.source_func(leaf_start, count)
|
||||
hashes = self.source_func(leaf_start, count)
|
||||
level += self._level(hashes)
|
||||
return level
|
||||
|
||||
|
@ -211,7 +216,7 @@ class MerkleCache:
|
|||
"""Call to initialize the cache to a source of given length."""
|
||||
self.length = length
|
||||
self.depth_higher = self.merkle.tree_depth(length) // 2
|
||||
self.level = self._level(await self.source_func(0, length))
|
||||
self.level = self._level(self.source_func(0, length))
|
||||
self.initialized.set()
|
||||
|
||||
def truncate(self, length):
|
||||
|
@ -245,7 +250,7 @@ class MerkleCache:
|
|||
await self._extend_to(length)
|
||||
leaf_start = self._leaf_start(index)
|
||||
count = min(self._segment_length(), length - leaf_start)
|
||||
leaf_hashes = await self.source_func(leaf_start, count)
|
||||
leaf_hashes = self.source_func(leaf_start, count)
|
||||
if length < self._segment_length():
|
||||
return self.merkle.branch_and_root(leaf_hashes, index)
|
||||
level = await self._level_for(length)
|
||||
|
|
|
@ -1274,11 +1274,11 @@ class LBRYElectrumX(SessionBase):
|
|||
hashX = self.address_to_hashX(address)
|
||||
return await self.hashX_unsubscribe(hashX, address)
|
||||
|
||||
async def get_balance(self, hashX):
|
||||
utxos = await self.db.all_utxos(hashX)
|
||||
confirmed = sum(utxo.value for utxo in utxos)
|
||||
unconfirmed = await self.mempool.balance_delta(hashX)
|
||||
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
|
||||
# async def get_balance(self, hashX):
|
||||
# utxos = await self.db.all_utxos(hashX)
|
||||
# confirmed = sum(utxo.value for utxo in utxos)
|
||||
# unconfirmed = await self.mempool.balance_delta(hashX)
|
||||
# return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
|
||||
|
||||
async def scripthash_get_balance(self, scripthash):
|
||||
"""Return the confirmed and unconfirmed balance of a scripthash."""
|
||||
|
@ -1534,6 +1534,7 @@ class LBRYElectrumX(SessionBase):
|
|||
if block_hash:
|
||||
block = await self.daemon.deserialised_block(block_hash)
|
||||
height = block['height']
|
||||
# print('lbrycrd txs', height, block['tx'])
|
||||
try:
|
||||
pos = block['tx'].index(tx_hash)
|
||||
except ValueError:
|
||||
|
@ -1543,15 +1544,12 @@ class LBRYElectrumX(SessionBase):
|
|||
else:
|
||||
batch_result[tx_hash] = [raw_tx, {'block_height': -1}]
|
||||
|
||||
def threaded_get_merkle():
|
||||
for tx_hash, (raw_tx, block_txs, pos, block_height) in needed_merkles.items():
|
||||
batch_result[tx_hash] = raw_tx, {
|
||||
'merkle': self._get_merkle_branch(block_txs, pos),
|
||||
'pos': pos,
|
||||
'block_height': block_height
|
||||
}
|
||||
if needed_merkles:
|
||||
await asyncio.get_running_loop().run_in_executor(self.db.executor, threaded_get_merkle)
|
||||
for tx_hash, (raw_tx, block_txs, pos, block_height) in needed_merkles.items():
|
||||
batch_result[tx_hash] = raw_tx, {
|
||||
'merkle': self._get_merkle_branch(block_txs, pos),
|
||||
'pos': pos,
|
||||
'block_height': block_height
|
||||
}
|
||||
|
||||
self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes))
|
||||
return batch_result
|
||||
|
|
|
@ -25,18 +25,18 @@ def db_class(db_dir, name):
|
|||
class Storage:
|
||||
"""Abstract base class of the DB backend abstraction."""
|
||||
|
||||
def __init__(self, db_dir, name, for_sync):
|
||||
def __init__(self, db_dir, name, for_sync, read_only=False):
|
||||
self.db_dir = db_dir
|
||||
self.is_new = not os.path.exists(os.path.join(db_dir, name))
|
||||
self.for_sync = for_sync or self.is_new
|
||||
self.open(name, create=self.is_new)
|
||||
self.open(name, create=self.is_new, read_only=read_only)
|
||||
|
||||
@classmethod
|
||||
def import_module(cls):
|
||||
"""Import the DB engine module."""
|
||||
raise NotImplementedError
|
||||
|
||||
def open(self, name, create):
|
||||
def open(self, name, create, read_only=False):
|
||||
"""Open an existing database or create a new one."""
|
||||
raise NotImplementedError
|
||||
|
||||
|
@ -77,7 +77,7 @@ class LevelDB(Storage):
|
|||
import plyvel
|
||||
cls.module = plyvel
|
||||
|
||||
def open(self, name, create, lru_cache_size=None):
|
||||
def open(self, name, create, read_only=False):
|
||||
mof = 10000
|
||||
path = os.path.join(self.db_dir, name)
|
||||
# Use snappy compression (the default)
|
||||
|
@ -97,17 +97,18 @@ class RocksDB(Storage):
|
|||
import rocksdb
|
||||
cls.module = rocksdb
|
||||
|
||||
def open(self, name, create):
|
||||
mof = 512 if self.for_sync else 128
|
||||
def open(self, name, create, read_only=False):
|
||||
mof = 10000
|
||||
path = os.path.join(self.db_dir, name)
|
||||
# Use snappy compression (the default)
|
||||
options = self.module.Options(create_if_missing=create,
|
||||
use_fsync=True,
|
||||
target_file_size_base=33554432,
|
||||
max_open_files=mof)
|
||||
self.db = self.module.DB(path, options)
|
||||
self.db = self.module.DB(path, options, read_only=read_only)
|
||||
self.get = self.db.get
|
||||
self.put = self.db.put
|
||||
self.multi_get = self.db.multi_get
|
||||
|
||||
def close(self):
|
||||
# PyRocksDB doesn't provide a close method; hopefully this is enough
|
||||
|
@ -118,8 +119,8 @@ class RocksDB(Storage):
|
|||
def write_batch(self):
|
||||
return RocksDBWriteBatch(self.db)
|
||||
|
||||
def iterator(self, prefix=b'', reverse=False):
|
||||
return RocksDBIterator(self.db, prefix, reverse)
|
||||
def iterator(self, **kwargs):
|
||||
return RocksDBIterator(self.db, **kwargs)
|
||||
|
||||
|
||||
class RocksDBWriteBatch:
|
||||
|
@ -140,28 +141,43 @@ class RocksDBWriteBatch:
|
|||
class RocksDBIterator:
|
||||
"""An iterator for RocksDB."""
|
||||
|
||||
def __init__(self, db, prefix, reverse):
|
||||
__slots__ = [
|
||||
'start',
|
||||
'prefix',
|
||||
'stop',
|
||||
'iterator',
|
||||
'include_key',
|
||||
'include_value',
|
||||
'prev',
|
||||
'reverse'
|
||||
]
|
||||
|
||||
def __init__(self, db, prefix=None, start=None, stop=None, include_key=True, include_value=True, reverse=False):
|
||||
self.start = start
|
||||
self.prefix = prefix
|
||||
if reverse:
|
||||
self.iterator = reversed(db.iteritems())
|
||||
nxt_prefix = util.increment_byte_string(prefix)
|
||||
if nxt_prefix:
|
||||
self.iterator.seek(nxt_prefix)
|
||||
try:
|
||||
next(self.iterator)
|
||||
except StopIteration:
|
||||
self.iterator.seek(nxt_prefix)
|
||||
else:
|
||||
self.iterator.seek_to_last()
|
||||
else:
|
||||
self.iterator = db.iteritems()
|
||||
self.stop = stop
|
||||
self.iterator = db.iteritems() if not reverse else reversed(db.iteritems())
|
||||
if prefix is not None and start is None:
|
||||
self.iterator.seek(prefix)
|
||||
elif start is not None:
|
||||
self.iterator.seek(start)
|
||||
self.include_key = include_key
|
||||
self.include_value = include_value
|
||||
self.prev = None
|
||||
self.reverse = reverse
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
k, v = next(self.iterator)
|
||||
if not k.startswith(self.prefix):
|
||||
if None not in (self.stop, self.prev) and self.prev.startswith(self.stop):
|
||||
raise StopIteration
|
||||
return k, v
|
||||
self.prev, v = next(self.iterator)
|
||||
prev = self.prev
|
||||
if self.prefix is not None and not prev.startswith(self.prefix):
|
||||
raise StopIteration
|
||||
if self.include_key and self.include_value:
|
||||
return prev, v
|
||||
elif self.include_key:
|
||||
return prev
|
||||
return v
|
||||
|
|
2
setup.py
2
setup.py
|
@ -10,6 +10,8 @@ with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh:
|
|||
PLYVEL = []
|
||||
if sys.platform.startswith('linux'):
|
||||
PLYVEL.append('plyvel==1.0.5')
|
||||
PLYVEL.append('python-rocksdb')
|
||||
|
||||
|
||||
setup(
|
||||
name=__name__,
|
||||
|
|
|
@ -2,6 +2,15 @@ import logging
|
|||
import asyncio
|
||||
from binascii import hexlify
|
||||
from lbry.testcase import CommandTestCase
|
||||
from lbry.wallet.server.leveldb import proc_ctx
|
||||
|
||||
|
||||
def get_txids(height):
|
||||
ctx = proc_ctx.get()
|
||||
return [
|
||||
ctx.ctx_tx_hash(tx_num)[0][::-1].hex()
|
||||
for tx_num in range(ctx.tx_counts[height - 1], ctx.tx_counts[height])
|
||||
]
|
||||
|
||||
|
||||
class BlockchainReorganizationTests(CommandTestCase):
|
||||
|
@ -11,18 +20,12 @@ class BlockchainReorganizationTests(CommandTestCase):
|
|||
async def assertBlockHash(self, height):
|
||||
bp = self.conductor.spv_node.server.bp
|
||||
|
||||
def get_txids():
|
||||
return [
|
||||
bp.db.fs_tx_hash(tx_num)[0][::-1].hex()
|
||||
for tx_num in range(bp.db.tx_counts[height - 1], bp.db.tx_counts[height])
|
||||
]
|
||||
|
||||
block_hash = await self.blockchain.get_block_hash(height)
|
||||
|
||||
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
|
||||
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
|
||||
self.assertEqual(block_hash, (bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
|
||||
|
||||
txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids)
|
||||
txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids, height)
|
||||
txs = await bp.db.fs_transactions(txids)
|
||||
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
||||
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
||||
|
|
Loading…
Reference in a new issue