client sync algorithms
This commit is contained in:
parent
d74fa05a8b
commit
522dc72dc1
12 changed files with 516 additions and 109 deletions
|
@ -7,8 +7,8 @@ from sqlalchemy.future import select
|
|||
from sqlalchemy.schema import CreateTable
|
||||
|
||||
from lbry.db.tables import (
|
||||
Block as BlockTable, BlockFilter, BlockGroupFilter,
|
||||
TX, TXFilter, MempoolFilter, TXO, TXI, Claim, Tag, Support
|
||||
Block as BlockTable, BlockFilter,
|
||||
TX, TXFilter, TXO, TXI, Claim, Tag, Support
|
||||
)
|
||||
from lbry.db.tables import (
|
||||
pg_add_block_constraints_and_indexes,
|
||||
|
@ -232,12 +232,12 @@ def sync_filters(start, end, p: ProgressContext):
|
|||
|
||||
for height, addresses in fp.block_filters.items():
|
||||
loader.add_block_filter(
|
||||
height, create_address_filter(list(addresses))
|
||||
height, 1, create_address_filter(list(addresses))
|
||||
)
|
||||
|
||||
for group_filter in fp.group_filters:
|
||||
for height, addresses in group_filter.groups.items():
|
||||
loader.add_group_filter(
|
||||
loader.add_block_filter(
|
||||
height, group_filter.factor, create_address_filter(list(addresses))
|
||||
)
|
||||
|
||||
|
@ -281,7 +281,12 @@ def get_block_range_without_filters() -> Tuple[int, int]:
|
|||
func.coalesce(func.max(BlockTable.c.height), -1).label('end_height'),
|
||||
)
|
||||
.select_from(
|
||||
BlockTable.join(BlockFilter, BlockTable.c.height == BlockFilter.c.height, isouter=True)
|
||||
BlockTable.join(
|
||||
BlockFilter,
|
||||
(BlockTable.c.height == BlockFilter.c.height) &
|
||||
(BlockFilter.c.factor == 1),
|
||||
isouter=True
|
||||
)
|
||||
)
|
||||
.where(BlockFilter.c.height.is_(None))
|
||||
)
|
||||
|
@ -323,13 +328,11 @@ def delete_all_the_things(height: int, p: ProgressContext):
|
|||
),
|
||||
Claim.delete().where(constrain(Claim.c.height)),
|
||||
Support.delete().where(constrain(Support.c.height)),
|
||||
MempoolFilter.delete(),
|
||||
]
|
||||
if height > 0:
|
||||
deletes.extend([
|
||||
# TODO: block and tx filters need where() clauses (below actually breaks things)
|
||||
BlockFilter.delete().where(BlockFilter.c.height >= height),
|
||||
# TODO: group and tx filters need where() clauses (below actually breaks things)
|
||||
BlockGroupFilter.delete(),
|
||||
TXFilter.delete(),
|
||||
])
|
||||
for delete in p.iter(deletes):
|
||||
|
|
|
@ -2,6 +2,7 @@ import os
|
|||
import asyncio
|
||||
import tempfile
|
||||
import multiprocessing as mp
|
||||
from binascii import hexlify
|
||||
from typing import List, Optional, Iterable, Iterator, TypeVar, Generic, TYPE_CHECKING, Dict, Tuple
|
||||
from concurrent.futures import Executor, ThreadPoolExecutor, ProcessPoolExecutor
|
||||
from functools import partial
|
||||
|
@ -232,9 +233,6 @@ class Database:
|
|||
async def get_best_block_height(self) -> int:
|
||||
return await self.run(q.get_best_block_height)
|
||||
|
||||
async def get_best_block_filter(self) -> int:
|
||||
return await self.run(q.get_best_block_filter)
|
||||
|
||||
async def process_all_things_after_sync(self):
|
||||
return await self.run(sync.process_all_things_after_sync)
|
||||
|
||||
|
@ -242,13 +240,25 @@ class Database:
|
|||
return await self.run(q.get_block_headers, start_height, end_height)
|
||||
|
||||
async def get_filters(self, start_height: int, end_height: int = None, granularity: int = 0):
|
||||
return await self.run(q.get_filters, start_height, end_height, granularity)
|
||||
filters = []
|
||||
for row in await self.run(q.get_filters, start_height, end_height, granularity):
|
||||
record = {
|
||||
"height": row["height"],
|
||||
"filter": hexlify(row["address_filter"]).decode(),
|
||||
}
|
||||
if granularity == 0:
|
||||
record["txid"] = hexlify(row["tx_hash"][::-1]).decode()
|
||||
filters.append(record)
|
||||
return filters
|
||||
|
||||
async def get_missing_required_filters(self, height) -> Dict[int, Tuple[int, int]]:
|
||||
return await self.run(q.get_missing_required_filters, height)
|
||||
|
||||
async def insert_block(self, block):
|
||||
return await self.run(q.insert_block, block)
|
||||
|
||||
async def insert_block_filter(self, height: int, address_filter: bytes):
|
||||
return await self.run(q.insert_block_filter, height, address_filter)
|
||||
async def insert_block_filter(self, height: int, factor: int, address_filter: bytes):
|
||||
return await self.run(q.insert_block_filter, height, factor, address_filter)
|
||||
|
||||
async def insert_transaction(self, block_hash, tx):
|
||||
return await self.run(q.insert_transaction, block_hash, tx)
|
||||
|
|
|
@ -4,3 +4,4 @@ from .search import *
|
|||
from .resolve import *
|
||||
from .address import *
|
||||
from .wallet import *
|
||||
from .filters import *
|
||||
|
|
|
@ -1,17 +1,118 @@
|
|||
import logging
|
||||
from typing import Tuple, List, Optional
|
||||
from typing import Tuple, List, Set, Iterator, Optional
|
||||
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from lbry.crypto.hash import hash160
|
||||
from lbry.crypto.bip32 import PubKey
|
||||
|
||||
from ..utils import query
|
||||
from ..query_context import context
|
||||
from ..tables import TXO, PubkeyAddress, AccountAddress
|
||||
from .filters import get_filter_matchers, get_filter_matchers_at_granularity, has_sub_filters
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DatabaseAddressIterator:
|
||||
|
||||
def __init__(self, account_id, chain):
|
||||
self.account_id = account_id
|
||||
self.chain = chain
|
||||
self.n = -1
|
||||
|
||||
def __iter__(self) -> Iterator[Tuple[bytes, int, bool]]:
|
||||
with context().connect_streaming() as c:
|
||||
sql = (
|
||||
select(
|
||||
AccountAddress.c.pubkey,
|
||||
AccountAddress.c.n
|
||||
).where(
|
||||
(AccountAddress.c.account == self.account_id) &
|
||||
(AccountAddress.c.chain == self.chain)
|
||||
).order_by(AccountAddress.c.n)
|
||||
)
|
||||
for row in c.execute(sql):
|
||||
self.n = row['n']
|
||||
yield hash160(row['pubkey']), self.n, False
|
||||
|
||||
|
||||
class PersistingAddressIterator(DatabaseAddressIterator):
|
||||
|
||||
def __init__(self, account_id, chain, pubkey_bytes, chain_code, depth):
|
||||
super().__init__(account_id, chain)
|
||||
self.pubkey_bytes = pubkey_bytes
|
||||
self.chain_code = chain_code
|
||||
self.depth = depth
|
||||
self.pubkey_buffer = []
|
||||
|
||||
def flush(self):
|
||||
if self.pubkey_buffer:
|
||||
add_keys([{
|
||||
'account': self.account_id,
|
||||
'address': k.address,
|
||||
'chain': self.chain,
|
||||
'pubkey': k.pubkey_bytes,
|
||||
'chain_code': k.chain_code,
|
||||
'n': k.n,
|
||||
'depth': k.depth
|
||||
} for k in self.pubkey_buffer])
|
||||
self.pubkey_buffer.clear()
|
||||
|
||||
def __enter__(self) -> 'PersistingAddressIterator':
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.flush()
|
||||
|
||||
def __iter__(self) -> Iterator[Tuple[bytes, int, bool]]:
|
||||
yield from super().__iter__()
|
||||
pubkey = PubKey(context().ledger, self.pubkey_bytes, self.chain_code, 0, self.depth)
|
||||
while True:
|
||||
self.n += 1
|
||||
pubkey_child = pubkey.child(self.n)
|
||||
self.pubkey_buffer.append(pubkey_child)
|
||||
if len(self.pubkey_buffer) >= 900:
|
||||
self.flush()
|
||||
yield hash160(pubkey_child.pubkey_bytes), self.n, True
|
||||
|
||||
|
||||
def generate_addresses_using_filters(best_height, allowed_gap, address_manager) -> Set:
|
||||
need, have = set(), set()
|
||||
matchers = get_filter_matchers(best_height)
|
||||
with PersistingAddressIterator(*address_manager) as addresses:
|
||||
gap = 0
|
||||
for address_hash, n, is_new in addresses:
|
||||
gap += 1
|
||||
address_bytes = bytearray(address_hash)
|
||||
for granularity, height, matcher in matchers:
|
||||
if matcher.Match(address_bytes):
|
||||
gap = 0
|
||||
match = (granularity, height)
|
||||
if match not in need and match not in have:
|
||||
if has_sub_filters(granularity, height):
|
||||
have.add(match)
|
||||
else:
|
||||
need.add(match)
|
||||
if gap >= allowed_gap:
|
||||
break
|
||||
return need
|
||||
|
||||
|
||||
def get_missing_sub_filters_for_addresses(granularity, address_manager):
|
||||
need = set()
|
||||
with DatabaseAddressIterator(*address_manager) as addresses:
|
||||
for height, matcher in get_filter_matchers_at_granularity(granularity):
|
||||
for address_hash, n, is_new in addresses:
|
||||
address_bytes = bytearray(address_hash)
|
||||
if matcher.Match(address_bytes) and not has_sub_filters(granularity, height):
|
||||
need.add((height, granularity))
|
||||
break
|
||||
return need
|
||||
|
||||
|
||||
def update_address_used_times(addresses):
|
||||
context().execute(
|
||||
PubkeyAddress.update()
|
||||
|
@ -50,12 +151,12 @@ def get_addresses(cols=None, include_total=False, **constraints) -> Tuple[List[d
|
|||
|
||||
|
||||
def get_address_count(**constraints):
|
||||
count = select_addresses([func.count().label('total')], **constraints)
|
||||
return count[0]['total'] or 0
|
||||
count = select_addresses([func.count().label("total")], **constraints)
|
||||
return count[0]["total"] or 0
|
||||
|
||||
|
||||
def get_all_addresses(self):
|
||||
return context().execute(select(PubkeyAddress.c.address))
|
||||
def get_all_addresses():
|
||||
return [r["address"] for r in context().fetchall(select(PubkeyAddress.c.address))]
|
||||
|
||||
|
||||
def add_keys(pubkeys):
|
||||
|
|
|
@ -1,13 +1,10 @@
|
|||
from math import log10
|
||||
from binascii import hexlify
|
||||
|
||||
from sqlalchemy import text, between
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from ..query_context import context
|
||||
from ..tables import (
|
||||
SCHEMA_VERSION, metadata, Version,
|
||||
Claim, Support, Block, BlockFilter, BlockGroupFilter, TX, TXFilter,
|
||||
Claim, Support, Block, TX,
|
||||
pg_add_account_address_constraints_and_indexes
|
||||
)
|
||||
|
||||
|
@ -24,10 +21,6 @@ def execute_fetchall(sql):
|
|||
return context().fetchall(text(sql))
|
||||
|
||||
|
||||
def has_filters():
|
||||
return context().has_records(BlockFilter)
|
||||
|
||||
|
||||
def has_claims():
|
||||
return context().has_records(Claim)
|
||||
|
||||
|
@ -40,10 +33,6 @@ def get_best_block_height():
|
|||
return context().fetchmax(Block.c.height, -1)
|
||||
|
||||
|
||||
def get_best_block_filter():
|
||||
return context().fetchmax(BlockFilter.c.height, -1)
|
||||
|
||||
|
||||
def insert_block(block):
|
||||
context().get_bulk_loader().add_block(block).flush(return_row_count_for_table=None)
|
||||
|
||||
|
@ -60,47 +49,6 @@ def get_block_headers(first, last=None):
|
|||
return context().fetchall(query)
|
||||
|
||||
|
||||
def insert_block_filter(height: int, address_filter: bytes):
|
||||
loader = context().get_bulk_loader()
|
||||
loader.add_block_filter(height, address_filter)
|
||||
loader.flush(return_row_count_for_table=None)
|
||||
|
||||
|
||||
def get_filters(start_height, end_height=None, granularity=0):
|
||||
assert granularity >= 0, "filter granularity must be 0 or positive number"
|
||||
if granularity == 0:
|
||||
query = (
|
||||
select('*').select_from(TXFilter)
|
||||
.where(between(TXFilter.c.height, start_height, end_height))
|
||||
.order_by(TXFilter.c.height)
|
||||
)
|
||||
elif granularity == 1:
|
||||
query = (
|
||||
select('*').select_from(BlockFilter)
|
||||
.where(between(BlockFilter.c.height, start_height, end_height))
|
||||
.order_by(BlockFilter.c.height)
|
||||
)
|
||||
else:
|
||||
query = (
|
||||
select('*').select_from(BlockGroupFilter)
|
||||
.where(
|
||||
(BlockGroupFilter.c.height == start_height) &
|
||||
(BlockGroupFilter.c.factor == log10(granularity))
|
||||
)
|
||||
.order_by(BlockGroupFilter.c.height)
|
||||
)
|
||||
result = []
|
||||
for row in context().fetchall(query):
|
||||
record = {
|
||||
"height": row["height"],
|
||||
"filter": hexlify(row["address_filter"]).decode(),
|
||||
}
|
||||
if granularity == 0:
|
||||
record["txid"] = hexlify(row["tx_hash"][::-1]).decode()
|
||||
result.append(record)
|
||||
return result
|
||||
|
||||
|
||||
def insert_transaction(block_hash, tx):
|
||||
context().get_bulk_loader().add_transaction(block_hash, tx).flush(TX)
|
||||
|
||||
|
@ -125,7 +73,7 @@ def disable_trigger_and_constraints(table_name):
|
|||
ctx = context()
|
||||
if ctx.is_postgres:
|
||||
ctx.execute(text(f"ALTER TABLE {table_name} DISABLE TRIGGER ALL;"))
|
||||
if table_name in ('tag', 'stake', 'block_group_filter', 'mempool_filter'):
|
||||
if table_name in ('tag', 'stake', 'block_filter', 'mempool_filter'):
|
||||
return
|
||||
if ctx.is_postgres:
|
||||
ctx.execute(text(
|
||||
|
|
153
lbry/db/queries/filters.py
Normal file
153
lbry/db/queries/filters.py
Normal file
|
@ -0,0 +1,153 @@
|
|||
from math import log10
|
||||
from typing import Dict, List, Tuple, Optional
|
||||
|
||||
from sqlalchemy import between, func, or_
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from lbry.blockchain.block import PyBIP158, get_address_filter
|
||||
|
||||
from ..query_context import context
|
||||
from ..tables import BlockFilter, TXFilter
|
||||
|
||||
|
||||
def has_filters():
|
||||
return context().has_records(BlockFilter)
|
||||
|
||||
|
||||
def has_sub_filters(granularity: int, height: int):
|
||||
if granularity >= 3:
|
||||
sub_filter_size = 10**(granularity-1)
|
||||
sub_filters_count = context().fetchtotal(
|
||||
(BlockFilter.c.factor == granularity-1) &
|
||||
between(BlockFilter.c.height, height, height + sub_filter_size * 9)
|
||||
)
|
||||
return sub_filters_count == 10
|
||||
elif granularity == 2:
|
||||
sub_filters_count = context().fetchtotal(
|
||||
(BlockFilter.c.factor == 1) &
|
||||
between(BlockFilter.c.height, height, height + 99)
|
||||
)
|
||||
return sub_filters_count == 100
|
||||
elif granularity == 1:
|
||||
tx_filters_count = context().fetchtotal(TXFilter.c.height == height)
|
||||
return tx_filters_count > 0
|
||||
|
||||
|
||||
def get_filters(start_height, end_height=None, granularity=0):
|
||||
assert granularity >= 0, "filter granularity must be 0 or positive number"
|
||||
if granularity == 0:
|
||||
query = (
|
||||
select(TXFilter.c.height, TXFilter.c.address_filter, TXFilter.c.tx_hash)
|
||||
.select_from(TXFilter)
|
||||
.where(between(TXFilter.c.height, start_height, end_height))
|
||||
.order_by(TXFilter.c.height)
|
||||
)
|
||||
else:
|
||||
factor = granularity if granularity <= 4 else log10(granularity)
|
||||
if end_height is None:
|
||||
height_condition = (BlockFilter.c.height == start_height)
|
||||
elif end_height == -1:
|
||||
height_condition = (BlockFilter.c.height >= start_height)
|
||||
else:
|
||||
height_condition = between(BlockFilter.c.height, start_height, end_height)
|
||||
query = (
|
||||
select(BlockFilter.c.height, BlockFilter.c.address_filter)
|
||||
.select_from(BlockFilter)
|
||||
.where(height_condition & (BlockFilter.c.factor == factor))
|
||||
.order_by(BlockFilter.c.height)
|
||||
)
|
||||
return context().fetchall(query)
|
||||
|
||||
|
||||
def get_minimal_required_filter_ranges(height) -> Dict[int, Tuple[int, int]]:
|
||||
minimal = {}
|
||||
if height >= 10_000:
|
||||
minimal[4] = (0, ((height // 10_000)-1) * 10_000)
|
||||
if height >= 1_000:
|
||||
start = height - height % 10_000
|
||||
minimal[3] = (start, start+(((height-start) // 1_000)-1) * 1_000)
|
||||
if height >= 100:
|
||||
start = height - height % 1_000
|
||||
minimal[2] = (start, start+(((height-start) // 100)-1) * 100)
|
||||
start = height - height % 100
|
||||
if start < height:
|
||||
minimal[1] = (start, height)
|
||||
return minimal
|
||||
|
||||
|
||||
def get_maximum_known_filters() -> Dict[str, Optional[int]]:
|
||||
query = select(
|
||||
select(func.max(BlockFilter.c.height))
|
||||
.where(BlockFilter.c.factor == 1)
|
||||
.scalar_subquery().label('1'),
|
||||
select(func.max(BlockFilter.c.height))
|
||||
.where(BlockFilter.c.factor == 2)
|
||||
.scalar_subquery().label('2'),
|
||||
select(func.max(BlockFilter.c.height))
|
||||
.where(BlockFilter.c.factor == 3)
|
||||
.scalar_subquery().label('3'),
|
||||
select(func.max(BlockFilter.c.height))
|
||||
.where(BlockFilter.c.factor == 4)
|
||||
.scalar_subquery().label('4'),
|
||||
)
|
||||
return context().fetchone(query)
|
||||
|
||||
|
||||
def get_missing_required_filters(height) -> Dict[int, Tuple[int, int]]:
|
||||
known_filters = get_maximum_known_filters()
|
||||
missing_filters = {}
|
||||
for granularity, (start, end) in get_minimal_required_filter_ranges(height).items():
|
||||
known_height = known_filters.get(str(granularity))
|
||||
if known_height is not None and known_height > start:
|
||||
if granularity == 1:
|
||||
adjusted_height = known_height + 1
|
||||
else:
|
||||
adjusted_height = known_height + 10**granularity
|
||||
if adjusted_height <= end:
|
||||
missing_filters[granularity] = (adjusted_height, end)
|
||||
else:
|
||||
missing_filters[granularity] = (start, end)
|
||||
return missing_filters
|
||||
|
||||
|
||||
def get_filter_matchers(height) -> List[Tuple[int, int, PyBIP158]]:
|
||||
conditions = []
|
||||
for granularity, (start, end) in get_minimal_required_filter_ranges(height).items():
|
||||
conditions.append(
|
||||
(BlockFilter.c.factor == granularity) &
|
||||
between(BlockFilter.c.height, start, end)
|
||||
)
|
||||
query = (
|
||||
select(BlockFilter.c.factor, BlockFilter.c.height, BlockFilter.c.address_filter)
|
||||
.select_from(BlockFilter)
|
||||
.where(or_(*conditions))
|
||||
.order_by(BlockFilter.c.height.desc())
|
||||
)
|
||||
return [
|
||||
(bf["factor"], bf["height"], get_address_filter(bf["address_filter"]))
|
||||
for bf in context().fetchall(query)
|
||||
]
|
||||
|
||||
|
||||
def get_filter_matchers_at_granularity(granularity) -> List[Tuple[int, PyBIP158]]:
|
||||
query = (
|
||||
select(BlockFilter.c.height, BlockFilter.c.address_filter)
|
||||
.where(BlockFilter.c.factor == granularity)
|
||||
.order_by(BlockFilter.c.height.desc())
|
||||
)
|
||||
return [
|
||||
(bf["height"], get_address_filter(bf["address_filter"]))
|
||||
for bf in context().fetchall(query)
|
||||
]
|
||||
|
||||
|
||||
def insert_block_filter(height: int, factor: int, address_filter: bytes):
|
||||
loader = context().get_bulk_loader()
|
||||
loader.add_block_filter(height, factor, address_filter)
|
||||
loader.flush(return_row_count_for_table=None)
|
||||
|
||||
|
||||
def insert_tx_filter(tx_hash: bytes, height: int, address_filter: bytes):
|
||||
loader = context().get_bulk_loader()
|
||||
loader.add_transaction_filter(tx_hash, height, address_filter)
|
||||
loader.flush(return_row_count_for_table=None)
|
|
@ -27,7 +27,7 @@ from lbry.schema.mime_types import guess_stream_type
|
|||
|
||||
from .utils import pg_insert
|
||||
from .tables import (
|
||||
Block, BlockFilter, BlockGroupFilter,
|
||||
Block, BlockFilter,
|
||||
TX, TXFilter, TXO, TXI, Claim, Tag, Support
|
||||
)
|
||||
from .constants import TXO_TYPES, STREAM_TYPES
|
||||
|
@ -444,7 +444,6 @@ class BulkLoader:
|
|||
self.delete_tags = []
|
||||
self.tx_filters = []
|
||||
self.block_filters = []
|
||||
self.group_filters = []
|
||||
|
||||
@staticmethod
|
||||
def block_to_row(block: Block) -> dict:
|
||||
|
@ -656,14 +655,8 @@ class BulkLoader:
|
|||
self.add_transaction(block.block_hash, tx)
|
||||
return self
|
||||
|
||||
def add_block_filter(self, height: int, address_filter: bytes):
|
||||
def add_block_filter(self, height: int, factor: int, address_filter: bytes):
|
||||
self.block_filters.append({
|
||||
'height': height,
|
||||
'address_filter': address_filter
|
||||
})
|
||||
|
||||
def add_group_filter(self, height: int, factor: int, address_filter: bytes):
|
||||
self.group_filters.append({
|
||||
'height': height,
|
||||
'factor': factor,
|
||||
'address_filter': address_filter
|
||||
|
@ -725,7 +718,6 @@ class BulkLoader:
|
|||
return (
|
||||
(Block.insert(), self.blocks),
|
||||
(BlockFilter.insert(), self.block_filters),
|
||||
(BlockGroupFilter.insert(), self.group_filters),
|
||||
(TX.insert(), self.txs),
|
||||
(TXFilter.insert(), self.tx_filters),
|
||||
(TXO.insert(), self.txos),
|
||||
|
|
|
@ -66,24 +66,15 @@ pg_add_block_constraints_and_indexes = [
|
|||
|
||||
BlockFilter = Table(
|
||||
'block_filter', metadata,
|
||||
Column('height', Integer, primary_key=True),
|
||||
Column('address_filter', LargeBinary),
|
||||
)
|
||||
|
||||
pg_add_block_filter_constraints_and_indexes = [
|
||||
"ALTER TABLE block_filter ADD PRIMARY KEY (height);",
|
||||
"ALTER TABLE block_filter ADD CONSTRAINT fk_block_filter"
|
||||
" FOREIGN KEY (height) REFERENCES block (height) ON DELETE CASCADE;",
|
||||
]
|
||||
|
||||
|
||||
BlockGroupFilter = Table(
|
||||
'block_group_filter', metadata,
|
||||
Column('height', Integer),
|
||||
Column('factor', SmallInteger),
|
||||
Column('address_filter', LargeBinary),
|
||||
)
|
||||
|
||||
pg_add_block_filter_constraints_and_indexes = [
|
||||
"ALTER TABLE block_filter ADD PRIMARY KEY (height, factor);",
|
||||
]
|
||||
|
||||
|
||||
TX = Table(
|
||||
'tx', metadata,
|
||||
|
@ -116,13 +107,6 @@ pg_add_tx_filter_constraints_and_indexes = [
|
|||
]
|
||||
|
||||
|
||||
MempoolFilter = Table(
|
||||
'mempool_filter', metadata,
|
||||
Column('filter_number', Integer),
|
||||
Column('mempool_filter', LargeBinary),
|
||||
)
|
||||
|
||||
|
||||
TXO = Table(
|
||||
'txo', metadata,
|
||||
Column('tx_hash', LargeBinary, ForeignKey(TX.columns.tx_hash)),
|
||||
|
|
|
@ -1248,8 +1248,8 @@ class API:
|
|||
start_height: int, # starting height of block range or just single block
|
||||
end_height: int = None, # return a range of blocks from start_height to end_height
|
||||
granularity: int = None, # 0 - individual tx filters, 1 - block per filter,
|
||||
# 1000, 10000, 100000 blocks per filter
|
||||
) -> list: # blocks
|
||||
# 2 or 100, 3 or 1000, 4 or 10000 blocks per filter
|
||||
) -> list: # filters
|
||||
"""
|
||||
List address filters
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ import asyncio
|
|||
import time
|
||||
from asyncio.runners import _cancel_all_tasks # type: ignore
|
||||
import unittest
|
||||
import multiprocessing as mp
|
||||
from unittest.case import _Outcome
|
||||
from typing import Optional, List, Union, Tuple
|
||||
from binascii import unhexlify, hexlify
|
||||
|
@ -18,9 +19,10 @@ from distutils.dir_util import remove_tree
|
|||
|
||||
import ecdsa
|
||||
|
||||
from lbry.db import Database
|
||||
from lbry.db import Database, queries as q
|
||||
from lbry.db.query_context import initialize, uninitialize
|
||||
from lbry.blockchain import (
|
||||
RegTestLedger, Transaction, Input, Output, dewies_to_lbc
|
||||
Ledger, RegTestLedger, Transaction, Input, Output, dewies_to_lbc
|
||||
)
|
||||
from lbry.blockchain.block import Block
|
||||
from lbry.blockchain.bcd_data_stream import BCDataStream
|
||||
|
@ -229,7 +231,17 @@ class AdvanceTimeTestCase(AsyncioTestCase):
|
|||
await asyncio.sleep(0)
|
||||
|
||||
|
||||
class UnitDBTestCase(AsyncioTestCase):
|
||||
class UnitDBTestCase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.ledger = Ledger(Config.with_null_dir().set(db_url='sqlite:///:memory:'))
|
||||
initialize(self.ledger, mp.Queue(), mp.Event())
|
||||
q.check_version_and_create_tables()
|
||||
self.addCleanup(uninitialize)
|
||||
|
||||
|
||||
class AsyncUnitDBTestCase(AsyncioTestCase):
|
||||
|
||||
async def asyncSetUp(self):
|
||||
await super().asyncSetUp()
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
from .wallet import Wallet, ENCRYPT_ON_DISK
|
||||
from .manager import WalletManager
|
||||
from .account import Account, SingleKey, HierarchicalDeterministic
|
||||
from .account import Account, SingleKey, HierarchicalDeterministic, AddressManager
|
||||
|
|
203
tests/unit/wallet/test_sync.py
Normal file
203
tests/unit/wallet/test_sync.py
Normal file
|
@ -0,0 +1,203 @@
|
|||
from binascii import hexlify
|
||||
from sqlalchemy.future import select
|
||||
|
||||
from lbry.crypto.hash import hash160
|
||||
from lbry.crypto.bip32 import from_extended_key_string
|
||||
from lbry.blockchain.block import create_address_filter
|
||||
from lbry.db import queries as q
|
||||
from lbry.db.tables import AccountAddress
|
||||
from lbry.db.query_context import context
|
||||
from lbry.testcase import UnitDBTestCase
|
||||
|
||||
|
||||
class TestMissingRequiredFiltersCalculation(UnitDBTestCase):
|
||||
|
||||
def test_get_missing_required_filters(self):
|
||||
self.assertEqual(q.get_missing_required_filters(99), {1: (0, 99)})
|
||||
self.assertEqual(q.get_missing_required_filters(100), {100: (0, 0)})
|
||||
self.assertEqual(q.get_missing_required_filters(199), {100: (0, 0), 1: (100, 199)})
|
||||
self.assertEqual(q.get_missing_required_filters(201), {100: (0, 100), 1: (200, 201)})
|
||||
# all filters missing
|
||||
self.assertEqual(q.get_missing_required_filters(134_567), {
|
||||
10_000: (0, 120_000),
|
||||
1_000: (130_000, 133_000),
|
||||
100: (134_000, 134_400),
|
||||
1: (134_500, 134_567)
|
||||
})
|
||||
|
||||
q.insert_block_filter(110_000, 4, b'beef')
|
||||
q.insert_block_filter(129_000, 3, b'beef')
|
||||
q.insert_block_filter(133_900, 2, b'beef')
|
||||
q.insert_block_filter(134_499, 1, b'beef')
|
||||
# we we have some filters, but not recent enough (all except 10k are adjusted)
|
||||
self.assertEqual(q.get_missing_required_filters(134_567), {
|
||||
10_000: (120_000, 120_000), # 0 -> 120_000
|
||||
1_000: (130_000, 133_000),
|
||||
100: (134_000, 134_400),
|
||||
1: (134_500, 134_567)
|
||||
})
|
||||
|
||||
q.insert_block_filter(132_000, 3, b'beef')
|
||||
q.insert_block_filter(134_300, 2, b'beef')
|
||||
q.insert_block_filter(134_550, 1, b'beef')
|
||||
# all filters get adjusted because we have recent of each
|
||||
self.assertEqual(q.get_missing_required_filters(134_567), {
|
||||
10_000: (120_000, 120_000), # 0 -> 120_000
|
||||
1_000: (133_000, 133_000), # 130_000 -> 133_000
|
||||
100: (134_400, 134_400), # 134_000 -> 134_400
|
||||
1: (134_551, 134_567) # 134_500 -> 134_551
|
||||
})
|
||||
|
||||
q.insert_block_filter(120_000, 4, b'beef')
|
||||
q.insert_block_filter(133_000, 3, b'beef')
|
||||
q.insert_block_filter(134_400, 2, b'beef')
|
||||
q.insert_block_filter(134_566, 1, b'beef')
|
||||
# we have latest filters for all except latest single block
|
||||
self.assertEqual(q.get_missing_required_filters(134_567), {
|
||||
1: (134_567, 134_567) # 134_551 -> 134_567
|
||||
})
|
||||
|
||||
q.insert_block_filter(134_567, 1, b'beef')
|
||||
# we have all latest filters
|
||||
self.assertEqual(q.get_missing_required_filters(134_567), {})
|
||||
|
||||
|
||||
class TestAddressGeneration(UnitDBTestCase):
|
||||
|
||||
RECEIVING_KEY_N = 0
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.root_pubkey = from_extended_key_string(
|
||||
self.ledger,
|
||||
'xpub661MyMwAqRbcGWtPvbWh9sc2BCfw2cTeVDYF23o3N1t6UZ5wv3EMmDgp66FxH'
|
||||
'uDtWdft3B5eL5xQtyzAtkdmhhC95gjRjLzSTdkho95asu9'
|
||||
)
|
||||
self.receiving_pubkey = self.root_pubkey.child(self.RECEIVING_KEY_N)
|
||||
|
||||
def generate(self, loops, is_new_starts=0):
|
||||
with q.PersistingAddressIterator(
|
||||
self.root_pubkey.address, self.RECEIVING_KEY_N,
|
||||
self.receiving_pubkey.pubkey_bytes,
|
||||
self.receiving_pubkey.chain_code,
|
||||
self.receiving_pubkey.depth
|
||||
) as generator:
|
||||
for address, n, is_new in generator:
|
||||
if n >= is_new_starts:
|
||||
self.assertTrue(is_new)
|
||||
else:
|
||||
self.assertFalse(is_new)
|
||||
if n == loops:
|
||||
break
|
||||
|
||||
@staticmethod
|
||||
def get_ordered_addresses():
|
||||
return [r["address"] for r in context().fetchall(
|
||||
select(AccountAddress.c.address).order_by(AccountAddress.c.n)
|
||||
)]
|
||||
|
||||
def test_generator_persisting(self):
|
||||
expected = [self.receiving_pubkey.child(n).addresses for n in range(30)]
|
||||
self.assertEqual([], self.get_ordered_addresses())
|
||||
self.generate(5, 0)
|
||||
self.assertEqual(expected[:6], self.get_ordered_addresses())
|
||||
self.generate(7, 6)
|
||||
self.assertEqual(expected[:8], self.get_ordered_addresses())
|
||||
self.generate(12, 8)
|
||||
self.assertEqual(expected[:13], self.get_ordered_addresses())
|
||||
|
||||
@staticmethod
|
||||
def insert_sub_filters(granularity, addresses, height):
|
||||
if granularity >= 2:
|
||||
end = height + 10 ** granularity
|
||||
step = 1 if granularity == 2 else 10 ** (granularity - 1)
|
||||
for i, height in enumerate(range(height, end, step)):
|
||||
if i == 3:
|
||||
q.insert_block_filter(height, granularity - 1, create_address_filter(addresses))
|
||||
else:
|
||||
q.insert_block_filter(height, granularity - 1, create_address_filter([b'beef']))
|
||||
elif granularity == 1:
|
||||
q.insert_tx_filter(hexlify(f'tx{height}'.encode()), height, create_address_filter(addresses))
|
||||
|
||||
def test_generate_from_filters(self):
|
||||
# 15 addresses will get generated, 9 due to filters and 6 due to gap
|
||||
pubkeys = [self.receiving_pubkey.child(n) for n in range(15)]
|
||||
hashes = [hash160(key.pubkey_bytes) for key in pubkeys]
|
||||
|
||||
# create all required filters (include 9 of the addresses in the filters)
|
||||
|
||||
q.insert_block_filter(0, 4, create_address_filter(hashes[0:1]))
|
||||
q.insert_block_filter(100_000, 4, create_address_filter(hashes[1:2]))
|
||||
q.insert_block_filter(110_000, 4, create_address_filter([b'beef']))
|
||||
q.insert_block_filter(120_000, 4, create_address_filter(hashes[2:3]))
|
||||
|
||||
q.insert_block_filter(130_000, 3, create_address_filter(hashes[3:4]))
|
||||
q.insert_block_filter(131_000, 3, create_address_filter([b'beef']))
|
||||
q.insert_block_filter(133_000, 3, create_address_filter(hashes[4:5]))
|
||||
|
||||
q.insert_block_filter(134_000, 2, create_address_filter(hashes[5:6]))
|
||||
q.insert_block_filter(134_200, 2, create_address_filter([b'beef']))
|
||||
q.insert_block_filter(134_400, 2, create_address_filter(hashes[6:7]))
|
||||
|
||||
q.insert_block_filter(134_500, 1, create_address_filter(hashes[7:8]))
|
||||
q.insert_block_filter(134_566, 1, create_address_filter([b'beef']))
|
||||
q.insert_block_filter(134_567, 1, create_address_filter(hashes[8:9]))
|
||||
|
||||
# check that all required filters did get created
|
||||
self.assertEqual(q.get_missing_required_filters(134_567), {})
|
||||
|
||||
# generate addresses with 6 address gap, returns new sub filters needed
|
||||
self.assertEqual(
|
||||
q.generate_addresses_using_filters(134_567, 6, (
|
||||
self.root_pubkey.address, self.RECEIVING_KEY_N,
|
||||
self.receiving_pubkey.pubkey_bytes,
|
||||
self.receiving_pubkey.chain_code,
|
||||
self.receiving_pubkey.depth
|
||||
)), {
|
||||
(1, 134500),
|
||||
(1, 134567),
|
||||
(2, 134000),
|
||||
(2, 134400),
|
||||
(3, 130000),
|
||||
(3, 133000),
|
||||
(4, 0),
|
||||
(4, 100000),
|
||||
(4, 120000)
|
||||
}
|
||||
)
|
||||
|
||||
# all addresses generated
|
||||
self.assertEqual([key.address for key in pubkeys], self.get_ordered_addresses())
|
||||
|
||||
# "download" missing sub filters
|
||||
self.insert_sub_filters(4, hashes[0:1], 0)
|
||||
self.insert_sub_filters(4, hashes[1:2], 100_000)
|
||||
self.insert_sub_filters(4, hashes[2:3], 120_000)
|
||||
self.insert_sub_filters(3, hashes[3:4], 130_000)
|
||||
self.insert_sub_filters(3, hashes[4:5], 133_000)
|
||||
self.insert_sub_filters(2, hashes[5:6], 134_000)
|
||||
self.insert_sub_filters(2, hashes[6:7], 134_400)
|
||||
self.insert_sub_filters(1, hashes[7:8], 134_500)
|
||||
self.insert_sub_filters(1, hashes[8:9], 134_567)
|
||||
|
||||
# no sub filters needed to be downloaded now when re-checking all addresses
|
||||
self.assertEqual(
|
||||
q.generate_addresses_using_filters(134_567, 6, (
|
||||
self.root_pubkey.address, self.RECEIVING_KEY_N,
|
||||
self.receiving_pubkey.pubkey_bytes,
|
||||
self.receiving_pubkey.chain_code,
|
||||
self.receiving_pubkey.depth
|
||||
)), set()
|
||||
)
|
||||
|
||||
# no new addresses should have been generated
|
||||
self.assertEqual([key.address for key in pubkeys], self.get_ordered_addresses())
|
||||
|
||||
self.assertEqual(
|
||||
q.generate_addresses_using_filters(134_567, 6, (
|
||||
self.root_pubkey.address, self.RECEIVING_KEY_N,
|
||||
self.receiving_pubkey.pubkey_bytes,
|
||||
self.receiving_pubkey.chain_code,
|
||||
self.receiving_pubkey.depth
|
||||
)), set()
|
||||
)
|
Loading…
Add table
Reference in a new issue