reproduce issue with lbrycrd -reindex

This commit is contained in:
Lex Berezhny 2020-02-14 12:19:55 -05:00
parent fd3448ffb8
commit 4c0fbb84d6
7 changed files with 516 additions and 1 deletions

1
.gitignore vendored
View file

@ -14,4 +14,5 @@ _trial_temp/
/tests/integration/blockchain/files /tests/integration/blockchain/files
/tests/.coverage.* /tests/.coverage.*
/lbry/blockchain/bin
/lbry/wallet/bin /lbry/wallet/bin

View file

@ -0,0 +1,2 @@
from .sync import BlockSync
from .lbrycrd import Lbrycrd

47
lbry/blockchain/block.py Normal file
View file

@ -0,0 +1,47 @@
import struct
from lbry.crypto.hash import double_sha256
from lbry.wallet.transaction import Transaction
from lbry.wallet.bcd_data_stream import BCDataStream
ZERO_BLOCK = bytes((0,)*32)
class Block:
__slots__ = (
'version', 'block_hash', 'prev_block_hash',
'merkle_root', 'claim_trie_root', 'timestamp',
'bits', 'nonce', 'txs'
)
def __init__(self, stream):
stream.read_uint32() # block size
header = stream.data.read(112)
version, = struct.unpack('<I', header[:4])
timestamp, bits, nonce = struct.unpack('<III', header[100:112])
self.version = version
self.block_hash = double_sha256(header)
self.prev_block_hash = header[4:36]
self.merkle_root = header[36:68]
self.claim_trie_root = header[68:100][::-1]
self.timestamp = timestamp
self.bits = bits
self.nonce = nonce
tx_count = stream.read_compact_size()
self.txs = [
Transaction(position=i)._deserialize(stream)
for i in range(tx_count)
]
@property
def is_first_block(self):
return self.prev_block_hash == ZERO_BLOCK
def read_blocks(block_file):
with open(block_file, 'rb') as fp:
stream = BCDataStream(fp=fp)
#while stream.read_uint32() == 4054508794:
while stream.read_uint32() == 3517637882:
yield Block(stream)

124
lbry/blockchain/db.py Normal file
View file

@ -0,0 +1,124 @@
import os
import asyncio
from collections import namedtuple
import apsw
DDL = """
pragma journal_mode=WAL;
create table block (
block_hash bytes not null,
previous_hash bytes not null,
height int
);
create table tx (
block_hash integer not null,
position integer not null,
tx_hash bytes not null
);
create table txi (
block_hash bytes not null,
tx_hash bytes not null,
txo_hash bytes not null
);
create table claim (
txo_hash bytes not null,
claim_hash bytes not null,
claim_name text not null,
amount integer not null,
height integer
);
create table claim_history (
block_hash bytes not null,
tx_hash bytes not null,
tx_position integer not null,
txo_hash bytes not null,
claim_hash bytes not null,
claim_name text not null,
action integer not null,
amount integer not null,
height integer,
is_spent bool
);
create table support (
block_hash bytes not null,
tx_hash bytes not null,
txo_hash bytes not null,
claim_hash bytes not null,
amount integer not null
);
"""
class BlockchainDB:
__slots__ = 'db', 'directory'
def __init__(self, path: str):
self.db = None
self.directory = path
def open(self):
self.db = apsw.Connection(
os.path.join(self.directory, 'blockchain.db'),
flags=(
apsw.SQLITE_OPEN_READWRITE |
apsw.SQLITE_OPEN_CREATE |
apsw.SQLITE_OPEN_URI
)
)
def exec_factory(cursor, statement, bindings):
tpl = namedtuple('row', (d[0] for d in cursor.getdescription()))
cursor.setrowtrace(lambda cursor, row: tpl(*row))
return True
self.db.setexectrace(exec_factory)
self.execute(DDL)
self.execute(f"ATTACH {os.path.join(self._db_path, 'block_index.sqlite')} AS block_index")
def close(self):
if self.db is not None:
self.db.close()
def execute(self, *args):
return self.db.cursor().execute(*args)
def executemany(self, *args):
return self.db.cursor().executemany(*args)
def begin(self):
self.execute('begin;')
def commit(self):
self.execute('commit;')
def get_blocks(self):
pass
class AsyncBlockchainDB:
__slots__ = 'db',
def __init__(self, db: BlockchainDB):
self.db = db
@classmethod
def from_path(cls, path: str):
return cls(BlockchainDB(path))
@staticmethod
async def run_in_executor(func, *args):
return await asyncio.get_running_loop().run_in_executor(
None, func, *args
)
async def open(self):
return await self.run_in_executor(self.db.open)
async def close(self):
return await self.run_in_executor(self.db.close)
async def get_blocks(self):
return await self.run_in_executor(self.db.get_blocks)

208
lbry/blockchain/lbrycrd.py Normal file
View file

@ -0,0 +1,208 @@
import os
import struct
import shutil
import asyncio
import logging
import zipfile
import tempfile
import urllib.request
from typing import Optional
from binascii import hexlify
import aiohttp
import zmq
import zmq.asyncio
from lbry.wallet.stream import StreamController
log = logging.getLogger(__name__)
download_url = (
# 'https://github.com/lbryio/lbrycrd/releases/download/v0.17.4.2/lbrycrd-linux-1742.zip'
'https://build.lbry.io/lbrycrd/fix_flush_to_not_corrupt/lbrycrd-linux.zip'
)
class Process(asyncio.SubprocessProtocol):
IGNORE_OUTPUT = [
b'keypool keep',
b'keypool reserve',
b'keypool return',
]
def __init__(self):
self.ready = asyncio.Event()
self.stopped = asyncio.Event()
self.log = log.getChild('blockchain')
def pipe_data_received(self, fd, data):
if self.log and not any(ignore in data for ignore in self.IGNORE_OUTPUT):
if b'Error:' in data:
self.log.error(data.decode())
else:
self.log.info(data.decode())
if b'Error:' in data:
self.ready.set()
raise SystemError(data.decode())
if b'Done loading' in data:
self.ready.set()
def process_exited(self):
self.stopped.set()
self.ready.set()
class Lbrycrd:
def __init__(self, path=None):
self.data_path = path
self.bin_dir = os.path.join(os.path.dirname(__file__), 'bin')
self.daemon_bin = os.path.join(self.bin_dir, 'lbrycrdd')
self.cli_bin = os.path.join(self.bin_dir, 'lbrycrd-cli')
self.protocol = None
self.transport = None
self.hostname = 'localhost'
self.peerport = 9246 + 2 # avoid conflict with default peer port
self.rpcport = 9245 + 2 # avoid conflict with default rpc port
self.rpcuser = 'rpcuser'
self.rpcpassword = 'rpcpassword'
self.session: Optional[aiohttp.ClientSession] = None
self.subscribed = False
self.subscription: Optional[asyncio.Task] = None
self._on_block_controller = StreamController()
self.on_block = self._on_block_controller.stream
self.on_block.listen(lambda e: log.info('%s %s', hexlify(e['hash']), e['msg']))
@classmethod
def regtest(cls):
return cls(tempfile.mkdtemp())
@property
def rpc_url(self):
return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.rpcport}/'
@property
def exists(self):
return (
os.path.exists(self.cli_bin) and
os.path.exists(self.daemon_bin)
)
async def download(self):
downloaded_file = os.path.join(
self.bin_dir, download_url[download_url.rfind('/')+1:]
)
if not os.path.exists(self.bin_dir):
os.mkdir(self.bin_dir)
if not os.path.exists(downloaded_file):
log.info('Downloading: %s', download_url)
async with aiohttp.ClientSession() as session:
async with session.get(download_url) as response:
with open(downloaded_file, 'wb') as out_file:
while True:
chunk = await response.content.read(4096)
if not chunk:
break
out_file.write(chunk)
with urllib.request.urlopen(download_url) as response:
with open(downloaded_file, 'wb') as out_file:
shutil.copyfileobj(response, out_file)
log.info('Extracting: %s', downloaded_file)
with zipfile.ZipFile(downloaded_file) as dotzip:
dotzip.extractall(self.bin_dir)
# zipfile bug https://bugs.python.org/issue15795
os.chmod(self.cli_bin, 0o755)
os.chmod(self.daemon_bin, 0o755)
return self.exists
async def ensure(self):
return self.exists or await self.download()
async def start(self, *args):
loop = asyncio.get_event_loop()
command = [
self.daemon_bin,
f'-datadir={self.data_path}', '-printtoconsole', '-regtest', '-server',
f'-rpcuser={self.rpcuser}', f'-rpcpassword={self.rpcpassword}', f'-rpcport={self.rpcport}',
f'-port={self.peerport}', '-zmqpubhashblock=tcp://127.0.0.1:29000', *args
]
log.info(' '.join(command))
self.transport, self.protocol = await loop.subprocess_exec(
Process, *command
)
await self.protocol.ready.wait()
assert not self.protocol.stopped.is_set()
self.session = aiohttp.ClientSession()
async def stop(self, cleanup=True):
try:
await self.session.close()
self.transport.terminate()
await self.protocol.stopped.wait()
self.transport.close()
finally:
if cleanup:
await self.cleanup()
async def cleanup(self):
await asyncio.get_running_loop().run_in_executor(
None, shutil.rmtree, self.data_path, True
)
def subscribe(self):
if not self.subscribed:
self.subscribed = True
ctx = zmq.asyncio.Context.instance()
sock = ctx.socket(zmq.SUB)
sock.connect("tcp://127.0.0.1:29000")
sock.subscribe("hashblock")
self.subscription = asyncio.create_task(self.subscription_handler(sock))
async def subscription_handler(self, sock):
try:
while self.subscribed:
msg = await sock.recv_multipart()
self._on_block_controller.add({
'hash': msg[1],
'msg': struct.unpack('<I', msg[2])[0]
})
except asyncio.CancelledError:
sock.close()
raise
def unsubscribe(self):
if self.subscribed:
self.subscribed = False
self.subscription.cancel()
self.subscription = None
async def rpc(self, method, params=None):
message = {
"jsonrpc": "1.0",
"id": "1",
"method": method,
"params": params or []
}
async with self.session.post(self.rpc_url, json=message) as resp:
try:
result = await resp.json()
except aiohttp.ContentTypeError as e:
raise Exception(await resp.text()) from e
if not result['error']:
return result['result']
else:
result['error'].update(method=method, params=params)
raise Exception(result['error'])
async def generate(self, blocks):
return await self.rpc("generate", [blocks])
async def claim_name(self, name, data, amount):
return await self.rpc("claimname", [name, data, amount])

109
lbry/blockchain/sync.py Normal file
View file

@ -0,0 +1,109 @@
import os
import time
import logging
from glob import glob
from concurrent.futures import ProcessPoolExecutor
from .lbrycrd import Lbrycrd
from .block import read_blocks
from .db import AsyncBlockchainDB
log = logging.getLogger(__name__)
class BlockSync:
def __init__(self, chain: Lbrycrd):
self.chain = chain
self.db = AsyncBlockchainDB.from_path(os.path.join(self.chain.data_path, 'regtest'))
async def start(self):
await self.db.open()
async def stop(self):
await self.db.close()
async def cleanup(self):
pass
def process_file(block_file):
blocks, txs, claims, supports, spends = [], [], [], [], []
for block in read_blocks(block_file):
for tx in block.txs:
txs.append((block.block_hash, tx.position, tx.hash))
for txi in tx.inputs:
if not txi.is_coinbase:
spends.append((block.block_hash, tx.hash, txi.txo_ref.hash))
for output in tx.outputs:
try:
if output.is_support:
supports.append((
block.block_hash, tx.hash, output.ref.hash, output.claim_hash, output.amount
))
elif output.script.is_claim_name:
claims.append((
block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash,
output.claim_name, 1, output.amount, None, None
))
elif output.script.is_update_claim:
claims.append((
block.block_hash, tx.hash, tx.position, output.ref.hash, output.claim_hash,
output.claim_name, 2, output.amount, None, None
))
except:
pass
blocks.append((block.block_hash, block.prev_block_hash, 0 if block.is_first_block else None))
sql = db.get()
sql.execute('begin;')
sql.executemany("insert into block values (?, ?, ?)", blocks)
sql.execute('commit;')
sql.execute('begin;')
sql.executemany("insert into tx values (?, ?, ?)", txs)
sql.execute('commit;')
sql.execute('begin;')
sql.executemany("insert into txi values (?, ?, ?)", spends)
sql.execute('commit;')
sql.execute('begin;')
sql.executemany("insert into support values (?, ?, ?, ?, ?)", supports)
sql.execute('commit;')
sql.execute('begin;')
sql.executemany("insert into claim_history values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", claims)
sql.execute('commit;')
return len(blocks), len(txs)
async def main():
#lbrycrd = os.path.expanduser('~/.lbrycrd/')
output_db = '/tmp/fast_sync.db'
if os.path.exists(output_db):
os.remove(output_db)
initializer(output_db)
create_db()
executor = ProcessPoolExecutor(
6, initializer=initializer, initargs=(output_db,)
)
file_paths = glob(os.path.join(lbrycrd, 'regtest', 'blocks', 'blk*.dat'))
file_paths.sort()
total_blocks, total_txs = 0, 0
start = time.perf_counter()
for file_path, (blocks, txs) in zip(file_paths, executor.map(process_file, file_paths)):
print(f"{file_path} {blocks}")
total_blocks += blocks
total_txs += txs
print(f'blocks: {total_blocks} (txs: {total_txs}) in {time.perf_counter()-start}s')
print('cleaning chain: set block heights and delete forks')
#clean_chain()
print(f'done in {time.perf_counter()-start}s')
await blockchain.stop()

View file

@ -0,0 +1,24 @@
import asyncio
from lbry.blockchain import Lbrycrd
async def main():
chain = Lbrycrd.regtest()
print(f'Generating: {chain.data_path}')
await chain.ensure()
await chain.start()
chain.subscribe()
await chain.generate(200)
await chain.on_block.where(lambda e: e['msg'] == 199)
await chain.claim_name(f'foo', 'beef' * 4000, '0.001')
await chain.generate(1)
await chain.stop(False)
await asyncio.sleep(3) # give lbrycrd time to stop
await chain.start('-reindex')
await chain.generate(1)
await chain.stop(False)
asyncio.run(main())