From c686187e3572d8776f36d5552db9a73c5fcc381a Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 29 Aug 2022 13:51:19 -0400 Subject: [PATCH] add migrators to build new indexes --- hub/db/db.py | 2 +- hub/db/interface.py | 3 ++ hub/db/migrators/migrate10to11.py | 67 +++++++++++++++++++++++++++++++ hub/db/migrators/migrate9to10.py | 48 ++++++++++++++++++++++ hub/scribe/service.py | 6 +++ 5 files changed, 125 insertions(+), 1 deletion(-) create mode 100644 hub/db/migrators/migrate10to11.py create mode 100644 hub/db/migrators/migrate9to10.py diff --git a/hub/db/db.py b/hub/db/db.py index 0b23383..825dab9 100644 --- a/hub/db/db.py +++ b/hub/db/db.py @@ -34,7 +34,7 @@ NAMESPACE = f"{PROMETHEUS_NAMESPACE}_db" class SecondaryDB: - DB_VERSIONS = [7, 8, 9] + DB_VERSIONS = [7, 8, 9, 10, 11] def __init__(self, coin, db_dir: str, secondary_name: str, max_open_files: int = -1, reorg_limit: int = 200, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, diff --git a/hub/db/interface.py b/hub/db/interface.py index 7031e66..8aa576e 100644 --- a/hub/db/interface.py +++ b/hub/db/interface.py @@ -292,6 +292,9 @@ class BasePrefixDB: def multi_delete(self, items: typing.List[typing.Tuple[bytes, bytes]]): self._op_stack.multi_delete([RevertableDelete(k, v) for k, v in items]) + def multi_put(self, items: typing.List[typing.Tuple[bytes, bytes]]): + self._op_stack.multi_put([RevertablePut(k, v) for k, v in items]) + def iterator(self, start: bytes, column_family: 'rocksdb.ColumnFamilyHandle' = None, iterate_lower_bound: bytes = None, iterate_upper_bound: bytes = None, reverse: bool = False, include_key: bool = True, include_value: bool = True, diff --git a/hub/db/migrators/migrate10to11.py b/hub/db/migrators/migrate10to11.py new file mode 100644 index 0000000..754001e --- /dev/null +++ b/hub/db/migrators/migrate10to11.py @@ -0,0 +1,67 @@ +import logging +from collections import defaultdict +from hub.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE + +FROM_VERSION = 10 +TO_VERSION = 11 + + +def migrate(db): + log = logging.getLogger(__name__) + prefix_db = db.prefix_db + + log.info("migrating the db to version 11") + + effective_amounts = defaultdict(int) + support_amounts = defaultdict(int) + + log.info("deleting any existing effective amounts") + + to_delete = list(prefix_db.effective_amount.iterate(deserialize_key=False, deserialize_value=False)) + while to_delete: + batch, to_delete = to_delete[:100000], to_delete[100000:] + if batch: + prefix_db.multi_delete(batch) + prefix_db.unsafe_commit() + + log.info("calculating claim effective amounts for the new index at block %i", db.db_height) + + height = db.db_height + + cnt = 0 + for k, v in prefix_db.active_amount.iterate(): + cnt += 1 + claim_hash, activation_height, amount = k.claim_hash, k.activation_height, v.amount + if activation_height <= height: + effective_amounts[claim_hash] += amount + if k.txo_type == ACTIVATED_SUPPORT_TXO_TYPE: + support_amounts[claim_hash] += amount + if cnt % 1000000 == 0: + log.info("scanned %i amounts for %i claims", cnt, len(effective_amounts)) + + log.info("preparing to insert effective amounts") + + effective_amounts_to_put = [ + prefix_db.effective_amount.pack_item(claim_hash, effective_amount, support_amounts[claim_hash]) + for claim_hash, effective_amount in effective_amounts.items() + ] + + log.info("inserting %i effective amounts", len(effective_amounts_to_put)) + + cnt = 0 + + while effective_amounts_to_put: + batch, effective_amounts_to_put = effective_amounts_to_put[:100000], effective_amounts_to_put[100000:] + if batch: + prefix_db.multi_put(batch) + prefix_db.unsafe_commit() + cnt += len(batch) + if cnt % 1000000 == 0: + log.info("inserted effective amounts for %i claims", cnt) + + log.info("finished building the effective amount index") + + db.db_version = 11 + db.write_db_state() + db.prefix_db.unsafe_commit() + log.info("finished migration to version 11") diff --git a/hub/db/migrators/migrate9to10.py b/hub/db/migrators/migrate9to10.py new file mode 100644 index 0000000..37c8491 --- /dev/null +++ b/hub/db/migrators/migrate9to10.py @@ -0,0 +1,48 @@ +import logging +from collections import defaultdict +from hub.db.revertable import RevertablePut + +FROM_VERSION = 9 +TO_VERSION = 10 + + +def migrate(db): + log = logging.getLogger(__name__) + prefix_db = db.prefix_db + + log.info("migrating the db to version 10") + + repost_counts = defaultdict(int) + log.info("deleting any existing repost counts") + + to_delete = list(prefix_db.reposted_count.iterate(deserialize_key=False, deserialize_value=False)) + while to_delete: + batch, to_delete = to_delete[:10000], to_delete[10000:] + if batch: + prefix_db.multi_delete(batch) + prefix_db.unsafe_commit() + + log.info("counting reposts to build the new index") + + for reposted_claim_hash in prefix_db.repost.iterate(include_key=False, deserialize_value=False): + repost_counts[reposted_claim_hash] += 1 + + log.info("inserting repost counts") + + reposted_counts_to_put = [ + prefix_db.reposted_count.pack_item(claim_hash, count) + for claim_hash, count in repost_counts.items() + ] + + while reposted_counts_to_put: + batch, reposted_counts_to_put = reposted_counts_to_put[:10000], reposted_counts_to_put[10000:] + if batch: + prefix_db.multi_put(batch) + prefix_db.unsafe_commit() + + log.info("finished building the repost count index") + + db.db_version = 10 + db.write_db_state() + db.prefix_db.unsafe_commit() + log.info("finished migration to version 10") diff --git a/hub/scribe/service.py b/hub/scribe/service.py index ba9dad8..204eed3 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -1828,6 +1828,12 @@ class BlockchainProcessorService(BlockchainService): elif self.db.db_version == 8: from hub.db.migrators.migrate8to9 import migrate, FROM_VERSION, TO_VERSION self.db._index_address_status = self.env.index_address_status + elif self.db.db_version == 9: + from hub.db.migrators.migrate9to10 import migrate, FROM_VERSION, TO_VERSION + self.db._index_address_status = self.env.index_address_status + elif self.db.db_version == 10: + from hub.db.migrators.migrate10to11 import migrate, FROM_VERSION, TO_VERSION + self.db._index_address_status = self.env.index_address_status else: raise RuntimeError("unknown db version") self.log.warning(f"migrating database from version {FROM_VERSION} to version {TO_VERSION}")