diff --git a/lbry/blockchain/sync/blocks.py b/lbry/blockchain/sync/blocks.py index 823b31db2..5760d6bd2 100644 --- a/lbry/blockchain/sync/blocks.py +++ b/lbry/blockchain/sync/blocks.py @@ -75,7 +75,7 @@ def sync_spends(initial_sync: bool, p: ProgressContext): p.ctx.execute(text("ALTER TABLE txi DROP CONSTRAINT txi_pkey;")) p.step() # 3. insert - old_txi = table("old_txi", *(c.copy() for c in TXI.columns)) + old_txi = table("old_txi", *(c.copy() for c in TXI.columns)) # pylint: disable=not-an-iterable columns = [c for c in old_txi.columns if c.name != "address"] + [TXO.c.address] join_txi_on_txo = old_txi.join(TXO, old_txi.c.txo_hash == TXO.c.txo_hash) select_txis = select(*columns).select_from(join_txi_on_txo) @@ -100,7 +100,7 @@ def sync_spends(initial_sync: bool, p: ProgressContext): p.ctx.execute(text("ALTER TABLE txo DROP CONSTRAINT txo_pkey;")) p.step() # 7. insert - old_txo = table("old_txo", *(c.copy() for c in TXO.columns)) + old_txo = table("old_txo", *(c.copy() for c in TXO.columns)) # pylint: disable=not-an-iterable columns = [c for c in old_txo.columns if c.name != "spent_height"] insert_columns = columns + [TXO.c.spent_height] select_columns = columns + [ diff --git a/lbry/blockchain/sync/claims.py b/lbry/blockchain/sync/claims.py index 5e77cd82f..9cc3a2ab1 100644 --- a/lbry/blockchain/sync/claims.py +++ b/lbry/blockchain/sync/claims.py @@ -130,10 +130,9 @@ def claims_insert( metadata = claim_metadata[i] if i < len(claim_metadata) else None if metadata is None: break - elif metadata['claim_hash'] != row.claim_hash: + if metadata['claim_hash'] != row.claim_hash: continue - else: - i += 1 + i += 1 txo, extra = row_to_claim_for_saving(row) extra.update({ 'short_url': metadata['short_url'], diff --git a/lbry/blockchain/transaction.py b/lbry/blockchain/transaction.py index 165110038..8025719b4 100644 --- a/lbry/blockchain/transaction.py +++ b/lbry/blockchain/transaction.py @@ -150,7 +150,7 @@ class Input(InputOutput): @classmethod def create_coinbase(cls) -> 'Input': - tx_ref = TXRefImmutable.from_hash(cls.NULL_HASH32, 0) + tx_ref = TXRefImmutable.from_hash(cls.NULL_HASH32, 0, 0) txo_ref = TXORef(tx_ref, 0) return cls(txo_ref, b'beef') diff --git a/lbry/console.py b/lbry/console.py index 8fba0fe49..b5b6bc1e5 100644 --- a/lbry/console.py +++ b/lbry/console.py @@ -6,7 +6,7 @@ from typing import Dict, Any from tempfile import TemporaryFile from tqdm.std import tqdm, Bar -from tqdm.utils import FormatReplace, _unicode, disp_len, disp_trim, _is_ascii, _unich +from tqdm.utils import FormatReplace, _unicode, disp_len, disp_trim, _is_ascii from lbry import __version__ from lbry.service.base import Service @@ -26,6 +26,8 @@ class RedirectOutput: self.stream_no = getattr(sys, stream_type).fileno() self.last_flush = time.time() self.last_read = 0 + self.backup = None + self.file = None def __enter__(self): self.backup = os.dup(self.stream_no) @@ -142,10 +144,10 @@ class Bar2(Bar): return ''.join(bar) -class tqdm2(tqdm): +class tqdm2(tqdm): # pylint: disable=invalid-name def __init__(self, initial=(0, 0), unit=('it', 'it'), total=(None, None), **kwargs): - self.n2 = self.last_print_n2 = initial[1] + self.n2 = self.last_print_n2 = initial[1] # pylint: disable=invalid-name self.unit2 = unit[1] self.total2 = total[1] super().__init__(initial=initial[0], unit=unit[0], total=total[0], **kwargs) @@ -171,7 +173,7 @@ class tqdm2(tqdm): @staticmethod def format_meter( - n, total, elapsed, ncols=None, prefix='', ascii=False, + n, total, elapsed, ncols=None, prefix='', ascii=False, # pylint: disable=redefined-builtin unit='it', unit_scale=False, rate=None, bar_format=None, postfix=None, unit_divisor=1000, **extra_kwargs ): @@ -251,7 +253,7 @@ class tqdm2(tqdm): # total is known: we can predict some stats if total: - n2, total2 = extra_kwargs['n2'], extra_kwargs['total2'] + n2, total2 = extra_kwargs['n2'], extra_kwargs['total2'] # pylint: disable=invalid-name # fractional and percentage progress frac = n / total @@ -411,43 +413,6 @@ class Advanced(Basic): if d['done'][0] == -1 or d['done'][0] == bar.total: bar.close() - def update_block_bars(self, event, d): - total_bar = self.bars[event[-4:]] - if event.endswith("read") and self.block_readers == 0: - total_bar.unpause() - if event.endswith("read") and d['step'] == d['total']: - self.block_readers -= 1 - - bar_name = f"block-{d['block_file']}" - bar = self.bars.get(bar_name) - if bar is None: - self.block_readers += 1 - return self.get_or_create_bar( - bar_name, - f" ├─ blk{d['block_file']:05}.dat reading", 'blocks', d['total'] - ) - - if event.endswith("save") and bar.unit == "blocks": - if self.block_savers == 0: - total_bar.unpause() - self.block_savers += 1 - bar.desc = f" ├─ blk{d['block_file']:05}.dat saving" - bar.unit = "txs" - bar.reset(d['total']) - return - - diff = d['step']-bar.n - bar.update(diff) - if event.endswith("save") and d['step'] == d['total']: - self.block_savers -= 1 - bar.close() - - total_bar.update(diff) - if total_bar.total == total_bar.n: - if total_bar.desc.endswith('txs saved'): - total_bar.desc = "├─ txs saved" - total_bar.refresh() - def update_other_bars(self, e, d): if d['total'] == 0: return @@ -465,11 +430,6 @@ class Advanced(Basic): #if d['step'] == d['total']: #bar.close() - def sync_complete(self): - self.bars['read'].postfix = (self.last_stats,) - for bar in self.bars.values(): - bar.close() - def on_sync_progress(self, event): e, d = event['event'], event.get('data', {}) if e.endswith(".init"): diff --git a/lbry/db/queries/txio.py b/lbry/db/queries/txio.py index 747535544..b15c1edae 100644 --- a/lbry/db/queries/txio.py +++ b/lbry/db/queries/txio.py @@ -14,9 +14,7 @@ from ..tables import ( ) from ..utils import query, in_account_ids from ..query_context import context -from ..constants import ( - TXO_TYPES, CLAIM_TYPE_CODES, CONTENT_TYPE_CODES, MAX_QUERY_VARIABLES -) +from ..constants import TXO_TYPES, CLAIM_TYPE_CODES, MAX_QUERY_VARIABLES log = logging.getLogger(__name__) @@ -326,8 +324,8 @@ def select_txos( include_is_my_input = True s = s.where( TXO.c.address.in_(my_addresses) | ( - (TXI.c.address != None) & - (TXI.c.address.in_(my_addresses)) + (TXI.c.address.isnot(None)) & + (TXI.c.address.in_(my_addresses)) ) ) else: @@ -338,13 +336,13 @@ def select_txos( if is_my_input: include_is_my_input = True s = s.where( - (TXI.c.address != None) & + (TXI.c.address.isnot(None)) & (TXI.c.address.in_(my_addresses)) ) elif is_my_input is False: include_is_my_input = True s = s.where( - (TXI.c.address == None) | + (TXI.c.address.is_(None)) | (TXI.c.address.notin_(my_addresses)) ) if exclude_internal_transfers: @@ -352,7 +350,7 @@ def select_txos( s = s.where( (TXO.c.txo_type != TXO_TYPES['other']) | (TXO.c.address.notin_(my_addresses)) - (TXI.c.address == None) | + (TXI.c.address.is_(None)) | (TXI.c.address.notin_(my_addresses)) ) joins = TXO.join(TX) @@ -439,15 +437,15 @@ def get_txos(no_tx=False, include_total=False, **constraints) -> Tuple[List[Outp select_columns.append(text(f"{1 if constraints['is_my_input'] else 0} AS is_my_input")) else: select_columns.append(( - (TXI.c.address != None) & - (TXI.c.address.in_(my_accounts)) - ).label('is_my_input')) + (TXI.c.address.isnot(None)) & + (TXI.c.address.in_(my_accounts)) + ).label('is_my_input')) if include_received_tips: support = TXO.alias('support') select_columns.append( select(func.coalesce(func.sum(support.c.amount), 0)) - .select_from(support).where( + .select_from(support).where( (support.c.claim_hash == TXO.c.claim_hash) & (support.c.txo_type == TXO_TYPES['support']) & (support.c.address.in_(my_accounts)) & @@ -562,8 +560,8 @@ def get_supports_summary(self, **constraints): def reserve_outputs(txo_hashes, is_reserved=True): context().execute( TXO.update() - .values(is_reserved=is_reserved) - .where(TXO.c.txo_hash.in_(txo_hashes)) + .values(is_reserved=is_reserved) + .where(TXO.c.txo_hash.in_(txo_hashes)) ) diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index bc1bf7122..e6b73ff07 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -9,7 +9,7 @@ from typing import Dict, List, Optional, Tuple from dataclasses import dataclass, field from contextvars import ContextVar -from sqlalchemy import create_engine, inspect, bindparam, func, exists, case, event as sqlalchemy_event +from sqlalchemy import create_engine, inspect, bindparam, func, exists, event as sqlalchemy_event from sqlalchemy.future import select from sqlalchemy.engine import Engine, Connection from sqlalchemy.sql import Insert @@ -25,7 +25,7 @@ from lbry.schema.tags import clean_tags from lbry.schema.result import Censor from lbry.schema.mime_types import guess_stream_type -from .utils import pg_insert, chunk +from .utils import pg_insert from .tables import Block, TX, TXO, TXI, Claim, Tag, Support from .constants import TXO_TYPES, STREAM_TYPES @@ -212,6 +212,7 @@ class Event: __slots__ = 'id', 'name', 'units' def __init__(self, name: str, units: Tuple[str]): + self.id = None self.name = name self.units = units diff --git a/lbry/db/sync.py b/lbry/db/sync.py index 19ac91a9f..5a9b9a536 100644 --- a/lbry/db/sync.py +++ b/lbry/db/sync.py @@ -1,40 +1,57 @@ from sqlalchemy.future import select from lbry.db.query_context import progress, Event -from lbry.db.tables import TXI, TXO -from .queries import rows_to_txos +from lbry.db.tables import TXI, TXO, Claim, Support +from .constants import TXO_TYPES, CLAIM_TYPE_CODES +from .queries import ( + rows_to_txos, where_unspent_txos, + where_abandoned_supports, + where_abandoned_claims +) + + +SPENDS_UPDATE_EVENT = Event.add("client.sync.spends.update", "steps") +CLAIMS_INSERT_EVENT = Event.add("client.sync.claims.insert", "claims") +CLAIMS_UPDATE_EVENT = Event.add("client.sync.claims.update", "claims") +CLAIMS_DELETE_EVENT = Event.add("client.sync.claims.delete", "claims") +SUPPORT_INSERT_EVENT = Event.add("client.sync.claims.insert", "supports") +SUPPORT_UPDATE_EVENT = Event.add("client.sync.claims.update", "supports") +SUPPORT_DELETE_EVENT = Event.add("client.sync.claims.delete", "supports") def process_all_things_after_sync(): - with progress(Event.INPUT_UPDATE) as p: + with progress(SPENDS_UPDATE_EVENT) as p: p.start(2) set_input_addresses(p.ctx) p.step(1) update_spent_outputs(p.ctx) p.step(2) - with progress(Event.SUPPORT_DELETE) as p: + with progress(SUPPORT_DELETE_EVENT) as p: p.start(1) - sql = Support.delete().where(condition_spent_supports) + sql = Support.delete().where(where_abandoned_supports()) p.ctx.execute(sql) - with progress(Event.SUPPORT_INSERT) as p: + with progress(SUPPORT_INSERT_EVENT) as p: loader = p.ctx.get_bulk_loader() - for support in rows_to_txos(p.ctx.fetchall(select_missing_supports)): + sql = where_unspent_txos(TXO_TYPES['support'], missing_in_supports_table=True) + for support in rows_to_txos(p.ctx.fetchall(sql)): loader.add_support(support) - loader.save() - with progress(Event.CLAIM_DELETE) as p: + loader.flush(Support) + with progress(CLAIMS_DELETE_EVENT) as p: p.start(1) - sql = Claim.delete().where(condition_spent_claims()) + sql = Claim.delete().where(where_abandoned_claims()) p.ctx.execute(sql) - with progress(Event.CLAIM_INSERT) as p: + with progress(CLAIMS_INSERT_EVENT) as p: loader = p.ctx.get_bulk_loader() - for claim in rows_to_txos(p.ctx.fetchall(select_missing_claims)): + sql = where_unspent_txos(CLAIM_TYPE_CODES, missing_in_claims_table=True) + for claim in rows_to_txos(p.ctx.fetchall(sql)): loader.add_claim(claim) - loader.save() - with progress(Event.CLAIM_UPDATE) as p: + loader.flush(Claim) + with progress(CLAIMS_UPDATE_EVENT) as p: loader = p.ctx.get_bulk_loader() - for claim in rows_to_txos(p.ctx.fetchall(select_stale_claims)): + sql = where_unspent_txos(CLAIM_TYPE_CODES, missing_or_stale_in_claims_table=True) + for claim in rows_to_txos(p.ctx.fetchall(sql)): loader.update_claim(claim) - loader.save() + loader.flush(Claim) def set_input_addresses(ctx): diff --git a/lbry/service/base.py b/lbry/service/base.py index 8ef67daa6..62c3b2493 100644 --- a/lbry/service/base.py +++ b/lbry/service/base.py @@ -92,12 +92,6 @@ class Service: async def get(self, uri, **kwargs): pass - async def get_block_address_filters(self): - raise NotImplementedError - - async def get_transaction_address_filters(self, block_hash): - raise NotImplementedError - def create_wallet(self, file_name): path = os.path.join(self.conf.wallet_dir, file_name) return self.wallets.add_from_path(path) @@ -194,7 +188,7 @@ class Service: async def wait(self, tx: Transaction, height=-1, timeout=1): raise NotImplementedError - async def resolve(self, accounts, urls, **kwargs): + async def resolve(self, urls, **kwargs): raise NotImplementedError async def search_claims(self, accounts, **kwargs) -> Tuple[List[Output], Optional[int], Censor]: @@ -227,7 +221,7 @@ class Service: async def _resolve_for_local_results(self, accounts, txos: Result) -> Result: results = [] response = await self.resolve( - accounts, [txo.permanent_url for txo in txos if txo.can_decode_claim] + [txo.permanent_url for txo in txos if txo.can_decode_claim], accounts=accounts ) for txo in txos: resolved = response.get(txo.permanent_url) if txo.can_decode_claim else None diff --git a/lbry/service/full_node.py b/lbry/service/full_node.py index facafeb20..18640cd22 100644 --- a/lbry/service/full_node.py +++ b/lbry/service/full_node.py @@ -32,11 +32,11 @@ class FullNode(Service): async def get_status(self): return 'everything is wonderful' - async def get_block_address_filters(self): - return { - hexlify(f['block_hash']).decode(): hexlify(f['block_filter']).decode() - for f in await self.db.get_block_address_filters() - } +# async def get_block_address_filters(self): +# return { +# hexlify(f['block_hash']).decode(): hexlify(f['block_filter']).decode() +# for f in await self.db.get_block_address_filters() +# } async def search_transactions(self, txids): tx_hashes = [unhexlify(txid)[::-1] for txid in txids] @@ -48,12 +48,6 @@ class FullNode(Service): async def search_claims(self, accounts, **kwargs): return await self.db.search_claims(**kwargs) - async def get_transaction_address_filters(self, block_hash): - return { - hexlify(f['tx_hash'][::-1]).decode(): hexlify(f['tx_filter']).decode() - for f in await self.db.get_transaction_address_filters(unhexlify(block_hash)) - } - async def broadcast(self, tx): return await self.chain.send_raw_transaction(hexlify(tx.raw).decode()) diff --git a/lbry/service/light_client.py b/lbry/service/light_client.py index ffdf881e0..8dd626ff3 100644 --- a/lbry/service/light_client.py +++ b/lbry/service/light_client.py @@ -32,7 +32,7 @@ class LightClient(Service): async def wait(self, tx: Transaction, height=-1, timeout=1): pass - async def resolve(self, accounts, urls, **kwargs): + async def resolve(self, urls, **kwargs): pass async def search_claims(self, accounts, **kwargs): diff --git a/setup.cfg b/setup.cfg index b40931ad3..e9d810917 100644 --- a/setup.cfg +++ b/setup.cfg @@ -41,4 +41,5 @@ disable= too-many-return-statements, too-many-instance-attributes, protected-access, - unused-argument + unused-argument, + bad-continuation