add migrators to build new indexes

This commit is contained in:
Jack Robison 2022-08-29 13:51:19 -04:00
parent ee02a80a98
commit c686187e35
5 changed files with 125 additions and 1 deletions

View file

@ -34,7 +34,7 @@ NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db"
class SecondaryDB:
DB_VERSIONS = [7, 8, 9]
DB_VERSIONS = [7, 8, 9, 10, 11]
def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,

View file

@ -292,6 +292,9 @@ class BasePrefixDB:
def multi_delete(self, items: typing.List[typing.Tuple[bytes, bytes]]):
self._op_stack.multi_delete([RevertableDelete(k, v) for k, v in items])
def multi_put(self, items: typing.List[typing.Tuple[bytes, bytes]]):
self._op_stack.multi_put([RevertablePut(k, v) for k, v in items])
def iterator(self, start: bytes, column_family: 'rocksdb.ColumnFamilyHandle' = None,
iterate_lower_bound: bytes = None, iterate_upper_bound: bytes = None,
reverse: bool = False, include_key: bool = True, include_value: bool = True,

View file

@ -0,0 +1,67 @@
import logging
from collections import defaultdict
from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE
FROM_VERSION = 10
TO_VERSION = 11
def migrate(db):
log = logging.getLogger(__name__)
prefix_db = db.prefix_db
log.info("migrating the db to version 11")
effective_amounts = defaultdict(int)
support_amounts = defaultdict(int)
log.info("deleting any existing effective amounts")
to_delete = list(prefix_db.effective_amount.iterate(deserialize_key=False, deserialize_value=False))
while to_delete:
batch, to_delete = to_delete[:100000], to_delete[100000:]
if batch:
prefix_db.multi_delete(batch)
prefix_db.unsafe_commit()
log.info("calculating claim effective amounts for the new index at block %i", db.db_height)
height = db.db_height
cnt = 0
for k, v in prefix_db.active_amount.iterate():
cnt += 1
claim_hash, activation_height, amount = k.claim_hash, k.activation_height, v.amount
if activation_height <= height:
effective_amounts[claim_hash] += amount
if k.txo_type == ACTIVATED_SUPPORT_TXO_TYPE:
support_amounts[claim_hash] += amount
if cnt % 1000000 == 0:
log.info("scanned %i amounts for %i claims", cnt, len(effective_amounts))
log.info("preparing to insert effective amounts")
effective_amounts_to_put = [
prefix_db.effective_amount.pack_item(claim_hash, effective_amount, support_amounts[claim_hash])
for claim_hash, effective_amount in effective_amounts.items()
]
log.info("inserting %i effective amounts", len(effective_amounts_to_put))
cnt = 0
while effective_amounts_to_put:
batch, effective_amounts_to_put = effective_amounts_to_put[:100000], effective_amounts_to_put[100000:]
if batch:
prefix_db.multi_put(batch)
prefix_db.unsafe_commit()
cnt += len(batch)
if cnt % 1000000 == 0:
log.info("inserted effective amounts for %i claims", cnt)
log.info("finished building the effective amount index")
db.db_version = 11
db.write_db_state()
db.prefix_db.unsafe_commit()
log.info("finished migration to version 11")

View file

@ -0,0 +1,48 @@
import logging
from collections import defaultdict
from hub.db.revertable import RevertablePut
FROM_VERSION = 9
TO_VERSION = 10
def migrate(db):
log = logging.getLogger(__name__)
prefix_db = db.prefix_db
log.info("migrating the db to version 10")
repost_counts = defaultdict(int)
log.info("deleting any existing repost counts")
to_delete = list(prefix_db.reposted_count.iterate(deserialize_key=False, deserialize_value=False))
while to_delete:
batch, to_delete = to_delete[:10000], to_delete[10000:]
if batch:
prefix_db.multi_delete(batch)
prefix_db.unsafe_commit()
log.info("counting reposts to build the new index")
for reposted_claim_hash in prefix_db.repost.iterate(include_key=False, deserialize_value=False):
repost_counts[reposted_claim_hash] += 1
log.info("inserting repost counts")
reposted_counts_to_put = [
prefix_db.reposted_count.pack_item(claim_hash, count)
for claim_hash, count in repost_counts.items()
]
while reposted_counts_to_put:
batch, reposted_counts_to_put = reposted_counts_to_put[:10000], reposted_counts_to_put[10000:]
if batch:
prefix_db.multi_put(batch)
prefix_db.unsafe_commit()
log.info("finished building the repost count index")
db.db_version = 10
db.write_db_state()
db.prefix_db.unsafe_commit()
log.info("finished migration to version 10")

View file

@ -1828,6 +1828,12 @@ class BlockchainProcessorService(BlockchainService):
elif self.db.db_version == 8:
from hub.db.migrators.migrate8to9 import migrate, FROM_VERSION, TO_VERSION
self.db._index_address_status = self.env.index_address_status
elif self.db.db_version == 9:
from hub.db.migrators.migrate9to10 import migrate, FROM_VERSION, TO_VERSION
self.db._index_address_status = self.env.index_address_status
elif self.db.db_version == 10:
from hub.db.migrators.migrate10to11 import migrate, FROM_VERSION, TO_VERSION
self.db._index_address_status = self.env.index_address_status
else:
raise RuntimeError("unknown db version")
self.log.warning(f"migrating database from version {FROM_VERSION} to version {TO_VERSION}")