diff --git a/lbry/db/database.py b/lbry/db/database.py index 93d299b32..c3f14264c 100644 --- a/lbry/db/database.py +++ b/lbry/db/database.py @@ -179,9 +179,6 @@ class Database: async def execute_fetchall(self, sql): return await self.run_in_executor(q.execute_fetchall, sql) - async def process_inputs_outputs(self): - return await self.run_in_executor(sync.process_inputs_outputs) - async def process_all_things_after_sync(self): return await self.run_in_executor(sync.process_all_things_after_sync) diff --git a/lbry/db/queries.py b/lbry/db/queries.py index 3e5a60e41..a58d6f4a4 100644 --- a/lbry/db/queries.py +++ b/lbry/db/queries.py @@ -27,8 +27,7 @@ from .tables import ( metadata, SCHEMA_VERSION, Version, Block, TX, TXO, TXI, txi_join_account, txo_join_account, - Claim, Support, Takeover, - PubkeyAddress, AccountAddress + Claim, Support, PubkeyAddress, AccountAddress ) @@ -158,33 +157,6 @@ def release_all_outputs(account_id): ) -def get_takeover_names(above_height, limit_height, offset, limit): - return context().fetchall( - select( - Takeover.c.normalized.label('_name'), - func.max(Takeover.c.height).label('_height'), - - ) - .where((Takeover.c.height < above_height) & (Takeover.c.height >= limit_height)) - .group_by(Takeover.c.normalized) - .limit(limit).offset(offset) - ) - - -def get_takeovers(above_height, limit_height, offset, limit): - return context().fetchall( - select( - Takeover.c.normalized, - Takeover.c.claim_hash, - Takeover.c.height, - ) - .select_from(Takeover) - .where((Takeover.c.height < above_height) & (Takeover.c.height >= limit_height)) - .group_by(Takeover.c.normalized) - .limit(limit).offset(offset) - ) - - def select_transactions(cols, account_ids=None, **constraints): s: Select = select(*cols).select_from(TX) if not {'tx_hash', 'tx_hash__in'}.intersection(constraints): diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index 9b4dad933..8cf74ac33 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -18,7 +18,7 @@ from lbry.schema.result import Censor from lbry.schema.mime_types import guess_stream_type from .utils import pg_insert, chunk -from .tables import Block, TX, TXO, TXI, Claim, Tag, Takeover, Support +from .tables import Block, TX, TXO, TXI, Claim, Tag, Support from .constants import TXO_TYPES, STREAM_TYPES @@ -163,7 +163,6 @@ class ProgressUnit(Enum): TASKS = "tasks", None BLOCKS = "blocks", Block TXS = "txs", TX - TAKEOVERS = "takeovers", Takeover TXIS = "txis", TXI CLAIMS = "claims", Claim SUPPORTS = "supports", Support @@ -178,18 +177,20 @@ class ProgressUnit(Enum): class Event(Enum): + START = "blockchain.sync.start", ProgressUnit.BLOCKS + COMPLETE = "blockchain.sync.complete", ProgressUnit.BLOCKS + # full node specific sync events BLOCK_READ = "blockchain.sync.block.read", ProgressUnit.BLOCKS BLOCK_SAVE = "blockchain.sync.block.save", ProgressUnit.TXS - BLOCK_DONE = "blockchain.sync.block.done", ProgressUnit.TASKS - CLAIM_TRIE = "blockchain.sync.claim.trie", ProgressUnit.TAKEOVERS + BLOCK_FILTER = "blockchain.sync.block.filter", ProgressUnit.BLOCKS CLAIM_META = "blockchain.sync.claim.update", ProgressUnit.CLAIMS - CLAIM_CALC = "blockchain.sync.claim.totals", ProgressUnit.CLAIMS + CLAIM_TRIE = "blockchain.sync.claim.takeovers", ProgressUnit.CLAIMS + STAKE_CALC = "blockchain.sync.claim.stakes", ProgressUnit.CLAIMS + CLAIM_CHAN = "blockchain.sync.claim.channels", ProgressUnit.CLAIMS CLAIM_SIGN = "blockchain.sync.claim.signatures", ProgressUnit.CLAIMS - SUPPORT_META = "blockchain.sync.support.update", ProgressUnit.SUPPORTS SUPPORT_SIGN = "blockchain.sync.support.signatures", ProgressUnit.SUPPORTS TRENDING_CALC = "blockchain.sync.trending", ProgressUnit.BLOCKS - TAKEOVER_INSERT = "blockchain.sync.takeover.insert", ProgressUnit.TAKEOVERS # full node + light client sync events INPUT_UPDATE = "db.sync.input", ProgressUnit.TXIS @@ -245,10 +246,10 @@ class ProgressContext: return self def __exit__(self, exc_type, exc_val, exc_tb): + self.ctx.message_queue.put(self.get_event_args(self.total)) + self.ctx.__exit__(exc_type, exc_val, exc_tb) if exc_type == BreakProgress: return True - self.ctx.message_queue.put(self.get_event_args(self.total)) - return self.ctx.__exit__(exc_type, exc_val, exc_tb) def start(self, total, extra=None): if not total: diff --git a/lbry/db/sync.py b/lbry/db/sync.py index 548df13d6..354e8d67f 100644 --- a/lbry/db/sync.py +++ b/lbry/db/sync.py @@ -10,46 +10,67 @@ from lbry.db.tables import ( def process_all_things_after_sync(): - process_inputs_outputs() - process_supports() - process_claim_deletes() - process_claim_changes() - - -def process_inputs_outputs(): - with progress(Event.INPUT_UPDATE) as p: p.start(2) - - if p.ctx.is_sqlite: - address_query = select(TXO.c.address).where(TXI.c.txo_hash == TXO.c.txo_hash) - set_addresses = ( - TXI.update() - .values(address=address_query.scalar_subquery()) - .where(TXI.c.address == None) - ) - else: - set_addresses = ( - TXI.update() - .values({TXI.c.address: TXO.c.address}) - .where((TXI.c.address == None) & (TXI.c.txo_hash == TXO.c.txo_hash)) - ) - - # 1. Update TXIs to have the address of TXO they are spending. - p.ctx.execute(set_addresses) + set_input_addresses(p.ctx) p.step(1) - - # 2. Update spent TXOs setting is_spent = True - set_is_spent = ( - TXO.update() - .values({TXO.c.is_spent: True}) - .where( - (TXO.c.is_spent == False) & - (TXO.c.txo_hash.in_(select(TXI.c.txo_hash))) - ) - ) - p.ctx.execute(set_is_spent) + update_spent_outputs(p.ctx) p.step(2) + with progress(Event.SUPPORT_DELETE) as p: + p.start(1) + sql = Support.delete().where(condition_spent_supports) + p.ctx.execute(sql) + with progress(Event.SUPPORT_INSERT) as p: + loader = p.ctx.get_bulk_loader() + for support in rows_to_txos(p.ctx.fetchall(select_missing_supports)): + loader.add_support(support) + loader.save() + with progress(Event.CLAIM_DELETE) as p: + p.start(1) + sql = Claim.delete().where(condition_spent_claims()) + p.ctx.execute(sql) + with progress(Event.CLAIM_INSERT) as p: + loader = p.ctx.get_bulk_loader() + for claim in rows_to_txos(p.ctx.fetchall(select_missing_claims)): + loader.add_claim(claim) + loader.save() + with progress(Event.CLAIM_UPDATE) as p: + loader = p.ctx.get_bulk_loader() + for claim in rows_to_txos(p.ctx.fetchall(select_stale_claims)): + loader.update_claim(claim) + loader.save() + + +def set_input_addresses(ctx): + # Update TXIs to have the address of TXO they are spending. + if ctx.is_sqlite: + address_query = select(TXO.c.address).where(TXI.c.txo_hash == TXO.c.txo_hash) + set_addresses = ( + TXI.update() + .values(address=address_query.scalar_subquery()) + .where(TXI.c.address == None) + ) + else: + set_addresses = ( + TXI.update() + .values({TXI.c.address: TXO.c.address}) + .where((TXI.c.address == None) & (TXI.c.txo_hash == TXO.c.txo_hash)) + ) + + ctx.execute(set_addresses) + + +def update_spent_outputs(ctx): + # Update spent TXOs setting is_spent = True + set_is_spent = ( + TXO.update() + .values({TXO.c.is_spent: True}) + .where( + (TXO.c.is_spent == False) & + (TXO.c.txo_hash.in_(select(TXI.c.txo_hash))) + ) + ) + ctx.execute(set_is_spent) def condition_spent_claims(claim_type: list = None): @@ -96,36 +117,3 @@ condition_spent_supports = ( select_missing_supports = ( select_txos(txo_type=TXO_TYPES['support'], is_spent=False, txo_id_not_in_support_table=True) ) - - -def process_supports(): - with progress(Event.SUPPORT_DELETE) as p: - p.start(1) - sql = Support.delete().where(condition_spent_supports) - p.ctx.execute(sql) - with progress(Event.SUPPORT_INSERT) as p: - loader = p.ctx.get_bulk_loader() - for support in rows_to_txos(p.ctx.fetchall(select_missing_supports)): - loader.add_support(support) - loader.save() - - -def process_claim_deletes(): - with progress(Event.CLAIM_DELETE) as p: - p.start(1) - sql = Claim.delete().where(condition_spent_claims()) - p.ctx.execute(sql) - - -def process_claim_changes(): - with progress(Event.CLAIM_INSERT) as p: - loader = p.ctx.get_bulk_loader() - for claim in rows_to_txos(p.ctx.fetchall(select_missing_claims)): - loader.add_claim(claim) - loader.save() - - with progress(Event.CLAIM_UPDATE) as p: - loader = p.ctx.get_bulk_loader() - for claim in rows_to_txos(p.ctx.fetchall(select_stale_claims)): - loader.update_claim(claim) - loader.save() diff --git a/lbry/db/tables.py b/lbry/db/tables.py index 3fd5909ef..dcd61314c 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -178,8 +178,6 @@ Support = Table( Column('address', Text), Column('amount', BigInteger), Column('height', Integer), - Column('activation_height', Integer, nullable=True), - Column('expiration_height', Integer, nullable=True), # support metadata Column('emoji', Text), @@ -190,11 +188,3 @@ Support = Table( Column('signature_digest', LargeBinary, nullable=True), Column('is_signature_valid', Boolean, nullable=True), ) - - -Takeover = Table( - 'takeover', metadata, - Column('normalized', Text), - Column('claim_hash', LargeBinary), - Column('height', Integer), -)