Compare commits

...

20 commits

Author SHA1 Message Date
Jack Robison
3fff5ccb49
use array.array to serialize TX_COUNTS_STATE 2020-12-01 18:47:16 -05:00
Jack Robison
aa0f49633a
off by one 2020-12-01 18:47:16 -05:00
Jack Robison
ede167bb56
initialize TX_COUNTS_STATE from existing TX_COUNT_PREFIX 2020-12-01 18:47:16 -05:00
Jack Robison
c03b1b5a93
fix block_txs iterator stop condition 2020-12-01 17:00:35 -05:00
Jack Robison
68a97b0e61
remove dead code 2020-12-01 17:00:03 -05:00
Jack Robison
f86d940f20
TX_COUNTS_STATE 2020-12-01 11:30:14 -05:00
Jack Robison
314f8b0d42
fix uncaught error in chris45 test 2020-12-01 11:30:14 -05:00
Jack Robison
5ec8f355d4
cleanup 2020-12-01 11:30:14 -05:00
Jack Robison
f46bf0924a
update dockerfile 2020-12-01 10:11:13 -05:00
Jack Robison
6cbe559778
constants 2020-11-30 14:40:17 -05:00
Jack Robison
4ab2d7e624
ci 2020-11-30 14:40:09 -05:00
Jack Robison
a82abb33ac
fix 2020-11-30 14:26:04 -05:00
Jack Robison
70e5ce4806
rocksdb 2020-11-30 09:49:01 -05:00
Jack Robison
d960ba7412
combine leveldb databases 2020-11-30 09:49:01 -05:00
Jack Robison
18340c248d
merge tx_db and headers_db 2020-11-30 09:47:38 -05:00
Jack Robison
9012db0cfb
use rocksdb by default 2020-11-29 14:30:02 -05:00
Jack Robison
7143b475a1
RocksReaderContext 2020-11-29 14:30:02 -05:00
Jack Robison
976387fefb
RocksDBIterator 2020-11-29 14:30:02 -05:00
Jack Robison
1c01faed28
add rocksdb requirement, update dockerfile 2020-11-29 13:36:28 -05:00
Jack Robison
df3254b371
Merkle staticmethods 2020-11-29 13:35:43 -05:00
12 changed files with 520 additions and 375 deletions

View file

@ -28,24 +28,32 @@ stages:
test:lint:
stage: test
script:
- apt-get update
- apt-get install -y liblz4-dev libsnappy-dev librocksdb-dev
- make install tools
- make lint
test:unit:
stage: test
script:
- apt-get update
- apt-get install -y liblz4-dev libsnappy-dev librocksdb-dev
- make install tools
- HOME=/tmp coverage run -p --source=lbry -m unittest discover -vv tests.unit
test:datanetwork-integration:
stage: test
script:
- apt-get update
- apt-get install -y liblz4-dev libsnappy-dev librocksdb-dev
- pip install tox-travis
- tox -e datanetwork --recreate
test:blockchain-integration:
stage: test
script:
- apt-get update
- apt-get install -y liblz4-dev libsnappy-dev librocksdb-dev
- pip install tox-travis
- tox -e blockchain
@ -53,13 +61,15 @@ test:other-integration:
stage: test
script:
- apt-get update
- apt-get install -y --no-install-recommends ffmpeg
- apt-get install -y --no-install-recommends ffmpeg liblz4-dev libsnappy-dev librocksdb-dev
- pip install tox-travis
- tox -e other
test:json-api:
stage: test
script:
- apt-get update
- apt-get install -y liblz4-dev libsnappy-dev librocksdb-dev
- make install tools
- HOME=/tmp coverage run -p --source=lbry scripts/generate_json_api.py

View file

@ -10,6 +10,10 @@ ENV DOCKER_TAG=$DOCKER_TAG DOCKER_COMMIT=$DOCKER_COMMIT
RUN apt-get update && \
apt-get -y --no-install-recommends install \
liblz4-dev \
libbz2-dev \
libsnappy-dev \
librocksdb-dev \
wget \
tar unzip \
build-essential \
@ -31,7 +35,8 @@ RUN chown -R $user:$user $projects_dir
USER $user
WORKDIR $projects_dir
RUN pip install uvloop
RUN pip install uvloop cython
RUN pip install python-rocksdb
RUN make install
RUN python3 docker/set_build.py
RUN rm ~/.cache -rf
@ -48,5 +53,6 @@ ENV DB_DIRECTORY=$db_dir
ENV MAX_SESSIONS=1000000000
ENV MAX_SEND=1000000000000000000
ENV EVENT_LOOP_POLICY=uvloop
ENV DB_ENGINE=rocksdb
COPY ./docker/wallet_server_entrypoint.sh /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]

View file

@ -131,7 +131,7 @@ class Ledger(metaclass=LedgerRegistry):
self._on_transaction_controller = StreamController()
self.on_transaction = self._on_transaction_controller.stream
self.on_transaction.listen(
lambda e: log.info(
lambda e: log.debug(
'(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s',
self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id
)

View file

@ -10,7 +10,7 @@ from lbry.wallet.server.db.writer import SQLDB
from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.util import chunks, class_logger
from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.server.leveldb import FlushData, reopen_rocksdb_ctx, UTXO_PREFIX, HASHX_UTXO_PREFIX
class Prefetcher:
@ -275,6 +275,7 @@ class BlockProcessor:
await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
await self.prefetcher.reset_height(self.height)
self.reorg_count_metric.inc()
await reopen_rocksdb_ctx(self.db.executor)
async def reorg_hashes(self, count):
"""Return a pair (start, last, hashes) of blocks to back up during a
@ -289,7 +290,7 @@ class BlockProcessor:
self.logger.info(f'chain was reorganised replacing {count:,d} '
f'block{s} at heights {start:,d}-{last:,d}')
return start, last, await self.db.fs_block_hashes(start, count)
return start, last, self.db.fs_block_hashes(start, count)
async def calc_reorg_range(self, count: Optional[int]):
"""Calculate the reorg range"""
@ -307,7 +308,7 @@ class BlockProcessor:
start = self.height - 1
count = 1
while start > 0:
hashes = await self.db.fs_block_hashes(start, count)
hashes = self.db.fs_block_hashes(start, count)
hex_hashes = [hash_to_hex_str(hash) for hash in hashes]
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
n = diff_pos(hex_hashes, d_hex_hashes)
@ -346,6 +347,7 @@ class BlockProcessor:
self.db.flush_dbs(self.flush_data(), flush_utxos,
self.estimate_txs_remaining)
await self.run_in_thread_with_lock(flush)
await reopen_rocksdb_ctx(self.db.executor)
async def _maybe_flush(self):
# If caught up, flush everything as client queries are
@ -584,9 +586,9 @@ class BlockProcessor:
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
# Value: hashX
prefix = b'h' + tx_hash[:4] + idx_packed
prefix = HASHX_UTXO_PREFIX + tx_hash[:4] + idx_packed
candidates = {db_key: hashX for db_key, hashX
in self.db.utxo_db.iterator(prefix=prefix)}
in self.db.db.iterator(prefix=prefix)}
for hdb_key, hashX in candidates.items():
tx_num_packed = hdb_key[-4:]
@ -600,8 +602,8 @@ class BlockProcessor:
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
udb_key = b'u' + hashX + hdb_key[-6:]
utxo_value_packed = self.db.utxo_db.get(udb_key)
udb_key = UTXO_PREFIX + hashX + hdb_key[-6:]
utxo_value_packed = self.db.db.get(udb_key)
if utxo_value_packed:
# Remove both entries for this UTXO
self.db_deletes.append(hdb_key)

View file

@ -36,7 +36,7 @@ class Env:
self.loop_policy = self.set_event_loop_policy()
self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK'])
self.db_dir = self.required('DB_DIRECTORY')
self.db_engine = self.default('DB_ENGINE', 'leveldb')
self.db_engine = self.default('DB_ENGINE', 'rocksdb')
self.trending_algorithms = [
trending for trending in set(self.default('TRENDING_ALGORITHMS', 'zscore').split(' ')) if trending
]

View file

@ -20,6 +20,10 @@ from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
HASHX_HISTORY_PREFIX = b'x'
HIST_STATE = b'state-hist'
class History:
DB_VERSIONS = [0]
@ -32,8 +36,8 @@ class History:
self.unflushed_count = 0
self.db = None
def open_db(self, db_class, for_sync, utxo_flush_count, compacting):
self.db = db_class('hist', for_sync)
def open_db(self, db, for_sync, utxo_flush_count, compacting):
self.db = db #db_class('hist', for_sync)
self.read_state()
self.clear_excess(utxo_flush_count)
# An incomplete compaction needs to be cancelled otherwise
@ -44,11 +48,11 @@ class History:
def close_db(self):
if self.db:
self.db.close()
# self.db.close()
self.db = None
def read_state(self):
state = self.db.get(b'state\0\0')
state = self.db.get(HIST_STATE)
if state:
state = ast.literal_eval(state.decode())
if not isinstance(state, dict):
@ -80,17 +84,18 @@ class History:
'excess history flushes...')
keys = []
for key, hist in self.db.iterator(prefix=b''):
flush_id, = unpack_be_uint16_from(key[-2:])
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
k = key[1:]
flush_id, = unpack_be_uint16_from(k[-2:])
if flush_id > utxo_flush_count:
keys.append(key)
keys.append(k)
self.logger.info(f'deleting {len(keys):,d} history entries')
self.flush_count = utxo_flush_count
with self.db.write_batch() as batch:
for key in keys:
batch.delete(key)
batch.delete(HASHX_HISTORY_PREFIX + key)
self.write_state(batch)
self.logger.info('deleted excess history entries')
@ -105,7 +110,7 @@ class History:
}
# History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with
batch.put(b'state\0\0', repr(state).encode())
batch.put(HIST_STATE, repr(state).encode())
def add_unflushed(self, hashXs_by_tx, first_tx_num):
unflushed = self.unflushed
@ -132,7 +137,7 @@ class History:
with self.db.write_batch() as batch:
for hashX in sorted(unflushed):
key = hashX + flush_id
batch.put(key, unflushed[hashX].tobytes())
batch.put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes())
self.write_state(batch)
count = len(unflushed)
@ -154,16 +159,17 @@ class History:
for hashX in sorted(hashXs):
deletes = []
puts = {}
for key, hist in self.db.iterator(prefix=hashX, reverse=True):
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
k = key[1:]
a = array.array('I')
a.frombytes(hist)
# Remove all history entries >= tx_count
idx = bisect_left(a, tx_count)
nremoves += len(a) - idx
if idx > 0:
puts[key] = a[:idx].tobytes()
puts[k] = a[:idx].tobytes()
break
deletes.append(key)
deletes.append(k)
for key in deletes:
batch.delete(key)
@ -221,9 +227,9 @@ class History:
with self.db.write_batch() as batch:
# Important: delete first! The keyspace may overlap.
for key in keys_to_delete:
batch.delete(key)
batch.delete(HASHX_HISTORY_PREFIX + key)
for key, value in write_items:
batch.put(key, value)
batch.put(HASHX_HISTORY_PREFIX + key, value)
self.write_state(batch)
def _compact_hashX(self, hashX, hist_map, hist_list,
@ -271,11 +277,12 @@ class History:
key_len = HASHX_LEN + 2
write_size = 0
for key, hist in self.db.iterator(prefix=prefix):
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix):
k = key[1:]
# Ignore non-history entries
if len(key) != key_len:
if len(k) != key_len:
continue
hashX = key[:-2]
hashX = k[:-2]
if hashX != prior_hashX and prior_hashX:
write_size += self._compact_hashX(prior_hashX, hist_map,
hist_list, write_items,
@ -283,7 +290,7 @@ class History:
hist_map.clear()
hist_list.clear()
prior_hashX = hashX
hist_map[key] = hist
hist_map[k] = hist
hist_list.append(hist)
if prior_hashX:

View file

@ -23,17 +23,19 @@ from bisect import bisect_right
from collections import namedtuple
from glob import glob
from struct import pack, unpack
from contextvars import ContextVar
from dataclasses import dataclass
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
import attr
from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.util import formatted_time
from lbry.wallet.server.storage import db_class
from lbry.wallet.server.util import formatted_time, unpack_be_uint64, unpack_le_int32_from, pack_le_int32
from lbry.wallet.server.storage import db_class, RocksDB
from lbry.wallet.server.history import History
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
HEADER_PREFIX = b'H'
TX_COUNT_PREFIX = b'T'
@ -41,8 +43,302 @@ TX_HASH_PREFIX = b'X'
TX_PREFIX = b'B'
TX_NUM_PREFIX = b'N'
BLOCK_HASH_PREFIX = b'C'
HISTORY_PREFIX = b'A'
HASHX_UTXO_PREFIX = b'h'
UTXO_PREFIX = b'u'
HASHX_HISTORY_PREFIX = b'x'
UNDO_PREFIX = b'U'
UTXO_STATE = b'state-utxo'
HIST_STATE = b'state-hist'
TX_COUNTS_STATE = b'state-tx-counts'
@dataclass
class RocksReaderContext:
db: 'RocksDB'
db_dir: str
name: str
merkle: Merkle
tx_counts: List[int]
block_txs_cache: pylru.lrucache
merkle_tx_cache: pylru.lrucache
txid_cache: pylru.lrucache
def close(self):
self.db.close()
def reopen(self):
self.db.close()
self.db.open(self.name, create=False, read_only=True)
def update_state(self):
tx_counts = array.array("L")
counts = self.db.get(TX_COUNTS_STATE)
if not counts:
self.tx_counts = list(map(unpack_be_uint64, self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)))
else:
tx_counts.frombytes(counts)
self.tx_counts = tx_counts.tolist()
def ctx_tx_hash(self, tx_num):
tx_height = bisect_right(self.tx_counts, tx_num)
if tx_height > len(self.tx_counts):
return None, tx_height
key = TX_HASH_PREFIX + util.pack_be_uint64(tx_num)
if key in self.txid_cache:
return self.txid_cache[key], tx_height
tx_hash = self.db.get(key)
if tx_height + 100 <= len(self.tx_counts):
self.txid_cache[key] = tx_hash
return tx_hash, tx_height
proc_ctx: ContextVar[Optional[RocksReaderContext]] = ContextVar('proc_ctx')
def _update_rocksdb_ctx():
ctx = proc_ctx.get()
ctx.update_state()
def _reopen_rocksdb_ctx():
ctx = proc_ctx.get()
ctx.db.close()
ctx.db = RocksDB(ctx.db_dir, ctx.name, for_sync=False, read_only=True)
ctx.update_state()
async def update_rocksdb_ctx(executor: ProcessPoolExecutor):
loop = asyncio.get_event_loop()
await asyncio.wait([loop.run_in_executor(executor, _update_rocksdb_ctx) for _ in range(executor._max_workers)])
async def reopen_rocksdb_ctx(executor: ProcessPoolExecutor):
loop = asyncio.get_event_loop()
await asyncio.wait([loop.run_in_executor(executor, _reopen_rocksdb_ctx) for _ in range(executor._max_workers)])
def _initializer(path, name):
db = RocksDB(path, name, for_sync=False, read_only=True)
proc_ctx.set(RocksReaderContext(db, path, name, Merkle(), [], pylru.lrucache(50000), pylru.lrucache(100000),
pylru.lrucache(1000000)))
def _teardown():
proc_ctx.get().close()
proc_ctx.set(None)
async def initialize_executor(workers, db_dir, for_sync, name):
executor = ProcessPoolExecutor(workers, initializer=_initializer, initargs=(db_dir, name))
try:
writer = RocksDB(db_dir, name, for_sync=for_sync, read_only=False)
await update_rocksdb_ctx(executor)
except Exception as err:
await teardown_executor(executor)
executor.shutdown(True)
raise err
return executor, writer
async def teardown_executor(executor: ProcessPoolExecutor):
try:
await asyncio.wait(
[asyncio.get_event_loop().run_in_executor(executor, _teardown) for _ in range(executor._max_workers)]
)
finally:
executor.shutdown(True)
def lookup_hashXs_utxos(prevouts):
"""Return (hashX, suffix) pairs, or None if not found,
for each prevout.
"""
ctx = proc_ctx.get()
iterator = ctx.db.iterator
get = ctx.db.get
def lookup_hashXs():
"""Return (hashX, suffix) pairs, or None if not found,
for each prevout.
"""
def lookup_hashX(tx_hash, tx_idx):
idx_packed = pack('<H', tx_idx)
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
# Value: hashX
prefix = HASHX_UTXO_PREFIX + tx_hash[:4] + idx_packed
# Find which entry, if any, the TX_HASH matches.
for db_key, hashX in iterator(prefix=prefix):
tx_num_packed = db_key[-4:]
tx_num, = unpack('<I', tx_num_packed)
hash, height = ctx.ctx_tx_hash(tx_num)
if hash == tx_hash:
return hashX, idx_packed + tx_num_packed
return None, None
return [lookup_hashX(*prevout) for prevout in prevouts]
def lookup_utxo(hashX, suffix):
if not hashX:
# This can happen when the daemon is a block ahead
# of us and has mempool txs spending outputs from
# that new block
return None
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
db_value = get(UTXO_PREFIX + hashX + suffix)
if not db_value:
# This can happen if the DB was updated between
# getting the hashXs and getting the UTXOs
return None
value, = unpack('<Q', db_value)
return hashX, value
return [lookup_utxo(*hashX_pair) for hashX_pair in lookup_hashXs()]
def get_counts():
ctx = proc_ctx.get()
return ctx.tx_counts
def read_txids():
return list(proc_ctx.get().db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
def read_headers():
return [
header for header in proc_ctx.get().db.iterator(prefix=HEADER_PREFIX, include_key=False)
]
def read_block_file(path):
with util.open_file(path) as f:
return f.read(-1)
def tx_hash(self, tx_num):
"""Return a par (tx_hash, tx_height) for the given tx number.
If the tx_height is not on disk, returns (None, tx_height)."""
tx_height = bisect_right(self.tx_counts, tx_num)
if tx_height > self.db_height:
return None, tx_height
return self.total_transactions[tx_num], tx_height
def read_utxos(hashX):
utxos = []
utxos_append = utxos.append
s_unpack = unpack
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
for db_key, db_value in proc_ctx.get().db.iterator(prefix=UTXO_PREFIX + hashX):
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value)
utxos_append((tx_num, tx_pos, value))
return utxos
def limited_history(hashX, limit=None):
ctx = proc_ctx.get()
tx_counts = ctx.tx_counts
db_height = len(tx_counts)
cnt = 0
txs = []
for hist in ctx.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False):
a = array.array('I')
a.frombytes(hist)
for tx_num in a:
tx_height = bisect_right(tx_counts, tx_num)
if tx_height > db_height:
return
txs.append((tx_num, tx_height))
cnt += 1
if limit and cnt >= limit:
break
if limit and cnt >= limit:
break
return txs
def tx_merkle(tx_num, tx_height):
ctx = proc_ctx.get()
db = ctx.db
tx_counts = ctx.tx_counts
db_height = len(tx_counts)
if tx_height == -1:
return {
'block_height': -1
}
tx_pos = tx_num - tx_counts[tx_height - 1]
uncached = None
if (tx_num, tx_height) in ctx.merkle_tx_cache:
return ctx.merkle_tx_cache[(tx_num, tx_height)]
if tx_height not in ctx.block_txs_cache:
block_txs = list(db.iterator(
prefix=TX_HASH_PREFIX,
start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]),
stop=None if tx_height + 1 == len(tx_counts) else
TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height] - 1), include_key=False
))
# print("rocks txs", tx_height, list(map(hash_to_hex_str, block_txs)))
if tx_height + 100 <= db_height:
ctx.block_txs_cache[tx_height] = block_txs
else:
block_txs = ctx.block_txs_cache.get(tx_height, uncached)
branch, root = ctx.merkle.branch_and_root(block_txs, tx_pos)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash)
for hash in branch
],
'pos': tx_pos
}
if tx_height + 100 < db_height:
ctx.merkle_tx_cache[(tx_num, tx_height)] = merkle
return merkle
def transaction_info_get_batch(txids: Iterable[str]):
ctx = proc_ctx.get()
db_height = len(ctx.tx_counts)
tx_counts = ctx.tx_counts
tx_db_get = ctx.db.get
tx_infos = {}
for tx_hash in txids:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
tx = None
tx_height = -1
if tx_num is not None:
tx_num = unpack_be_uint64(tx_num)
tx_height = bisect_right(tx_counts, tx_num)
if tx_height < db_height:
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
tx_infos[tx_hash] = (
None if not tx else tx.hex(), {'block_height': -1} if tx_height == -1 else tx_merkle(
tx_num, tx_height
)
)
return tx_infos
def _update_block_txs_cache(tx_num, tx_height):
ctx = proc_ctx.get()
db = ctx.db
tx_counts = ctx.tx_counts
db_height = len(tx_counts)
@attr.s(slots=True)
class FlushData:
@ -80,7 +376,7 @@ class LevelDB:
self.db_class = db_class(env.db_dir, self.env.db_engine)
self.history = History()
self.utxo_db = None
self.db = None
self.tx_counts = None
self.headers = None
self.last_flush = time.time()
@ -91,9 +387,6 @@ class LevelDB:
self.merkle = Merkle()
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
self.headers_db = None
self.tx_db = None
self._block_txs_cache = pylru.lrucache(50000)
self._merkle_tx_cache = pylru.lrucache(100000)
self.total_transactions = None
@ -104,15 +397,9 @@ class LevelDB:
# tx_counts[N] has the cumulative number of txs at the end of
# height N. So tx_counts[0] is 1 - the genesis coinbase
def get_counts():
return tuple(
util.unpack_be_uint64(tx_count)
for tx_count in self.tx_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
)
tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts)
assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}"
self.tx_counts = array.array('I', tx_counts)
self.tx_counts = tx_counts
if self.tx_counts:
assert self.db_tx_count == self.tx_counts[-1], \
@ -121,12 +408,9 @@ class LevelDB:
assert self.db_tx_count == 0
async def _read_txids(self):
def get_txids():
return list(self.tx_db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
start = time.perf_counter()
self.logger.info("loading txids")
txids = await asyncio.get_event_loop().run_in_executor(self.executor, get_txids)
txids = await asyncio.get_event_loop().run_in_executor(self.executor, read_txids)
assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1]
self.total_transactions = txids
ts = time.perf_counter() - start
@ -136,47 +420,26 @@ class LevelDB:
if self.headers is not None:
return
def get_headers():
return [
header for header in self.headers_db.iterator(prefix=HEADER_PREFIX, include_key=False)
]
headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers)
headers = await asyncio.get_event_loop().run_in_executor(self.executor, read_headers)
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
self.headers = headers
async def _open_dbs(self, for_sync, compacting):
name = f'lbry-{self.env.db_engine}'
if self.executor is None:
self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1))
coin_path = os.path.join(self.env.db_dir, 'COIN')
if not os.path.isfile(coin_path):
with util.open_file(coin_path, create=True) as f:
f.write(f'ElectrumX databases and metadata for '
f'{self.coin.NAME} {self.coin.NET}'.encode())
self.executor, self.db = await initialize_executor(
max(1, os.cpu_count() - 1), self.env.db_dir, for_sync, name
)
assert self.headers_db is None
self.headers_db = self.db_class('headers', for_sync)
if self.headers_db.is_new:
self.logger.info('created new headers db')
self.logger.info(f'opened headers DB (for sync: {for_sync})')
if self.db.is_new:
self.logger.info('created new db: %s', name)
self.logger.info(f'opened DB (for sync: {for_sync})')
assert self.tx_db is None
self.tx_db = self.db_class('tx', for_sync)
if self.tx_db.is_new:
self.logger.info('created new tx db')
self.logger.info(f'opened tx DB (for sync: {for_sync})')
assert self.utxo_db is None
# First UTXO DB
self.utxo_db = self.db_class('utxo', for_sync)
if self.utxo_db.is_new:
self.logger.info('created new utxo db')
self.logger.info(f'opened utxo db (for sync: {for_sync})')
self.read_utxo_state()
# Then history DB
self.utxo_flush_count = self.history.open_db(
self.db_class, for_sync, self.utxo_flush_count, compacting
self.db, for_sync, self.utxo_flush_count, compacting
)
self.clear_excess_undo_info()
@ -187,12 +450,11 @@ class LevelDB:
await self._read_headers()
def close(self):
self.utxo_db.close()
self.db.close()
self.history.close_db()
self.headers_db.close()
self.tx_db.close()
self.executor.shutdown(wait=True)
self.executor = None
self.db = None
async def open_for_compacting(self):
await self._open_dbs(True, True)
@ -211,18 +473,12 @@ class LevelDB:
"""Open the databases for serving. If they are already open they are
closed first.
"""
self.logger.info('closing DBs to re-open for serving')
if self.utxo_db:
self.logger.info('closing DBs to re-open for serving')
self.utxo_db.close()
self.history.close_db()
self.utxo_db = None
if self.headers_db:
self.headers_db.close()
self.headers_db = None
if self.tx_db:
self.tx_db.close()
self.tx_db = None
if self.db:
return
# self.logger.info('closing DBs to re-open for serving')
# self.db.close()
# self.history.close_db()
# self.db = None
await self._open_dbs(False, False)
self.logger.info("opened for serving")
@ -271,14 +527,14 @@ class LevelDB:
self.flush_history()
# Flush state last as it reads the wall time.
with self.utxo_db.write_batch() as batch:
with self.db.write_batch() as batch:
if flush_utxos:
self.flush_utxo_db(batch, flush_data)
self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.utxo_db)
self.flush_state(self.db)
elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.history.flush_count:,d} took '
@ -286,7 +542,7 @@ class LevelDB:
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
# Catch-up stats
if self.utxo_db.for_sync:
if self.db.for_sync:
flush_interval = self.last_flush - prior_flush
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
@ -317,18 +573,16 @@ class LevelDB:
# Write the headers
start_time = time.perf_counter()
with self.headers_db.write_batch() as batch:
with self.db.write_batch() as batch:
batch_put = batch.put
for i, header in enumerate(flush_data.headers):
batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header)
self.headers.append(header)
flush_data.headers.clear()
flush_data.headers.clear()
height_start = self.fs_height + 1
tx_num = prior_tx_count
height_start = self.fs_height + 1
tx_num = prior_tx_count
with self.tx_db.write_batch() as batch:
batch_put = batch.put
for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs):
tx_count = self.tx_counts[height_start]
batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
@ -342,14 +596,22 @@ class LevelDB:
tx_num += 1
offset += 32
a = array.array('L')
a.fromlist(self.tx_counts)
batch_put(TX_COUNTS_STATE, a.tobytes())
flush_data.block_txs.clear()
flush_data.block_hashes.clear()
self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count
elapsed = time.perf_counter() - start_time
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
def flush_history(self):
self.history.flush()
@ -374,15 +636,15 @@ class LevelDB:
# suffix = tx_idx + tx_num
hashX = value[:-12]
suffix = key[-2:] + value[-12:-8]
batch_put(b'h' + key[:4] + suffix, hashX)
batch_put(b'u' + hashX + suffix, value[-8:])
batch_put(HASHX_UTXO_PREFIX + key[:4] + suffix, hashX)
batch_put(UTXO_PREFIX + hashX + suffix, value[-8:])
flush_data.adds.clear()
# New undo information
self.flush_undo_infos(batch_put, flush_data.undo_infos)
flush_data.undo_infos.clear()
if self.utxo_db.for_sync:
if self.db.for_sync:
block_count = flush_data.height - self.db_height
tx_count = flush_data.tx_count - self.db_tx_count
elapsed = time.time() - start_time
@ -416,7 +678,7 @@ class LevelDB:
self.backup_fs(flush_data.height, flush_data.tx_count)
self.history.backup(touched, flush_data.tx_count)
with self.utxo_db.write_batch() as batch:
with self.db.write_batch() as batch:
self.flush_utxo_db(batch, flush_data)
# Flush state last as it reads the wall time.
self.flush_state(batch)
@ -470,78 +732,12 @@ class LevelDB:
return None, tx_height
return self.total_transactions[tx_num], tx_height
async def tx_merkle(self, tx_num, tx_height):
if tx_height == -1:
return {
'block_height': -1
}
tx_counts = self.tx_counts
tx_pos = tx_num - tx_counts[tx_height - 1]
def _update_block_txs_cache():
block_txs = list(self.tx_db.iterator(
start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]),
stop=None if tx_height + 1 == len(tx_counts) else
TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height]), include_key=False
))
if tx_height + 100 > self.db_height:
return block_txs
self._block_txs_cache[tx_height] = block_txs
uncached = None
if (tx_num, tx_height) in self._merkle_tx_cache:
return self._merkle_tx_cache[(tx_num, tx_height)]
if tx_height not in self._block_txs_cache:
uncached = await asyncio.get_event_loop().run_in_executor(self.executor, _update_block_txs_cache)
block_txs = self._block_txs_cache.get(tx_height, uncached)
branch, root = self.merkle.branch_and_root(block_txs, tx_pos)
merkle = {
'block_height': tx_height,
'merkle': [
hash_to_hex_str(hash)
for hash in branch
],
'pos': tx_pos
}
if tx_height + 100 < self.db_height:
self._merkle_tx_cache[(tx_num, tx_height)] = merkle
return merkle
def _fs_transactions(self, txids: Iterable[str]) -> List[Tuple[str, Optional[str], int, int]]:
unpack_be_uint64 = util.unpack_be_uint64
tx_counts = self.tx_counts
tx_db_get = self.tx_db.get
tx_infos = []
for tx_hash in txids:
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
tx = None
tx_height = -1
if tx_num is not None:
tx_num = unpack_be_uint64(tx_num)
tx_height = bisect_right(tx_counts, tx_num)
if tx_height < self.db_height:
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
tx_infos.append((tx_hash, None if not tx else tx.hex(), tx_num, tx_height))
return tx_infos
async def fs_transactions(self, txids):
txs = await asyncio.get_event_loop().run_in_executor(
self.executor, self._fs_transactions, txids
return await asyncio.get_event_loop().run_in_executor(
self.executor, transaction_info_get_batch, txids
)
unsorted_result = {}
async def add_result(item):
_txid, _tx, _tx_num, _tx_height = item
unsorted_result[_txid] = (_tx, await self.tx_merkle(_tx_num, _tx_height))
if txs:
await asyncio.gather(*map(add_result, txs))
return {txid: unsorted_result[txid] for txid, _, _, _ in txs}
async def fs_block_hashes(self, height, count):
def fs_block_hashes(self, height, count):
if height + count > len(self.headers):
raise self.DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}')
return [self.coin.header_hash(header) for header in self.headers[height:height + count]]
@ -553,49 +749,9 @@ class LevelDB:
transactions. By default returns at most 1000 entries. Set
limit to None to get them all.
"""
# def read_history():
# hashx_history = []
# for key, hist in self.history.db.iterator(prefix=hashX):
# a = array.array('I')
# a.frombytes(hist)
# for tx_num in a:
# tx_height = bisect_right(self.tx_counts, tx_num)
# if tx_height > self.db_height:
# tx_hash = None
# else:
# tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
#
# hashx_history.append((tx_hash, tx_height))
# if limit and len(hashx_history) >= limit:
# return hashx_history
# return hashx_history
def read_history():
db_height = self.db_height
tx_counts = self.tx_counts
tx_db_get = self.tx_db.get
pack_be_uint64 = util.pack_be_uint64
cnt = 0
txs = []
for hist in self.history.db.iterator(prefix=hashX, include_key=False):
a = array.array('I')
a.frombytes(hist)
for tx_num in a:
tx_height = bisect_right(tx_counts, tx_num)
if tx_height > db_height:
return
txs.append((tx_num, tx_height))
cnt += 1
if limit and cnt >= limit:
break
if limit and cnt >= limit:
break
return txs
while True:
history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history)
history = await asyncio.get_event_loop().run_in_executor(self.executor, limited_history, hashX, limit)
if history is not None:
return [(self.total_transactions[tx_num], tx_height) for (tx_num, tx_height) in history]
self.logger.warning(f'limited_history: tx hash '
@ -610,11 +766,11 @@ class LevelDB:
def undo_key(self, height):
"""DB key for undo information at the given height."""
return b'U' + pack('>I', height)
return UNDO_PREFIX + pack('>I', height)
def read_undo_info(self, height):
"""Read undo information from a file for the current height."""
return self.utxo_db.get(self.undo_key(height))
return self.db.get(self.undo_key(height))
def flush_undo_infos(self, batch_put, undo_infos):
"""undo_infos is a list of (undo_info, height) pairs."""
@ -631,11 +787,7 @@ class LevelDB:
"""Returns a raw block read from disk. Raises FileNotFoundError
if the block isn't on-disk."""
def read():
with util.open_file(self.raw_block_path(height)) as f:
return f.read(-1)
return await asyncio.get_event_loop().run_in_executor(self.executor, read)
return await asyncio.get_event_loop().run_in_executor(self.executor, read_block_file, self.raw_block_path(height))
def write_raw_block(self, block, height):
"""Write a raw block to disk."""
@ -650,17 +802,16 @@ class LevelDB:
def clear_excess_undo_info(self):
"""Clear excess undo info. Only most recent N are kept."""
prefix = b'U'
min_height = self.min_undo_height(self.db_height)
keys = []
for key, hist in self.utxo_db.iterator(prefix=prefix):
for key, hist in self.db.iterator(prefix=UNDO_PREFIX):
height, = unpack('>I', key[-4:])
if height >= min_height:
break
keys.append(key)
if keys:
with self.utxo_db.write_batch() as batch:
with self.db.write_batch() as batch:
for key in keys:
batch.delete(key)
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
@ -681,7 +832,7 @@ class LevelDB:
# -- UTXO database
def read_utxo_state(self):
state = self.utxo_db.get(b'state')
state = self.db.get(UTXO_STATE)
if not state:
self.db_height = -1
self.db_tx_count = 0
@ -724,7 +875,7 @@ class LevelDB:
self.logger.info(f'height: {self.db_height:,d}')
self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}')
self.logger.info(f'tx count: {self.db_tx_count:,d}')
if self.utxo_db.for_sync:
if self.db.for_sync:
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
if self.first_sync:
self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}')
@ -741,32 +892,18 @@ class LevelDB:
'first_sync': self.first_sync,
'db_version': self.db_version,
}
batch.put(b'state', repr(state).encode())
batch.put(UTXO_STATE, repr(state).encode())
def set_flush_count(self, count):
self.utxo_flush_count = count
with self.utxo_db.write_batch() as batch:
with self.db.write_batch() as batch:
self.write_utxo_state(batch)
async def all_utxos(self, hashX):
"""Return all UTXOs for an address sorted in no particular order."""
def read_utxos():
utxos = []
utxos_append = utxos.append
s_unpack = unpack
fs_tx_hash = self.fs_tx_hash
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
prefix = b'u' + hashX
for db_key, db_value in self.utxo_db.iterator(prefix=prefix):
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value)
tx_hash, height = fs_tx_hash(tx_num)
utxos_append(UTXO(tx_num, tx_pos, tx_hash, height, value))
return utxos
while True:
utxos = await asyncio.get_event_loop().run_in_executor(self.executor, read_utxos)
utxos = [UTXO(tx_num, tx_pos, *self.fs_tx_hash(tx_num), value=value) for (tx_num, tx_pos, value) in utxos]
if all(utxo.tx_hash is not None for utxo in utxos):
return utxos
self.logger.warning(f'all_utxos: tx hash not '
@ -779,45 +916,4 @@ class LevelDB:
Used by the mempool code.
"""
def lookup_hashXs():
"""Return (hashX, suffix) pairs, or None if not found,
for each prevout.
"""
def lookup_hashX(tx_hash, tx_idx):
idx_packed = pack('<H', tx_idx)
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
# Value: hashX
prefix = b'h' + tx_hash[:4] + idx_packed
# Find which entry, if any, the TX_HASH matches.
for db_key, hashX in self.utxo_db.iterator(prefix=prefix):
tx_num_packed = db_key[-4:]
tx_num, = unpack('<I', tx_num_packed)
hash, height = self.fs_tx_hash(tx_num)
if hash == tx_hash:
return hashX, idx_packed + tx_num_packed
return None, None
return [lookup_hashX(*prevout) for prevout in prevouts]
def lookup_utxos(hashX_pairs):
def lookup_utxo(hashX, suffix):
if not hashX:
# This can happen when the daemon is a block ahead
# of us and has mempool txs spending outputs from
# that new block
return None
# Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer
key = b'u' + hashX + suffix
db_value = self.utxo_db.get(key)
if not db_value:
# This can happen if the DB was updated between
# getting the hashXs and getting the UTXOs
return None
value, = unpack('<Q', db_value)
return hashX, value
return [lookup_utxo(*hashX_pair) for hashX_pair in hashX_pairs]
hashX_pairs = await asyncio.get_event_loop().run_in_executor(self.executor, lookup_hashXs)
return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_utxos, hashX_pairs)
return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_hashXs_utxos, prevouts)

View file

@ -43,10 +43,12 @@ class Merkle:
def __init__(self, hash_func=double_sha256):
self.hash_func = hash_func
def tree_depth(self, hash_count):
return self.branch_length(hash_count) + 1
@staticmethod
def tree_depth(hash_count):
return Merkle.branch_length(hash_count) + 1
def branch_length(self, hash_count):
@staticmethod
def branch_length(hash_count):
"""Return the length of a merkle branch given the number of hashes."""
if not isinstance(hash_count, int):
raise TypeError('hash_count must be an integer')
@ -54,7 +56,8 @@ class Merkle:
raise ValueError('hash_count must be at least 1')
return ceil(log(hash_count, 2))
def branch_and_root(self, hashes, index, length=None):
@staticmethod
def branch_and_root(hashes, index, length=None, hash_func=double_sha256):
"""Return a (merkle branch, merkle_root) pair given hashes, and the
index of one of those hashes.
"""
@ -64,7 +67,7 @@ class Merkle:
# This also asserts hashes is not empty
if not 0 <= index < len(hashes):
raise ValueError(f"index '{index}/{len(hashes)}' out of range")
natural_length = self.branch_length(len(hashes))
natural_length = Merkle.branch_length(len(hashes))
if length is None:
length = natural_length
else:
@ -73,7 +76,6 @@ class Merkle:
if length < natural_length:
raise ValueError('length out of range')
hash_func = self.hash_func
branch = []
for _ in range(length):
if len(hashes) & 1:
@ -85,44 +87,47 @@ class Merkle:
return branch, hashes[0]
def root(self, hashes, length=None):
@staticmethod
def root(hashes, length=None):
"""Return the merkle root of a non-empty iterable of binary hashes."""
branch, root = self.branch_and_root(hashes, 0, length)
branch, root = Merkle.branch_and_root(hashes, 0, length)
return root
def root_from_proof(self, hash, branch, index):
"""Return the merkle root given a hash, a merkle branch to it, and
its index in the hashes array.
# @staticmethod
# def root_from_proof(hash, branch, index, hash_func=double_sha256):
# """Return the merkle root given a hash, a merkle branch to it, and
# its index in the hashes array.
#
# branch is an iterable sorted deepest to shallowest. If the
# returned root is the expected value then the merkle proof is
# verified.
#
# The caller should have confirmed the length of the branch with
# branch_length(). Unfortunately this is not easily done for
# bitcoin transactions as the number of transactions in a block
# is unknown to an SPV client.
# """
# for elt in branch:
# if index & 1:
# hash = hash_func(elt + hash)
# else:
# hash = hash_func(hash + elt)
# index >>= 1
# if index:
# raise ValueError('index out of range for branch')
# return hash
branch is an iterable sorted deepest to shallowest. If the
returned root is the expected value then the merkle proof is
verified.
The caller should have confirmed the length of the branch with
branch_length(). Unfortunately this is not easily done for
bitcoin transactions as the number of transactions in a block
is unknown to an SPV client.
"""
hash_func = self.hash_func
for elt in branch:
if index & 1:
hash = hash_func(elt + hash)
else:
hash = hash_func(hash + elt)
index >>= 1
if index:
raise ValueError('index out of range for branch')
return hash
def level(self, hashes, depth_higher):
@staticmethod
def level(hashes, depth_higher):
"""Return a level of the merkle tree of hashes the given depth
higher than the bottom row of the original tree."""
size = 1 << depth_higher
root = self.root
root = Merkle.root
return [root(hashes[n: n + size], depth_higher)
for n in range(0, len(hashes), size)]
def branch_and_root_from_level(self, level, leaf_hashes, index,
@staticmethod
def branch_and_root_from_level(level, leaf_hashes, index,
depth_higher):
"""Return a (merkle branch, merkle_root) pair when a merkle-tree has a
level cached.
@ -146,10 +151,10 @@ class Merkle:
if not isinstance(leaf_hashes, list):
raise TypeError("leaf_hashes must be a list")
leaf_index = (index >> depth_higher) << depth_higher
leaf_branch, leaf_root = self.branch_and_root(
leaf_branch, leaf_root = Merkle.branch_and_root(
leaf_hashes, index - leaf_index, depth_higher)
index >>= depth_higher
level_branch, root = self.branch_and_root(level, index)
level_branch, root = Merkle.branch_and_root(level, index)
# Check last so that we know index is in-range
if leaf_root != level[index]:
raise ValueError('leaf hashes inconsistent with level')
@ -191,7 +196,7 @@ class MerkleCache:
# Start from the beginning of any final partial segment.
# Retain the value of depth_higher; in practice this is fine
start = self._leaf_start(self.length)
hashes = await self.source_func(start, length - start)
hashes = self.source_func(start, length - start)
self.level[start >> self.depth_higher:] = self._level(hashes)
self.length = length
@ -203,7 +208,7 @@ class MerkleCache:
level = self.level[:length >> self.depth_higher]
leaf_start = self._leaf_start(length)
count = min(self._segment_length(), length - leaf_start)
hashes = await self.source_func(leaf_start, count)
hashes = self.source_func(leaf_start, count)
level += self._level(hashes)
return level
@ -211,7 +216,7 @@ class MerkleCache:
"""Call to initialize the cache to a source of given length."""
self.length = length
self.depth_higher = self.merkle.tree_depth(length) // 2
self.level = self._level(await self.source_func(0, length))
self.level = self._level(self.source_func(0, length))
self.initialized.set()
def truncate(self, length):
@ -245,7 +250,7 @@ class MerkleCache:
await self._extend_to(length)
leaf_start = self._leaf_start(index)
count = min(self._segment_length(), length - leaf_start)
leaf_hashes = await self.source_func(leaf_start, count)
leaf_hashes = self.source_func(leaf_start, count)
if length < self._segment_length():
return self.merkle.branch_and_root(leaf_hashes, index)
level = await self._level_for(length)

View file

@ -1274,11 +1274,11 @@ class LBRYElectrumX(SessionBase):
hashX = self.address_to_hashX(address)
return await self.hashX_unsubscribe(hashX, address)
async def get_balance(self, hashX):
utxos = await self.db.all_utxos(hashX)
confirmed = sum(utxo.value for utxo in utxos)
unconfirmed = await self.mempool.balance_delta(hashX)
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
# async def get_balance(self, hashX):
# utxos = await self.db.all_utxos(hashX)
# confirmed = sum(utxo.value for utxo in utxos)
# unconfirmed = await self.mempool.balance_delta(hashX)
# return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
async def scripthash_get_balance(self, scripthash):
"""Return the confirmed and unconfirmed balance of a scripthash."""
@ -1534,6 +1534,7 @@ class LBRYElectrumX(SessionBase):
if block_hash:
block = await self.daemon.deserialised_block(block_hash)
height = block['height']
# print('lbrycrd txs', height, block['tx'])
try:
pos = block['tx'].index(tx_hash)
except ValueError:
@ -1543,15 +1544,12 @@ class LBRYElectrumX(SessionBase):
else:
batch_result[tx_hash] = [raw_tx, {'block_height': -1}]
def threaded_get_merkle():
for tx_hash, (raw_tx, block_txs, pos, block_height) in needed_merkles.items():
batch_result[tx_hash] = raw_tx, {
'merkle': self._get_merkle_branch(block_txs, pos),
'pos': pos,
'block_height': block_height
}
if needed_merkles:
await asyncio.get_running_loop().run_in_executor(self.db.executor, threaded_get_merkle)
for tx_hash, (raw_tx, block_txs, pos, block_height) in needed_merkles.items():
batch_result[tx_hash] = raw_tx, {
'merkle': self._get_merkle_branch(block_txs, pos),
'pos': pos,
'block_height': block_height
}
self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes))
return batch_result

View file

@ -25,18 +25,18 @@ def db_class(db_dir, name):
class Storage:
"""Abstract base class of the DB backend abstraction."""
def __init__(self, db_dir, name, for_sync):
def __init__(self, db_dir, name, for_sync, read_only=False):
self.db_dir = db_dir
self.is_new = not os.path.exists(os.path.join(db_dir, name))
self.for_sync = for_sync or self.is_new
self.open(name, create=self.is_new)
self.open(name, create=self.is_new, read_only=read_only)
@classmethod
def import_module(cls):
"""Import the DB engine module."""
raise NotImplementedError
def open(self, name, create):
def open(self, name, create, read_only=False):
"""Open an existing database or create a new one."""
raise NotImplementedError
@ -77,7 +77,7 @@ class LevelDB(Storage):
import plyvel
cls.module = plyvel
def open(self, name, create, lru_cache_size=None):
def open(self, name, create, read_only=False):
mof = 10000
path = os.path.join(self.db_dir, name)
# Use snappy compression (the default)
@ -97,17 +97,18 @@ class RocksDB(Storage):
import rocksdb
cls.module = rocksdb
def open(self, name, create):
mof = 512 if self.for_sync else 128
def open(self, name, create, read_only=False):
mof = 10000
path = os.path.join(self.db_dir, name)
# Use snappy compression (the default)
options = self.module.Options(create_if_missing=create,
use_fsync=True,
target_file_size_base=33554432,
max_open_files=mof)
self.db = self.module.DB(path, options)
self.db = self.module.DB(path, options, read_only=read_only)
self.get = self.db.get
self.put = self.db.put
self.multi_get = self.db.multi_get
def close(self):
# PyRocksDB doesn't provide a close method; hopefully this is enough
@ -118,8 +119,8 @@ class RocksDB(Storage):
def write_batch(self):
return RocksDBWriteBatch(self.db)
def iterator(self, prefix=b'', reverse=False):
return RocksDBIterator(self.db, prefix, reverse)
def iterator(self, **kwargs):
return RocksDBIterator(self.db, **kwargs)
class RocksDBWriteBatch:
@ -140,28 +141,43 @@ class RocksDBWriteBatch:
class RocksDBIterator:
"""An iterator for RocksDB."""
def __init__(self, db, prefix, reverse):
__slots__ = [
'start',
'prefix',
'stop',
'iterator',
'include_key',
'include_value',
'prev',
'reverse'
]
def __init__(self, db, prefix=None, start=None, stop=None, include_key=True, include_value=True, reverse=False):
self.start = start
self.prefix = prefix
if reverse:
self.iterator = reversed(db.iteritems())
nxt_prefix = util.increment_byte_string(prefix)
if nxt_prefix:
self.iterator.seek(nxt_prefix)
try:
next(self.iterator)
except StopIteration:
self.iterator.seek(nxt_prefix)
else:
self.iterator.seek_to_last()
else:
self.iterator = db.iteritems()
self.stop = stop
self.iterator = db.iteritems() if not reverse else reversed(db.iteritems())
if prefix is not None and start is None:
self.iterator.seek(prefix)
elif start is not None:
self.iterator.seek(start)
self.include_key = include_key
self.include_value = include_value
self.prev = None
self.reverse = reverse
def __iter__(self):
return self
def __next__(self):
k, v = next(self.iterator)
if not k.startswith(self.prefix):
if None not in (self.stop, self.prev) and self.prev.startswith(self.stop):
raise StopIteration
return k, v
self.prev, v = next(self.iterator)
prev = self.prev
if self.prefix is not None and not prev.startswith(self.prefix):
raise StopIteration
if self.include_key and self.include_value:
return prev, v
elif self.include_key:
return prev
return v

View file

@ -10,6 +10,8 @@ with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh:
PLYVEL = []
if sys.platform.startswith('linux'):
PLYVEL.append('plyvel==1.0.5')
PLYVEL.append('python-rocksdb')
setup(
name=__name__,

View file

@ -2,6 +2,15 @@ import logging
import asyncio
from binascii import hexlify
from lbry.testcase import CommandTestCase
from lbry.wallet.server.leveldb import proc_ctx
def get_txids(height):
ctx = proc_ctx.get()
return [
ctx.ctx_tx_hash(tx_num)[0][::-1].hex()
for tx_num in range(ctx.tx_counts[height - 1], ctx.tx_counts[height])
]
class BlockchainReorganizationTests(CommandTestCase):
@ -11,18 +20,12 @@ class BlockchainReorganizationTests(CommandTestCase):
async def assertBlockHash(self, height):
bp = self.conductor.spv_node.server.bp
def get_txids():
return [
bp.db.fs_tx_hash(tx_num)[0][::-1].hex()
for tx_num in range(bp.db.tx_counts[height - 1], bp.db.tx_counts[height])
]
block_hash = await self.blockchain.get_block_hash(height)
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
self.assertEqual(block_hash, (bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids)
txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids, height)
txs = await bp.db.fs_transactions(txids)
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')