delete lbry/wallet/server/storage.py

-expose leveldb lru cache size as `CACHE_MB` hub param
This commit is contained in:
Jack Robison 2021-07-28 13:59:56 -04:00 committed by Victor Shyba
parent e94a2c7c94
commit 94e0624024
4 changed files with 18 additions and 183 deletions

View file

@ -392,7 +392,7 @@ class BlockProcessor:
async def write_state(self): async def write_state(self):
def flush(): def flush():
with self.db.db.write_batch() as batch: with self.db.db.write_batch(transaction=True) as batch:
self.db.write_db_state(batch) self.db.write_db_state(batch)
await self.run_in_thread_with_lock(flush) await self.run_in_thread_with_lock(flush)

View file

@ -57,7 +57,7 @@ class Env:
self.coin = Coin.lookup_coin_class(coin_name, network) self.coin = Coin.lookup_coin_class(coin_name, network)
self.es_index_prefix = self.default('ES_INDEX_PREFIX', '') self.es_index_prefix = self.default('ES_INDEX_PREFIX', '')
self.es_mode = self.default('ES_MODE', 'writer') self.es_mode = self.default('ES_MODE', 'writer')
self.cache_MB = self.integer('CACHE_MB', 1200) self.cache_MB = self.integer('CACHE_MB', 4096)
self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
# Server stuff # Server stuff
self.tcp_port = self.integer('TCP_PORT', None) self.tcp_port = self.integer('TCP_PORT', None)

View file

@ -8,7 +8,7 @@
"""Interface to the blockchain database.""" """Interface to the blockchain database."""
import os
import asyncio import asyncio
import array import array
import time import time
@ -17,6 +17,7 @@ import struct
import attr import attr
import zlib import zlib
import base64 import base64
import plyvel
from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List
from functools import partial from functools import partial
from asyncio import sleep from asyncio import sleep
@ -28,7 +29,6 @@ from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str from lbry.wallet.server.hash import hash_to_hex_str
from lbry.wallet.server.tx import TxInput from lbry.wallet.server.tx import TxInput
from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.storage import db_class
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue
@ -107,7 +107,6 @@ class LevelDB:
self.logger.info(f'switching current directory to {env.db_dir}') self.logger.info(f'switching current directory to {env.db_dir}')
self.db_class = db_class(env.db_dir, self.env.db_engine)
self.db = None self.db = None
self.hist_unflushed = defaultdict(partial(array.array, 'I')) self.hist_unflushed = defaultdict(partial(array.array, 'I'))
@ -771,12 +770,19 @@ class LevelDB:
async def open_dbs(self): async def open_dbs(self):
if self.db: if self.db:
return return
assert self.db is None
self.db = self.db_class(f'lbry-{self.env.db_engine}', True) path = os.path.join(self.env.db_dir, 'lbry-leveldb')
if self.db.is_new: is_new = os.path.isdir(path)
self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}') self.db = plyvel.DB(
path, create_if_missing=True, max_open_files=512,
lru_cache_size=self.env.cache_MB * 1024 * 1024, write_buffer_size=64 * 1024 * 1024,
max_file_size=1024 * 1024 * 64, bloom_filter_bits=32
)
if is_new:
self.logger.info('created new db: %s', f'lbry-leveldb')
else: else:
self.logger.info(f'opened db: %s', f'lbry-{self.env.db_engine}') self.logger.info(f'opened db: %s', f'lbry-leveldb')
# read db state # read db state
self.read_db_state() self.read_db_state()
@ -793,8 +799,6 @@ class LevelDB:
self.logger.info(f'height: {self.db_height:,d}') self.logger.info(f'height: {self.db_height:,d}')
self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}') self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}')
self.logger.info(f'tx count: {self.db_tx_count:,d}') self.logger.info(f'tx count: {self.db_tx_count:,d}')
if self.db.for_sync:
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
if self.first_sync: if self.first_sync:
self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}') self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}')
if self.hist_db_version not in self.DB_VERSIONS: if self.hist_db_version not in self.DB_VERSIONS:
@ -859,7 +863,7 @@ class LevelDB:
) )
) )
with self.db.write_batch() as batch: with self.db.write_batch(transaction=True) as batch:
batch_put = batch.put batch_put = batch.put
batch_delete = batch.delete batch_delete = batch.delete
@ -900,7 +904,7 @@ class LevelDB:
self.hist_flush_count += 1 self.hist_flush_count += 1
nremoves = 0 nremoves = 0
with self.db.write_batch() as batch: with self.db.write_batch(transaction=True) as batch:
batch_put = batch.put batch_put = batch.put
batch_delete = batch.delete batch_delete = batch.delete
for op in flush_data.put_and_delete_ops: for op in flush_data.put_and_delete_ops:

View file

@ -1,169 +0,0 @@
# Copyright (c) 2016-2017, the ElectrumX authors
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
"""Backend database abstraction."""
import os
from functools import partial
from lbry.wallet.server import util
def db_class(db_dir, name):
"""Returns a DB engine class."""
for db_class in util.subclasses(Storage):
if db_class.__name__.lower() == name.lower():
db_class.import_module()
return partial(db_class, db_dir)
raise RuntimeError(f'unrecognised DB engine "{name}"')
class Storage:
"""Abstract base class of the DB backend abstraction."""
def __init__(self, db_dir, name, for_sync):
self.db_dir = db_dir
self.is_new = not os.path.exists(os.path.join(db_dir, name))
self.for_sync = for_sync or self.is_new
self.open(name, create=self.is_new)
@classmethod
def import_module(cls):
"""Import the DB engine module."""
raise NotImplementedError
def open(self, name, create):
"""Open an existing database or create a new one."""
raise NotImplementedError
def close(self):
"""Close an existing database."""
raise NotImplementedError
def get(self, key):
raise NotImplementedError
def put(self, key, value):
raise NotImplementedError
def write_batch(self):
"""Return a context manager that provides `put` and `delete`.
Changes should only be committed when the context manager
closes without an exception.
"""
raise NotImplementedError
def iterator(self, prefix=b'', reverse=False):
"""Return an iterator that yields (key, value) pairs from the
database sorted by key.
If `prefix` is set, only keys starting with `prefix` will be
included. If `reverse` is True the items are returned in
reverse order.
"""
raise NotImplementedError
class LevelDB(Storage):
"""LevelDB database engine."""
@classmethod
def import_module(cls):
import plyvel
cls.module = plyvel
def open(self, name, create, lru_cache_size=None):
mof = 512
path = os.path.join(self.db_dir, name)
# Use snappy compression (the default)
self.db = self.module.DB(path, create_if_missing=create, max_open_files=mof, lru_cache_size=4*1024*1024*1024,
write_buffer_size=64*1024*1024, max_file_size=1024*1024*64,
bloom_filter_bits=32)
self.close = self.db.close
self.get = self.db.get
self.put = self.db.put
self.iterator = self.db.iterator
self.write_batch = partial(self.db.write_batch, transaction=True)
class RocksDB(Storage):
"""RocksDB database engine."""
@classmethod
def import_module(cls):
import rocksdb
cls.module = rocksdb
def open(self, name, create):
mof = 512 if self.for_sync else 128
path = os.path.join(self.db_dir, name)
# Use snappy compression (the default)
options = self.module.Options(create_if_missing=create,
use_fsync=True,
target_file_size_base=33554432,
max_open_files=mof)
self.db = self.module.DB(path, options)
self.get = self.db.get
self.put = self.db.put
def close(self):
# PyRocksDB doesn't provide a close method; hopefully this is enough
self.db = self.get = self.put = None
import gc
gc.collect()
def write_batch(self):
return RocksDBWriteBatch(self.db)
def iterator(self, prefix=b'', reverse=False):
return RocksDBIterator(self.db, prefix, reverse)
class RocksDBWriteBatch:
"""A write batch for RocksDB."""
def __init__(self, db):
self.batch = RocksDB.module.WriteBatch()
self.db = db
def __enter__(self):
return self.batch
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_val:
self.db.write(self.batch)
class RocksDBIterator:
"""An iterator for RocksDB."""
def __init__(self, db, prefix, reverse):
self.prefix = prefix
if reverse:
self.iterator = reversed(db.iteritems())
nxt_prefix = util.increment_byte_string(prefix)
if nxt_prefix:
self.iterator.seek(nxt_prefix)
try:
next(self.iterator)
except StopIteration:
self.iterator.seek(nxt_prefix)
else:
self.iterator.seek_to_last()
else:
self.iterator = db.iteritems()
self.iterator.seek(prefix)
def __iter__(self):
return self
def __next__(self):
k, v = next(self.iterator)
if not k.startswith(self.prefix):
raise StopIteration
return k, v