diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 3f13bf0e3..e3f548bbb 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -392,7 +392,7 @@ class BlockProcessor: async def write_state(self): def flush(): - with self.db.db.write_batch() as batch: + with self.db.db.write_batch(transaction=True) as batch: self.db.write_db_state(batch) await self.run_in_thread_with_lock(flush) diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 1a109b9d3..c20d64d64 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -57,7 +57,7 @@ class Env: self.coin = Coin.lookup_coin_class(coin_name, network) self.es_index_prefix = self.default('ES_INDEX_PREFIX', '') self.es_mode = self.default('ES_MODE', 'writer') - self.cache_MB = self.integer('CACHE_MB', 1200) + self.cache_MB = self.integer('CACHE_MB', 4096) self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) # Server stuff self.tcp_port = self.integer('TCP_PORT', None) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index de1f3b2e6..f1ea674ea 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -8,7 +8,7 @@ """Interface to the blockchain database.""" - +import os import asyncio import array import time @@ -17,6 +17,7 @@ import struct import attr import zlib import base64 +import plyvel from typing import Optional, Iterable, Tuple, DefaultDict, Set, Dict, List from functools import partial from asyncio import sleep @@ -28,7 +29,6 @@ from lbry.wallet.server import util from lbry.wallet.server.hash import hash_to_hex_str from lbry.wallet.server.tx import TxInput from lbry.wallet.server.merkle import Merkle, MerkleCache -from lbry.wallet.server.storage import db_class from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue @@ -107,7 +107,6 @@ class LevelDB: self.logger.info(f'switching current directory to {env.db_dir}') - self.db_class = db_class(env.db_dir, self.env.db_engine) self.db = None self.hist_unflushed = defaultdict(partial(array.array, 'I')) @@ -771,12 +770,19 @@ class LevelDB: async def open_dbs(self): if self.db: return - assert self.db is None - self.db = self.db_class(f'lbry-{self.env.db_engine}', True) - if self.db.is_new: - self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}') + + path = os.path.join(self.env.db_dir, 'lbry-leveldb') + is_new = os.path.isdir(path) + self.db = plyvel.DB( + path, create_if_missing=True, max_open_files=512, + lru_cache_size=self.env.cache_MB * 1024 * 1024, write_buffer_size=64 * 1024 * 1024, + max_file_size=1024 * 1024 * 64, bloom_filter_bits=32 + ) + + if is_new: + self.logger.info('created new db: %s', f'lbry-leveldb') else: - self.logger.info(f'opened db: %s', f'lbry-{self.env.db_engine}') + self.logger.info(f'opened db: %s', f'lbry-leveldb') # read db state self.read_db_state() @@ -793,8 +799,6 @@ class LevelDB: self.logger.info(f'height: {self.db_height:,d}') self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}') self.logger.info(f'tx count: {self.db_tx_count:,d}') - if self.db.for_sync: - self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB') if self.first_sync: self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}') if self.hist_db_version not in self.DB_VERSIONS: @@ -859,7 +863,7 @@ class LevelDB: ) ) - with self.db.write_batch() as batch: + with self.db.write_batch(transaction=True) as batch: batch_put = batch.put batch_delete = batch.delete @@ -900,7 +904,7 @@ class LevelDB: self.hist_flush_count += 1 nremoves = 0 - with self.db.write_batch() as batch: + with self.db.write_batch(transaction=True) as batch: batch_put = batch.put batch_delete = batch.delete for op in flush_data.put_and_delete_ops: diff --git a/lbry/wallet/server/storage.py b/lbry/wallet/server/storage.py deleted file mode 100644 index 2d2b805e4..000000000 --- a/lbry/wallet/server/storage.py +++ /dev/null @@ -1,169 +0,0 @@ -# Copyright (c) 2016-2017, the ElectrumX authors -# -# All rights reserved. -# -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. - -"""Backend database abstraction.""" - -import os -from functools import partial - -from lbry.wallet.server import util - - -def db_class(db_dir, name): - """Returns a DB engine class.""" - for db_class in util.subclasses(Storage): - if db_class.__name__.lower() == name.lower(): - db_class.import_module() - return partial(db_class, db_dir) - raise RuntimeError(f'unrecognised DB engine "{name}"') - - -class Storage: - """Abstract base class of the DB backend abstraction.""" - - def __init__(self, db_dir, name, for_sync): - self.db_dir = db_dir - self.is_new = not os.path.exists(os.path.join(db_dir, name)) - self.for_sync = for_sync or self.is_new - self.open(name, create=self.is_new) - - @classmethod - def import_module(cls): - """Import the DB engine module.""" - raise NotImplementedError - - def open(self, name, create): - """Open an existing database or create a new one.""" - raise NotImplementedError - - def close(self): - """Close an existing database.""" - raise NotImplementedError - - def get(self, key): - raise NotImplementedError - - def put(self, key, value): - raise NotImplementedError - - def write_batch(self): - """Return a context manager that provides `put` and `delete`. - - Changes should only be committed when the context manager - closes without an exception. - """ - raise NotImplementedError - - def iterator(self, prefix=b'', reverse=False): - """Return an iterator that yields (key, value) pairs from the - database sorted by key. - - If `prefix` is set, only keys starting with `prefix` will be - included. If `reverse` is True the items are returned in - reverse order. - """ - raise NotImplementedError - - -class LevelDB(Storage): - """LevelDB database engine.""" - - @classmethod - def import_module(cls): - import plyvel - cls.module = plyvel - - def open(self, name, create, lru_cache_size=None): - mof = 512 - path = os.path.join(self.db_dir, name) - # Use snappy compression (the default) - self.db = self.module.DB(path, create_if_missing=create, max_open_files=mof, lru_cache_size=4*1024*1024*1024, - write_buffer_size=64*1024*1024, max_file_size=1024*1024*64, - bloom_filter_bits=32) - self.close = self.db.close - self.get = self.db.get - self.put = self.db.put - self.iterator = self.db.iterator - self.write_batch = partial(self.db.write_batch, transaction=True) - - -class RocksDB(Storage): - """RocksDB database engine.""" - - @classmethod - def import_module(cls): - import rocksdb - cls.module = rocksdb - - def open(self, name, create): - mof = 512 if self.for_sync else 128 - path = os.path.join(self.db_dir, name) - # Use snappy compression (the default) - options = self.module.Options(create_if_missing=create, - use_fsync=True, - target_file_size_base=33554432, - max_open_files=mof) - self.db = self.module.DB(path, options) - self.get = self.db.get - self.put = self.db.put - - def close(self): - # PyRocksDB doesn't provide a close method; hopefully this is enough - self.db = self.get = self.put = None - import gc - gc.collect() - - def write_batch(self): - return RocksDBWriteBatch(self.db) - - def iterator(self, prefix=b'', reverse=False): - return RocksDBIterator(self.db, prefix, reverse) - - -class RocksDBWriteBatch: - """A write batch for RocksDB.""" - - def __init__(self, db): - self.batch = RocksDB.module.WriteBatch() - self.db = db - - def __enter__(self): - return self.batch - - def __exit__(self, exc_type, exc_val, exc_tb): - if not exc_val: - self.db.write(self.batch) - - -class RocksDBIterator: - """An iterator for RocksDB.""" - - def __init__(self, db, prefix, reverse): - self.prefix = prefix - if reverse: - self.iterator = reversed(db.iteritems()) - nxt_prefix = util.increment_byte_string(prefix) - if nxt_prefix: - self.iterator.seek(nxt_prefix) - try: - next(self.iterator) - except StopIteration: - self.iterator.seek(nxt_prefix) - else: - self.iterator.seek_to_last() - else: - self.iterator = db.iteritems() - self.iterator.seek(prefix) - - def __iter__(self): - return self - - def __next__(self): - k, v = next(self.iterator) - if not k.startswith(self.prefix): - raise StopIteration - return k, v