lbry-sdk/lbry/blockchain/database.py

245 lines
9.8 KiB
Python
Raw Normal View History

2020-04-12 02:15:04 +02:00
import os.path
2020-05-01 15:28:51 +02:00
import asyncio
2020-04-12 02:15:04 +02:00
import sqlite3
2020-06-19 20:28:34 +02:00
from typing import List, Optional
2020-05-01 15:28:51 +02:00
from concurrent.futures import ThreadPoolExecutor
2020-06-19 20:28:34 +02:00
from lbry.schema.url import normalize_name
from .bcd_data_stream import BCDataStream
2020-04-12 02:15:04 +02:00
2020-05-01 15:28:51 +02:00
FILES = [
'claims',
2020-05-01 15:28:51 +02:00
'block_index',
]
2020-04-12 02:15:04 +02:00
2020-07-07 06:20:11 +02:00
def make_short_url(r):
try:
2020-09-23 19:39:24 +02:00
# TODO: we describe it as normalized but the old SDK didnt do that
2020-09-23 07:21:30 +02:00
name = r["name"].decode().replace("\x00", "")
return f'{name}#{r["shortestID"] or r["claimID"][::-1].hex()[0]}'
2020-07-07 06:20:11 +02:00
except UnicodeDecodeError:
# print(f'failed making short url due to name parse error for claim_id: {r["claimID"][::-1].hex()}')
return "INVALID NAME"
2020-07-07 06:20:11 +02:00
2020-06-19 20:28:34 +02:00
class FindShortestID:
__slots__ = 'short_id', 'new_id'
def __init__(self):
self.short_id = ''
self.new_id = None
def step(self, other_id, new_id):
other_id = other_id[::-1].hex()
if self.new_id is None:
self.new_id = new_id[::-1].hex()
for i in range(len(self.new_id)):
if other_id[i] != self.new_id[i]:
if i > len(self.short_id)-1:
self.short_id = self.new_id[:i+1]
break
def finalize(self):
return self.short_id
2020-05-01 15:28:51 +02:00
class BlockchainDB:
2020-04-12 02:15:04 +02:00
def __init__(self, directory: str):
2020-05-01 15:28:51 +02:00
self.directory = directory
self.connection: Optional[sqlite3.Connection] = None
self.executor: Optional[ThreadPoolExecutor] = None
async def run_in_executor(self, *args):
2020-08-20 16:43:44 +02:00
return await asyncio.get_running_loop().run_in_executor(self.executor, *args)
2020-05-01 15:28:51 +02:00
def sync_open(self):
self.connection = sqlite3.connect(
os.path.join(self.directory, FILES[0]+'.sqlite'),
timeout=60.0 * 5
)
for file in FILES[1:]:
self.connection.execute(
f"ATTACH DATABASE '{os.path.join(self.directory, file+'.sqlite')}' AS {file}"
)
2020-06-19 20:28:34 +02:00
self.connection.create_aggregate("find_shortest_id", 2, FindShortestID)
self.connection.execute("CREATE INDEX IF NOT EXISTS claim_originalheight ON claim (originalheight);")
self.connection.execute("CREATE INDEX IF NOT EXISTS claim_updateheight ON claim (updateheight);")
self.connection.execute("create index IF NOT EXISTS support_blockheight on support (blockheight);")
2020-05-01 15:28:51 +02:00
self.connection.row_factory = sqlite3.Row
2020-04-12 02:15:04 +02:00
2020-05-01 15:28:51 +02:00
async def open(self):
assert self.executor is None, "Database is already open."
self.executor = ThreadPoolExecutor(max_workers=1)
return await self.run_in_executor(self.sync_open)
2020-04-12 02:15:04 +02:00
2020-05-01 15:28:51 +02:00
def sync_close(self):
self.connection.close()
self.connection = None
2020-04-12 02:15:04 +02:00
2020-05-01 15:28:51 +02:00
async def close(self):
if self.executor is not None:
if self.connection is not None:
await self.run_in_executor(self.sync_close)
self.executor.shutdown()
self.executor = None
2020-06-19 20:28:34 +02:00
async def commit(self):
await self.run_in_executor(self.connection.commit)
2020-05-01 15:28:51 +02:00
def sync_execute(self, sql: str, *args):
return self.connection.execute(sql, *args)
2020-06-19 20:28:34 +02:00
async def execute(self, sql: str, *args):
2020-05-01 15:28:51 +02:00
return await self.run_in_executor(self.sync_execute, sql, *args)
2020-06-19 20:28:34 +02:00
def sync_execute_fetchall(self, sql: str, *args) -> List[dict]:
2020-07-06 05:03:45 +02:00
return self.connection.execute(sql, *args).fetchall()
2020-05-01 15:28:51 +02:00
2020-06-19 20:28:34 +02:00
async def execute_fetchall(self, sql: str, *args) -> List[dict]:
2020-05-01 15:28:51 +02:00
return await self.run_in_executor(self.sync_execute_fetchall, sql, *args)
2020-06-19 20:28:34 +02:00
def sync_get_best_height(self) -> int:
sql = "SELECT MAX(height) FROM block_info"
return self.connection.execute(sql).fetchone()[0]
async def get_best_height(self) -> int:
return await self.run_in_executor(self.sync_get_best_height)
def sync_get_block_files(self, file_number: int = None, start_height: int = None) -> List[dict]:
2020-06-05 06:35:22 +02:00
sql = """
2020-05-20 23:54:38 +02:00
SELECT
file as file_number,
COUNT(hash) as blocks,
SUM(txcount) as txs,
MAX(height) as best_height,
MIN(height) as start_height
2020-06-05 06:35:22 +02:00
FROM block_info
WHERE status&1 AND status&4
"""
args = ()
2020-06-19 20:28:34 +02:00
if file_number is not None and start_height is not None:
sql += "AND file = ? AND height >= ?"
args = (file_number, start_height)
2020-07-06 05:03:45 +02:00
return [dict(r) for r in self.sync_execute_fetchall(sql + " GROUP BY file ORDER BY file ASC;", args)]
2020-06-05 06:35:22 +02:00
2020-06-19 20:28:34 +02:00
async def get_block_files(self, file_number: int = None, start_height: int = None) -> List[dict]:
2020-06-05 06:35:22 +02:00
return await self.run_in_executor(
2020-06-19 20:28:34 +02:00
self.sync_get_block_files, file_number, start_height
2020-04-12 02:15:04 +02:00
)
2020-06-19 20:28:34 +02:00
def sync_get_blocks_in_file(self, block_file: int, start_height=0) -> List[dict]:
2020-07-06 05:03:45 +02:00
return [dict(r) for r in self.sync_execute_fetchall(
2020-04-12 02:15:04 +02:00
"""
SELECT datapos as data_offset, height, hash as block_hash, txCount as txs
2020-05-18 14:22:23 +02:00
FROM block_info
2020-06-19 20:28:34 +02:00
WHERE file = ? AND height >= ? AND status&1 AND status&4
2020-05-18 14:22:23 +02:00
ORDER BY datapos ASC;
2020-06-19 20:28:34 +02:00
""", (block_file, start_height)
2020-07-06 05:03:45 +02:00
)]
2020-05-01 15:28:51 +02:00
2020-06-19 20:28:34 +02:00
async def get_blocks_in_file(self, block_file: int, start_height=0) -> List[dict]:
return await self.run_in_executor(self.sync_get_blocks_in_file, block_file, start_height)
def sync_get_claim_support_txo_hashes(self, at_height: int) -> set:
return {
r['txID'] + BCDataStream.uint32.pack(r['txN'])
for r in self.connection.execute(
"""
SELECT txID, txN FROM claim WHERE updateHeight = ?
UNION
SELECT txID, txN FROM support WHERE blockHeight = ?
""", (at_height, at_height)
).fetchall()
}
def sync_get_takeover_count(self, start_height: int, end_height: int) -> int:
2020-07-06 05:03:45 +02:00
sql = """
SELECT COUNT(*) FROM claim WHERE name IN (
2020-07-12 00:18:33 +02:00
SELECT name FROM takeover
WHERE name IS NOT NULL AND height BETWEEN ? AND ?
2020-07-06 05:03:45 +02:00
)
""", (start_height, end_height)
return self.connection.execute(*sql).fetchone()[0]
2020-06-19 20:28:34 +02:00
async def get_takeover_count(self, start_height: int, end_height: int) -> int:
return await self.run_in_executor(self.sync_get_takeover_count, start_height, end_height)
def sync_get_takeovers(self, start_height: int, end_height: int) -> List[dict]:
2020-07-06 05:03:45 +02:00
sql = """
SELECT name, claimID, MAX(height) AS height FROM takeover
2020-07-12 00:18:33 +02:00
WHERE name IS NOT NULL AND height BETWEEN ? AND ?
2020-07-06 05:03:45 +02:00
GROUP BY name
""", (start_height, end_height)
return [{
'normalized': normalize_name(r['name'].decode()),
'claim_hash': r['claimID'],
'height': r['height']
} for r in self.sync_execute_fetchall(*sql)]
2020-05-01 15:28:51 +02:00
2020-06-19 20:28:34 +02:00
async def get_takeovers(self, start_height: int, end_height: int) -> List[dict]:
return await self.run_in_executor(self.sync_get_takeovers, start_height, end_height)
def sync_get_claim_metadata_count(self, start_height: int, end_height: int) -> int:
sql = "SELECT COUNT(*) FROM claim WHERE originalHeight BETWEEN ? AND ?"
return self.connection.execute(sql, (start_height, end_height)).fetchone()[0]
async def get_claim_metadata_count(self, start_height: int, end_height: int) -> int:
return await self.run_in_executor(self.sync_get_claim_metadata_count, start_height, end_height)
2020-07-06 05:03:45 +02:00
def sync_get_claim_metadata(self, claim_hashes) -> List[dict]:
sql = f"""
2020-06-22 02:14:14 +02:00
SELECT
2020-07-06 05:03:45 +02:00
name, claimID, activationHeight, expirationHeight, originalHeight,
2020-06-22 02:14:14 +02:00
(SELECT
CASE WHEN takeover.claimID = claim.claimID THEN takeover.height END
FROM takeover WHERE takeover.name = claim.name
ORDER BY height DESC LIMIT 1
) AS takeoverHeight,
(SELECT find_shortest_id(c.claimid, claim.claimid) FROM claim AS c
WHERE
c.nodename = claim.nodename AND
c.originalheight <= claim.originalheight AND
c.claimid != claim.claimid
) AS shortestID
FROM claim
2020-07-06 05:03:45 +02:00
WHERE claimID IN ({','.join(['?' for _ in claim_hashes])})
ORDER BY claimID
""", claim_hashes
2020-06-19 20:28:34 +02:00
return [{
"name": r["name"],
2020-07-06 05:03:45 +02:00
"claim_hash": r["claimID"],
2020-06-19 20:28:34 +02:00
"activation_height": r["activationHeight"],
"expiration_height": r["expirationHeight"],
"takeover_height": r["takeoverHeight"],
2020-07-06 05:03:45 +02:00
"creation_height": r["originalHeight"],
2020-07-07 06:20:11 +02:00
"short_url": make_short_url(r),
2020-06-22 02:14:14 +02:00
} for r in self.sync_execute_fetchall(*sql)]
2020-06-19 20:28:34 +02:00
async def get_claim_metadata(self, start_height: int, end_height: int) -> List[dict]:
return await self.run_in_executor(self.sync_get_claim_metadata, start_height, end_height)
def sync_get_support_metadata_count(self, start_height: int, end_height: int) -> int:
sql = "SELECT COUNT(*) FROM support WHERE blockHeight BETWEEN ? AND ?"
return self.connection.execute(sql, (start_height, end_height)).fetchone()[0]
async def get_support_metadata_count(self, start_height: int, end_height: int) -> int:
return await self.run_in_executor(self.sync_get_support_metadata_count, start_height, end_height)
def sync_get_support_metadata(self, start_height: int, end_height: int) -> List[dict]:
2020-06-22 02:14:14 +02:00
sql = """
SELECT name, txid, txn, activationHeight, expirationHeight
FROM support WHERE blockHeight BETWEEN ? AND ?
""", (start_height, end_height)
2020-06-19 20:28:34 +02:00
return [{
"name": r['name'],
"txo_hash_pk": r['txID'] + BCDataStream.uint32.pack(r['txN']),
"activation_height": r['activationHeight'],
"expiration_height": r['expirationHeight'],
2020-06-22 02:14:14 +02:00
} for r in self.sync_execute_fetchall(*sql)]
2020-05-01 15:28:51 +02:00
2020-06-19 20:28:34 +02:00
async def get_support_metadata(self, start_height: int, end_height: int) -> List[dict]:
return await self.run_in_executor(self.sync_get_support_metadata, start_height, end_height)