lbry-sdk/lbrynet/extras/wallet/server/db.py
2018-11-09 16:52:46 -05:00

212 lines
9.7 KiB
Python

import msgpack
import struct
import time
from torba.server.hash import hash_to_hex_str
from torba.server.db import DB
from lbrynet.extras.wallet.server.model import ClaimInfo
class LBRYDB(DB):
def __init__(self, *args, **kwargs):
self.claim_cache = {}
self.claims_for_name_cache = {}
self.claims_signed_by_cert_cache = {}
self.outpoint_to_claim_id_cache = {}
self.claims_db = self.names_db = self.signatures_db = self.outpoint_to_claim_id_db = self.claim_undo_db = None
# stores deletes not yet flushed to disk
self.pending_abandons = {}
super().__init__(*args, **kwargs)
def shutdown(self):
self.batched_flush_claims()
self.claims_db.close()
self.names_db.close()
self.signatures_db.close()
self.outpoint_to_claim_id_db.close()
self.claim_undo_db.close()
self.utxo_db.close()
# electrumx ones
self.utxo_db.close()
self.history.close_db()
async def _open_dbs(self, for_sync, compacting):
await super()._open_dbs(for_sync=for_sync, compacting=compacting)
def log_reason(message, is_for_sync):
reason = 'sync' if is_for_sync else 'serving'
self.logger.info('{} for {}'.format(message, reason))
if self.claims_db:
if self.claims_db.for_sync == for_sync:
return
log_reason('closing claim DBs to re-open', for_sync)
self.claims_db.close()
self.names_db.close()
self.signatures_db.close()
self.outpoint_to_claim_id_db.close()
self.claim_undo_db.close()
self.claims_db = self.db_class('claims', for_sync)
self.names_db = self.db_class('names', for_sync)
self.signatures_db = self.db_class('signatures', for_sync)
self.outpoint_to_claim_id_db = self.db_class('outpoint_claim_id', for_sync)
self.claim_undo_db = self.db_class('claim_undo', for_sync)
log_reason('opened claim DBs', self.claims_db.for_sync)
def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining):
# flush claims together with utxos as they are parsed together
self.batched_flush_claims()
return super().flush_dbs(flush_data, flush_utxos, estimate_txs_remaining)
def batched_flush_claims(self):
with self.claims_db.write_batch() as claims_batch:
with self.names_db.write_batch() as names_batch:
with self.signatures_db.write_batch() as signed_claims_batch:
with self.outpoint_to_claim_id_db.write_batch() as outpoint_batch:
self.flush_claims(claims_batch, names_batch, signed_claims_batch,
outpoint_batch)
def flush_claims(self, batch, names_batch, signed_claims_batch, outpoint_batch):
flush_start = time.time()
write_claim, write_name, write_cert = batch.put, names_batch.put, signed_claims_batch.put
write_outpoint = outpoint_batch.put
delete_claim, delete_outpoint, delete_name = batch.delete, outpoint_batch.delete, names_batch.delete
delete_cert = signed_claims_batch.delete
for claim_id, outpoints in self.pending_abandons.items():
claim = self.get_claim_info(claim_id)
self.remove_claim_for_name(claim.name, claim_id)
if claim.cert_id:
self.remove_claim_from_certificate_claims(claim.cert_id, claim_id)
self.remove_certificate(claim_id)
self.claim_cache[claim_id] = None
for txid, tx_index in outpoints:
self.put_claim_id_for_outpoint(txid, tx_index, None)
for key, claim in self.claim_cache.items():
if claim:
write_claim(key, claim)
else:
delete_claim(key)
for name, claims in self.claims_for_name_cache.items():
if not claims:
delete_name(name)
else:
write_name(name, msgpack.dumps(claims))
for cert_id, claims in self.claims_signed_by_cert_cache.items():
if not claims:
delete_cert(cert_id)
else:
write_cert(cert_id, msgpack.dumps(claims))
for key, claim_id in self.outpoint_to_claim_id_cache.items():
if claim_id:
write_outpoint(key, claim_id)
else:
delete_outpoint(key)
self.logger.info('flushed at height {:,d} with {:,d} claims, {:,d} outpoints, {:,d} names '
'and {:,d} certificates added while {:,d} were abandoned in {:.1f}s, committing...'
.format(self.db_height,
len(self.claim_cache), len(self.outpoint_to_claim_id_cache),
len(self.claims_for_name_cache),
len(self.claims_signed_by_cert_cache), len(self.pending_abandons),
time.time() - flush_start))
self.claim_cache = {}
self.claims_for_name_cache = {}
self.claims_signed_by_cert_cache = {}
self.outpoint_to_claim_id_cache = {}
self.pending_abandons = {}
def assert_flushed(self, flush_data):
super().assert_flushed(flush_data)
assert not self.claim_cache
assert not self.claims_for_name_cache
assert not self.claims_signed_by_cert_cache
assert not self.outpoint_to_claim_id_cache
assert not self.pending_abandons
def abandon_spent(self, tx_hash, tx_idx):
claim_id = self.get_claim_id_from_outpoint(tx_hash, tx_idx)
if claim_id:
self.logger.info("[!] Abandon: {}".format(hash_to_hex_str(claim_id)))
self.pending_abandons.setdefault(claim_id, []).append((tx_hash, tx_idx,))
return claim_id
def put_claim_id_for_outpoint(self, tx_hash, tx_idx, claim_id):
self.logger.info("[+] Adding outpoint: {}:{} for {}.".format(hash_to_hex_str(tx_hash), tx_idx,
hash_to_hex_str(claim_id) if claim_id else None))
self.outpoint_to_claim_id_cache[tx_hash + struct.pack('>I', tx_idx)] = claim_id
def remove_claim_id_for_outpoint(self, tx_hash, tx_idx):
self.logger.info("[-] Remove outpoint: {}:{}.".format(hash_to_hex_str(tx_hash), tx_idx))
self.outpoint_to_claim_id_cache[tx_hash + struct.pack('>I', tx_idx)] = None
def get_claim_id_from_outpoint(self, tx_hash, tx_idx):
key = tx_hash + struct.pack('>I', tx_idx)
return self.outpoint_to_claim_id_cache.get(key) or self.outpoint_to_claim_id_db.get(key)
def get_claims_for_name(self, name):
if name in self.claims_for_name_cache:
return self.claims_for_name_cache[name]
db_claims = self.names_db.get(name)
return msgpack.loads(db_claims) if db_claims else {}
def put_claim_for_name(self, name, claim_id):
self.logger.info("[+] Adding claim {} for name {}.".format(hash_to_hex_str(claim_id), name))
claims = self.get_claims_for_name(name)
claims.setdefault(claim_id, max(claims.values() or [0]) + 1)
self.claims_for_name_cache[name] = claims
def remove_claim_for_name(self, name, claim_id):
self.logger.info("[-] Removing claim from name: {} - {}".format(hash_to_hex_str(claim_id), name))
claims = self.get_claims_for_name(name)
claim_n = claims.pop(claim_id)
for _claim_id, number in claims.items():
if number > claim_n:
claims[_claim_id] = number - 1
self.claims_for_name_cache[name] = claims
def get_signed_claim_ids_by_cert_id(self, cert_id):
if cert_id in self.claims_signed_by_cert_cache:
return self.claims_signed_by_cert_cache[cert_id]
db_claims = self.signatures_db.get(cert_id)
return msgpack.loads(db_claims, use_list=True) if db_claims else []
def put_claim_id_signed_by_cert_id(self, cert_id, claim_id):
self.logger.info("[+] Adding signature: {} - {}".format(hash_to_hex_str(claim_id), hash_to_hex_str(cert_id)))
certs = self.get_signed_claim_ids_by_cert_id(cert_id)
certs.append(claim_id)
self.claims_signed_by_cert_cache[cert_id] = certs
def remove_certificate(self, cert_id):
self.logger.info("[-] Removing certificate: {}".format(hash_to_hex_str(cert_id)))
self.claims_signed_by_cert_cache[cert_id] = []
def remove_claim_from_certificate_claims(self, cert_id, claim_id):
self.logger.info("[-] Removing signature: {} - {}".format(hash_to_hex_str(claim_id), hash_to_hex_str(cert_id)))
certs = self.get_signed_claim_ids_by_cert_id(cert_id)
if claim_id in certs:
certs.remove(claim_id)
self.claims_signed_by_cert_cache[cert_id] = certs
def get_claim_info(self, claim_id):
serialized = self.claim_cache.get(claim_id) or self.claims_db.get(claim_id)
return ClaimInfo.from_serialized(serialized) if serialized else None
def put_claim_info(self, claim_id, claim_info):
self.logger.info("[+] Adding claim info for: {}".format(hash_to_hex_str(claim_id)))
self.claim_cache[claim_id] = claim_info.serialized
def get_update_input(self, claim, inputs):
claim_id = claim.claim_id
claim_info = self.get_claim_info(claim_id)
if not claim_info:
return False
for input in inputs:
if input.prev_hash == claim_info.txid and input.prev_idx == claim_info.nout:
return input
return False
def write_undo(self, pending_undo):
with self.claim_undo_db.write_batch() as writer:
for height, undo_info in pending_undo:
writer.put(struct.pack(">I", height), msgpack.dumps(undo_info))