lbry-sdk/lbry/db/sync.py

104 lines
3.7 KiB
Python
Raw Normal View History

2020-06-05 06:35:22 +02:00
from sqlalchemy.future import select
2020-06-19 20:28:34 +02:00
from lbry.db.query_context import progress, Event
2020-08-20 19:31:58 +02:00
from lbry.db.tables import TX, TXI, TXO, Claim, Support
2020-07-13 21:45:21 +02:00
from .constants import TXO_TYPES, CLAIM_TYPE_CODES
from .queries import (
2020-08-20 19:31:58 +02:00
BASE_SELECT_TXO_COLUMNS,
2020-07-13 21:45:21 +02:00
rows_to_txos, where_unspent_txos,
where_abandoned_supports,
where_abandoned_claims
)
SPENDS_UPDATE_EVENT = Event.add("client.sync.spends.update", "steps")
CLAIMS_INSERT_EVENT = Event.add("client.sync.claims.insert", "claims")
CLAIMS_UPDATE_EVENT = Event.add("client.sync.claims.update", "claims")
CLAIMS_DELETE_EVENT = Event.add("client.sync.claims.delete", "claims")
2020-07-13 21:50:00 +02:00
SUPPORT_INSERT_EVENT = Event.add("client.sync.supports.insert", "supports")
SUPPORT_UPDATE_EVENT = Event.add("client.sync.supports.update", "supports")
SUPPORT_DELETE_EVENT = Event.add("client.sync.supports.delete", "supports")
2020-06-05 06:35:22 +02:00
2020-06-19 20:28:34 +02:00
def process_all_things_after_sync():
2020-07-13 21:45:21 +02:00
with progress(SPENDS_UPDATE_EVENT) as p:
2020-06-26 16:39:16 +02:00
p.start(2)
update_spent_outputs(p.ctx)
2020-08-20 19:31:58 +02:00
p.step(1)
set_input_addresses(p.ctx)
2020-06-26 16:39:16 +02:00
p.step(2)
2020-07-13 21:45:21 +02:00
with progress(SUPPORT_DELETE_EVENT) as p:
2020-06-26 16:39:16 +02:00
p.start(1)
2020-07-13 21:45:21 +02:00
sql = Support.delete().where(where_abandoned_supports())
2020-06-26 16:39:16 +02:00
p.ctx.execute(sql)
2020-07-13 21:45:21 +02:00
with progress(SUPPORT_INSERT_EVENT) as p:
2020-06-26 16:39:16 +02:00
loader = p.ctx.get_bulk_loader()
2020-08-20 19:31:58 +02:00
sql = (
select(*BASE_SELECT_TXO_COLUMNS)
.where(where_unspent_txos(TXO_TYPES['support'], missing_in_supports_table=True))
.select_from(TXO.join(TX))
)
2020-07-13 21:45:21 +02:00
for support in rows_to_txos(p.ctx.fetchall(sql)):
2020-06-26 16:39:16 +02:00
loader.add_support(support)
2020-07-13 21:45:21 +02:00
loader.flush(Support)
with progress(CLAIMS_DELETE_EVENT) as p:
2020-06-26 16:39:16 +02:00
p.start(1)
2020-07-13 21:45:21 +02:00
sql = Claim.delete().where(where_abandoned_claims())
2020-06-26 16:39:16 +02:00
p.ctx.execute(sql)
2020-07-13 21:45:21 +02:00
with progress(CLAIMS_INSERT_EVENT) as p:
2020-06-26 16:39:16 +02:00
loader = p.ctx.get_bulk_loader()
2020-08-20 19:31:58 +02:00
sql = (
select(*BASE_SELECT_TXO_COLUMNS)
.where(where_unspent_txos(CLAIM_TYPE_CODES, missing_in_claims_table=True))
.select_from(TXO.join(TX))
)
2020-07-13 21:45:21 +02:00
for claim in rows_to_txos(p.ctx.fetchall(sql)):
2020-08-20 19:31:58 +02:00
loader.add_claim(claim, '', 0, 0, 0, 0, staked_support_amount=0, staked_support_count=0)
2020-07-13 21:45:21 +02:00
loader.flush(Claim)
with progress(CLAIMS_UPDATE_EVENT) as p:
2020-06-26 16:39:16 +02:00
loader = p.ctx.get_bulk_loader()
2020-08-20 19:31:58 +02:00
sql = (
select(*BASE_SELECT_TXO_COLUMNS)
.where(where_unspent_txos(CLAIM_TYPE_CODES, missing_or_stale_in_claims_table=True))
.select_from(TXO.join(TX))
)
2020-07-13 21:45:21 +02:00
for claim in rows_to_txos(p.ctx.fetchall(sql)):
2020-06-26 16:39:16 +02:00
loader.update_claim(claim)
2020-07-13 21:45:21 +02:00
loader.flush(Claim)
2020-06-19 20:28:34 +02:00
2020-06-26 16:39:16 +02:00
def set_input_addresses(ctx):
# Update TXIs to have the address of TXO they are spending.
if ctx.is_sqlite:
address_query = select(TXO.c.address).where(TXI.c.txo_hash == TXO.c.txo_hash)
set_addresses = (
TXI.update()
.values(address=address_query.scalar_subquery())
2020-07-12 00:18:33 +02:00
.where(TXI.c.address.is_(None))
2020-06-26 16:39:16 +02:00
)
else:
set_addresses = (
TXI.update()
.values({TXI.c.address: TXO.c.address})
2020-07-12 00:18:33 +02:00
.where((TXI.c.address.is_(None)) & (TXI.c.txo_hash == TXO.c.txo_hash))
2020-06-26 16:39:16 +02:00
)
ctx.execute(set_addresses)
2020-06-19 20:28:34 +02:00
2020-06-26 16:39:16 +02:00
def update_spent_outputs(ctx):
2020-07-06 05:03:45 +02:00
# Update spent TXOs setting spent_height
set_spent_height = (
2020-06-26 16:39:16 +02:00
TXO.update()
2020-07-06 05:03:45 +02:00
.values({
TXO.c.spent_height: (
select(TXI.c.height)
.where(TXI.c.txo_hash == TXO.c.txo_hash)
2020-07-07 04:42:15 +02:00
.scalar_subquery()
2020-07-06 05:03:45 +02:00
)
}).where(
(TXO.c.spent_height == 0) &
2020-07-12 22:27:51 +02:00
(TXO.c.txo_hash.in_(select(TXI.c.txo_hash).where(TXI.c.address.is_(None))))
2020-06-19 20:28:34 +02:00
)
2020-06-26 16:39:16 +02:00
)
2020-07-06 05:03:45 +02:00
ctx.execute(set_spent_height)