lbry-sdk/lbry/blockchain/database.py
2020-05-01 09:28:51 -04:00

123 lines
4 KiB
Python

import os.path
import asyncio
import sqlite3
from typing import Optional
from concurrent.futures import ThreadPoolExecutor
FILES = [
'block_index',
'claims'
]
class BlockchainDB:
def __init__(self, directory: str):
self.directory = directory
self.connection: Optional[sqlite3.Connection] = None
self.executor: Optional[ThreadPoolExecutor] = None
async def run_in_executor(self, *args):
return await asyncio.get_event_loop().run_in_executor(self.executor, *args)
def sync_open(self):
self.connection = sqlite3.connect(
os.path.join(self.directory, FILES[0]+'.sqlite'),
timeout=60.0 * 5
)
for file in FILES[1:]:
self.connection.execute(
f"ATTACH DATABASE '{os.path.join(self.directory, file+'.sqlite')}' AS {file}"
)
self.connection.row_factory = sqlite3.Row
async def open(self):
assert self.executor is None, "Database is already open."
self.executor = ThreadPoolExecutor(max_workers=1)
return await self.run_in_executor(self.sync_open)
def sync_close(self):
self.connection.close()
self.connection = None
async def close(self):
if self.executor is not None:
if self.connection is not None:
await self.run_in_executor(self.sync_close)
self.executor.shutdown()
self.executor = None
def sync_execute(self, sql: str, *args):
return self.connection.execute(sql, *args)
async def execute(self, sql, *args):
return await self.run_in_executor(self.sync_execute, sql, *args)
def sync_execute_fetchall(self, sql: str, *args):
return list(self.connection.execute(sql, *args).fetchall())
async def execute_fetchall(self, sql: str, *args):
return await self.run_in_executor(self.sync_execute_fetchall, sql, *args)
def sync_get_block_files(self):
return self.sync_execute_fetchall(
"""
SELECT file as file_number, COUNT(hash) as blocks, SUM(txcount) as txs
FROM block_info GROUP BY file ORDER BY file ASC;
"""
)
async def get_block_files(self):
return await self.run_in_executor(self.sync_get_block_files)
def sync_get_file_details(self, block_file):
return self.sync_execute_fetchall(
"""
SELECT datapos as data_offset, height, hash as block_hash, txCount as txs
FROM block_info WHERE file = ? ORDER BY datapos ASC;
""", (block_file,)
)
async def get_file_details(self, block_file):
return await self.run_in_executor(self.sync_get_file_details, block_file)
def sync_get_claimtrie(self):
return self.sync_execute_fetchall(
"""
SELECT
takeover.name AS normalized,
takeover.claimID AS claim_hash,
takeover.height AS last_take_over_height,
originalHeight AS original_height,
updateHeight AS update_height,
validHeight AS valid_height,
activationHeight AS activation_height,
expirationHeight AS expiration_height
FROM takeover JOIN claim USING (claimID)
GROUP BY takeover.name HAVING MAX(height);
"""
)
async def get_claimtrie(self):
return await self.run_in_executor(self.sync_get_claimtrie)
def sync_get_claims(self):
return self.sync_execute_fetchall(
"""
SELECT
claimID AS claim_hash,
txID AS tx_hash,
txN AS position,
amount,
originalHeight AS original_height,
updateHeight AS update_height,
validHeight AS valid_height,
activationHeight AS activation_height,
expirationHeight AS expiration_height
FROM claims.claim
"""
)
async def get_claims(self):
return await self.run_in_executor(self.sync_get_claims)