Compare commits
20 commits
master
...
spv-rocksd
Author | SHA1 | Date | |
---|---|---|---|
|
3fff5ccb49 | ||
|
aa0f49633a | ||
|
ede167bb56 | ||
|
c03b1b5a93 | ||
|
68a97b0e61 | ||
|
f86d940f20 | ||
|
314f8b0d42 | ||
|
5ec8f355d4 | ||
|
f46bf0924a | ||
|
6cbe559778 | ||
|
4ab2d7e624 | ||
|
a82abb33ac | ||
|
70e5ce4806 | ||
|
d960ba7412 | ||
|
18340c248d | ||
|
9012db0cfb | ||
|
7143b475a1 | ||
|
976387fefb | ||
|
1c01faed28 | ||
|
df3254b371 |
12 changed files with 520 additions and 375 deletions
|
@ -28,24 +28,32 @@ stages:
|
||||||
test:lint:
|
test:lint:
|
||||||
stage: test
|
stage: test
|
||||||
script:
|
script:
|
||||||
|
- apt-get update
|
||||||
|
- apt-get install -y liblz4-dev libsnappy-dev librocksdb-dev
|
||||||
- make install tools
|
- make install tools
|
||||||
- make lint
|
- make lint
|
||||||
|
|
||||||
test:unit:
|
test:unit:
|
||||||
stage: test
|
stage: test
|
||||||
script:
|
script:
|
||||||
|
- apt-get update
|
||||||
|
- apt-get install -y liblz4-dev libsnappy-dev librocksdb-dev
|
||||||
- make install tools
|
- make install tools
|
||||||
- HOME=/tmp coverage run -p --source=lbry -m unittest discover -vv tests.unit
|
- HOME=/tmp coverage run -p --source=lbry -m unittest discover -vv tests.unit
|
||||||
|
|
||||||
test:datanetwork-integration:
|
test:datanetwork-integration:
|
||||||
stage: test
|
stage: test
|
||||||
script:
|
script:
|
||||||
|
- apt-get update
|
||||||
|
- apt-get install -y liblz4-dev libsnappy-dev librocksdb-dev
|
||||||
- pip install tox-travis
|
- pip install tox-travis
|
||||||
- tox -e datanetwork --recreate
|
- tox -e datanetwork --recreate
|
||||||
|
|
||||||
test:blockchain-integration:
|
test:blockchain-integration:
|
||||||
stage: test
|
stage: test
|
||||||
script:
|
script:
|
||||||
|
- apt-get update
|
||||||
|
- apt-get install -y liblz4-dev libsnappy-dev librocksdb-dev
|
||||||
- pip install tox-travis
|
- pip install tox-travis
|
||||||
- tox -e blockchain
|
- tox -e blockchain
|
||||||
|
|
||||||
|
@ -53,13 +61,15 @@ test:other-integration:
|
||||||
stage: test
|
stage: test
|
||||||
script:
|
script:
|
||||||
- apt-get update
|
- apt-get update
|
||||||
- apt-get install -y --no-install-recommends ffmpeg
|
- apt-get install -y --no-install-recommends ffmpeg liblz4-dev libsnappy-dev librocksdb-dev
|
||||||
- pip install tox-travis
|
- pip install tox-travis
|
||||||
- tox -e other
|
- tox -e other
|
||||||
|
|
||||||
test:json-api:
|
test:json-api:
|
||||||
stage: test
|
stage: test
|
||||||
script:
|
script:
|
||||||
|
- apt-get update
|
||||||
|
- apt-get install -y liblz4-dev libsnappy-dev librocksdb-dev
|
||||||
- make install tools
|
- make install tools
|
||||||
- HOME=/tmp coverage run -p --source=lbry scripts/generate_json_api.py
|
- HOME=/tmp coverage run -p --source=lbry scripts/generate_json_api.py
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,10 @@ ENV DOCKER_TAG=$DOCKER_TAG DOCKER_COMMIT=$DOCKER_COMMIT
|
||||||
|
|
||||||
RUN apt-get update && \
|
RUN apt-get update && \
|
||||||
apt-get -y --no-install-recommends install \
|
apt-get -y --no-install-recommends install \
|
||||||
|
liblz4-dev \
|
||||||
|
libbz2-dev \
|
||||||
|
libsnappy-dev \
|
||||||
|
librocksdb-dev \
|
||||||
wget \
|
wget \
|
||||||
tar unzip \
|
tar unzip \
|
||||||
build-essential \
|
build-essential \
|
||||||
|
@ -31,7 +35,8 @@ RUN chown -R $user:$user $projects_dir
|
||||||
USER $user
|
USER $user
|
||||||
WORKDIR $projects_dir
|
WORKDIR $projects_dir
|
||||||
|
|
||||||
RUN pip install uvloop
|
RUN pip install uvloop cython
|
||||||
|
RUN pip install python-rocksdb
|
||||||
RUN make install
|
RUN make install
|
||||||
RUN python3 docker/set_build.py
|
RUN python3 docker/set_build.py
|
||||||
RUN rm ~/.cache -rf
|
RUN rm ~/.cache -rf
|
||||||
|
@ -48,5 +53,6 @@ ENV DB_DIRECTORY=$db_dir
|
||||||
ENV MAX_SESSIONS=1000000000
|
ENV MAX_SESSIONS=1000000000
|
||||||
ENV MAX_SEND=1000000000000000000
|
ENV MAX_SEND=1000000000000000000
|
||||||
ENV EVENT_LOOP_POLICY=uvloop
|
ENV EVENT_LOOP_POLICY=uvloop
|
||||||
|
ENV DB_ENGINE=rocksdb
|
||||||
COPY ./docker/wallet_server_entrypoint.sh /entrypoint.sh
|
COPY ./docker/wallet_server_entrypoint.sh /entrypoint.sh
|
||||||
ENTRYPOINT ["/entrypoint.sh"]
|
ENTRYPOINT ["/entrypoint.sh"]
|
||||||
|
|
|
@ -131,7 +131,7 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
self._on_transaction_controller = StreamController()
|
self._on_transaction_controller = StreamController()
|
||||||
self.on_transaction = self._on_transaction_controller.stream
|
self.on_transaction = self._on_transaction_controller.stream
|
||||||
self.on_transaction.listen(
|
self.on_transaction.listen(
|
||||||
lambda e: log.info(
|
lambda e: log.debug(
|
||||||
'(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s',
|
'(%s) on_transaction: address=%s, height=%s, is_verified=%s, tx.id=%s',
|
||||||
self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id
|
self.get_id(), e.address, e.tx.height, e.tx.is_verified, e.tx.id
|
||||||
)
|
)
|
||||||
|
|
|
@ -10,7 +10,7 @@ from lbry.wallet.server.db.writer import SQLDB
|
||||||
from lbry.wallet.server.daemon import DaemonError
|
from lbry.wallet.server.daemon import DaemonError
|
||||||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||||
from lbry.wallet.server.util import chunks, class_logger
|
from lbry.wallet.server.util import chunks, class_logger
|
||||||
from lbry.wallet.server.leveldb import FlushData
|
from lbry.wallet.server.leveldb import FlushData, reopen_rocksdb_ctx, UTXO_PREFIX, HASHX_UTXO_PREFIX
|
||||||
|
|
||||||
|
|
||||||
class Prefetcher:
|
class Prefetcher:
|
||||||
|
@ -275,6 +275,7 @@ class BlockProcessor:
|
||||||
await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
|
await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
|
||||||
await self.prefetcher.reset_height(self.height)
|
await self.prefetcher.reset_height(self.height)
|
||||||
self.reorg_count_metric.inc()
|
self.reorg_count_metric.inc()
|
||||||
|
await reopen_rocksdb_ctx(self.db.executor)
|
||||||
|
|
||||||
async def reorg_hashes(self, count):
|
async def reorg_hashes(self, count):
|
||||||
"""Return a pair (start, last, hashes) of blocks to back up during a
|
"""Return a pair (start, last, hashes) of blocks to back up during a
|
||||||
|
@ -289,7 +290,7 @@ class BlockProcessor:
|
||||||
self.logger.info(f'chain was reorganised replacing {count:,d} '
|
self.logger.info(f'chain was reorganised replacing {count:,d} '
|
||||||
f'block{s} at heights {start:,d}-{last:,d}')
|
f'block{s} at heights {start:,d}-{last:,d}')
|
||||||
|
|
||||||
return start, last, await self.db.fs_block_hashes(start, count)
|
return start, last, self.db.fs_block_hashes(start, count)
|
||||||
|
|
||||||
async def calc_reorg_range(self, count: Optional[int]):
|
async def calc_reorg_range(self, count: Optional[int]):
|
||||||
"""Calculate the reorg range"""
|
"""Calculate the reorg range"""
|
||||||
|
@ -307,7 +308,7 @@ class BlockProcessor:
|
||||||
start = self.height - 1
|
start = self.height - 1
|
||||||
count = 1
|
count = 1
|
||||||
while start > 0:
|
while start > 0:
|
||||||
hashes = await self.db.fs_block_hashes(start, count)
|
hashes = self.db.fs_block_hashes(start, count)
|
||||||
hex_hashes = [hash_to_hex_str(hash) for hash in hashes]
|
hex_hashes = [hash_to_hex_str(hash) for hash in hashes]
|
||||||
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
|
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
|
||||||
n = diff_pos(hex_hashes, d_hex_hashes)
|
n = diff_pos(hex_hashes, d_hex_hashes)
|
||||||
|
@ -346,6 +347,7 @@ class BlockProcessor:
|
||||||
self.db.flush_dbs(self.flush_data(), flush_utxos,
|
self.db.flush_dbs(self.flush_data(), flush_utxos,
|
||||||
self.estimate_txs_remaining)
|
self.estimate_txs_remaining)
|
||||||
await self.run_in_thread_with_lock(flush)
|
await self.run_in_thread_with_lock(flush)
|
||||||
|
await reopen_rocksdb_ctx(self.db.executor)
|
||||||
|
|
||||||
async def _maybe_flush(self):
|
async def _maybe_flush(self):
|
||||||
# If caught up, flush everything as client queries are
|
# If caught up, flush everything as client queries are
|
||||||
|
@ -584,9 +586,9 @@ class BlockProcessor:
|
||||||
|
|
||||||
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
||||||
# Value: hashX
|
# Value: hashX
|
||||||
prefix = b'h' + tx_hash[:4] + idx_packed
|
prefix = HASHX_UTXO_PREFIX + tx_hash[:4] + idx_packed
|
||||||
candidates = {db_key: hashX for db_key, hashX
|
candidates = {db_key: hashX for db_key, hashX
|
||||||
in self.db.utxo_db.iterator(prefix=prefix)}
|
in self.db.db.iterator(prefix=prefix)}
|
||||||
|
|
||||||
for hdb_key, hashX in candidates.items():
|
for hdb_key, hashX in candidates.items():
|
||||||
tx_num_packed = hdb_key[-4:]
|
tx_num_packed = hdb_key[-4:]
|
||||||
|
@ -600,8 +602,8 @@ class BlockProcessor:
|
||||||
|
|
||||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||||
# Value: the UTXO value as a 64-bit unsigned integer
|
# Value: the UTXO value as a 64-bit unsigned integer
|
||||||
udb_key = b'u' + hashX + hdb_key[-6:]
|
udb_key = UTXO_PREFIX + hashX + hdb_key[-6:]
|
||||||
utxo_value_packed = self.db.utxo_db.get(udb_key)
|
utxo_value_packed = self.db.db.get(udb_key)
|
||||||
if utxo_value_packed:
|
if utxo_value_packed:
|
||||||
# Remove both entries for this UTXO
|
# Remove both entries for this UTXO
|
||||||
self.db_deletes.append(hdb_key)
|
self.db_deletes.append(hdb_key)
|
||||||
|
|
|
@ -36,7 +36,7 @@ class Env:
|
||||||
self.loop_policy = self.set_event_loop_policy()
|
self.loop_policy = self.set_event_loop_policy()
|
||||||
self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK'])
|
self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK'])
|
||||||
self.db_dir = self.required('DB_DIRECTORY')
|
self.db_dir = self.required('DB_DIRECTORY')
|
||||||
self.db_engine = self.default('DB_ENGINE', 'leveldb')
|
self.db_engine = self.default('DB_ENGINE', 'rocksdb')
|
||||||
self.trending_algorithms = [
|
self.trending_algorithms = [
|
||||||
trending for trending in set(self.default('TRENDING_ALGORITHMS', 'zscore').split(' ')) if trending
|
trending for trending in set(self.default('TRENDING_ALGORITHMS', 'zscore').split(' ')) if trending
|
||||||
]
|
]
|
||||||
|
|
|
@ -20,6 +20,10 @@ from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from
|
||||||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||||
|
|
||||||
|
|
||||||
|
HASHX_HISTORY_PREFIX = b'x'
|
||||||
|
HIST_STATE = b'state-hist'
|
||||||
|
|
||||||
|
|
||||||
class History:
|
class History:
|
||||||
|
|
||||||
DB_VERSIONS = [0]
|
DB_VERSIONS = [0]
|
||||||
|
@ -32,8 +36,8 @@ class History:
|
||||||
self.unflushed_count = 0
|
self.unflushed_count = 0
|
||||||
self.db = None
|
self.db = None
|
||||||
|
|
||||||
def open_db(self, db_class, for_sync, utxo_flush_count, compacting):
|
def open_db(self, db, for_sync, utxo_flush_count, compacting):
|
||||||
self.db = db_class('hist', for_sync)
|
self.db = db #db_class('hist', for_sync)
|
||||||
self.read_state()
|
self.read_state()
|
||||||
self.clear_excess(utxo_flush_count)
|
self.clear_excess(utxo_flush_count)
|
||||||
# An incomplete compaction needs to be cancelled otherwise
|
# An incomplete compaction needs to be cancelled otherwise
|
||||||
|
@ -44,11 +48,11 @@ class History:
|
||||||
|
|
||||||
def close_db(self):
|
def close_db(self):
|
||||||
if self.db:
|
if self.db:
|
||||||
self.db.close()
|
# self.db.close()
|
||||||
self.db = None
|
self.db = None
|
||||||
|
|
||||||
def read_state(self):
|
def read_state(self):
|
||||||
state = self.db.get(b'state\0\0')
|
state = self.db.get(HIST_STATE)
|
||||||
if state:
|
if state:
|
||||||
state = ast.literal_eval(state.decode())
|
state = ast.literal_eval(state.decode())
|
||||||
if not isinstance(state, dict):
|
if not isinstance(state, dict):
|
||||||
|
@ -80,17 +84,18 @@ class History:
|
||||||
'excess history flushes...')
|
'excess history flushes...')
|
||||||
|
|
||||||
keys = []
|
keys = []
|
||||||
for key, hist in self.db.iterator(prefix=b''):
|
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
|
||||||
flush_id, = unpack_be_uint16_from(key[-2:])
|
k = key[1:]
|
||||||
|
flush_id, = unpack_be_uint16_from(k[-2:])
|
||||||
if flush_id > utxo_flush_count:
|
if flush_id > utxo_flush_count:
|
||||||
keys.append(key)
|
keys.append(k)
|
||||||
|
|
||||||
self.logger.info(f'deleting {len(keys):,d} history entries')
|
self.logger.info(f'deleting {len(keys):,d} history entries')
|
||||||
|
|
||||||
self.flush_count = utxo_flush_count
|
self.flush_count = utxo_flush_count
|
||||||
with self.db.write_batch() as batch:
|
with self.db.write_batch() as batch:
|
||||||
for key in keys:
|
for key in keys:
|
||||||
batch.delete(key)
|
batch.delete(HASHX_HISTORY_PREFIX + key)
|
||||||
self.write_state(batch)
|
self.write_state(batch)
|
||||||
|
|
||||||
self.logger.info('deleted excess history entries')
|
self.logger.info('deleted excess history entries')
|
||||||
|
@ -105,7 +110,7 @@ class History:
|
||||||
}
|
}
|
||||||
# History entries are not prefixed; the suffix \0\0 ensures we
|
# History entries are not prefixed; the suffix \0\0 ensures we
|
||||||
# look similar to other entries and aren't interfered with
|
# look similar to other entries and aren't interfered with
|
||||||
batch.put(b'state\0\0', repr(state).encode())
|
batch.put(HIST_STATE, repr(state).encode())
|
||||||
|
|
||||||
def add_unflushed(self, hashXs_by_tx, first_tx_num):
|
def add_unflushed(self, hashXs_by_tx, first_tx_num):
|
||||||
unflushed = self.unflushed
|
unflushed = self.unflushed
|
||||||
|
@ -132,7 +137,7 @@ class History:
|
||||||
with self.db.write_batch() as batch:
|
with self.db.write_batch() as batch:
|
||||||
for hashX in sorted(unflushed):
|
for hashX in sorted(unflushed):
|
||||||
key = hashX + flush_id
|
key = hashX + flush_id
|
||||||
batch.put(key, unflushed[hashX].tobytes())
|
batch.put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes())
|
||||||
self.write_state(batch)
|
self.write_state(batch)
|
||||||
|
|
||||||
count = len(unflushed)
|
count = len(unflushed)
|
||||||
|
@ -154,16 +159,17 @@ class History:
|
||||||
for hashX in sorted(hashXs):
|
for hashX in sorted(hashXs):
|
||||||
deletes = []
|
deletes = []
|
||||||
puts = {}
|
puts = {}
|
||||||
for key, hist in self.db.iterator(prefix=hashX, reverse=True):
|
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
|
||||||
|
k = key[1:]
|
||||||
a = array.array('I')
|
a = array.array('I')
|
||||||
a.frombytes(hist)
|
a.frombytes(hist)
|
||||||
# Remove all history entries >= tx_count
|
# Remove all history entries >= tx_count
|
||||||
idx = bisect_left(a, tx_count)
|
idx = bisect_left(a, tx_count)
|
||||||
nremoves += len(a) - idx
|
nremoves += len(a) - idx
|
||||||
if idx > 0:
|
if idx > 0:
|
||||||
puts[key] = a[:idx].tobytes()
|
puts[k] = a[:idx].tobytes()
|
||||||
break
|
break
|
||||||
deletes.append(key)
|
deletes.append(k)
|
||||||
|
|
||||||
for key in deletes:
|
for key in deletes:
|
||||||
batch.delete(key)
|
batch.delete(key)
|
||||||
|
@ -221,9 +227,9 @@ class History:
|
||||||
with self.db.write_batch() as batch:
|
with self.db.write_batch() as batch:
|
||||||
# Important: delete first! The keyspace may overlap.
|
# Important: delete first! The keyspace may overlap.
|
||||||
for key in keys_to_delete:
|
for key in keys_to_delete:
|
||||||
batch.delete(key)
|
batch.delete(HASHX_HISTORY_PREFIX + key)
|
||||||
for key, value in write_items:
|
for key, value in write_items:
|
||||||
batch.put(key, value)
|
batch.put(HASHX_HISTORY_PREFIX + key, value)
|
||||||
self.write_state(batch)
|
self.write_state(batch)
|
||||||
|
|
||||||
def _compact_hashX(self, hashX, hist_map, hist_list,
|
def _compact_hashX(self, hashX, hist_map, hist_list,
|
||||||
|
@ -271,11 +277,12 @@ class History:
|
||||||
|
|
||||||
key_len = HASHX_LEN + 2
|
key_len = HASHX_LEN + 2
|
||||||
write_size = 0
|
write_size = 0
|
||||||
for key, hist in self.db.iterator(prefix=prefix):
|
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix):
|
||||||
|
k = key[1:]
|
||||||
# Ignore non-history entries
|
# Ignore non-history entries
|
||||||
if len(key) != key_len:
|
if len(k) != key_len:
|
||||||
continue
|
continue
|
||||||
hashX = key[:-2]
|
hashX = k[:-2]
|
||||||
if hashX != prior_hashX and prior_hashX:
|
if hashX != prior_hashX and prior_hashX:
|
||||||
write_size += self._compact_hashX(prior_hashX, hist_map,
|
write_size += self._compact_hashX(prior_hashX, hist_map,
|
||||||
hist_list, write_items,
|
hist_list, write_items,
|
||||||
|
@ -283,7 +290,7 @@ class History:
|
||||||
hist_map.clear()
|
hist_map.clear()
|
||||||
hist_list.clear()
|
hist_list.clear()
|
||||||
prior_hashX = hashX
|
prior_hashX = hashX
|
||||||
hist_map[key] = hist
|
hist_map[k] = hist
|
||||||
hist_list.append(hist)
|
hist_list.append(hist)
|
||||||
|
|
||||||
if prior_hashX:
|
if prior_hashX:
|
||||||
|
|
|
@ -23,17 +23,19 @@ from bisect import bisect_right
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from glob import glob
|
from glob import glob
|
||||||
from struct import pack, unpack
|
from struct import pack, unpack
|
||||||
|
from contextvars import ContextVar
|
||||||
|
from dataclasses import dataclass
|
||||||
from concurrent.futures.thread import ThreadPoolExecutor
|
from concurrent.futures.thread import ThreadPoolExecutor
|
||||||
|
from concurrent.futures.process import ProcessPoolExecutor
|
||||||
import attr
|
import attr
|
||||||
|
|
||||||
from lbry.wallet.server import util
|
from lbry.wallet.server import util
|
||||||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||||
from lbry.wallet.server.merkle import Merkle, MerkleCache
|
from lbry.wallet.server.merkle import Merkle, MerkleCache
|
||||||
from lbry.wallet.server.util import formatted_time
|
from lbry.wallet.server.util import formatted_time, unpack_be_uint64, unpack_le_int32_from, pack_le_int32
|
||||||
from lbry.wallet.server.storage import db_class
|
from lbry.wallet.server.storage import db_class, RocksDB
|
||||||
from lbry.wallet.server.history import History
|
from lbry.wallet.server.history import History
|
||||||
|
|
||||||
|
|
||||||
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
|
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
|
||||||
HEADER_PREFIX = b'H'
|
HEADER_PREFIX = b'H'
|
||||||
TX_COUNT_PREFIX = b'T'
|
TX_COUNT_PREFIX = b'T'
|
||||||
|
@ -41,8 +43,302 @@ TX_HASH_PREFIX = b'X'
|
||||||
TX_PREFIX = b'B'
|
TX_PREFIX = b'B'
|
||||||
TX_NUM_PREFIX = b'N'
|
TX_NUM_PREFIX = b'N'
|
||||||
BLOCK_HASH_PREFIX = b'C'
|
BLOCK_HASH_PREFIX = b'C'
|
||||||
|
HISTORY_PREFIX = b'A'
|
||||||
|
HASHX_UTXO_PREFIX = b'h'
|
||||||
|
UTXO_PREFIX = b'u'
|
||||||
|
HASHX_HISTORY_PREFIX = b'x'
|
||||||
|
UNDO_PREFIX = b'U'
|
||||||
|
UTXO_STATE = b'state-utxo'
|
||||||
|
HIST_STATE = b'state-hist'
|
||||||
|
TX_COUNTS_STATE = b'state-tx-counts'
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RocksReaderContext:
|
||||||
|
db: 'RocksDB'
|
||||||
|
db_dir: str
|
||||||
|
name: str
|
||||||
|
merkle: Merkle
|
||||||
|
tx_counts: List[int]
|
||||||
|
block_txs_cache: pylru.lrucache
|
||||||
|
merkle_tx_cache: pylru.lrucache
|
||||||
|
txid_cache: pylru.lrucache
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.db.close()
|
||||||
|
|
||||||
|
def reopen(self):
|
||||||
|
self.db.close()
|
||||||
|
self.db.open(self.name, create=False, read_only=True)
|
||||||
|
|
||||||
|
def update_state(self):
|
||||||
|
tx_counts = array.array("L")
|
||||||
|
counts = self.db.get(TX_COUNTS_STATE)
|
||||||
|
if not counts:
|
||||||
|
self.tx_counts = list(map(unpack_be_uint64, self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)))
|
||||||
|
else:
|
||||||
|
tx_counts.frombytes(counts)
|
||||||
|
self.tx_counts = tx_counts.tolist()
|
||||||
|
|
||||||
|
def ctx_tx_hash(self, tx_num):
|
||||||
|
tx_height = bisect_right(self.tx_counts, tx_num)
|
||||||
|
if tx_height > len(self.tx_counts):
|
||||||
|
return None, tx_height
|
||||||
|
key = TX_HASH_PREFIX + util.pack_be_uint64(tx_num)
|
||||||
|
if key in self.txid_cache:
|
||||||
|
return self.txid_cache[key], tx_height
|
||||||
|
tx_hash = self.db.get(key)
|
||||||
|
if tx_height + 100 <= len(self.tx_counts):
|
||||||
|
self.txid_cache[key] = tx_hash
|
||||||
|
return tx_hash, tx_height
|
||||||
|
|
||||||
|
|
||||||
|
proc_ctx: ContextVar[Optional[RocksReaderContext]] = ContextVar('proc_ctx')
|
||||||
|
|
||||||
|
|
||||||
|
def _update_rocksdb_ctx():
|
||||||
|
ctx = proc_ctx.get()
|
||||||
|
ctx.update_state()
|
||||||
|
|
||||||
|
|
||||||
|
def _reopen_rocksdb_ctx():
|
||||||
|
ctx = proc_ctx.get()
|
||||||
|
ctx.db.close()
|
||||||
|
ctx.db = RocksDB(ctx.db_dir, ctx.name, for_sync=False, read_only=True)
|
||||||
|
ctx.update_state()
|
||||||
|
|
||||||
|
|
||||||
|
async def update_rocksdb_ctx(executor: ProcessPoolExecutor):
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
await asyncio.wait([loop.run_in_executor(executor, _update_rocksdb_ctx) for _ in range(executor._max_workers)])
|
||||||
|
|
||||||
|
|
||||||
|
async def reopen_rocksdb_ctx(executor: ProcessPoolExecutor):
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
await asyncio.wait([loop.run_in_executor(executor, _reopen_rocksdb_ctx) for _ in range(executor._max_workers)])
|
||||||
|
|
||||||
|
|
||||||
|
def _initializer(path, name):
|
||||||
|
db = RocksDB(path, name, for_sync=False, read_only=True)
|
||||||
|
proc_ctx.set(RocksReaderContext(db, path, name, Merkle(), [], pylru.lrucache(50000), pylru.lrucache(100000),
|
||||||
|
pylru.lrucache(1000000)))
|
||||||
|
|
||||||
|
|
||||||
|
def _teardown():
|
||||||
|
proc_ctx.get().close()
|
||||||
|
proc_ctx.set(None)
|
||||||
|
|
||||||
|
|
||||||
|
async def initialize_executor(workers, db_dir, for_sync, name):
|
||||||
|
executor = ProcessPoolExecutor(workers, initializer=_initializer, initargs=(db_dir, name))
|
||||||
|
try:
|
||||||
|
writer = RocksDB(db_dir, name, for_sync=for_sync, read_only=False)
|
||||||
|
await update_rocksdb_ctx(executor)
|
||||||
|
except Exception as err:
|
||||||
|
await teardown_executor(executor)
|
||||||
|
executor.shutdown(True)
|
||||||
|
raise err
|
||||||
|
return executor, writer
|
||||||
|
|
||||||
|
|
||||||
|
async def teardown_executor(executor: ProcessPoolExecutor):
|
||||||
|
try:
|
||||||
|
await asyncio.wait(
|
||||||
|
[asyncio.get_event_loop().run_in_executor(executor, _teardown) for _ in range(executor._max_workers)]
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
executor.shutdown(True)
|
||||||
|
|
||||||
|
|
||||||
|
def lookup_hashXs_utxos(prevouts):
|
||||||
|
"""Return (hashX, suffix) pairs, or None if not found,
|
||||||
|
for each prevout.
|
||||||
|
"""
|
||||||
|
ctx = proc_ctx.get()
|
||||||
|
iterator = ctx.db.iterator
|
||||||
|
get = ctx.db.get
|
||||||
|
|
||||||
|
def lookup_hashXs():
|
||||||
|
"""Return (hashX, suffix) pairs, or None if not found,
|
||||||
|
for each prevout.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def lookup_hashX(tx_hash, tx_idx):
|
||||||
|
idx_packed = pack('<H', tx_idx)
|
||||||
|
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
||||||
|
# Value: hashX
|
||||||
|
prefix = HASHX_UTXO_PREFIX + tx_hash[:4] + idx_packed
|
||||||
|
# Find which entry, if any, the TX_HASH matches.
|
||||||
|
for db_key, hashX in iterator(prefix=prefix):
|
||||||
|
tx_num_packed = db_key[-4:]
|
||||||
|
tx_num, = unpack('<I', tx_num_packed)
|
||||||
|
hash, height = ctx.ctx_tx_hash(tx_num)
|
||||||
|
if hash == tx_hash:
|
||||||
|
return hashX, idx_packed + tx_num_packed
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
return [lookup_hashX(*prevout) for prevout in prevouts]
|
||||||
|
|
||||||
|
def lookup_utxo(hashX, suffix):
|
||||||
|
if not hashX:
|
||||||
|
# This can happen when the daemon is a block ahead
|
||||||
|
# of us and has mempool txs spending outputs from
|
||||||
|
# that new block
|
||||||
|
return None
|
||||||
|
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||||
|
# Value: the UTXO value as a 64-bit unsigned integer
|
||||||
|
db_value = get(UTXO_PREFIX + hashX + suffix)
|
||||||
|
if not db_value:
|
||||||
|
# This can happen if the DB was updated between
|
||||||
|
# getting the hashXs and getting the UTXOs
|
||||||
|
return None
|
||||||
|
value, = unpack('<Q', db_value)
|
||||||
|
return hashX, value
|
||||||
|
|
||||||
|
return [lookup_utxo(*hashX_pair) for hashX_pair in lookup_hashXs()]
|
||||||
|
|
||||||
|
|
||||||
|
def get_counts():
|
||||||
|
ctx = proc_ctx.get()
|
||||||
|
return ctx.tx_counts
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def read_txids():
|
||||||
|
return list(proc_ctx.get().db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
|
||||||
|
|
||||||
|
|
||||||
|
def read_headers():
|
||||||
|
return [
|
||||||
|
header for header in proc_ctx.get().db.iterator(prefix=HEADER_PREFIX, include_key=False)
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def read_block_file(path):
|
||||||
|
with util.open_file(path) as f:
|
||||||
|
return f.read(-1)
|
||||||
|
|
||||||
|
|
||||||
|
def tx_hash(self, tx_num):
|
||||||
|
"""Return a par (tx_hash, tx_height) for the given tx number.
|
||||||
|
|
||||||
|
If the tx_height is not on disk, returns (None, tx_height)."""
|
||||||
|
tx_height = bisect_right(self.tx_counts, tx_num)
|
||||||
|
if tx_height > self.db_height:
|
||||||
|
return None, tx_height
|
||||||
|
return self.total_transactions[tx_num], tx_height
|
||||||
|
|
||||||
|
|
||||||
|
def read_utxos(hashX):
|
||||||
|
utxos = []
|
||||||
|
utxos_append = utxos.append
|
||||||
|
s_unpack = unpack
|
||||||
|
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||||
|
# Value: the UTXO value as a 64-bit unsigned integer
|
||||||
|
for db_key, db_value in proc_ctx.get().db.iterator(prefix=UTXO_PREFIX + hashX):
|
||||||
|
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
|
||||||
|
value, = unpack('<Q', db_value)
|
||||||
|
utxos_append((tx_num, tx_pos, value))
|
||||||
|
return utxos
|
||||||
|
|
||||||
|
|
||||||
|
def limited_history(hashX, limit=None):
|
||||||
|
ctx = proc_ctx.get()
|
||||||
|
tx_counts = ctx.tx_counts
|
||||||
|
db_height = len(tx_counts)
|
||||||
|
cnt = 0
|
||||||
|
txs = []
|
||||||
|
|
||||||
|
for hist in ctx.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False):
|
||||||
|
a = array.array('I')
|
||||||
|
a.frombytes(hist)
|
||||||
|
for tx_num in a:
|
||||||
|
tx_height = bisect_right(tx_counts, tx_num)
|
||||||
|
if tx_height > db_height:
|
||||||
|
return
|
||||||
|
txs.append((tx_num, tx_height))
|
||||||
|
cnt += 1
|
||||||
|
if limit and cnt >= limit:
|
||||||
|
break
|
||||||
|
if limit and cnt >= limit:
|
||||||
|
break
|
||||||
|
return txs
|
||||||
|
|
||||||
|
|
||||||
|
def tx_merkle(tx_num, tx_height):
|
||||||
|
ctx = proc_ctx.get()
|
||||||
|
db = ctx.db
|
||||||
|
tx_counts = ctx.tx_counts
|
||||||
|
db_height = len(tx_counts)
|
||||||
|
|
||||||
|
if tx_height == -1:
|
||||||
|
return {
|
||||||
|
'block_height': -1
|
||||||
|
}
|
||||||
|
tx_pos = tx_num - tx_counts[tx_height - 1]
|
||||||
|
|
||||||
|
uncached = None
|
||||||
|
if (tx_num, tx_height) in ctx.merkle_tx_cache:
|
||||||
|
return ctx.merkle_tx_cache[(tx_num, tx_height)]
|
||||||
|
if tx_height not in ctx.block_txs_cache:
|
||||||
|
block_txs = list(db.iterator(
|
||||||
|
prefix=TX_HASH_PREFIX,
|
||||||
|
start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]),
|
||||||
|
stop=None if tx_height + 1 == len(tx_counts) else
|
||||||
|
TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height] - 1), include_key=False
|
||||||
|
))
|
||||||
|
# print("rocks txs", tx_height, list(map(hash_to_hex_str, block_txs)))
|
||||||
|
if tx_height + 100 <= db_height:
|
||||||
|
ctx.block_txs_cache[tx_height] = block_txs
|
||||||
|
else:
|
||||||
|
block_txs = ctx.block_txs_cache.get(tx_height, uncached)
|
||||||
|
|
||||||
|
branch, root = ctx.merkle.branch_and_root(block_txs, tx_pos)
|
||||||
|
merkle = {
|
||||||
|
'block_height': tx_height,
|
||||||
|
'merkle': [
|
||||||
|
hash_to_hex_str(hash)
|
||||||
|
for hash in branch
|
||||||
|
],
|
||||||
|
'pos': tx_pos
|
||||||
|
}
|
||||||
|
if tx_height + 100 < db_height:
|
||||||
|
ctx.merkle_tx_cache[(tx_num, tx_height)] = merkle
|
||||||
|
return merkle
|
||||||
|
|
||||||
|
|
||||||
|
def transaction_info_get_batch(txids: Iterable[str]):
|
||||||
|
ctx = proc_ctx.get()
|
||||||
|
db_height = len(ctx.tx_counts)
|
||||||
|
tx_counts = ctx.tx_counts
|
||||||
|
tx_db_get = ctx.db.get
|
||||||
|
tx_infos = {}
|
||||||
|
|
||||||
|
for tx_hash in txids:
|
||||||
|
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
||||||
|
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
|
||||||
|
tx = None
|
||||||
|
tx_height = -1
|
||||||
|
if tx_num is not None:
|
||||||
|
tx_num = unpack_be_uint64(tx_num)
|
||||||
|
tx_height = bisect_right(tx_counts, tx_num)
|
||||||
|
if tx_height < db_height:
|
||||||
|
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
|
||||||
|
tx_infos[tx_hash] = (
|
||||||
|
None if not tx else tx.hex(), {'block_height': -1} if tx_height == -1 else tx_merkle(
|
||||||
|
tx_num, tx_height
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return tx_infos
|
||||||
|
|
||||||
|
|
||||||
|
def _update_block_txs_cache(tx_num, tx_height):
|
||||||
|
ctx = proc_ctx.get()
|
||||||
|
db = ctx.db
|
||||||
|
tx_counts = ctx.tx_counts
|
||||||
|
db_height = len(tx_counts)
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True)
|
@attr.s(slots=True)
|
||||||
class FlushData:
|
class FlushData:
|
||||||
|
@ -80,7 +376,7 @@ class LevelDB:
|
||||||
|
|
||||||
self.db_class = db_class(env.db_dir, self.env.db_engine)
|
self.db_class = db_class(env.db_dir, self.env.db_engine)
|
||||||
self.history = History()
|
self.history = History()
|
||||||
self.utxo_db = None
|
self.db = None
|
||||||
self.tx_counts = None
|
self.tx_counts = None
|
||||||
self.headers = None
|
self.headers = None
|
||||||
self.last_flush = time.time()
|
self.last_flush = time.time()
|
||||||
|
@ -91,9 +387,6 @@ class LevelDB:
|
||||||
self.merkle = Merkle()
|
self.merkle = Merkle()
|
||||||
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
|
self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
|
||||||
|
|
||||||
self.headers_db = None
|
|
||||||
self.tx_db = None
|
|
||||||
|
|
||||||
self._block_txs_cache = pylru.lrucache(50000)
|
self._block_txs_cache = pylru.lrucache(50000)
|
||||||
self._merkle_tx_cache = pylru.lrucache(100000)
|
self._merkle_tx_cache = pylru.lrucache(100000)
|
||||||
self.total_transactions = None
|
self.total_transactions = None
|
||||||
|
@ -104,15 +397,9 @@ class LevelDB:
|
||||||
# tx_counts[N] has the cumulative number of txs at the end of
|
# tx_counts[N] has the cumulative number of txs at the end of
|
||||||
# height N. So tx_counts[0] is 1 - the genesis coinbase
|
# height N. So tx_counts[0] is 1 - the genesis coinbase
|
||||||
|
|
||||||
def get_counts():
|
|
||||||
return tuple(
|
|
||||||
util.unpack_be_uint64(tx_count)
|
|
||||||
for tx_count in self.tx_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
|
|
||||||
)
|
|
||||||
|
|
||||||
tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts)
|
tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts)
|
||||||
assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}"
|
assert len(tx_counts) == self.db_height + 1, f"{len(tx_counts)} vs {self.db_height + 1}"
|
||||||
self.tx_counts = array.array('I', tx_counts)
|
self.tx_counts = tx_counts
|
||||||
|
|
||||||
if self.tx_counts:
|
if self.tx_counts:
|
||||||
assert self.db_tx_count == self.tx_counts[-1], \
|
assert self.db_tx_count == self.tx_counts[-1], \
|
||||||
|
@ -121,12 +408,9 @@ class LevelDB:
|
||||||
assert self.db_tx_count == 0
|
assert self.db_tx_count == 0
|
||||||
|
|
||||||
async def _read_txids(self):
|
async def _read_txids(self):
|
||||||
def get_txids():
|
|
||||||
return list(self.tx_db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
|
|
||||||
|
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
self.logger.info("loading txids")
|
self.logger.info("loading txids")
|
||||||
txids = await asyncio.get_event_loop().run_in_executor(self.executor, get_txids)
|
txids = await asyncio.get_event_loop().run_in_executor(self.executor, read_txids)
|
||||||
assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1]
|
assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1]
|
||||||
self.total_transactions = txids
|
self.total_transactions = txids
|
||||||
ts = time.perf_counter() - start
|
ts = time.perf_counter() - start
|
||||||
|
@ -136,47 +420,26 @@ class LevelDB:
|
||||||
if self.headers is not None:
|
if self.headers is not None:
|
||||||
return
|
return
|
||||||
|
|
||||||
def get_headers():
|
headers = await asyncio.get_event_loop().run_in_executor(self.executor, read_headers)
|
||||||
return [
|
|
||||||
header for header in self.headers_db.iterator(prefix=HEADER_PREFIX, include_key=False)
|
|
||||||
]
|
|
||||||
|
|
||||||
headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers)
|
|
||||||
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
|
assert len(headers) - 1 == self.db_height, f"{len(headers)} vs {self.db_height}"
|
||||||
self.headers = headers
|
self.headers = headers
|
||||||
|
|
||||||
async def _open_dbs(self, for_sync, compacting):
|
async def _open_dbs(self, for_sync, compacting):
|
||||||
|
name = f'lbry-{self.env.db_engine}'
|
||||||
if self.executor is None:
|
if self.executor is None:
|
||||||
self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1))
|
self.executor, self.db = await initialize_executor(
|
||||||
coin_path = os.path.join(self.env.db_dir, 'COIN')
|
max(1, os.cpu_count() - 1), self.env.db_dir, for_sync, name
|
||||||
if not os.path.isfile(coin_path):
|
)
|
||||||
with util.open_file(coin_path, create=True) as f:
|
|
||||||
f.write(f'ElectrumX databases and metadata for '
|
|
||||||
f'{self.coin.NAME} {self.coin.NET}'.encode())
|
|
||||||
|
|
||||||
assert self.headers_db is None
|
if self.db.is_new:
|
||||||
self.headers_db = self.db_class('headers', for_sync)
|
self.logger.info('created new db: %s', name)
|
||||||
if self.headers_db.is_new:
|
self.logger.info(f'opened DB (for sync: {for_sync})')
|
||||||
self.logger.info('created new headers db')
|
|
||||||
self.logger.info(f'opened headers DB (for sync: {for_sync})')
|
|
||||||
|
|
||||||
assert self.tx_db is None
|
|
||||||
self.tx_db = self.db_class('tx', for_sync)
|
|
||||||
if self.tx_db.is_new:
|
|
||||||
self.logger.info('created new tx db')
|
|
||||||
self.logger.info(f'opened tx DB (for sync: {for_sync})')
|
|
||||||
|
|
||||||
assert self.utxo_db is None
|
|
||||||
# First UTXO DB
|
|
||||||
self.utxo_db = self.db_class('utxo', for_sync)
|
|
||||||
if self.utxo_db.is_new:
|
|
||||||
self.logger.info('created new utxo db')
|
|
||||||
self.logger.info(f'opened utxo db (for sync: {for_sync})')
|
|
||||||
self.read_utxo_state()
|
self.read_utxo_state()
|
||||||
|
|
||||||
# Then history DB
|
# Then history DB
|
||||||
self.utxo_flush_count = self.history.open_db(
|
self.utxo_flush_count = self.history.open_db(
|
||||||
self.db_class, for_sync, self.utxo_flush_count, compacting
|
self.db, for_sync, self.utxo_flush_count, compacting
|
||||||
)
|
)
|
||||||
self.clear_excess_undo_info()
|
self.clear_excess_undo_info()
|
||||||
|
|
||||||
|
@ -187,12 +450,11 @@ class LevelDB:
|
||||||
await self._read_headers()
|
await self._read_headers()
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.utxo_db.close()
|
self.db.close()
|
||||||
self.history.close_db()
|
self.history.close_db()
|
||||||
self.headers_db.close()
|
|
||||||
self.tx_db.close()
|
|
||||||
self.executor.shutdown(wait=True)
|
self.executor.shutdown(wait=True)
|
||||||
self.executor = None
|
self.executor = None
|
||||||
|
self.db = None
|
||||||
|
|
||||||
async def open_for_compacting(self):
|
async def open_for_compacting(self):
|
||||||
await self._open_dbs(True, True)
|
await self._open_dbs(True, True)
|
||||||
|
@ -211,18 +473,12 @@ class LevelDB:
|
||||||
"""Open the databases for serving. If they are already open they are
|
"""Open the databases for serving. If they are already open they are
|
||||||
closed first.
|
closed first.
|
||||||
"""
|
"""
|
||||||
self.logger.info('closing DBs to re-open for serving')
|
if self.db:
|
||||||
if self.utxo_db:
|
return
|
||||||
self.logger.info('closing DBs to re-open for serving')
|
# self.logger.info('closing DBs to re-open for serving')
|
||||||
self.utxo_db.close()
|
# self.db.close()
|
||||||
self.history.close_db()
|
# self.history.close_db()
|
||||||
self.utxo_db = None
|
# self.db = None
|
||||||
if self.headers_db:
|
|
||||||
self.headers_db.close()
|
|
||||||
self.headers_db = None
|
|
||||||
if self.tx_db:
|
|
||||||
self.tx_db.close()
|
|
||||||
self.tx_db = None
|
|
||||||
|
|
||||||
await self._open_dbs(False, False)
|
await self._open_dbs(False, False)
|
||||||
self.logger.info("opened for serving")
|
self.logger.info("opened for serving")
|
||||||
|
@ -271,14 +527,14 @@ class LevelDB:
|
||||||
self.flush_history()
|
self.flush_history()
|
||||||
|
|
||||||
# Flush state last as it reads the wall time.
|
# Flush state last as it reads the wall time.
|
||||||
with self.utxo_db.write_batch() as batch:
|
with self.db.write_batch() as batch:
|
||||||
if flush_utxos:
|
if flush_utxos:
|
||||||
self.flush_utxo_db(batch, flush_data)
|
self.flush_utxo_db(batch, flush_data)
|
||||||
self.flush_state(batch)
|
self.flush_state(batch)
|
||||||
|
|
||||||
# Update and put the wall time again - otherwise we drop the
|
# Update and put the wall time again - otherwise we drop the
|
||||||
# time it took to commit the batch
|
# time it took to commit the batch
|
||||||
self.flush_state(self.utxo_db)
|
self.flush_state(self.db)
|
||||||
|
|
||||||
elapsed = self.last_flush - start_time
|
elapsed = self.last_flush - start_time
|
||||||
self.logger.info(f'flush #{self.history.flush_count:,d} took '
|
self.logger.info(f'flush #{self.history.flush_count:,d} took '
|
||||||
|
@ -286,7 +542,7 @@ class LevelDB:
|
||||||
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
|
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
|
||||||
|
|
||||||
# Catch-up stats
|
# Catch-up stats
|
||||||
if self.utxo_db.for_sync:
|
if self.db.for_sync:
|
||||||
flush_interval = self.last_flush - prior_flush
|
flush_interval = self.last_flush - prior_flush
|
||||||
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
|
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
|
||||||
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
|
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
|
||||||
|
@ -317,7 +573,7 @@ class LevelDB:
|
||||||
# Write the headers
|
# Write the headers
|
||||||
start_time = time.perf_counter()
|
start_time = time.perf_counter()
|
||||||
|
|
||||||
with self.headers_db.write_batch() as batch:
|
with self.db.write_batch() as batch:
|
||||||
batch_put = batch.put
|
batch_put = batch.put
|
||||||
for i, header in enumerate(flush_data.headers):
|
for i, header in enumerate(flush_data.headers):
|
||||||
batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header)
|
batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header)
|
||||||
|
@ -327,8 +583,6 @@ class LevelDB:
|
||||||
height_start = self.fs_height + 1
|
height_start = self.fs_height + 1
|
||||||
tx_num = prior_tx_count
|
tx_num = prior_tx_count
|
||||||
|
|
||||||
with self.tx_db.write_batch() as batch:
|
|
||||||
batch_put = batch.put
|
|
||||||
for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs):
|
for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs):
|
||||||
tx_count = self.tx_counts[height_start]
|
tx_count = self.tx_counts[height_start]
|
||||||
batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
|
batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
|
||||||
|
@ -342,14 +596,22 @@ class LevelDB:
|
||||||
tx_num += 1
|
tx_num += 1
|
||||||
offset += 32
|
offset += 32
|
||||||
|
|
||||||
|
a = array.array('L')
|
||||||
|
a.fromlist(self.tx_counts)
|
||||||
|
batch_put(TX_COUNTS_STATE, a.tobytes())
|
||||||
|
|
||||||
flush_data.block_txs.clear()
|
flush_data.block_txs.clear()
|
||||||
flush_data.block_hashes.clear()
|
flush_data.block_hashes.clear()
|
||||||
|
|
||||||
self.fs_height = flush_data.height
|
self.fs_height = flush_data.height
|
||||||
self.fs_tx_count = flush_data.tx_count
|
self.fs_tx_count = flush_data.tx_count
|
||||||
|
|
||||||
elapsed = time.perf_counter() - start_time
|
elapsed = time.perf_counter() - start_time
|
||||||
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
|
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def flush_history(self):
|
def flush_history(self):
|
||||||
self.history.flush()
|
self.history.flush()
|
||||||
|
|
||||||
|
@ -374,15 +636,15 @@ class LevelDB:
|
||||||
# suffix = tx_idx + tx_num
|
# suffix = tx_idx + tx_num
|
||||||
hashX = value[:-12]
|
hashX = value[:-12]
|
||||||
suffix = key[-2:] + value[-12:-8]
|
suffix = key[-2:] + value[-12:-8]
|
||||||
batch_put(b'h' + key[:4] + suffix, hashX)
|
batch_put(HASHX_UTXO_PREFIX + key[:4] + suffix, hashX)
|
||||||
batch_put(b'u' + hashX + suffix, value[-8:])
|
batch_put(UTXO_PREFIX + hashX + suffix, value[-8:])
|
||||||
flush_data.adds.clear()
|
flush_data.adds.clear()
|
||||||
|
|
||||||
# New undo information
|
# New undo information
|
||||||
self.flush_undo_infos(batch_put, flush_data.undo_infos)
|
self.flush_undo_infos(batch_put, flush_data.undo_infos)
|
||||||
flush_data.undo_infos.clear()
|
flush_data.undo_infos.clear()
|
||||||
|
|
||||||
if self.utxo_db.for_sync:
|
if self.db.for_sync:
|
||||||
block_count = flush_data.height - self.db_height
|
block_count = flush_data.height - self.db_height
|
||||||
tx_count = flush_data.tx_count - self.db_tx_count
|
tx_count = flush_data.tx_count - self.db_tx_count
|
||||||
elapsed = time.time() - start_time
|
elapsed = time.time() - start_time
|
||||||
|
@ -416,7 +678,7 @@ class LevelDB:
|
||||||
|
|
||||||
self.backup_fs(flush_data.height, flush_data.tx_count)
|
self.backup_fs(flush_data.height, flush_data.tx_count)
|
||||||
self.history.backup(touched, flush_data.tx_count)
|
self.history.backup(touched, flush_data.tx_count)
|
||||||
with self.utxo_db.write_batch() as batch:
|
with self.db.write_batch() as batch:
|
||||||
self.flush_utxo_db(batch, flush_data)
|
self.flush_utxo_db(batch, flush_data)
|
||||||
# Flush state last as it reads the wall time.
|
# Flush state last as it reads the wall time.
|
||||||
self.flush_state(batch)
|
self.flush_state(batch)
|
||||||
|
@ -470,78 +732,12 @@ class LevelDB:
|
||||||
return None, tx_height
|
return None, tx_height
|
||||||
return self.total_transactions[tx_num], tx_height
|
return self.total_transactions[tx_num], tx_height
|
||||||
|
|
||||||
async def tx_merkle(self, tx_num, tx_height):
|
|
||||||
if tx_height == -1:
|
|
||||||
return {
|
|
||||||
'block_height': -1
|
|
||||||
}
|
|
||||||
tx_counts = self.tx_counts
|
|
||||||
tx_pos = tx_num - tx_counts[tx_height - 1]
|
|
||||||
|
|
||||||
def _update_block_txs_cache():
|
|
||||||
block_txs = list(self.tx_db.iterator(
|
|
||||||
start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]),
|
|
||||||
stop=None if tx_height + 1 == len(tx_counts) else
|
|
||||||
TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height]), include_key=False
|
|
||||||
))
|
|
||||||
if tx_height + 100 > self.db_height:
|
|
||||||
return block_txs
|
|
||||||
self._block_txs_cache[tx_height] = block_txs
|
|
||||||
|
|
||||||
uncached = None
|
|
||||||
if (tx_num, tx_height) in self._merkle_tx_cache:
|
|
||||||
return self._merkle_tx_cache[(tx_num, tx_height)]
|
|
||||||
if tx_height not in self._block_txs_cache:
|
|
||||||
uncached = await asyncio.get_event_loop().run_in_executor(self.executor, _update_block_txs_cache)
|
|
||||||
block_txs = self._block_txs_cache.get(tx_height, uncached)
|
|
||||||
branch, root = self.merkle.branch_and_root(block_txs, tx_pos)
|
|
||||||
merkle = {
|
|
||||||
'block_height': tx_height,
|
|
||||||
'merkle': [
|
|
||||||
hash_to_hex_str(hash)
|
|
||||||
for hash in branch
|
|
||||||
],
|
|
||||||
'pos': tx_pos
|
|
||||||
}
|
|
||||||
if tx_height + 100 < self.db_height:
|
|
||||||
self._merkle_tx_cache[(tx_num, tx_height)] = merkle
|
|
||||||
return merkle
|
|
||||||
|
|
||||||
def _fs_transactions(self, txids: Iterable[str]) -> List[Tuple[str, Optional[str], int, int]]:
|
|
||||||
unpack_be_uint64 = util.unpack_be_uint64
|
|
||||||
tx_counts = self.tx_counts
|
|
||||||
tx_db_get = self.tx_db.get
|
|
||||||
tx_infos = []
|
|
||||||
|
|
||||||
for tx_hash in txids:
|
|
||||||
tx_hash_bytes = bytes.fromhex(tx_hash)[::-1]
|
|
||||||
tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes)
|
|
||||||
tx = None
|
|
||||||
tx_height = -1
|
|
||||||
if tx_num is not None:
|
|
||||||
tx_num = unpack_be_uint64(tx_num)
|
|
||||||
tx_height = bisect_right(tx_counts, tx_num)
|
|
||||||
if tx_height < self.db_height:
|
|
||||||
tx = tx_db_get(TX_PREFIX + tx_hash_bytes)
|
|
||||||
tx_infos.append((tx_hash, None if not tx else tx.hex(), tx_num, tx_height))
|
|
||||||
|
|
||||||
return tx_infos
|
|
||||||
|
|
||||||
async def fs_transactions(self, txids):
|
async def fs_transactions(self, txids):
|
||||||
txs = await asyncio.get_event_loop().run_in_executor(
|
return await asyncio.get_event_loop().run_in_executor(
|
||||||
self.executor, self._fs_transactions, txids
|
self.executor, transaction_info_get_batch, txids
|
||||||
)
|
)
|
||||||
unsorted_result = {}
|
|
||||||
|
|
||||||
async def add_result(item):
|
def fs_block_hashes(self, height, count):
|
||||||
_txid, _tx, _tx_num, _tx_height = item
|
|
||||||
unsorted_result[_txid] = (_tx, await self.tx_merkle(_tx_num, _tx_height))
|
|
||||||
|
|
||||||
if txs:
|
|
||||||
await asyncio.gather(*map(add_result, txs))
|
|
||||||
return {txid: unsorted_result[txid] for txid, _, _, _ in txs}
|
|
||||||
|
|
||||||
async def fs_block_hashes(self, height, count):
|
|
||||||
if height + count > len(self.headers):
|
if height + count > len(self.headers):
|
||||||
raise self.DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}')
|
raise self.DBError(f'only got {len(self.headers) - height:,d} headers starting at {height:,d}, not {count:,d}')
|
||||||
return [self.coin.header_hash(header) for header in self.headers[height:height + count]]
|
return [self.coin.header_hash(header) for header in self.headers[height:height + count]]
|
||||||
|
@ -553,49 +749,9 @@ class LevelDB:
|
||||||
transactions. By default returns at most 1000 entries. Set
|
transactions. By default returns at most 1000 entries. Set
|
||||||
limit to None to get them all.
|
limit to None to get them all.
|
||||||
"""
|
"""
|
||||||
# def read_history():
|
|
||||||
# hashx_history = []
|
|
||||||
# for key, hist in self.history.db.iterator(prefix=hashX):
|
|
||||||
# a = array.array('I')
|
|
||||||
# a.frombytes(hist)
|
|
||||||
# for tx_num in a:
|
|
||||||
# tx_height = bisect_right(self.tx_counts, tx_num)
|
|
||||||
# if tx_height > self.db_height:
|
|
||||||
# tx_hash = None
|
|
||||||
# else:
|
|
||||||
# tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
|
|
||||||
#
|
|
||||||
# hashx_history.append((tx_hash, tx_height))
|
|
||||||
# if limit and len(hashx_history) >= limit:
|
|
||||||
# return hashx_history
|
|
||||||
# return hashx_history
|
|
||||||
|
|
||||||
def read_history():
|
|
||||||
db_height = self.db_height
|
|
||||||
tx_counts = self.tx_counts
|
|
||||||
tx_db_get = self.tx_db.get
|
|
||||||
pack_be_uint64 = util.pack_be_uint64
|
|
||||||
|
|
||||||
cnt = 0
|
|
||||||
txs = []
|
|
||||||
|
|
||||||
for hist in self.history.db.iterator(prefix=hashX, include_key=False):
|
|
||||||
a = array.array('I')
|
|
||||||
a.frombytes(hist)
|
|
||||||
for tx_num in a:
|
|
||||||
tx_height = bisect_right(tx_counts, tx_num)
|
|
||||||
if tx_height > db_height:
|
|
||||||
return
|
|
||||||
txs.append((tx_num, tx_height))
|
|
||||||
cnt += 1
|
|
||||||
if limit and cnt >= limit:
|
|
||||||
break
|
|
||||||
if limit and cnt >= limit:
|
|
||||||
break
|
|
||||||
return txs
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history)
|
history = await asyncio.get_event_loop().run_in_executor(self.executor, limited_history, hashX, limit)
|
||||||
if history is not None:
|
if history is not None:
|
||||||
return [(self.total_transactions[tx_num], tx_height) for (tx_num, tx_height) in history]
|
return [(self.total_transactions[tx_num], tx_height) for (tx_num, tx_height) in history]
|
||||||
self.logger.warning(f'limited_history: tx hash '
|
self.logger.warning(f'limited_history: tx hash '
|
||||||
|
@ -610,11 +766,11 @@ class LevelDB:
|
||||||
|
|
||||||
def undo_key(self, height):
|
def undo_key(self, height):
|
||||||
"""DB key for undo information at the given height."""
|
"""DB key for undo information at the given height."""
|
||||||
return b'U' + pack('>I', height)
|
return UNDO_PREFIX + pack('>I', height)
|
||||||
|
|
||||||
def read_undo_info(self, height):
|
def read_undo_info(self, height):
|
||||||
"""Read undo information from a file for the current height."""
|
"""Read undo information from a file for the current height."""
|
||||||
return self.utxo_db.get(self.undo_key(height))
|
return self.db.get(self.undo_key(height))
|
||||||
|
|
||||||
def flush_undo_infos(self, batch_put, undo_infos):
|
def flush_undo_infos(self, batch_put, undo_infos):
|
||||||
"""undo_infos is a list of (undo_info, height) pairs."""
|
"""undo_infos is a list of (undo_info, height) pairs."""
|
||||||
|
@ -631,11 +787,7 @@ class LevelDB:
|
||||||
"""Returns a raw block read from disk. Raises FileNotFoundError
|
"""Returns a raw block read from disk. Raises FileNotFoundError
|
||||||
if the block isn't on-disk."""
|
if the block isn't on-disk."""
|
||||||
|
|
||||||
def read():
|
return await asyncio.get_event_loop().run_in_executor(self.executor, read_block_file, self.raw_block_path(height))
|
||||||
with util.open_file(self.raw_block_path(height)) as f:
|
|
||||||
return f.read(-1)
|
|
||||||
|
|
||||||
return await asyncio.get_event_loop().run_in_executor(self.executor, read)
|
|
||||||
|
|
||||||
def write_raw_block(self, block, height):
|
def write_raw_block(self, block, height):
|
||||||
"""Write a raw block to disk."""
|
"""Write a raw block to disk."""
|
||||||
|
@ -650,17 +802,16 @@ class LevelDB:
|
||||||
|
|
||||||
def clear_excess_undo_info(self):
|
def clear_excess_undo_info(self):
|
||||||
"""Clear excess undo info. Only most recent N are kept."""
|
"""Clear excess undo info. Only most recent N are kept."""
|
||||||
prefix = b'U'
|
|
||||||
min_height = self.min_undo_height(self.db_height)
|
min_height = self.min_undo_height(self.db_height)
|
||||||
keys = []
|
keys = []
|
||||||
for key, hist in self.utxo_db.iterator(prefix=prefix):
|
for key, hist in self.db.iterator(prefix=UNDO_PREFIX):
|
||||||
height, = unpack('>I', key[-4:])
|
height, = unpack('>I', key[-4:])
|
||||||
if height >= min_height:
|
if height >= min_height:
|
||||||
break
|
break
|
||||||
keys.append(key)
|
keys.append(key)
|
||||||
|
|
||||||
if keys:
|
if keys:
|
||||||
with self.utxo_db.write_batch() as batch:
|
with self.db.write_batch() as batch:
|
||||||
for key in keys:
|
for key in keys:
|
||||||
batch.delete(key)
|
batch.delete(key)
|
||||||
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
|
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
|
||||||
|
@ -681,7 +832,7 @@ class LevelDB:
|
||||||
# -- UTXO database
|
# -- UTXO database
|
||||||
|
|
||||||
def read_utxo_state(self):
|
def read_utxo_state(self):
|
||||||
state = self.utxo_db.get(b'state')
|
state = self.db.get(UTXO_STATE)
|
||||||
if not state:
|
if not state:
|
||||||
self.db_height = -1
|
self.db_height = -1
|
||||||
self.db_tx_count = 0
|
self.db_tx_count = 0
|
||||||
|
@ -724,7 +875,7 @@ class LevelDB:
|
||||||
self.logger.info(f'height: {self.db_height:,d}')
|
self.logger.info(f'height: {self.db_height:,d}')
|
||||||
self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}')
|
self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}')
|
||||||
self.logger.info(f'tx count: {self.db_tx_count:,d}')
|
self.logger.info(f'tx count: {self.db_tx_count:,d}')
|
||||||
if self.utxo_db.for_sync:
|
if self.db.for_sync:
|
||||||
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
|
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
|
||||||
if self.first_sync:
|
if self.first_sync:
|
||||||
self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}')
|
self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}')
|
||||||
|
@ -741,32 +892,18 @@ class LevelDB:
|
||||||
'first_sync': self.first_sync,
|
'first_sync': self.first_sync,
|
||||||
'db_version': self.db_version,
|
'db_version': self.db_version,
|
||||||
}
|
}
|
||||||
batch.put(b'state', repr(state).encode())
|
batch.put(UTXO_STATE, repr(state).encode())
|
||||||
|
|
||||||
def set_flush_count(self, count):
|
def set_flush_count(self, count):
|
||||||
self.utxo_flush_count = count
|
self.utxo_flush_count = count
|
||||||
with self.utxo_db.write_batch() as batch:
|
with self.db.write_batch() as batch:
|
||||||
self.write_utxo_state(batch)
|
self.write_utxo_state(batch)
|
||||||
|
|
||||||
async def all_utxos(self, hashX):
|
async def all_utxos(self, hashX):
|
||||||
"""Return all UTXOs for an address sorted in no particular order."""
|
"""Return all UTXOs for an address sorted in no particular order."""
|
||||||
def read_utxos():
|
|
||||||
utxos = []
|
|
||||||
utxos_append = utxos.append
|
|
||||||
s_unpack = unpack
|
|
||||||
fs_tx_hash = self.fs_tx_hash
|
|
||||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
|
||||||
# Value: the UTXO value as a 64-bit unsigned integer
|
|
||||||
prefix = b'u' + hashX
|
|
||||||
for db_key, db_value in self.utxo_db.iterator(prefix=prefix):
|
|
||||||
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
|
|
||||||
value, = unpack('<Q', db_value)
|
|
||||||
tx_hash, height = fs_tx_hash(tx_num)
|
|
||||||
utxos_append(UTXO(tx_num, tx_pos, tx_hash, height, value))
|
|
||||||
return utxos
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
utxos = await asyncio.get_event_loop().run_in_executor(self.executor, read_utxos)
|
utxos = await asyncio.get_event_loop().run_in_executor(self.executor, read_utxos)
|
||||||
|
utxos = [UTXO(tx_num, tx_pos, *self.fs_tx_hash(tx_num), value=value) for (tx_num, tx_pos, value) in utxos]
|
||||||
if all(utxo.tx_hash is not None for utxo in utxos):
|
if all(utxo.tx_hash is not None for utxo in utxos):
|
||||||
return utxos
|
return utxos
|
||||||
self.logger.warning(f'all_utxos: tx hash not '
|
self.logger.warning(f'all_utxos: tx hash not '
|
||||||
|
@ -779,45 +916,4 @@ class LevelDB:
|
||||||
|
|
||||||
Used by the mempool code.
|
Used by the mempool code.
|
||||||
"""
|
"""
|
||||||
def lookup_hashXs():
|
return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_hashXs_utxos, prevouts)
|
||||||
"""Return (hashX, suffix) pairs, or None if not found,
|
|
||||||
for each prevout.
|
|
||||||
"""
|
|
||||||
def lookup_hashX(tx_hash, tx_idx):
|
|
||||||
idx_packed = pack('<H', tx_idx)
|
|
||||||
|
|
||||||
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
|
||||||
# Value: hashX
|
|
||||||
prefix = b'h' + tx_hash[:4] + idx_packed
|
|
||||||
|
|
||||||
# Find which entry, if any, the TX_HASH matches.
|
|
||||||
for db_key, hashX in self.utxo_db.iterator(prefix=prefix):
|
|
||||||
tx_num_packed = db_key[-4:]
|
|
||||||
tx_num, = unpack('<I', tx_num_packed)
|
|
||||||
hash, height = self.fs_tx_hash(tx_num)
|
|
||||||
if hash == tx_hash:
|
|
||||||
return hashX, idx_packed + tx_num_packed
|
|
||||||
return None, None
|
|
||||||
return [lookup_hashX(*prevout) for prevout in prevouts]
|
|
||||||
|
|
||||||
def lookup_utxos(hashX_pairs):
|
|
||||||
def lookup_utxo(hashX, suffix):
|
|
||||||
if not hashX:
|
|
||||||
# This can happen when the daemon is a block ahead
|
|
||||||
# of us and has mempool txs spending outputs from
|
|
||||||
# that new block
|
|
||||||
return None
|
|
||||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
|
||||||
# Value: the UTXO value as a 64-bit unsigned integer
|
|
||||||
key = b'u' + hashX + suffix
|
|
||||||
db_value = self.utxo_db.get(key)
|
|
||||||
if not db_value:
|
|
||||||
# This can happen if the DB was updated between
|
|
||||||
# getting the hashXs and getting the UTXOs
|
|
||||||
return None
|
|
||||||
value, = unpack('<Q', db_value)
|
|
||||||
return hashX, value
|
|
||||||
return [lookup_utxo(*hashX_pair) for hashX_pair in hashX_pairs]
|
|
||||||
|
|
||||||
hashX_pairs = await asyncio.get_event_loop().run_in_executor(self.executor, lookup_hashXs)
|
|
||||||
return await asyncio.get_event_loop().run_in_executor(self.executor, lookup_utxos, hashX_pairs)
|
|
||||||
|
|
|
@ -43,10 +43,12 @@ class Merkle:
|
||||||
def __init__(self, hash_func=double_sha256):
|
def __init__(self, hash_func=double_sha256):
|
||||||
self.hash_func = hash_func
|
self.hash_func = hash_func
|
||||||
|
|
||||||
def tree_depth(self, hash_count):
|
@staticmethod
|
||||||
return self.branch_length(hash_count) + 1
|
def tree_depth(hash_count):
|
||||||
|
return Merkle.branch_length(hash_count) + 1
|
||||||
|
|
||||||
def branch_length(self, hash_count):
|
@staticmethod
|
||||||
|
def branch_length(hash_count):
|
||||||
"""Return the length of a merkle branch given the number of hashes."""
|
"""Return the length of a merkle branch given the number of hashes."""
|
||||||
if not isinstance(hash_count, int):
|
if not isinstance(hash_count, int):
|
||||||
raise TypeError('hash_count must be an integer')
|
raise TypeError('hash_count must be an integer')
|
||||||
|
@ -54,7 +56,8 @@ class Merkle:
|
||||||
raise ValueError('hash_count must be at least 1')
|
raise ValueError('hash_count must be at least 1')
|
||||||
return ceil(log(hash_count, 2))
|
return ceil(log(hash_count, 2))
|
||||||
|
|
||||||
def branch_and_root(self, hashes, index, length=None):
|
@staticmethod
|
||||||
|
def branch_and_root(hashes, index, length=None, hash_func=double_sha256):
|
||||||
"""Return a (merkle branch, merkle_root) pair given hashes, and the
|
"""Return a (merkle branch, merkle_root) pair given hashes, and the
|
||||||
index of one of those hashes.
|
index of one of those hashes.
|
||||||
"""
|
"""
|
||||||
|
@ -64,7 +67,7 @@ class Merkle:
|
||||||
# This also asserts hashes is not empty
|
# This also asserts hashes is not empty
|
||||||
if not 0 <= index < len(hashes):
|
if not 0 <= index < len(hashes):
|
||||||
raise ValueError(f"index '{index}/{len(hashes)}' out of range")
|
raise ValueError(f"index '{index}/{len(hashes)}' out of range")
|
||||||
natural_length = self.branch_length(len(hashes))
|
natural_length = Merkle.branch_length(len(hashes))
|
||||||
if length is None:
|
if length is None:
|
||||||
length = natural_length
|
length = natural_length
|
||||||
else:
|
else:
|
||||||
|
@ -73,7 +76,6 @@ class Merkle:
|
||||||
if length < natural_length:
|
if length < natural_length:
|
||||||
raise ValueError('length out of range')
|
raise ValueError('length out of range')
|
||||||
|
|
||||||
hash_func = self.hash_func
|
|
||||||
branch = []
|
branch = []
|
||||||
for _ in range(length):
|
for _ in range(length):
|
||||||
if len(hashes) & 1:
|
if len(hashes) & 1:
|
||||||
|
@ -85,44 +87,47 @@ class Merkle:
|
||||||
|
|
||||||
return branch, hashes[0]
|
return branch, hashes[0]
|
||||||
|
|
||||||
def root(self, hashes, length=None):
|
@staticmethod
|
||||||
|
def root(hashes, length=None):
|
||||||
"""Return the merkle root of a non-empty iterable of binary hashes."""
|
"""Return the merkle root of a non-empty iterable of binary hashes."""
|
||||||
branch, root = self.branch_and_root(hashes, 0, length)
|
branch, root = Merkle.branch_and_root(hashes, 0, length)
|
||||||
return root
|
return root
|
||||||
|
|
||||||
def root_from_proof(self, hash, branch, index):
|
# @staticmethod
|
||||||
"""Return the merkle root given a hash, a merkle branch to it, and
|
# def root_from_proof(hash, branch, index, hash_func=double_sha256):
|
||||||
its index in the hashes array.
|
# """Return the merkle root given a hash, a merkle branch to it, and
|
||||||
|
# its index in the hashes array.
|
||||||
|
#
|
||||||
|
# branch is an iterable sorted deepest to shallowest. If the
|
||||||
|
# returned root is the expected value then the merkle proof is
|
||||||
|
# verified.
|
||||||
|
#
|
||||||
|
# The caller should have confirmed the length of the branch with
|
||||||
|
# branch_length(). Unfortunately this is not easily done for
|
||||||
|
# bitcoin transactions as the number of transactions in a block
|
||||||
|
# is unknown to an SPV client.
|
||||||
|
# """
|
||||||
|
# for elt in branch:
|
||||||
|
# if index & 1:
|
||||||
|
# hash = hash_func(elt + hash)
|
||||||
|
# else:
|
||||||
|
# hash = hash_func(hash + elt)
|
||||||
|
# index >>= 1
|
||||||
|
# if index:
|
||||||
|
# raise ValueError('index out of range for branch')
|
||||||
|
# return hash
|
||||||
|
|
||||||
branch is an iterable sorted deepest to shallowest. If the
|
@staticmethod
|
||||||
returned root is the expected value then the merkle proof is
|
def level(hashes, depth_higher):
|
||||||
verified.
|
|
||||||
|
|
||||||
The caller should have confirmed the length of the branch with
|
|
||||||
branch_length(). Unfortunately this is not easily done for
|
|
||||||
bitcoin transactions as the number of transactions in a block
|
|
||||||
is unknown to an SPV client.
|
|
||||||
"""
|
|
||||||
hash_func = self.hash_func
|
|
||||||
for elt in branch:
|
|
||||||
if index & 1:
|
|
||||||
hash = hash_func(elt + hash)
|
|
||||||
else:
|
|
||||||
hash = hash_func(hash + elt)
|
|
||||||
index >>= 1
|
|
||||||
if index:
|
|
||||||
raise ValueError('index out of range for branch')
|
|
||||||
return hash
|
|
||||||
|
|
||||||
def level(self, hashes, depth_higher):
|
|
||||||
"""Return a level of the merkle tree of hashes the given depth
|
"""Return a level of the merkle tree of hashes the given depth
|
||||||
higher than the bottom row of the original tree."""
|
higher than the bottom row of the original tree."""
|
||||||
size = 1 << depth_higher
|
size = 1 << depth_higher
|
||||||
root = self.root
|
root = Merkle.root
|
||||||
return [root(hashes[n: n + size], depth_higher)
|
return [root(hashes[n: n + size], depth_higher)
|
||||||
for n in range(0, len(hashes), size)]
|
for n in range(0, len(hashes), size)]
|
||||||
|
|
||||||
def branch_and_root_from_level(self, level, leaf_hashes, index,
|
@staticmethod
|
||||||
|
def branch_and_root_from_level(level, leaf_hashes, index,
|
||||||
depth_higher):
|
depth_higher):
|
||||||
"""Return a (merkle branch, merkle_root) pair when a merkle-tree has a
|
"""Return a (merkle branch, merkle_root) pair when a merkle-tree has a
|
||||||
level cached.
|
level cached.
|
||||||
|
@ -146,10 +151,10 @@ class Merkle:
|
||||||
if not isinstance(leaf_hashes, list):
|
if not isinstance(leaf_hashes, list):
|
||||||
raise TypeError("leaf_hashes must be a list")
|
raise TypeError("leaf_hashes must be a list")
|
||||||
leaf_index = (index >> depth_higher) << depth_higher
|
leaf_index = (index >> depth_higher) << depth_higher
|
||||||
leaf_branch, leaf_root = self.branch_and_root(
|
leaf_branch, leaf_root = Merkle.branch_and_root(
|
||||||
leaf_hashes, index - leaf_index, depth_higher)
|
leaf_hashes, index - leaf_index, depth_higher)
|
||||||
index >>= depth_higher
|
index >>= depth_higher
|
||||||
level_branch, root = self.branch_and_root(level, index)
|
level_branch, root = Merkle.branch_and_root(level, index)
|
||||||
# Check last so that we know index is in-range
|
# Check last so that we know index is in-range
|
||||||
if leaf_root != level[index]:
|
if leaf_root != level[index]:
|
||||||
raise ValueError('leaf hashes inconsistent with level')
|
raise ValueError('leaf hashes inconsistent with level')
|
||||||
|
@ -191,7 +196,7 @@ class MerkleCache:
|
||||||
# Start from the beginning of any final partial segment.
|
# Start from the beginning of any final partial segment.
|
||||||
# Retain the value of depth_higher; in practice this is fine
|
# Retain the value of depth_higher; in practice this is fine
|
||||||
start = self._leaf_start(self.length)
|
start = self._leaf_start(self.length)
|
||||||
hashes = await self.source_func(start, length - start)
|
hashes = self.source_func(start, length - start)
|
||||||
self.level[start >> self.depth_higher:] = self._level(hashes)
|
self.level[start >> self.depth_higher:] = self._level(hashes)
|
||||||
self.length = length
|
self.length = length
|
||||||
|
|
||||||
|
@ -203,7 +208,7 @@ class MerkleCache:
|
||||||
level = self.level[:length >> self.depth_higher]
|
level = self.level[:length >> self.depth_higher]
|
||||||
leaf_start = self._leaf_start(length)
|
leaf_start = self._leaf_start(length)
|
||||||
count = min(self._segment_length(), length - leaf_start)
|
count = min(self._segment_length(), length - leaf_start)
|
||||||
hashes = await self.source_func(leaf_start, count)
|
hashes = self.source_func(leaf_start, count)
|
||||||
level += self._level(hashes)
|
level += self._level(hashes)
|
||||||
return level
|
return level
|
||||||
|
|
||||||
|
@ -211,7 +216,7 @@ class MerkleCache:
|
||||||
"""Call to initialize the cache to a source of given length."""
|
"""Call to initialize the cache to a source of given length."""
|
||||||
self.length = length
|
self.length = length
|
||||||
self.depth_higher = self.merkle.tree_depth(length) // 2
|
self.depth_higher = self.merkle.tree_depth(length) // 2
|
||||||
self.level = self._level(await self.source_func(0, length))
|
self.level = self._level(self.source_func(0, length))
|
||||||
self.initialized.set()
|
self.initialized.set()
|
||||||
|
|
||||||
def truncate(self, length):
|
def truncate(self, length):
|
||||||
|
@ -245,7 +250,7 @@ class MerkleCache:
|
||||||
await self._extend_to(length)
|
await self._extend_to(length)
|
||||||
leaf_start = self._leaf_start(index)
|
leaf_start = self._leaf_start(index)
|
||||||
count = min(self._segment_length(), length - leaf_start)
|
count = min(self._segment_length(), length - leaf_start)
|
||||||
leaf_hashes = await self.source_func(leaf_start, count)
|
leaf_hashes = self.source_func(leaf_start, count)
|
||||||
if length < self._segment_length():
|
if length < self._segment_length():
|
||||||
return self.merkle.branch_and_root(leaf_hashes, index)
|
return self.merkle.branch_and_root(leaf_hashes, index)
|
||||||
level = await self._level_for(length)
|
level = await self._level_for(length)
|
||||||
|
|
|
@ -1274,11 +1274,11 @@ class LBRYElectrumX(SessionBase):
|
||||||
hashX = self.address_to_hashX(address)
|
hashX = self.address_to_hashX(address)
|
||||||
return await self.hashX_unsubscribe(hashX, address)
|
return await self.hashX_unsubscribe(hashX, address)
|
||||||
|
|
||||||
async def get_balance(self, hashX):
|
# async def get_balance(self, hashX):
|
||||||
utxos = await self.db.all_utxos(hashX)
|
# utxos = await self.db.all_utxos(hashX)
|
||||||
confirmed = sum(utxo.value for utxo in utxos)
|
# confirmed = sum(utxo.value for utxo in utxos)
|
||||||
unconfirmed = await self.mempool.balance_delta(hashX)
|
# unconfirmed = await self.mempool.balance_delta(hashX)
|
||||||
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
|
# return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
|
||||||
|
|
||||||
async def scripthash_get_balance(self, scripthash):
|
async def scripthash_get_balance(self, scripthash):
|
||||||
"""Return the confirmed and unconfirmed balance of a scripthash."""
|
"""Return the confirmed and unconfirmed balance of a scripthash."""
|
||||||
|
@ -1534,6 +1534,7 @@ class LBRYElectrumX(SessionBase):
|
||||||
if block_hash:
|
if block_hash:
|
||||||
block = await self.daemon.deserialised_block(block_hash)
|
block = await self.daemon.deserialised_block(block_hash)
|
||||||
height = block['height']
|
height = block['height']
|
||||||
|
# print('lbrycrd txs', height, block['tx'])
|
||||||
try:
|
try:
|
||||||
pos = block['tx'].index(tx_hash)
|
pos = block['tx'].index(tx_hash)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
|
@ -1543,15 +1544,12 @@ class LBRYElectrumX(SessionBase):
|
||||||
else:
|
else:
|
||||||
batch_result[tx_hash] = [raw_tx, {'block_height': -1}]
|
batch_result[tx_hash] = [raw_tx, {'block_height': -1}]
|
||||||
|
|
||||||
def threaded_get_merkle():
|
|
||||||
for tx_hash, (raw_tx, block_txs, pos, block_height) in needed_merkles.items():
|
for tx_hash, (raw_tx, block_txs, pos, block_height) in needed_merkles.items():
|
||||||
batch_result[tx_hash] = raw_tx, {
|
batch_result[tx_hash] = raw_tx, {
|
||||||
'merkle': self._get_merkle_branch(block_txs, pos),
|
'merkle': self._get_merkle_branch(block_txs, pos),
|
||||||
'pos': pos,
|
'pos': pos,
|
||||||
'block_height': block_height
|
'block_height': block_height
|
||||||
}
|
}
|
||||||
if needed_merkles:
|
|
||||||
await asyncio.get_running_loop().run_in_executor(self.db.executor, threaded_get_merkle)
|
|
||||||
|
|
||||||
self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes))
|
self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes))
|
||||||
return batch_result
|
return batch_result
|
||||||
|
|
|
@ -25,18 +25,18 @@ def db_class(db_dir, name):
|
||||||
class Storage:
|
class Storage:
|
||||||
"""Abstract base class of the DB backend abstraction."""
|
"""Abstract base class of the DB backend abstraction."""
|
||||||
|
|
||||||
def __init__(self, db_dir, name, for_sync):
|
def __init__(self, db_dir, name, for_sync, read_only=False):
|
||||||
self.db_dir = db_dir
|
self.db_dir = db_dir
|
||||||
self.is_new = not os.path.exists(os.path.join(db_dir, name))
|
self.is_new = not os.path.exists(os.path.join(db_dir, name))
|
||||||
self.for_sync = for_sync or self.is_new
|
self.for_sync = for_sync or self.is_new
|
||||||
self.open(name, create=self.is_new)
|
self.open(name, create=self.is_new, read_only=read_only)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def import_module(cls):
|
def import_module(cls):
|
||||||
"""Import the DB engine module."""
|
"""Import the DB engine module."""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def open(self, name, create):
|
def open(self, name, create, read_only=False):
|
||||||
"""Open an existing database or create a new one."""
|
"""Open an existing database or create a new one."""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@ -77,7 +77,7 @@ class LevelDB(Storage):
|
||||||
import plyvel
|
import plyvel
|
||||||
cls.module = plyvel
|
cls.module = plyvel
|
||||||
|
|
||||||
def open(self, name, create, lru_cache_size=None):
|
def open(self, name, create, read_only=False):
|
||||||
mof = 10000
|
mof = 10000
|
||||||
path = os.path.join(self.db_dir, name)
|
path = os.path.join(self.db_dir, name)
|
||||||
# Use snappy compression (the default)
|
# Use snappy compression (the default)
|
||||||
|
@ -97,17 +97,18 @@ class RocksDB(Storage):
|
||||||
import rocksdb
|
import rocksdb
|
||||||
cls.module = rocksdb
|
cls.module = rocksdb
|
||||||
|
|
||||||
def open(self, name, create):
|
def open(self, name, create, read_only=False):
|
||||||
mof = 512 if self.for_sync else 128
|
mof = 10000
|
||||||
path = os.path.join(self.db_dir, name)
|
path = os.path.join(self.db_dir, name)
|
||||||
# Use snappy compression (the default)
|
# Use snappy compression (the default)
|
||||||
options = self.module.Options(create_if_missing=create,
|
options = self.module.Options(create_if_missing=create,
|
||||||
use_fsync=True,
|
use_fsync=True,
|
||||||
target_file_size_base=33554432,
|
target_file_size_base=33554432,
|
||||||
max_open_files=mof)
|
max_open_files=mof)
|
||||||
self.db = self.module.DB(path, options)
|
self.db = self.module.DB(path, options, read_only=read_only)
|
||||||
self.get = self.db.get
|
self.get = self.db.get
|
||||||
self.put = self.db.put
|
self.put = self.db.put
|
||||||
|
self.multi_get = self.db.multi_get
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
# PyRocksDB doesn't provide a close method; hopefully this is enough
|
# PyRocksDB doesn't provide a close method; hopefully this is enough
|
||||||
|
@ -118,8 +119,8 @@ class RocksDB(Storage):
|
||||||
def write_batch(self):
|
def write_batch(self):
|
||||||
return RocksDBWriteBatch(self.db)
|
return RocksDBWriteBatch(self.db)
|
||||||
|
|
||||||
def iterator(self, prefix=b'', reverse=False):
|
def iterator(self, **kwargs):
|
||||||
return RocksDBIterator(self.db, prefix, reverse)
|
return RocksDBIterator(self.db, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class RocksDBWriteBatch:
|
class RocksDBWriteBatch:
|
||||||
|
@ -140,28 +141,43 @@ class RocksDBWriteBatch:
|
||||||
class RocksDBIterator:
|
class RocksDBIterator:
|
||||||
"""An iterator for RocksDB."""
|
"""An iterator for RocksDB."""
|
||||||
|
|
||||||
def __init__(self, db, prefix, reverse):
|
__slots__ = [
|
||||||
|
'start',
|
||||||
|
'prefix',
|
||||||
|
'stop',
|
||||||
|
'iterator',
|
||||||
|
'include_key',
|
||||||
|
'include_value',
|
||||||
|
'prev',
|
||||||
|
'reverse'
|
||||||
|
]
|
||||||
|
|
||||||
|
def __init__(self, db, prefix=None, start=None, stop=None, include_key=True, include_value=True, reverse=False):
|
||||||
|
self.start = start
|
||||||
self.prefix = prefix
|
self.prefix = prefix
|
||||||
if reverse:
|
self.stop = stop
|
||||||
self.iterator = reversed(db.iteritems())
|
self.iterator = db.iteritems() if not reverse else reversed(db.iteritems())
|
||||||
nxt_prefix = util.increment_byte_string(prefix)
|
if prefix is not None and start is None:
|
||||||
if nxt_prefix:
|
|
||||||
self.iterator.seek(nxt_prefix)
|
|
||||||
try:
|
|
||||||
next(self.iterator)
|
|
||||||
except StopIteration:
|
|
||||||
self.iterator.seek(nxt_prefix)
|
|
||||||
else:
|
|
||||||
self.iterator.seek_to_last()
|
|
||||||
else:
|
|
||||||
self.iterator = db.iteritems()
|
|
||||||
self.iterator.seek(prefix)
|
self.iterator.seek(prefix)
|
||||||
|
elif start is not None:
|
||||||
|
self.iterator.seek(start)
|
||||||
|
self.include_key = include_key
|
||||||
|
self.include_value = include_value
|
||||||
|
self.prev = None
|
||||||
|
self.reverse = reverse
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def __next__(self):
|
def __next__(self):
|
||||||
k, v = next(self.iterator)
|
if None not in (self.stop, self.prev) and self.prev.startswith(self.stop):
|
||||||
if not k.startswith(self.prefix):
|
|
||||||
raise StopIteration
|
raise StopIteration
|
||||||
return k, v
|
self.prev, v = next(self.iterator)
|
||||||
|
prev = self.prev
|
||||||
|
if self.prefix is not None and not prev.startswith(self.prefix):
|
||||||
|
raise StopIteration
|
||||||
|
if self.include_key and self.include_value:
|
||||||
|
return prev, v
|
||||||
|
elif self.include_key:
|
||||||
|
return prev
|
||||||
|
return v
|
||||||
|
|
2
setup.py
2
setup.py
|
@ -10,6 +10,8 @@ with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh:
|
||||||
PLYVEL = []
|
PLYVEL = []
|
||||||
if sys.platform.startswith('linux'):
|
if sys.platform.startswith('linux'):
|
||||||
PLYVEL.append('plyvel==1.0.5')
|
PLYVEL.append('plyvel==1.0.5')
|
||||||
|
PLYVEL.append('python-rocksdb')
|
||||||
|
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name=__name__,
|
name=__name__,
|
||||||
|
|
|
@ -2,6 +2,15 @@ import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
from binascii import hexlify
|
from binascii import hexlify
|
||||||
from lbry.testcase import CommandTestCase
|
from lbry.testcase import CommandTestCase
|
||||||
|
from lbry.wallet.server.leveldb import proc_ctx
|
||||||
|
|
||||||
|
|
||||||
|
def get_txids(height):
|
||||||
|
ctx = proc_ctx.get()
|
||||||
|
return [
|
||||||
|
ctx.ctx_tx_hash(tx_num)[0][::-1].hex()
|
||||||
|
for tx_num in range(ctx.tx_counts[height - 1], ctx.tx_counts[height])
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
class BlockchainReorganizationTests(CommandTestCase):
|
class BlockchainReorganizationTests(CommandTestCase):
|
||||||
|
@ -11,18 +20,12 @@ class BlockchainReorganizationTests(CommandTestCase):
|
||||||
async def assertBlockHash(self, height):
|
async def assertBlockHash(self, height):
|
||||||
bp = self.conductor.spv_node.server.bp
|
bp = self.conductor.spv_node.server.bp
|
||||||
|
|
||||||
def get_txids():
|
|
||||||
return [
|
|
||||||
bp.db.fs_tx_hash(tx_num)[0][::-1].hex()
|
|
||||||
for tx_num in range(bp.db.tx_counts[height - 1], bp.db.tx_counts[height])
|
|
||||||
]
|
|
||||||
|
|
||||||
block_hash = await self.blockchain.get_block_hash(height)
|
block_hash = await self.blockchain.get_block_hash(height)
|
||||||
|
|
||||||
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
|
self.assertEqual(block_hash, (await self.ledger.headers.hash(height)).decode())
|
||||||
self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
|
self.assertEqual(block_hash, (bp.db.fs_block_hashes(height, 1))[0][::-1].hex())
|
||||||
|
|
||||||
txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids)
|
txids = await asyncio.get_event_loop().run_in_executor(bp.db.executor, get_txids, height)
|
||||||
txs = await bp.db.fs_transactions(txids)
|
txs = await bp.db.fs_transactions(txids)
|
||||||
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
block_txs = (await bp.daemon.deserialised_block(block_hash))['tx']
|
||||||
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions')
|
||||||
|
|
Loading…
Add table
Reference in a new issue