This commit is contained in:
Lex Berezhny 2020-07-13 15:45:21 -04:00
parent 8fd92cb649
commit 248e04089b
11 changed files with 70 additions and 106 deletions

View file

@ -75,7 +75,7 @@ def sync_spends(initial_sync: bool, p: ProgressContext):
p.ctx.execute(text("ALTER TABLE txi DROP CONSTRAINT txi_pkey;")) p.ctx.execute(text("ALTER TABLE txi DROP CONSTRAINT txi_pkey;"))
p.step() p.step()
# 3. insert # 3. insert
old_txi = table("old_txi", *(c.copy() for c in TXI.columns)) old_txi = table("old_txi", *(c.copy() for c in TXI.columns)) # pylint: disable=not-an-iterable
columns = [c for c in old_txi.columns if c.name != "address"] + [TXO.c.address] columns = [c for c in old_txi.columns if c.name != "address"] + [TXO.c.address]
join_txi_on_txo = old_txi.join(TXO, old_txi.c.txo_hash == TXO.c.txo_hash) join_txi_on_txo = old_txi.join(TXO, old_txi.c.txo_hash == TXO.c.txo_hash)
select_txis = select(*columns).select_from(join_txi_on_txo) select_txis = select(*columns).select_from(join_txi_on_txo)
@ -100,7 +100,7 @@ def sync_spends(initial_sync: bool, p: ProgressContext):
p.ctx.execute(text("ALTER TABLE txo DROP CONSTRAINT txo_pkey;")) p.ctx.execute(text("ALTER TABLE txo DROP CONSTRAINT txo_pkey;"))
p.step() p.step()
# 7. insert # 7. insert
old_txo = table("old_txo", *(c.copy() for c in TXO.columns)) old_txo = table("old_txo", *(c.copy() for c in TXO.columns)) # pylint: disable=not-an-iterable
columns = [c for c in old_txo.columns if c.name != "spent_height"] columns = [c for c in old_txo.columns if c.name != "spent_height"]
insert_columns = columns + [TXO.c.spent_height] insert_columns = columns + [TXO.c.spent_height]
select_columns = columns + [ select_columns = columns + [

View file

@ -130,10 +130,9 @@ def claims_insert(
metadata = claim_metadata[i] if i < len(claim_metadata) else None metadata = claim_metadata[i] if i < len(claim_metadata) else None
if metadata is None: if metadata is None:
break break
elif metadata['claim_hash'] != row.claim_hash: if metadata['claim_hash'] != row.claim_hash:
continue continue
else: i += 1
i += 1
txo, extra = row_to_claim_for_saving(row) txo, extra = row_to_claim_for_saving(row)
extra.update({ extra.update({
'short_url': metadata['short_url'], 'short_url': metadata['short_url'],

View file

@ -150,7 +150,7 @@ class Input(InputOutput):
@classmethod @classmethod
def create_coinbase(cls) -> 'Input': def create_coinbase(cls) -> 'Input':
tx_ref = TXRefImmutable.from_hash(cls.NULL_HASH32, 0) tx_ref = TXRefImmutable.from_hash(cls.NULL_HASH32, 0, 0)
txo_ref = TXORef(tx_ref, 0) txo_ref = TXORef(tx_ref, 0)
return cls(txo_ref, b'beef') return cls(txo_ref, b'beef')

View file

@ -6,7 +6,7 @@ from typing import Dict, Any
from tempfile import TemporaryFile from tempfile import TemporaryFile
from tqdm.std import tqdm, Bar from tqdm.std import tqdm, Bar
from tqdm.utils import FormatReplace, _unicode, disp_len, disp_trim, _is_ascii, _unich from tqdm.utils import FormatReplace, _unicode, disp_len, disp_trim, _is_ascii
from lbry import __version__ from lbry import __version__
from lbry.service.base import Service from lbry.service.base import Service
@ -26,6 +26,8 @@ class RedirectOutput:
self.stream_no = getattr(sys, stream_type).fileno() self.stream_no = getattr(sys, stream_type).fileno()
self.last_flush = time.time() self.last_flush = time.time()
self.last_read = 0 self.last_read = 0
self.backup = None
self.file = None
def __enter__(self): def __enter__(self):
self.backup = os.dup(self.stream_no) self.backup = os.dup(self.stream_no)
@ -142,10 +144,10 @@ class Bar2(Bar):
return ''.join(bar) return ''.join(bar)
class tqdm2(tqdm): class tqdm2(tqdm): # pylint: disable=invalid-name
def __init__(self, initial=(0, 0), unit=('it', 'it'), total=(None, None), **kwargs): def __init__(self, initial=(0, 0), unit=('it', 'it'), total=(None, None), **kwargs):
self.n2 = self.last_print_n2 = initial[1] self.n2 = self.last_print_n2 = initial[1] # pylint: disable=invalid-name
self.unit2 = unit[1] self.unit2 = unit[1]
self.total2 = total[1] self.total2 = total[1]
super().__init__(initial=initial[0], unit=unit[0], total=total[0], **kwargs) super().__init__(initial=initial[0], unit=unit[0], total=total[0], **kwargs)
@ -171,7 +173,7 @@ class tqdm2(tqdm):
@staticmethod @staticmethod
def format_meter( def format_meter(
n, total, elapsed, ncols=None, prefix='', ascii=False, n, total, elapsed, ncols=None, prefix='', ascii=False, # pylint: disable=redefined-builtin
unit='it', unit_scale=False, rate=None, bar_format=None, unit='it', unit_scale=False, rate=None, bar_format=None,
postfix=None, unit_divisor=1000, **extra_kwargs postfix=None, unit_divisor=1000, **extra_kwargs
): ):
@ -251,7 +253,7 @@ class tqdm2(tqdm):
# total is known: we can predict some stats # total is known: we can predict some stats
if total: if total:
n2, total2 = extra_kwargs['n2'], extra_kwargs['total2'] n2, total2 = extra_kwargs['n2'], extra_kwargs['total2'] # pylint: disable=invalid-name
# fractional and percentage progress # fractional and percentage progress
frac = n / total frac = n / total
@ -411,43 +413,6 @@ class Advanced(Basic):
if d['done'][0] == -1 or d['done'][0] == bar.total: if d['done'][0] == -1 or d['done'][0] == bar.total:
bar.close() bar.close()
def update_block_bars(self, event, d):
total_bar = self.bars[event[-4:]]
if event.endswith("read") and self.block_readers == 0:
total_bar.unpause()
if event.endswith("read") and d['step'] == d['total']:
self.block_readers -= 1
bar_name = f"block-{d['block_file']}"
bar = self.bars.get(bar_name)
if bar is None:
self.block_readers += 1
return self.get_or_create_bar(
bar_name,
f" ├─ blk{d['block_file']:05}.dat reading", 'blocks', d['total']
)
if event.endswith("save") and bar.unit == "blocks":
if self.block_savers == 0:
total_bar.unpause()
self.block_savers += 1
bar.desc = f" ├─ blk{d['block_file']:05}.dat saving"
bar.unit = "txs"
bar.reset(d['total'])
return
diff = d['step']-bar.n
bar.update(diff)
if event.endswith("save") and d['step'] == d['total']:
self.block_savers -= 1
bar.close()
total_bar.update(diff)
if total_bar.total == total_bar.n:
if total_bar.desc.endswith('txs saved'):
total_bar.desc = "├─ txs saved"
total_bar.refresh()
def update_other_bars(self, e, d): def update_other_bars(self, e, d):
if d['total'] == 0: if d['total'] == 0:
return return
@ -465,11 +430,6 @@ class Advanced(Basic):
#if d['step'] == d['total']: #if d['step'] == d['total']:
#bar.close() #bar.close()
def sync_complete(self):
self.bars['read'].postfix = (self.last_stats,)
for bar in self.bars.values():
bar.close()
def on_sync_progress(self, event): def on_sync_progress(self, event):
e, d = event['event'], event.get('data', {}) e, d = event['event'], event.get('data', {})
if e.endswith(".init"): if e.endswith(".init"):

View file

@ -14,9 +14,7 @@ from ..tables import (
) )
from ..utils import query, in_account_ids from ..utils import query, in_account_ids
from ..query_context import context from ..query_context import context
from ..constants import ( from ..constants import TXO_TYPES, CLAIM_TYPE_CODES, MAX_QUERY_VARIABLES
TXO_TYPES, CLAIM_TYPE_CODES, CONTENT_TYPE_CODES, MAX_QUERY_VARIABLES
)
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -326,8 +324,8 @@ def select_txos(
include_is_my_input = True include_is_my_input = True
s = s.where( s = s.where(
TXO.c.address.in_(my_addresses) | ( TXO.c.address.in_(my_addresses) | (
(TXI.c.address != None) & (TXI.c.address.isnot(None)) &
(TXI.c.address.in_(my_addresses)) (TXI.c.address.in_(my_addresses))
) )
) )
else: else:
@ -338,13 +336,13 @@ def select_txos(
if is_my_input: if is_my_input:
include_is_my_input = True include_is_my_input = True
s = s.where( s = s.where(
(TXI.c.address != None) & (TXI.c.address.isnot(None)) &
(TXI.c.address.in_(my_addresses)) (TXI.c.address.in_(my_addresses))
) )
elif is_my_input is False: elif is_my_input is False:
include_is_my_input = True include_is_my_input = True
s = s.where( s = s.where(
(TXI.c.address == None) | (TXI.c.address.is_(None)) |
(TXI.c.address.notin_(my_addresses)) (TXI.c.address.notin_(my_addresses))
) )
if exclude_internal_transfers: if exclude_internal_transfers:
@ -352,7 +350,7 @@ def select_txos(
s = s.where( s = s.where(
(TXO.c.txo_type != TXO_TYPES['other']) | (TXO.c.txo_type != TXO_TYPES['other']) |
(TXO.c.address.notin_(my_addresses)) (TXO.c.address.notin_(my_addresses))
(TXI.c.address == None) | (TXI.c.address.is_(None)) |
(TXI.c.address.notin_(my_addresses)) (TXI.c.address.notin_(my_addresses))
) )
joins = TXO.join(TX) joins = TXO.join(TX)
@ -439,15 +437,15 @@ def get_txos(no_tx=False, include_total=False, **constraints) -> Tuple[List[Outp
select_columns.append(text(f"{1 if constraints['is_my_input'] else 0} AS is_my_input")) select_columns.append(text(f"{1 if constraints['is_my_input'] else 0} AS is_my_input"))
else: else:
select_columns.append(( select_columns.append((
(TXI.c.address != None) & (TXI.c.address.isnot(None)) &
(TXI.c.address.in_(my_accounts)) (TXI.c.address.in_(my_accounts))
).label('is_my_input')) ).label('is_my_input'))
if include_received_tips: if include_received_tips:
support = TXO.alias('support') support = TXO.alias('support')
select_columns.append( select_columns.append(
select(func.coalesce(func.sum(support.c.amount), 0)) select(func.coalesce(func.sum(support.c.amount), 0))
.select_from(support).where( .select_from(support).where(
(support.c.claim_hash == TXO.c.claim_hash) & (support.c.claim_hash == TXO.c.claim_hash) &
(support.c.txo_type == TXO_TYPES['support']) & (support.c.txo_type == TXO_TYPES['support']) &
(support.c.address.in_(my_accounts)) & (support.c.address.in_(my_accounts)) &
@ -562,8 +560,8 @@ def get_supports_summary(self, **constraints):
def reserve_outputs(txo_hashes, is_reserved=True): def reserve_outputs(txo_hashes, is_reserved=True):
context().execute( context().execute(
TXO.update() TXO.update()
.values(is_reserved=is_reserved) .values(is_reserved=is_reserved)
.where(TXO.c.txo_hash.in_(txo_hashes)) .where(TXO.c.txo_hash.in_(txo_hashes))
) )

View file

@ -9,7 +9,7 @@ from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field from dataclasses import dataclass, field
from contextvars import ContextVar from contextvars import ContextVar
from sqlalchemy import create_engine, inspect, bindparam, func, exists, case, event as sqlalchemy_event from sqlalchemy import create_engine, inspect, bindparam, func, exists, event as sqlalchemy_event
from sqlalchemy.future import select from sqlalchemy.future import select
from sqlalchemy.engine import Engine, Connection from sqlalchemy.engine import Engine, Connection
from sqlalchemy.sql import Insert from sqlalchemy.sql import Insert
@ -25,7 +25,7 @@ from lbry.schema.tags import clean_tags
from lbry.schema.result import Censor from lbry.schema.result import Censor
from lbry.schema.mime_types import guess_stream_type from lbry.schema.mime_types import guess_stream_type
from .utils import pg_insert, chunk from .utils import pg_insert
from .tables import Block, TX, TXO, TXI, Claim, Tag, Support from .tables import Block, TX, TXO, TXI, Claim, Tag, Support
from .constants import TXO_TYPES, STREAM_TYPES from .constants import TXO_TYPES, STREAM_TYPES
@ -212,6 +212,7 @@ class Event:
__slots__ = 'id', 'name', 'units' __slots__ = 'id', 'name', 'units'
def __init__(self, name: str, units: Tuple[str]): def __init__(self, name: str, units: Tuple[str]):
self.id = None
self.name = name self.name = name
self.units = units self.units = units

View file

@ -1,40 +1,57 @@
from sqlalchemy.future import select from sqlalchemy.future import select
from lbry.db.query_context import progress, Event from lbry.db.query_context import progress, Event
from lbry.db.tables import TXI, TXO from lbry.db.tables import TXI, TXO, Claim, Support
from .queries import rows_to_txos from .constants import TXO_TYPES, CLAIM_TYPE_CODES
from .queries import (
rows_to_txos, where_unspent_txos,
where_abandoned_supports,
where_abandoned_claims
)
SPENDS_UPDATE_EVENT = Event.add("client.sync.spends.update", "steps")
CLAIMS_INSERT_EVENT = Event.add("client.sync.claims.insert", "claims")
CLAIMS_UPDATE_EVENT = Event.add("client.sync.claims.update", "claims")
CLAIMS_DELETE_EVENT = Event.add("client.sync.claims.delete", "claims")
SUPPORT_INSERT_EVENT = Event.add("client.sync.claims.insert", "supports")
SUPPORT_UPDATE_EVENT = Event.add("client.sync.claims.update", "supports")
SUPPORT_DELETE_EVENT = Event.add("client.sync.claims.delete", "supports")
def process_all_things_after_sync(): def process_all_things_after_sync():
with progress(Event.INPUT_UPDATE) as p: with progress(SPENDS_UPDATE_EVENT) as p:
p.start(2) p.start(2)
set_input_addresses(p.ctx) set_input_addresses(p.ctx)
p.step(1) p.step(1)
update_spent_outputs(p.ctx) update_spent_outputs(p.ctx)
p.step(2) p.step(2)
with progress(Event.SUPPORT_DELETE) as p: with progress(SUPPORT_DELETE_EVENT) as p:
p.start(1) p.start(1)
sql = Support.delete().where(condition_spent_supports) sql = Support.delete().where(where_abandoned_supports())
p.ctx.execute(sql) p.ctx.execute(sql)
with progress(Event.SUPPORT_INSERT) as p: with progress(SUPPORT_INSERT_EVENT) as p:
loader = p.ctx.get_bulk_loader() loader = p.ctx.get_bulk_loader()
for support in rows_to_txos(p.ctx.fetchall(select_missing_supports)): sql = where_unspent_txos(TXO_TYPES['support'], missing_in_supports_table=True)
for support in rows_to_txos(p.ctx.fetchall(sql)):
loader.add_support(support) loader.add_support(support)
loader.save() loader.flush(Support)
with progress(Event.CLAIM_DELETE) as p: with progress(CLAIMS_DELETE_EVENT) as p:
p.start(1) p.start(1)
sql = Claim.delete().where(condition_spent_claims()) sql = Claim.delete().where(where_abandoned_claims())
p.ctx.execute(sql) p.ctx.execute(sql)
with progress(Event.CLAIM_INSERT) as p: with progress(CLAIMS_INSERT_EVENT) as p:
loader = p.ctx.get_bulk_loader() loader = p.ctx.get_bulk_loader()
for claim in rows_to_txos(p.ctx.fetchall(select_missing_claims)): sql = where_unspent_txos(CLAIM_TYPE_CODES, missing_in_claims_table=True)
for claim in rows_to_txos(p.ctx.fetchall(sql)):
loader.add_claim(claim) loader.add_claim(claim)
loader.save() loader.flush(Claim)
with progress(Event.CLAIM_UPDATE) as p: with progress(CLAIMS_UPDATE_EVENT) as p:
loader = p.ctx.get_bulk_loader() loader = p.ctx.get_bulk_loader()
for claim in rows_to_txos(p.ctx.fetchall(select_stale_claims)): sql = where_unspent_txos(CLAIM_TYPE_CODES, missing_or_stale_in_claims_table=True)
for claim in rows_to_txos(p.ctx.fetchall(sql)):
loader.update_claim(claim) loader.update_claim(claim)
loader.save() loader.flush(Claim)
def set_input_addresses(ctx): def set_input_addresses(ctx):

View file

@ -92,12 +92,6 @@ class Service:
async def get(self, uri, **kwargs): async def get(self, uri, **kwargs):
pass pass
async def get_block_address_filters(self):
raise NotImplementedError
async def get_transaction_address_filters(self, block_hash):
raise NotImplementedError
def create_wallet(self, file_name): def create_wallet(self, file_name):
path = os.path.join(self.conf.wallet_dir, file_name) path = os.path.join(self.conf.wallet_dir, file_name)
return self.wallets.add_from_path(path) return self.wallets.add_from_path(path)
@ -194,7 +188,7 @@ class Service:
async def wait(self, tx: Transaction, height=-1, timeout=1): async def wait(self, tx: Transaction, height=-1, timeout=1):
raise NotImplementedError raise NotImplementedError
async def resolve(self, accounts, urls, **kwargs): async def resolve(self, urls, **kwargs):
raise NotImplementedError raise NotImplementedError
async def search_claims(self, accounts, **kwargs) -> Tuple[List[Output], Optional[int], Censor]: async def search_claims(self, accounts, **kwargs) -> Tuple[List[Output], Optional[int], Censor]:
@ -227,7 +221,7 @@ class Service:
async def _resolve_for_local_results(self, accounts, txos: Result) -> Result: async def _resolve_for_local_results(self, accounts, txos: Result) -> Result:
results = [] results = []
response = await self.resolve( response = await self.resolve(
accounts, [txo.permanent_url for txo in txos if txo.can_decode_claim] [txo.permanent_url for txo in txos if txo.can_decode_claim], accounts=accounts
) )
for txo in txos: for txo in txos:
resolved = response.get(txo.permanent_url) if txo.can_decode_claim else None resolved = response.get(txo.permanent_url) if txo.can_decode_claim else None

View file

@ -32,11 +32,11 @@ class FullNode(Service):
async def get_status(self): async def get_status(self):
return 'everything is wonderful' return 'everything is wonderful'
async def get_block_address_filters(self): # async def get_block_address_filters(self):
return { # return {
hexlify(f['block_hash']).decode(): hexlify(f['block_filter']).decode() # hexlify(f['block_hash']).decode(): hexlify(f['block_filter']).decode()
for f in await self.db.get_block_address_filters() # for f in await self.db.get_block_address_filters()
} # }
async def search_transactions(self, txids): async def search_transactions(self, txids):
tx_hashes = [unhexlify(txid)[::-1] for txid in txids] tx_hashes = [unhexlify(txid)[::-1] for txid in txids]
@ -48,12 +48,6 @@ class FullNode(Service):
async def search_claims(self, accounts, **kwargs): async def search_claims(self, accounts, **kwargs):
return await self.db.search_claims(**kwargs) return await self.db.search_claims(**kwargs)
async def get_transaction_address_filters(self, block_hash):
return {
hexlify(f['tx_hash'][::-1]).decode(): hexlify(f['tx_filter']).decode()
for f in await self.db.get_transaction_address_filters(unhexlify(block_hash))
}
async def broadcast(self, tx): async def broadcast(self, tx):
return await self.chain.send_raw_transaction(hexlify(tx.raw).decode()) return await self.chain.send_raw_transaction(hexlify(tx.raw).decode())

View file

@ -32,7 +32,7 @@ class LightClient(Service):
async def wait(self, tx: Transaction, height=-1, timeout=1): async def wait(self, tx: Transaction, height=-1, timeout=1):
pass pass
async def resolve(self, accounts, urls, **kwargs): async def resolve(self, urls, **kwargs):
pass pass
async def search_claims(self, accounts, **kwargs): async def search_claims(self, accounts, **kwargs):

View file

@ -41,4 +41,5 @@ disable=
too-many-return-statements, too-many-return-statements,
too-many-instance-attributes, too-many-instance-attributes,
protected-access, protected-access,
unused-argument unused-argument,
bad-continuation