From bdc7f4b3f5f76db945299084c92940d058dd5d1e Mon Sep 17 00:00:00 2001
From: Jack Robison <jackrobison@lbry.io>
Date: Mon, 9 Nov 2020 13:09:00 -0500
Subject: [PATCH] combine tx_count_db and hashes_db, add tx db

---
 lbry/wallet/server/block_processor.py |  6 +--
 lbry/wallet/server/leveldb.py         | 59 ++++++++++++---------------
 2 files changed, 29 insertions(+), 36 deletions(-)

diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py
index 7fe0ae059..68c735265 100644
--- a/lbry/wallet/server/block_processor.py
+++ b/lbry/wallet/server/block_processor.py
@@ -171,7 +171,7 @@ class BlockProcessor:
 
         # Caches of unflushed items.
         self.headers = []
-        self.tx_hashes = []
+        self.block_txs = []
         self.undo_infos = []
 
         # UTXO cache
@@ -337,7 +337,7 @@ class BlockProcessor:
         """The data for a flush.  The lock must be taken."""
         assert self.state_lock.locked()
         return FlushData(self.height, self.tx_count, self.headers,
-                         self.tx_hashes, self.undo_infos, self.utxo_cache,
+                         self.block_txs, self.undo_infos, self.utxo_cache,
                          self.db_deletes, self.tip)
 
     async def flush(self, flush_utxos):
@@ -404,7 +404,7 @@ class BlockProcessor:
         self.tip = self.coin.header_hash(headers[-1])
 
     def advance_txs(self, height, txs, header):
-        self.tx_hashes.append(b''.join(tx_hash for tx, tx_hash in txs))
+        self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx for tx, _ in txs]))
 
         # Use local vars for speed in the loops
         undo_info = []
diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py
index ccd3fcd60..a7f84afe6 100644
--- a/lbry/wallet/server/leveldb.py
+++ b/lbry/wallet/server/leveldb.py
@@ -35,6 +35,7 @@ UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
 HEADER_PREFIX = b'H'
 TX_COUNT_PREFIX = b'T'
 TX_HASH_PREFIX = b'X'
+TX_PREFIX = b'B'
 
 
 @attr.s(slots=True)
@@ -42,7 +43,7 @@ class FlushData:
     height = attr.ib()
     tx_count = attr.ib()
     headers = attr.ib()
-    block_tx_hashes = attr.ib()
+    block_txs = attr.ib()
     # The following are flushed to the UTXO DB if undo_infos is not None
     undo_infos = attr.ib()
     adds = attr.ib()
@@ -82,9 +83,8 @@ class LevelDB:
         self.merkle = Merkle()
         self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes)
 
-        self.hashes_db = None
         self.headers_db = None
-        self.tx_count_db = None
+        self.tx_db = None
 
     async def _read_tx_counts(self):
         if self.tx_counts is not None:
@@ -95,7 +95,7 @@ class LevelDB:
         def get_counts():
             return tuple(
                 util.unpack_be_uint64(tx_count)
-                for tx_count in self.tx_count_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
+                for tx_count in self.tx_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
             )
 
         tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts)
@@ -123,17 +123,11 @@ class LevelDB:
             self.logger.info('created new headers db')
         self.logger.info(f'opened headers DB (for sync: {for_sync})')
 
-        assert self.tx_count_db is None
-        self.tx_count_db = self.db_class('tx_count', for_sync)
-        if self.tx_count_db.is_new:
-            self.logger.info('created new tx count db')
-        self.logger.info(f'opened tx count DB (for sync: {for_sync})')
-
-        assert self.hashes_db is None
-        self.hashes_db = self.db_class('hashes', for_sync)
-        if self.hashes_db.is_new:
-            self.logger.info('created new tx hashes db')
-        self.logger.info(f'opened tx hashes DB (for sync: {for_sync})')
+        assert self.tx_db is None
+        self.tx_db = self.db_class('tx', for_sync)
+        if self.tx_db.is_new:
+            self.logger.info('created new tx db')
+        self.logger.info(f'opened tx DB (for sync: {for_sync})')
 
         assert self.utxo_db is None
         # First UTXO DB
@@ -156,8 +150,7 @@ class LevelDB:
         self.utxo_db.close()
         self.history.close_db()
         self.headers_db.close()
-        self.tx_count_db.close()
-        self.hashes_db.close()
+        self.tx_db.close()
         self.executor.shutdown(wait=True)
         self.executor = None
 
@@ -187,12 +180,9 @@ class LevelDB:
         if self.headers_db:
             self.headers_db.close()
             self.headers_db = None
-        if self.tx_count_db:
-            self.tx_count_db.close()
-            self.tx_count_db = None
-        if self.hashes_db:
-            self.hashes_db.close()
-            self.hashes_db = None
+        if self.tx_db:
+            self.tx_db.close()
+            self.tx_db = None
 
         await self._open_dbs(False, False)
         self.logger.info("opened for serving")
@@ -217,7 +207,7 @@ class LevelDB:
         assert flush_data.height == self.fs_height == self.db_height
         assert flush_data.tip == self.db_tip
         assert not flush_data.headers
-        assert not flush_data.block_tx_hashes
+        assert not flush_data.block_txs
         assert not flush_data.adds
         assert not flush_data.deletes
         assert not flush_data.undo_infos
@@ -275,30 +265,33 @@ class LevelDB:
         """
         prior_tx_count = (self.tx_counts[self.fs_height]
                           if self.fs_height >= 0 else 0)
-        assert len(flush_data.block_tx_hashes) == len(flush_data.headers)
+        assert len(flush_data.block_txs) == len(flush_data.headers)
         assert flush_data.height == self.fs_height + len(flush_data.headers)
         assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
                                        else 0)
         assert len(self.tx_counts) == flush_data.height + 1
-        assert len(b''.join(flush_data.block_tx_hashes)) // 32 == flush_data.tx_count - prior_tx_count
+        assert len(
+            b''.join(hashes for hashes, _ in flush_data.block_txs)
+        ) // 32 == flush_data.tx_count - prior_tx_count
 
         # Write the headers, tx counts, and tx hashes
         start_time = time.perf_counter()
         height_start = self.fs_height + 1
         tx_num = prior_tx_count
 
-        for header, tx_hashes in zip(flush_data.headers, flush_data.block_tx_hashes):
+        for header, (tx_hashes, txs) in zip(flush_data.headers, flush_data.block_txs):
             tx_count = self.tx_counts[height_start]
             self.headers_db.put(HEADER_PREFIX + util.pack_be_uint64(height_start), header)
-            self.tx_count_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
+            self.tx_db.put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
             height_start += 1
             offset = 0
             while offset < len(tx_hashes):
-                self.hashes_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32])
+                self.tx_db.put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32])
+                self.tx_db.put(TX_PREFIX + util.pack_be_uint64(tx_num), txs[offset // 32])
                 tx_num += 1
                 offset += 32
 
-        flush_data.block_tx_hashes.clear()
+        flush_data.block_txs.clear()
         self.fs_height = flush_data.height
         self.fs_tx_count = flush_data.tx_count
         flush_data.headers.clear()
@@ -362,7 +355,7 @@ class LevelDB:
     def flush_backup(self, flush_data, touched):
         """Like flush_dbs() but when backing up.  All UTXOs are flushed."""
         assert not flush_data.headers
-        assert not flush_data.block_tx_hashes
+        assert not flush_data.block_txs
         assert flush_data.height < self.db_height
         self.history.assert_flushed()
 
@@ -431,7 +424,7 @@ class LevelDB:
         if tx_height > self.db_height:
             tx_hash = None
         else:
-            tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
+            tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
         return tx_hash, tx_height
 
     async def fs_block_hashes(self, height, count):
@@ -463,7 +456,7 @@ class LevelDB:
                     if tx_height > self.db_height:
                         tx_hash = None
                     else:
-                        tx_hash = self.hashes_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
+                        tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num))
                     hashx_history.append((tx_hash, tx_height))
                     if limit and len(hashx_history) >= limit:
                         return hashx_history