got tests to pass on sqlite and postgres and added filter groups at various block height scales

This commit is contained in:
Lex Berezhny 2020-09-21 10:26:19 -04:00
parent 5b9e40c0c0
commit 0b34c4ea86
11 changed files with 496 additions and 100 deletions

View file

@ -1,5 +1,5 @@
import struct
from typing import NamedTuple, List, Tuple
from typing import NamedTuple, List
from chiabip158 import PyBIP158 # pylint: disable=no-name-in-module
@ -11,12 +11,12 @@ from lbry.blockchain.bcd_data_stream import BCDataStream
ZERO_BLOCK = bytes((0,)*32)
def create_block_filter(address_hashes: List[bytearray]) -> bytes:
return bytes(PyBIP158(address_hashes).GetEncoded())
def create_address_filter(address_hashes: List[bytes]) -> bytes:
return bytes(PyBIP158([bytearray(a) for a in address_hashes]).GetEncoded())
def get_block_filter(block_filter: bytes) -> PyBIP158:
return PyBIP158(bytearray(block_filter))
def get_address_filter(address_filter: bytes) -> PyBIP158:
return PyBIP158(bytearray(address_filter))
class Block(NamedTuple):

View file

@ -1,24 +1,29 @@
import logging
from typing import Tuple
from sqlalchemy import table, bindparam, text, func, union
from sqlalchemy import table, text, func, union, between
from sqlalchemy.future import select
from sqlalchemy.schema import CreateTable
from lbry.db.tables import Block as BlockTable, TX, TXO, TXI, Claim, Tag, Support, BlockFilter
from lbry.db.tables import (
join_block_filter,
Block as BlockTable, BlockFilter, BlockGroupFilter,
TX, TXFilter, MempoolFilter, TXO, TXI, Claim, Tag, Support
)
from lbry.db.tables import (
pg_add_block_constraints_and_indexes,
pg_add_block_filter_constraints_and_indexes,
pg_add_tx_constraints_and_indexes,
pg_add_tx_filter_constraints_and_indexes,
pg_add_txo_constraints_and_indexes,
pg_add_txi_constraints_and_indexes
pg_add_txi_constraints_and_indexes,
)
from lbry.db.query_context import ProgressContext, event_emitter, context
from lbry.db.sync import set_input_addresses, update_spent_outputs
from lbry.blockchain.block import Block, create_block_filter
from lbry.blockchain.block import Block, create_address_filter
from lbry.blockchain.bcd_data_stream import BCDataStream
from .context import get_or_initialize_lbrycrd
from .filter_builder import FilterBuilder
log = logging.getLogger(__name__)
@ -63,17 +68,9 @@ def sync_block_file(
return last_block_processed
@event_emitter("blockchain.sync.spends.main", "steps")
def sync_spends(initial_sync: bool, p: ProgressContext):
if initial_sync:
p.start(
7 +
len(pg_add_block_constraints_and_indexes) +
len(pg_add_block_filter_constraints_and_indexes) +
len(pg_add_tx_constraints_and_indexes) +
len(pg_add_txi_constraints_and_indexes) +
len(pg_add_txo_constraints_and_indexes)
)
@event_emitter("blockchain.sync.blocks.indexes", "steps")
def blocks_constraints_and_indexes(p: ProgressContext):
p.start(1 + len(pg_add_block_constraints_and_indexes))
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM ANALYZE block;"))
p.step()
@ -81,14 +78,25 @@ def sync_spends(initial_sync: bool, p: ProgressContext):
if p.ctx.is_postgres:
p.ctx.execute(text(constraint))
p.step()
@event_emitter("blockchain.sync.blocks.vacuum", "steps")
def blocks_vacuum(p: ProgressContext):
p.start(1)
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM ANALYZE block_filter;"))
p.step()
for constraint in pg_add_block_filter_constraints_and_indexes:
if p.ctx.is_postgres:
p.ctx.execute(text(constraint))
p.ctx.execute_notx(text("VACUUM block;"))
p.step()
@event_emitter("blockchain.sync.spends.main", "steps")
def sync_spends(initial_sync: bool, p: ProgressContext):
if initial_sync:
p.start(
7 +
len(pg_add_tx_constraints_and_indexes) +
len(pg_add_txi_constraints_and_indexes) +
len(pg_add_txo_constraints_and_indexes)
)
# 1. tx table stuff
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM ANALYZE tx;"))
@ -99,8 +107,6 @@ def sync_spends(initial_sync: bool, p: ProgressContext):
p.step()
# A. Update TXIs to have the address of TXO they are spending.
# 2. txi table reshuffling
p.ctx.execute(text("ALTER TABLE txi RENAME TO old_txi;"))
p.ctx.execute(CreateTable(TXI, include_foreign_key_constraints=[]))
if p.ctx.is_postgres:
@ -171,41 +177,96 @@ def sync_spends(initial_sync: bool, p: ProgressContext):
p.step()
@event_emitter("blockchain.sync.filter.generate", "blocks")
def sync_block_filters(blocks, p: ProgressContext):
p.start(
len(blocks), progress_id=blocks[0][1], label=f"generate filters {blocks[0][1]}-{blocks[-1][1]}"
@event_emitter("blockchain.sync.filters.generate", "blocks", throttle=100)
def sync_filters(start, end, p: ProgressContext):
fp = FilterBuilder(start, end)
p.start((end-start)+1, progress_id=start, label=f"generate filters {start}-{end}")
with p.ctx.connect_streaming() as c:
loader = p.ctx.get_bulk_loader()
tx_hash, height, addresses, last_added = None, None, set(), None
address_to_hash = p.ctx.ledger.address_to_hash160
for row in c.execute(get_block_tx_addresses_sql(*fp.query_heights)):
if tx_hash != row.tx_hash:
if tx_hash is not None:
last_added = tx_hash
fp.add(tx_hash, height, addresses)
tx_hash, height, addresses = row.tx_hash, row.height, set()
addresses.add(address_to_hash(row.address))
if all([last_added, tx_hash]) and last_added != tx_hash: # pickup last tx
fp.add(tx_hash, height, addresses)
for tx_hash, height, addresses in fp.tx_filters:
loader.add_transaction_filter(
tx_hash, height, create_address_filter(list(addresses))
)
for block_hash, block_height in blocks:
block_filter = create_block_filter([
bytearray(p.ctx.ledger.address_to_hash160(r["address"]))
for r in get_block_tx_addresses(block_height)
])
p.ctx.execute(
BlockFilter.insert().values(block_hash=block_hash, block_filter=block_filter)
for height, addresses in fp.block_filters.items():
loader.add_block_filter(
height, create_address_filter(list(addresses))
)
for group_filter in fp.group_filters:
for height, addresses in group_filter.groups.items():
loader.add_group_filter(
height, group_filter.factor, create_address_filter(list(addresses))
)
p.add(loader.flush(BlockFilter))
@event_emitter("blockchain.sync.filters.indexes", "steps")
def filters_constraints_and_indexes(p: ProgressContext):
constraints = (
pg_add_tx_filter_constraints_and_indexes +
pg_add_block_filter_constraints_and_indexes
)
p.start(2 + len(constraints))
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM ANALYZE block_filter;"))
p.step()
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM ANALYZE tx_filter;"))
p.step()
for constraint in constraints:
if p.ctx.is_postgres:
p.ctx.execute(text(constraint))
p.step()
def get_blocks_without_filters():
result = []
for block in context().fetchall(
select(BlockTable.c.block_hash, BlockTable.c.height).select_from(
join_block_filter
).where(BlockFilter.c.block_filter.is_(None))):
result.append((block['block_hash'], block['height']))
return result
@event_emitter("blockchain.sync.filters.vacuum", "steps")
def filters_vacuum(p: ProgressContext):
p.start(2)
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM block_filter;"))
p.step()
if p.ctx.is_postgres:
p.ctx.execute_notx(text("VACUUM tx_filter;"))
p.step()
def get_block_tx_addresses(block_height):
return context().fetchall(
union(
select(TXO.c.address).select_from(TXO)
.where((TXO.c.address.isnot(None)) & (TXO.c.height == block_height)),
select(TXI.c.address).select_from(TXI.join(TXO))
.where((TXI.c.address.isnot(None)) & (TXI.c.height == block_height)),
def get_block_range_without_filters() -> Tuple[int, int]:
sql = (
select(
func.coalesce(func.min(BlockTable.c.height), -1).label('start_height'),
func.coalesce(func.max(BlockTable.c.height), -1).label('end_height'),
)
.select_from(BlockTable)
.where(BlockTable.c.height.notin_(select(BlockFilter.c.height)))
)
result = context().fetchone(sql)
return result['start_height'], result['end_height']
def get_block_tx_addresses_sql(start_height, end_height):
return union(
select(TXO.c.tx_hash, TXO.c.height, TXO.c.address).where(
(TXO.c.address.isnot(None)) & between(TXO.c.height, start_height, end_height)
),
select(TXI.c.tx_hash, TXI.c.height, TXI.c.address).where(
(TXI.c.address.isnot(None)) & between(TXI.c.height, start_height, end_height)
),
).order_by('height', 'tx_hash')
@event_emitter("blockchain.sync.rewind.main", "steps")
@ -222,6 +283,11 @@ def rewind(height: int, p: ProgressContext):
),
Claim.delete().where(Claim.c.height >= height),
Support.delete().where(Support.c.height >= height),
BlockFilter.delete().where(BlockFilter.c.height >= height),
# TODO: group and tx filters need where() clauses (below actually breaks things)
BlockGroupFilter.delete(),
TXFilter.delete(),
MempoolFilter.delete()
]
for delete in p.iter(deletes):
p.ctx.execute(delete)

View file

@ -0,0 +1,79 @@
from typing import Dict
def split_range_into_10k_batches(start, end):
batch = [start, end]
batches = [batch]
for block in range(start, end+1):
if 0 < block != batch[0] and block % 10_000 == 0:
batch = [block, block]
batches.append(batch)
else:
batch[1] = block
return batches
class GroupFilter:
"""
Collects addresses into buckets of specific sizes defined by 10 raised to power of factor.
eg. a factor of 2 (10**2) would create block buckets 100-199, 200-299, etc
a factor of 3 (10**3) would create block buckets 1000-1999, 2000-2999, etc
"""
def __init__(self, start, end, factor):
self.start = start
self.end = end
self.factor = factor
self.resolution = resolution = 10**factor
last_height_in_group, groups = resolution-1, {}
for block in range(start, end+1):
if block % resolution == last_height_in_group:
groups[block-last_height_in_group] = set()
self.last_height_in_group = last_height_in_group
self.groups: Dict[int, set] = groups
@property
def coverage(self):
return list(self.groups.keys())
def add(self, height, addresses):
group = self.groups.get(height - (height % self.resolution))
if group is not None:
group.update(addresses)
class FilterBuilder:
"""
Creates filter groups, calculates the necessary block range to fulfill creation
of filter groups and collects tx filters, block filters and group filters.
"""
def __init__(self, start, end):
self.start = start
self.end = end
self.group_filters = [
GroupFilter(start, end, 4),
GroupFilter(start, end, 3),
GroupFilter(start, end, 2),
]
self.start_tx_height, self.end_tx_height = self._calculate_tx_heights_for_query()
self.tx_filters = []
self.block_filters: Dict[int, set] = {}
def _calculate_tx_heights_for_query(self):
for group_filter in self.group_filters:
if group_filter.groups:
return group_filter.coverage[0], self.end
return self.start, self.end
@property
def query_heights(self):
return self.start_tx_height, self.end_tx_height
def add(self, tx_hash, height, addresses):
if self.start <= height <= self.end:
self.tx_filters.append((tx_hash, height, addresses))
block_filter = self.block_filters.get(height)
if block_filter is None:
block_filter = self.block_filters[height] = set()
block_filter.update(addresses)
for group_filter in self.group_filters:
group_filter.add(height, addresses)

View file

@ -13,12 +13,14 @@ from lbry.blockchain.lbrycrd import Lbrycrd
from lbry.error import LbrycrdEventSubscriptionError
from . import blocks as block_phase, claims as claim_phase, supports as support_phase
from .filter_builder import split_range_into_10k_batches
log = logging.getLogger(__name__)
BLOCKS_INIT_EVENT = Event.add("blockchain.sync.blocks.init", "steps")
BLOCKS_MAIN_EVENT = Event.add("blockchain.sync.blocks.main", "blocks", "txs")
FILTER_MAIN_EVENT = Event.add("blockchain.sync.filter.main", "blocks")
FILTER_INIT_EVENT = Event.add("blockchain.sync.filters.init", "steps")
FILTER_MAIN_EVENT = Event.add("blockchain.sync.filters.main", "blocks")
CLAIMS_INIT_EVENT = Event.add("blockchain.sync.claims.init", "steps")
CLAIMS_MAIN_EVENT = Event.add("blockchain.sync.claims.main", "claims")
TRENDS_INIT_EVENT = Event.add("blockchain.sync.trends.init", "steps")
@ -137,24 +139,39 @@ class BlockchainSync(Sync):
})
completed = await self.run_tasks(tasks)
if completed:
if starting_height == 0:
await self.db.run(block_phase.blocks_constraints_and_indexes)
else:
await self.db.run(block_phase.blocks_vacuum)
best_height_processed = max(f.result() for f in completed)
return starting_height, best_height_processed
async def sync_filters(self):
# if not self.conf.spv_address_filters:
# return
blocks = await self.db.run(block_phase.get_blocks_without_filters)
batch_size = (len(blocks) // self.db.workers) + 1
batches = [
blocks[index:index + batch_size] for index in range(0, len(blocks), batch_size)
]
if batches:
if not self.conf.spv_address_filters:
return
with Progress(self.db.message_queue, FILTER_INIT_EVENT) as p:
p.start(2)
initial_sync = not await self.db.has_filters()
p.step()
if initial_sync:
blocks = [0, await self.db.get_best_block_height()]
else:
blocks = await self.db.run(block_phase.get_block_range_without_filters)
if blocks != (-1, -1):
batches = split_range_into_10k_batches(*blocks)
p.step()
else:
p.step()
return
with Progress(self.db.message_queue, FILTER_MAIN_EVENT) as p:
p.start(len(blocks))
p.start((blocks[1]-blocks[0])+1)
await self.run_tasks([
self.db.run(block_phase.sync_block_filters, batch)
for batch in batches
self.db.run(block_phase.sync_filters, *batch) for batch in batches
])
if initial_sync:
await self.db.run(block_phase.filters_constraints_and_indexes)
else:
await self.db.run(block_phase.filters_vacuum)
async def sync_spends(self, blocks_added):
if blocks_added:

View file

@ -199,6 +199,9 @@ class Database:
async def execute_fetchall(self, sql):
return await self.run(q.execute_fetchall, sql)
async def has_filters(self):
return await self.run(q.has_filters)
async def has_claims(self):
return await self.run(q.has_claims)

View file

@ -2,7 +2,7 @@ from sqlalchemy import text
from sqlalchemy.future import select
from ..query_context import context
from ..tables import SCHEMA_VERSION, metadata, Version, Claim, Support, Block, TX
from ..tables import SCHEMA_VERSION, metadata, Version, Claim, Support, Block, BlockFilter, TX
def execute(sql):
@ -13,6 +13,10 @@ def execute_fetchall(sql):
return context().fetchall(text(sql))
def has_filters():
return context().has_records(BlockFilter)
def has_claims():
return context().has_records(Claim)
@ -22,7 +26,7 @@ def has_supports():
def get_best_block_height():
context().fetchmax(Block.c.height, -1)
return context().fetchmax(Block.c.height, -1)
def insert_block(block):
@ -50,7 +54,7 @@ def disable_trigger_and_constraints(table_name):
ctx = context()
if ctx.is_postgres:
ctx.execute(text(f"ALTER TABLE {table_name} DISABLE TRIGGER ALL;"))
if table_name in ('tag', 'stake'):
if table_name in ('tag', 'stake', 'block_group_filter', 'mempool_filter'):
return
if ctx.is_postgres:
ctx.execute(text(

View file

@ -26,7 +26,10 @@ from lbry.schema.result import Censor
from lbry.schema.mime_types import guess_stream_type
from .utils import pg_insert
from .tables import Block, TX, TXO, TXI, Claim, Tag, Support
from .tables import (
Block, BlockFilter, BlockGroupFilter,
TX, TXFilter, TXO, TXI, Claim, Tag, Support
)
from .constants import TXO_TYPES, STREAM_TYPES
@ -414,6 +417,9 @@ class BulkLoader:
self.tags = []
self.update_claims = []
self.delete_tags = []
self.tx_filters = []
self.block_filters = []
self.group_filters = []
@staticmethod
def block_to_row(block: Block) -> dict:
@ -429,7 +435,7 @@ class BulkLoader:
def tx_to_row(block_hash: bytes, tx: Transaction) -> dict:
row = {
'tx_hash': tx.hash,
'block_hash': block_hash,
#'block_hash': block_hash,
'raw': tx.raw,
'height': tx.height,
'position': tx.position,
@ -621,6 +627,19 @@ class BulkLoader:
self.add_transaction(block.block_hash, tx)
return self
def add_block_filter(self, height: int, address_filter: bytes):
self.block_filters.append({
'height': height,
'address_filter': address_filter
})
def add_group_filter(self, height: int, factor: int, address_filter: bytes):
self.group_filters.append({
'height': height,
'factor': factor,
'address_filter': address_filter
})
def add_transaction(self, block_hash: bytes, tx: Transaction):
self.txs.append(self.tx_to_row(block_hash, tx))
for txi in tx.inputs:
@ -630,6 +649,13 @@ class BulkLoader:
self.txos.append(self.txo_to_row(tx, txo))
return self
def add_transaction_filter(self, tx_hash: bytes, height: int, address_filter: bytes):
self.tx_filters.append({
'tx_hash': tx_hash,
'height': height,
'address_filter': address_filter
})
def add_support(self, txo: Output, **extra):
self.supports.append(self.support_to_row(txo, **extra))
@ -669,7 +695,10 @@ class BulkLoader:
def get_queries(self):
return (
(Block.insert(), self.blocks),
(BlockFilter.insert(), self.block_filters),
(BlockGroupFilter.insert(), self.group_filters),
(TX.insert(), self.txs),
(TXFilter.insert(), self.tx_filters),
(TXO.insert(), self.txos),
(TXI.insert(), self.txis),
(Claim.insert(), self.claims),

View file

@ -40,34 +40,41 @@ AccountAddress = Table(
Block = Table(
'block', metadata,
Column('block_hash', LargeBinary, primary_key=True),
Column('height', Integer, primary_key=True),
Column('block_hash', LargeBinary),
Column('previous_hash', LargeBinary),
Column('file_number', SmallInteger),
Column('height', Integer),
Column('timestamp', Integer),
)
pg_add_block_constraints_and_indexes = [
"ALTER TABLE block ADD PRIMARY KEY (block_hash);"
"ALTER TABLE block ADD PRIMARY KEY (height);",
]
BlockFilter = Table(
'block_filter', metadata,
Column('block_hash', LargeBinary, primary_key=True),
Column('block_filter', LargeBinary, nullable=True),
Column('height', Integer, primary_key=True),
Column('address_filter', LargeBinary),
)
join_block_filter = Block.join(BlockFilter, BlockFilter.columns.block_hash == Block.columns.block_hash, full=True)
pg_add_block_filter_constraints_and_indexes = [
"ALTER TABLE block_filter ADD PRIMARY KEY (block_hash);",
"ALTER TABLE block_filter ADD PRIMARY KEY (height);",
"ALTER TABLE block_filter ADD CONSTRAINT fk_block_filter"
" FOREIGN KEY(block_hash) "
" REFERENCES block(block_hash) "
" ON DELETE CASCADE;"
" FOREIGN KEY (height) REFERENCES block (height) ON DELETE CASCADE;",
]
BlockGroupFilter = Table(
'block_group_filter', metadata,
Column('height', Integer),
Column('factor', SmallInteger),
Column('address_filter', LargeBinary),
)
TX = Table(
'tx', metadata,
Column('block_hash', LargeBinary, nullable=True),
Column('tx_hash', LargeBinary, primary_key=True),
Column('raw', LargeBinary),
Column('height', Integer),
@ -76,15 +83,34 @@ TX = Table(
Column('day', Integer, nullable=True),
Column('is_verified', Boolean, server_default='FALSE'),
Column('purchased_claim_hash', LargeBinary, nullable=True),
Column('tx_filter', LargeBinary, nullable=True)
)
pg_add_tx_constraints_and_indexes = [
"ALTER TABLE tx ADD PRIMARY KEY (tx_hash);",
]
TXFilter = Table(
'tx_filter', metadata,
Column('tx_hash', LargeBinary, primary_key=True),
Column('height', Integer),
Column('address_filter', LargeBinary),
)
pg_add_tx_filter_constraints_and_indexes = [
"ALTER TABLE tx_filter ADD PRIMARY KEY (tx_hash);",
"ALTER TABLE tx_filter ADD CONSTRAINT fk_tx_filter"
" FOREIGN KEY (tx_hash) REFERENCES tx (tx_hash) ON DELETE CASCADE;"
]
MempoolFilter = Table(
'mempool_filter', metadata,
Column('filter_number', Integer),
Column('mempool_filter', LargeBinary),
)
TXO = Table(
'txo', metadata,
Column('tx_hash', LargeBinary, ForeignKey(TX.columns.tx_hash)),
@ -114,7 +140,6 @@ TXO = Table(
txo_join_account = TXO.join(AccountAddress, TXO.columns.address == AccountAddress.columns.address)
pg_add_txo_constraints_and_indexes = [
"ALTER TABLE txo ADD PRIMARY KEY (txo_hash);",
# find appropriate channel public key for signing a content claim
@ -133,7 +158,7 @@ pg_add_txo_constraints_and_indexes = [
f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};",
f"CREATE INDEX txo_spent_supports_by_height ON txo (spent_height DESC) "
f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};",
"CREATE INDEX txo_tx_height ON txo (height);"
"CREATE INDEX txo_height ON txo (height);",
]
@ -148,10 +173,9 @@ TXI = Table(
txi_join_account = TXI.join(AccountAddress, TXI.columns.address == AccountAddress.columns.address)
pg_add_txi_constraints_and_indexes = [
"ALTER TABLE txi ADD PRIMARY KEY (txo_hash);",
"CREATE INDEX txi_tx_height ON txi (height);"
"CREATE INDEX txi_height ON txi (height);",
]
@ -214,14 +238,12 @@ Claim = Table(
Column('trending_global', BigInteger, server_default='0'),
)
Tag = Table(
'tag', metadata,
Column('claim_hash', LargeBinary),
Column('tag', Text),
)
pg_add_claim_and_tag_constraints_and_indexes = [
"ALTER TABLE claim ADD PRIMARY KEY (claim_hash);",
# for checking if claim is up-to-date
@ -259,7 +281,6 @@ Support = Table(
Column('is_signature_valid', Boolean, nullable=True),
)
pg_add_support_constraints_and_indexes = [
"ALTER TABLE support ADD PRIMARY KEY (txo_hash);",
# used to count()/sum() supports signed by channel

View file

@ -769,10 +769,23 @@ class EventGenerator:
yield from self.blocks_main_start()
for block_file in self.block_files:
yield from self.blocks_file(*block_file)
if self.initial_sync:
yield from self.blocks_indexes()
else:
yield from self.blocks_vacuum()
yield from self.blocks_main_finish()
yield from self.spends_steps()
yield from self.filters_init()
if self.block_files:
yield from self.filters_main_start()
yield from self.filters_generate()
if self.initial_sync:
yield from self.filters_indexes()
else:
yield from self.filters_vacuum()
yield from self.filters_main_finish()
if self.claims:
if not self.initial_sync:
yield from self.claims_init()
@ -845,10 +858,69 @@ class EventGenerator:
"data": {"id": file, "done": step}
}
def blocks_indexes(self):
yield from self.generate(
"blockchain.sync.blocks.indexes", ("steps",), 0, None, (2,), (1,)
)
def blocks_vacuum(self):
yield from self.generate(
"blockchain.sync.blocks.vacuum", ("steps",), 0, None, (1,), (1,)
)
def filters_init(self):
yield from self.generate("blockchain.sync.filters.init", ("steps",), 0, None, (2,), (1,))
def filters_main_start(self):
yield {
"event": "blockchain.sync.filters.main",
"data": {
"id": 0, "done": (0,),
"total": ((self.end_height-self.start_height)+1,),
"units": ("blocks",)}
}
@staticmethod
def filters_main_finish():
yield {
"event": "blockchain.sync.filters.main",
"data": {"id": 0, "done": (-1,)}
}
def filters_generate(self):
#yield from self.generate(
# "blockchain.sync.filters.generate", ("blocks",), 0,
# f"generate filters 0-{blocks-1}", (blocks,), (100,)
#)
blocks = (self.end_height-self.start_height)+1
yield {
"event": "blockchain.sync.filters.generate",
"data": {
"id": self.start_height, "done": (0,),
"total": (blocks,),
"units": ("blocks",),
"label": f"generate filters {self.start_height}-{self.end_height}",
}
}
yield {
"event": "blockchain.sync.filters.generate",
"data": {"id": self.start_height, "done": (blocks,)}
}
def filters_indexes(self):
yield from self.generate(
"blockchain.sync.filters.indexes", ("steps",), 0, None, (6,), (1,)
)
def filters_vacuum(self):
yield from self.generate(
"blockchain.sync.filters.vacuum", ("steps",), 0, None, (2,), (1,)
)
def spends_steps(self):
yield from self.generate(
"blockchain.sync.spends.main", ("steps",), 0, None,
(15 if self.initial_sync else 5,),
(17 if self.initial_sync else 5,),
(1,)
)

View file

@ -372,7 +372,6 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
generate = not os.path.exists(self.TEST_DATA_CACHE_DIR)
self.db = await self.make_db(self.chain)
self.chain.ledger.conf.spv_address_filters = False
self.sync = BlockchainSync(self.chain, self.db)
if not generate:

View file

@ -0,0 +1,106 @@
from unittest import TestCase
from lbry.blockchain.sync.filter_builder import (
FilterBuilder as FB, GroupFilter as GF, split_range_into_10k_batches as split
)
class TestFilterGenerationComponents(TestCase):
def test_split_range_into_10k_batches(self):
# single block (same start-end)
self.assertEqual(split(901_123, 901_123), [[901_123, 901_123]])
# spans a 10k split
self.assertEqual(split(901_123, 911_123), [[901_123, 909_999], [910_000, 911_123]])
# starts on last element before split
self.assertEqual(split(909_999, 911_123), [[909_999, 909_999], [910_000, 911_123]])
# starts on first element after split
self.assertEqual(split(910_000, 911_123), [[910_000, 911_123]])
# ends on last element before split
self.assertEqual(split(901_123, 909_999), [[901_123, 909_999]])
# ends on first element after split
self.assertEqual(split(901_123, 910_000), [[901_123, 909_999], [910_000, 910_000]])
# initial sync from 0 onwards
self.assertEqual(split(0, 37645), [
[0, 9_999],
[10_000, 19_999],
[20_000, 29_999],
[30_000, 37645]
])
def test_group_filter_coverage(self):
# single block (same start-end)
self.assertEqual(GF(1893, 1898, 2).coverage, [])
# spans a group split
self.assertEqual(GF(1893, 1905, 2).coverage, [1800])
# starts on last element before split and
self.assertEqual(GF(1799, 1915, 2).coverage, [1700, 1800])
# starts on first element after split
self.assertEqual(GF(1800, 1915, 2).coverage, [1800])
# ends on last element before split
self.assertEqual(GF(1893, 1899, 2).coverage, [1800])
# ends on first element after split
self.assertEqual(GF(1899, 1900, 2).coverage, [1800])
self.assertEqual(GF(1599, 1899, 2).coverage, [1500, 1600, 1700, 1800])
self.assertEqual(GF(1600, 1899, 2).coverage, [1600, 1700, 1800])
def test_group_filter_add_tx(self):
gf = GF(1898, 2002, 2)
gf.add(1798, ['a']) # outside range
gf.add(1800, ['b']) # first element in group 1800
gf.add(1801, ['c'])
gf.add(1898, ['d'])
gf.add(1899, ['e']) # last element in group 1800
gf.add(1900, ['f']) # first element in group 1900
gf.add(1901, ['g'])
gf.add(2001, ['h']) # outside range
self.assertEqual(gf.groups, {
1800: {'b', 'c', 'd', 'e'},
1900: {'f', 'g'}
})
def test_filter_builder_query_heights(self):
self.assertEqual(FB(893, 898).query_heights, (893, 898))
self.assertEqual(FB(893, 899).query_heights, (800, 899))
self.assertEqual(FB(913, 998).query_heights, (913, 998))
self.assertEqual(FB(913, 999).query_heights, (0, 999))
self.assertEqual(FB(1_913, 1_999).query_heights, (1_000, 1_999))
self.assertEqual(FB(9_913, 9_998).query_heights, (9_913, 9_998))
self.assertEqual(FB(9_913, 9_999).query_heights, (0, 9_999))
self.assertEqual(FB(19_913, 19_999).query_heights, (10_000, 19_999))
self.assertEqual(FB(819_913, 819_999).query_heights, (810_000, 819_999))
def test_filter_builder_add(self):
fb = FB(818_813, 819_999)
self.assertEqual(fb.query_heights, (810_000, 819_999))
self.assertEqual(fb.group_filters[0].coverage, [810_000])
self.assertEqual(fb.group_filters[1].coverage, [818_000, 819_000])
self.assertEqual(fb.group_filters[2].coverage, [
818_800, 818_900, 819_000, 819_100, 819_200, 819_300,
819_400, 819_500, 819_600, 819_700, 819_800, 819_900
])
fb.add(b'beef0', 810_000, ['a'])
fb.add(b'beef1', 815_001, ['b'])
fb.add(b'beef2', 818_412, ['c'])
fb.add(b'beef3', 818_812, ['d'])
fb.add(b'beef4', 818_813, ['e'])
fb.add(b'beef5', 819_000, ['f'])
fb.add(b'beef6', 819_999, ['g'])
fb.add(b'beef7', 819_999, ['h'])
fb.add(b'beef8', 820_000, ['i'])
self.assertEqual(fb.group_filters[0].groups, {
810_000: {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'}
})
self.assertEqual(fb.group_filters[1].groups, {
818_000: {'c', 'd', 'e'},
819_000: {'f', 'g', 'h'}
})
self.assertEqual(fb.group_filters[2].groups[818_800], {'d', 'e'})
self.assertEqual(fb.group_filters[2].groups[819_000], {'f'})
self.assertEqual(fb.group_filters[2].groups[819_900], {'g', 'h'})
self.assertEqual(fb.block_filters, {818813: {'e'}, 819000: {'f'}, 819999: {'g', 'h'}})
self.assertEqual(fb.tx_filters, [
(b'beef4', 818813, ['e']),
(b'beef5', 819000, ['f']),
(b'beef6', 819999, ['g']),
(b'beef7', 819999, ['h'])
])