removed Takeovers for now

This commit is contained in:
Lex Berezhny 2020-06-26 10:39:16 -04:00
parent e64b108404
commit 4cb4659489
5 changed files with 68 additions and 120 deletions

View file

@ -179,9 +179,6 @@ class Database:
async def execute_fetchall(self, sql):
return await self.run_in_executor(q.execute_fetchall, sql)
async def process_inputs_outputs(self):
return await self.run_in_executor(sync.process_inputs_outputs)
async def process_all_things_after_sync(self):
return await self.run_in_executor(sync.process_all_things_after_sync)

View file

@ -27,8 +27,7 @@ from .tables import (
metadata,
SCHEMA_VERSION, Version,
Block, TX, TXO, TXI, txi_join_account, txo_join_account,
Claim, Support, Takeover,
PubkeyAddress, AccountAddress
Claim, Support, PubkeyAddress, AccountAddress
)
@ -158,33 +157,6 @@ def release_all_outputs(account_id):
)
def get_takeover_names(above_height, limit_height, offset, limit):
return context().fetchall(
select(
Takeover.c.normalized.label('_name'),
func.max(Takeover.c.height).label('_height'),
)
.where((Takeover.c.height < above_height) & (Takeover.c.height >= limit_height))
.group_by(Takeover.c.normalized)
.limit(limit).offset(offset)
)
def get_takeovers(above_height, limit_height, offset, limit):
return context().fetchall(
select(
Takeover.c.normalized,
Takeover.c.claim_hash,
Takeover.c.height,
)
.select_from(Takeover)
.where((Takeover.c.height < above_height) & (Takeover.c.height >= limit_height))
.group_by(Takeover.c.normalized)
.limit(limit).offset(offset)
)
def select_transactions(cols, account_ids=None, **constraints):
s: Select = select(*cols).select_from(TX)
if not {'tx_hash', 'tx_hash__in'}.intersection(constraints):

View file

@ -18,7 +18,7 @@ from lbry.schema.result import Censor
from lbry.schema.mime_types import guess_stream_type
from .utils import pg_insert, chunk
from .tables import Block, TX, TXO, TXI, Claim, Tag, Takeover, Support
from .tables import Block, TX, TXO, TXI, Claim, Tag, Support
from .constants import TXO_TYPES, STREAM_TYPES
@ -163,7 +163,6 @@ class ProgressUnit(Enum):
TASKS = "tasks", None
BLOCKS = "blocks", Block
TXS = "txs", TX
TAKEOVERS = "takeovers", Takeover
TXIS = "txis", TXI
CLAIMS = "claims", Claim
SUPPORTS = "supports", Support
@ -178,18 +177,20 @@ class ProgressUnit(Enum):
class Event(Enum):
START = "blockchain.sync.start", ProgressUnit.BLOCKS
COMPLETE = "blockchain.sync.complete", ProgressUnit.BLOCKS
# full node specific sync events
BLOCK_READ = "blockchain.sync.block.read", ProgressUnit.BLOCKS
BLOCK_SAVE = "blockchain.sync.block.save", ProgressUnit.TXS
BLOCK_DONE = "blockchain.sync.block.done", ProgressUnit.TASKS
CLAIM_TRIE = "blockchain.sync.claim.trie", ProgressUnit.TAKEOVERS
BLOCK_FILTER = "blockchain.sync.block.filter", ProgressUnit.BLOCKS
CLAIM_META = "blockchain.sync.claim.update", ProgressUnit.CLAIMS
CLAIM_CALC = "blockchain.sync.claim.totals", ProgressUnit.CLAIMS
CLAIM_TRIE = "blockchain.sync.claim.takeovers", ProgressUnit.CLAIMS
STAKE_CALC = "blockchain.sync.claim.stakes", ProgressUnit.CLAIMS
CLAIM_CHAN = "blockchain.sync.claim.channels", ProgressUnit.CLAIMS
CLAIM_SIGN = "blockchain.sync.claim.signatures", ProgressUnit.CLAIMS
SUPPORT_META = "blockchain.sync.support.update", ProgressUnit.SUPPORTS
SUPPORT_SIGN = "blockchain.sync.support.signatures", ProgressUnit.SUPPORTS
TRENDING_CALC = "blockchain.sync.trending", ProgressUnit.BLOCKS
TAKEOVER_INSERT = "blockchain.sync.takeover.insert", ProgressUnit.TAKEOVERS
# full node + light client sync events
INPUT_UPDATE = "db.sync.input", ProgressUnit.TXIS
@ -245,10 +246,10 @@ class ProgressContext:
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.ctx.message_queue.put(self.get_event_args(self.total))
self.ctx.__exit__(exc_type, exc_val, exc_tb)
if exc_type == BreakProgress:
return True
self.ctx.message_queue.put(self.get_event_args(self.total))
return self.ctx.__exit__(exc_type, exc_val, exc_tb)
def start(self, total, extra=None):
if not total:

View file

@ -10,46 +10,67 @@ from lbry.db.tables import (
def process_all_things_after_sync():
process_inputs_outputs()
process_supports()
process_claim_deletes()
process_claim_changes()
def process_inputs_outputs():
with progress(Event.INPUT_UPDATE) as p:
p.start(2)
if p.ctx.is_sqlite:
address_query = select(TXO.c.address).where(TXI.c.txo_hash == TXO.c.txo_hash)
set_addresses = (
TXI.update()
.values(address=address_query.scalar_subquery())
.where(TXI.c.address == None)
)
else:
set_addresses = (
TXI.update()
.values({TXI.c.address: TXO.c.address})
.where((TXI.c.address == None) & (TXI.c.txo_hash == TXO.c.txo_hash))
)
# 1. Update TXIs to have the address of TXO they are spending.
p.ctx.execute(set_addresses)
set_input_addresses(p.ctx)
p.step(1)
# 2. Update spent TXOs setting is_spent = True
set_is_spent = (
TXO.update()
.values({TXO.c.is_spent: True})
.where(
(TXO.c.is_spent == False) &
(TXO.c.txo_hash.in_(select(TXI.c.txo_hash)))
)
)
p.ctx.execute(set_is_spent)
update_spent_outputs(p.ctx)
p.step(2)
with progress(Event.SUPPORT_DELETE) as p:
p.start(1)
sql = Support.delete().where(condition_spent_supports)
p.ctx.execute(sql)
with progress(Event.SUPPORT_INSERT) as p:
loader = p.ctx.get_bulk_loader()
for support in rows_to_txos(p.ctx.fetchall(select_missing_supports)):
loader.add_support(support)
loader.save()
with progress(Event.CLAIM_DELETE) as p:
p.start(1)
sql = Claim.delete().where(condition_spent_claims())
p.ctx.execute(sql)
with progress(Event.CLAIM_INSERT) as p:
loader = p.ctx.get_bulk_loader()
for claim in rows_to_txos(p.ctx.fetchall(select_missing_claims)):
loader.add_claim(claim)
loader.save()
with progress(Event.CLAIM_UPDATE) as p:
loader = p.ctx.get_bulk_loader()
for claim in rows_to_txos(p.ctx.fetchall(select_stale_claims)):
loader.update_claim(claim)
loader.save()
def set_input_addresses(ctx):
# Update TXIs to have the address of TXO they are spending.
if ctx.is_sqlite:
address_query = select(TXO.c.address).where(TXI.c.txo_hash == TXO.c.txo_hash)
set_addresses = (
TXI.update()
.values(address=address_query.scalar_subquery())
.where(TXI.c.address == None)
)
else:
set_addresses = (
TXI.update()
.values({TXI.c.address: TXO.c.address})
.where((TXI.c.address == None) & (TXI.c.txo_hash == TXO.c.txo_hash))
)
ctx.execute(set_addresses)
def update_spent_outputs(ctx):
# Update spent TXOs setting is_spent = True
set_is_spent = (
TXO.update()
.values({TXO.c.is_spent: True})
.where(
(TXO.c.is_spent == False) &
(TXO.c.txo_hash.in_(select(TXI.c.txo_hash)))
)
)
ctx.execute(set_is_spent)
def condition_spent_claims(claim_type: list = None):
@ -96,36 +117,3 @@ condition_spent_supports = (
select_missing_supports = (
select_txos(txo_type=TXO_TYPES['support'], is_spent=False, txo_id_not_in_support_table=True)
)
def process_supports():
with progress(Event.SUPPORT_DELETE) as p:
p.start(1)
sql = Support.delete().where(condition_spent_supports)
p.ctx.execute(sql)
with progress(Event.SUPPORT_INSERT) as p:
loader = p.ctx.get_bulk_loader()
for support in rows_to_txos(p.ctx.fetchall(select_missing_supports)):
loader.add_support(support)
loader.save()
def process_claim_deletes():
with progress(Event.CLAIM_DELETE) as p:
p.start(1)
sql = Claim.delete().where(condition_spent_claims())
p.ctx.execute(sql)
def process_claim_changes():
with progress(Event.CLAIM_INSERT) as p:
loader = p.ctx.get_bulk_loader()
for claim in rows_to_txos(p.ctx.fetchall(select_missing_claims)):
loader.add_claim(claim)
loader.save()
with progress(Event.CLAIM_UPDATE) as p:
loader = p.ctx.get_bulk_loader()
for claim in rows_to_txos(p.ctx.fetchall(select_stale_claims)):
loader.update_claim(claim)
loader.save()

View file

@ -178,8 +178,6 @@ Support = Table(
Column('address', Text),
Column('amount', BigInteger),
Column('height', Integer),
Column('activation_height', Integer, nullable=True),
Column('expiration_height', Integer, nullable=True),
# support metadata
Column('emoji', Text),
@ -190,11 +188,3 @@ Support = Table(
Column('signature_digest', LargeBinary, nullable=True),
Column('is_signature_valid', Boolean, nullable=True),
)
Takeover = Table(
'takeover', metadata,
Column('normalized', Text),
Column('claim_hash', LargeBinary),
Column('height', Integer),
)