lbry-sdk/lbry/blockchain/database.py

238 lines
9.6 KiB
Python
Raw Normal View History

2020-04-12 02:15:04 +02:00
import os.path
2020-05-01 15:28:51 +02:00
import asyncio
2020-04-12 02:15:04 +02:00
import sqlite3
2020-06-19 20:28:34 +02:00
from typing import List, Optional
2020-05-01 15:28:51 +02:00
from concurrent.futures import ThreadPoolExecutor
2020-06-19 20:28:34 +02:00
from lbry.schema.url import normalize_name
from .bcd_data_stream import BCDataStream
2020-04-12 02:15:04 +02:00
2020-05-01 15:28:51 +02:00
FILES = [
'block_index',
'claims'
]
2020-04-12 02:15:04 +02:00
2020-06-19 20:28:34 +02:00
class FindShortestID:
__slots__ = 'short_id', 'new_id'
def __init__(self):
self.short_id = ''
self.new_id = None
def step(self, other_id, new_id):
other_id = other_id[::-1].hex()
if self.new_id is None:
self.new_id = new_id[::-1].hex()
for i in range(len(self.new_id)):
if other_id[i] != self.new_id[i]:
if i > len(self.short_id)-1:
self.short_id = self.new_id[:i+1]
break
def finalize(self):
return self.short_id
2020-05-01 15:28:51 +02:00
class BlockchainDB:
2020-04-12 02:15:04 +02:00
def __init__(self, directory: str):
2020-05-01 15:28:51 +02:00
self.directory = directory
self.connection: Optional[sqlite3.Connection] = None
self.executor: Optional[ThreadPoolExecutor] = None
async def run_in_executor(self, *args):
return await asyncio.get_event_loop().run_in_executor(self.executor, *args)
def sync_open(self):
self.connection = sqlite3.connect(
os.path.join(self.directory, FILES[0]+'.sqlite'),
timeout=60.0 * 5
)
for file in FILES[1:]:
self.connection.execute(
f"ATTACH DATABASE '{os.path.join(self.directory, file+'.sqlite')}' AS {file}"
)
2020-06-19 20:28:34 +02:00
self.connection.create_aggregate("find_shortest_id", 2, FindShortestID)
#self.connection.execute(
2020-06-22 01:50:53 +02:00
# "CREATE INDEX IF NOT EXISTS claim_originalheight_claimid ON claim (originalheight, claimid);"
2020-06-19 20:28:34 +02:00
#)
2020-05-01 15:28:51 +02:00
self.connection.row_factory = sqlite3.Row
2020-04-12 02:15:04 +02:00
2020-05-01 15:28:51 +02:00
async def open(self):
assert self.executor is None, "Database is already open."
self.executor = ThreadPoolExecutor(max_workers=1)
return await self.run_in_executor(self.sync_open)
2020-04-12 02:15:04 +02:00
2020-05-01 15:28:51 +02:00
def sync_close(self):
self.connection.close()
self.connection = None
2020-04-12 02:15:04 +02:00
2020-05-01 15:28:51 +02:00
async def close(self):
if self.executor is not None:
if self.connection is not None:
await self.run_in_executor(self.sync_close)
self.executor.shutdown()
self.executor = None
2020-06-19 20:28:34 +02:00
async def commit(self):
await self.run_in_executor(self.connection.commit)
2020-05-01 15:28:51 +02:00
def sync_execute(self, sql: str, *args):
return self.connection.execute(sql, *args)
2020-06-19 20:28:34 +02:00
async def execute(self, sql: str, *args):
2020-05-01 15:28:51 +02:00
return await self.run_in_executor(self.sync_execute, sql, *args)
2020-06-19 20:28:34 +02:00
def sync_execute_fetchall(self, sql: str, *args) -> List[dict]:
return [dict(r) for r in self.connection.execute(sql, *args).fetchall()]
2020-05-01 15:28:51 +02:00
2020-06-19 20:28:34 +02:00
async def execute_fetchall(self, sql: str, *args) -> List[dict]:
2020-05-01 15:28:51 +02:00
return await self.run_in_executor(self.sync_execute_fetchall, sql, *args)
2020-06-19 20:28:34 +02:00
def sync_get_best_height(self) -> int:
sql = "SELECT MAX(height) FROM block_info"
return self.connection.execute(sql).fetchone()[0]
async def get_best_height(self) -> int:
return await self.run_in_executor(self.sync_get_best_height)
def sync_get_block_files(self, file_number: int = None, start_height: int = None) -> List[dict]:
2020-06-05 06:35:22 +02:00
sql = """
2020-05-20 23:54:38 +02:00
SELECT
file as file_number,
COUNT(hash) as blocks,
SUM(txcount) as txs,
2020-06-19 20:28:34 +02:00
MAX(height) as best_height
2020-06-05 06:35:22 +02:00
FROM block_info
WHERE status&1 AND status&4
"""
args = ()
2020-06-19 20:28:34 +02:00
if file_number is not None and start_height is not None:
sql += "AND file = ? AND height >= ?"
args = (file_number, start_height)
2020-06-05 06:35:22 +02:00
return self.sync_execute_fetchall(sql + " GROUP BY file ORDER BY file ASC;", args)
2020-06-19 20:28:34 +02:00
async def get_block_files(self, file_number: int = None, start_height: int = None) -> List[dict]:
2020-06-05 06:35:22 +02:00
return await self.run_in_executor(
2020-06-19 20:28:34 +02:00
self.sync_get_block_files, file_number, start_height
2020-04-12 02:15:04 +02:00
)
2020-06-19 20:28:34 +02:00
def sync_get_blocks_in_file(self, block_file: int, start_height=0) -> List[dict]:
2020-05-01 15:28:51 +02:00
return self.sync_execute_fetchall(
2020-04-12 02:15:04 +02:00
"""
SELECT datapos as data_offset, height, hash as block_hash, txCount as txs
2020-05-18 14:22:23 +02:00
FROM block_info
2020-06-19 20:28:34 +02:00
WHERE file = ? AND height >= ? AND status&1 AND status&4
2020-05-18 14:22:23 +02:00
ORDER BY datapos ASC;
2020-06-19 20:28:34 +02:00
""", (block_file, start_height)
2020-04-12 02:15:04 +02:00
)
2020-05-01 15:28:51 +02:00
2020-06-19 20:28:34 +02:00
async def get_blocks_in_file(self, block_file: int, start_height=0) -> List[dict]:
return await self.run_in_executor(self.sync_get_blocks_in_file, block_file, start_height)
def sync_get_claim_support_txo_hashes(self, at_height: int) -> set:
return {
r['txID'] + BCDataStream.uint32.pack(r['txN'])
for r in self.connection.execute(
"""
SELECT txID, txN FROM claim WHERE updateHeight = ?
UNION
SELECT txID, txN FROM support WHERE blockHeight = ?
""", (at_height, at_height)
).fetchall()
}
def sync_get_takeover_count(self, start_height: int, end_height: int) -> int:
sql = "SELECT COUNT(*) FROM takeover WHERE height BETWEEN ? AND ?"
return self.connection.execute(sql, (start_height, end_height)).fetchone()[0]
async def get_takeover_count(self, start_height: int, end_height: int) -> int:
return await self.run_in_executor(self.sync_get_takeover_count, start_height, end_height)
def sync_get_takeovers(self, start_height: int, end_height: int) -> List[dict]:
2020-05-01 15:28:51 +02:00
return self.sync_execute_fetchall(
"""
SELECT
2020-06-19 20:28:34 +02:00
takeover.name,
2020-05-01 15:28:51 +02:00
takeover.claimID AS claim_hash,
2020-06-19 20:28:34 +02:00
takeover.height
FROM takeover
WHERE height BETWEEN ? AND ?
ORDER BY height, name
""", (start_height, end_height)
2020-05-01 15:28:51 +02:00
)
2020-06-19 20:28:34 +02:00
async def get_takeovers(self, start_height: int, end_height: int) -> List[dict]:
return await self.run_in_executor(self.sync_get_takeovers, start_height, end_height)
def sync_get_claim_metadata_count(self, start_height: int, end_height: int) -> int:
sql = "SELECT COUNT(*) FROM claim WHERE originalHeight BETWEEN ? AND ?"
return self.connection.execute(sql, (start_height, end_height)).fetchone()[0]
async def get_claim_metadata_count(self, start_height: int, end_height: int) -> int:
return await self.run_in_executor(self.sync_get_claim_metadata_count, start_height, end_height)
def sync_get_claim_metadata(self, start_height: int, end_height: int) -> List[dict]:
return [{
"name": r["name"],
"claim_hash_": r["claimID"],
"activation_height": r["activationHeight"],
"expiration_height": r["expirationHeight"],
"takeover_height": r["takeoverHeight"],
"is_controlling": r["isControlling"],
"short_url": f'{normalize_name(r["name"].decode())}#{r["shortestID"] or r["claimID"][::-1].hex()[0]}',
"short_url_": f'{normalize_name(r["name"].decode())}#{r["shortestID"] or r["claimID"][::-1].hex()[0]}',
} for r in self.sync_execute_fetchall(
2020-05-01 15:28:51 +02:00
"""
SELECT
2020-06-19 20:28:34 +02:00
name, claimID, activationHeight, expirationHeight,
(SELECT
CASE WHEN takeover.claimID = claim.claimID THEN takeover.height END
FROM takeover WHERE takeover.name = claim.name
ORDER BY height DESC LIMIT 1
) AS takeoverHeight,
(SELECT CASE WHEN takeover.claimID = claim.claimID THEN 1 ELSE 0 END
FROM takeover WHERE takeover.name = claim.name
ORDER BY height DESC LIMIT 1
) AS isControlling,
(SELECT find_shortest_id(c.claimid, claim.claimid) FROM claim AS c
WHERE
c.nodename = claim.nodename AND
c.originalheight <= claim.originalheight AND
c.claimid != claim.claimid
) AS shortestID
FROM claim
WHERE originalHeight BETWEEN ? AND ?
2020-06-22 01:50:53 +02:00
ORDER BY originalHeight, claimid
2020-06-19 20:28:34 +02:00
""", (start_height, end_height)
)]
async def get_claim_metadata(self, start_height: int, end_height: int) -> List[dict]:
return await self.run_in_executor(self.sync_get_claim_metadata, start_height, end_height)
def sync_get_support_metadata_count(self, start_height: int, end_height: int) -> int:
sql = "SELECT COUNT(*) FROM support WHERE blockHeight BETWEEN ? AND ?"
return self.connection.execute(sql, (start_height, end_height)).fetchone()[0]
async def get_support_metadata_count(self, start_height: int, end_height: int) -> int:
return await self.run_in_executor(self.sync_get_support_metadata_count, start_height, end_height)
def sync_get_support_metadata(self, start_height: int, end_height: int) -> List[dict]:
return [{
"name": r['name'],
"txo_hash_pk": r['txID'] + BCDataStream.uint32.pack(r['txN']),
"activation_height": r['activationHeight'],
"expiration_height": r['expirationHeight'],
} for r in self.sync_execute_fetchall(
2020-05-01 15:28:51 +02:00
"""
2020-06-19 20:28:34 +02:00
SELECT name, txid, txn, activationHeight, expirationHeight
FROM support WHERE blockHeight BETWEEN ? AND ?
""", (start_height, end_height)
)
]
2020-05-01 15:28:51 +02:00
2020-06-19 20:28:34 +02:00
async def get_support_metadata(self, start_height: int, end_height: int) -> List[dict]:
return await self.run_in_executor(self.sync_get_support_metadata, start_height, end_height)