diff --git a/scribe/reader/interface.py b/scribe/reader/interface.py index bd095c9..2e0afb0 100644 --- a/scribe/reader/interface.py +++ b/scribe/reader/interface.py @@ -84,6 +84,12 @@ class BaseBlockchainReader: ) async def poll_for_changes(self): + """ + Detect and handle if the db has advanced to a new block or unwound during a chain reorganization + + If a reorg is detected, this will first unwind() to the branching height and then advance() forward + to the new block(s). + """ await asyncio.get_event_loop().run_in_executor(self._executor, self._detect_changes) async def refresh_blocks_forever(self, synchronized: asyncio.Event): @@ -100,9 +106,15 @@ class BaseBlockchainReader: synchronized.set() def clear_caches(self): + """ + Called after finished advancing, used for invalidating caches + """ pass def advance(self, height: int): + """ + Advance to the given block height + """ tx_count = self.db.prefix_db.tx_count.get(height).tx_count assert tx_count not in self.db.tx_counts, f'boom {tx_count} in {len(self.db.tx_counts)} tx counts' assert len(self.db.tx_counts) == height, f"{len(self.db.tx_counts)} != {height}" @@ -110,10 +122,14 @@ class BaseBlockchainReader: self.db.headers.append(self.db.prefix_db.header.get(height, deserialize_value=False)) def unwind(self): + """ + Go backwards one block + """ self.db.tx_counts.pop() self.db.headers.pop() async def start(self): + # TODO: make the method here useful if not self._executor: self._executor = ThreadPoolExecutor(self._thread_workers, thread_name_prefix=self._thread_prefix) self.db._executor = self._executor