lbry-sdk/lbry/wallet/server/db/prefixes.py
2022-03-15 15:34:07 -03:00

908 lines
28 KiB
Python

import typing
import struct
from typing import Union, Tuple, NamedTuple
from lbry.wallet.server.db import DB_PREFIXES
ACTIVATED_CLAIM_TXO_TYPE = 1
ACTIVATED_SUPPORT_TXO_TYPE = 2
def length_encoded_name(name: str) -> bytes:
encoded = name.encode('utf-8')
return len(encoded).to_bytes(2, byteorder='big') + encoded
class PrefixRow:
prefix: bytes
key_struct: struct.Struct
value_struct: struct.Struct
key_part_lambdas = []
@classmethod
def pack_partial_key(cls, *args) -> bytes:
return cls.prefix + cls.key_part_lambdas[len(args)](*args)
@classmethod
def pack_key(cls, *args) -> bytes:
return cls.prefix + cls.key_struct.pack(*args)
@classmethod
def pack_value(cls, *args) -> bytes:
return cls.value_struct.pack(*args)
@classmethod
def unpack_key(cls, key: bytes):
assert key[:1] == cls.prefix
return cls.key_struct.unpack(key[1:])
@classmethod
def unpack_value(cls, data: bytes):
return cls.value_struct.unpack(data)
@classmethod
def unpack_item(cls, key: bytes, value: bytes):
return cls.unpack_key(key), cls.unpack_value(value)
class ClaimToTXOKey(typing.NamedTuple):
claim_hash: bytes
def __str__(self):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()})"
class ClaimToTXOValue(typing.NamedTuple):
tx_num: int
position: int
root_tx_num: int
root_position: int
amount: int
# activation: int
channel_signature_is_valid: bool
name: str
class TXOToClaimKey(typing.NamedTuple):
tx_num: int
position: int
class TXOToClaimValue(typing.NamedTuple):
claim_hash: bytes
name: str
def __str__(self):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()}, name={self.name})"
class ClaimShortIDKey(typing.NamedTuple):
name: str
claim_hash: bytes
root_tx_num: int
root_position: int
def __str__(self):
return f"{self.__class__.__name__}(name={self.name}, claim_hash={self.claim_hash.hex()}, " \
f"root_tx_num={self.root_tx_num}, root_position={self.root_position})"
class ClaimShortIDValue(typing.NamedTuple):
tx_num: int
position: int
class ClaimToChannelKey(typing.NamedTuple):
claim_hash: bytes
tx_num: int
position: int
def __str__(self):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()}, " \
f"tx_num={self.tx_num}, position={self.position})"
class ClaimToChannelValue(typing.NamedTuple):
signing_hash: bytes
def __str__(self):
return f"{self.__class__.__name__}(signing_hash={self.signing_hash.hex()})"
class ChannelToClaimKey(typing.NamedTuple):
signing_hash: bytes
name: str
tx_num: int
position: int
def __str__(self):
return f"{self.__class__.__name__}(signing_hash={self.signing_hash.hex()}, name={self.name}, " \
f"tx_num={self.tx_num}, position={self.position})"
class ChannelToClaimValue(typing.NamedTuple):
claim_hash: bytes
def __str__(self):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()})"
class ClaimToSupportKey(typing.NamedTuple):
claim_hash: bytes
tx_num: int
position: int
def __str__(self):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()}, tx_num={self.tx_num}, " \
f"position={self.position})"
class ClaimToSupportValue(typing.NamedTuple):
amount: int
class SupportToClaimKey(typing.NamedTuple):
tx_num: int
position: int
class SupportToClaimValue(typing.NamedTuple):
claim_hash: bytes
def __str__(self):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()})"
class ClaimExpirationKey(typing.NamedTuple):
expiration: int
tx_num: int
position: int
class ClaimExpirationValue(typing.NamedTuple):
claim_hash: bytes
name: str
def __str__(self):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()}, name={self.name})"
class ClaimTakeoverKey(typing.NamedTuple):
name: str
class ClaimTakeoverValue(typing.NamedTuple):
claim_hash: bytes
height: int
def __str__(self):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()}, height={self.height})"
class PendingActivationKey(typing.NamedTuple):
height: int
txo_type: int
tx_num: int
position: int
@property
def is_support(self) -> bool:
return self.txo_type == ACTIVATED_SUPPORT_TXO_TYPE
@property
def is_claim(self) -> bool:
return self.txo_type == ACTIVATED_CLAIM_TXO_TYPE
class PendingActivationValue(typing.NamedTuple):
claim_hash: bytes
name: str
def __str__(self):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()}, name={self.name})"
class ActivationKey(typing.NamedTuple):
txo_type: int
tx_num: int
position: int
class ActivationValue(typing.NamedTuple):
height: int
claim_hash: bytes
name: str
def __str__(self):
return f"{self.__class__.__name__}(height={self.height}, claim_hash={self.claim_hash.hex()}, name={self.name})"
class ActiveAmountKey(typing.NamedTuple):
claim_hash: bytes
txo_type: int
activation_height: int
tx_num: int
position: int
def __str__(self):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()}, txo_type={self.txo_type}, " \
f"activation_height={self.activation_height}, tx_num={self.tx_num}, position={self.position})"
class ActiveAmountValue(typing.NamedTuple):
amount: int
class EffectiveAmountKey(typing.NamedTuple):
name: str
effective_amount: int
tx_num: int
position: int
class EffectiveAmountValue(typing.NamedTuple):
claim_hash: bytes
def __str__(self):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()})"
class RepostKey(typing.NamedTuple):
claim_hash: bytes
def __str__(self):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()})"
class RepostValue(typing.NamedTuple):
reposted_claim_hash: bytes
def __str__(self):
return f"{self.__class__.__name__}(reposted_claim_hash={self.reposted_claim_hash.hex()})"
class RepostedKey(typing.NamedTuple):
reposted_claim_hash: bytes
tx_num: int
position: int
def __str__(self):
return f"{self.__class__.__name__}(reposted_claim_hash={self.reposted_claim_hash.hex()}, " \
f"tx_num={self.tx_num}, position={self.position})"
class RepostedValue(typing.NamedTuple):
claim_hash: bytes
def __str__(self):
return f"{self.__class__.__name__}(claim_hash={self.claim_hash.hex()})"
class ActiveAmountPrefixRow(PrefixRow):
prefix = DB_PREFIXES.active_amount.value
key_struct = struct.Struct(b'>20sBLLH')
value_struct = struct.Struct(b'>Q')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack,
struct.Struct(b'>20sB').pack,
struct.Struct(b'>20sBL').pack,
struct.Struct(b'>20sBLL').pack,
struct.Struct(b'>20sBLLH').pack
]
@classmethod
def pack_key(cls, claim_hash: bytes, txo_type: int, activation_height: int, tx_num: int, position: int):
return super().pack_key(claim_hash, txo_type, activation_height, tx_num, position)
@classmethod
def unpack_key(cls, key: bytes) -> ActiveAmountKey:
return ActiveAmountKey(*super().unpack_key(key))
@classmethod
def unpack_value(cls, data: bytes) -> ActiveAmountValue:
return ActiveAmountValue(*super().unpack_value(data))
@classmethod
def pack_value(cls, amount: int) -> bytes:
return cls.value_struct.pack(amount)
@classmethod
def pack_item(cls, claim_hash: bytes, txo_type: int, activation_height: int, tx_num: int, position: int, amount: int):
return cls.pack_key(claim_hash, txo_type, activation_height, tx_num, position), cls.pack_value(amount)
class ClaimToTXOPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_to_txo.value
key_struct = struct.Struct(b'>20s')
value_struct = struct.Struct(b'>LHLHQB')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack
]
@classmethod
def pack_key(cls, claim_hash: bytes):
return super().pack_key(
claim_hash
)
@classmethod
def unpack_key(cls, key: bytes) -> ClaimToTXOKey:
assert key[:1] == cls.prefix and len(key) == 21
return ClaimToTXOKey(key[1:])
@classmethod
def unpack_value(cls, data: bytes) -> ClaimToTXOValue:
tx_num, position, root_tx_num, root_position, amount, channel_signature_is_valid = cls.value_struct.unpack(
data[:21]
)
name_len = int.from_bytes(data[21:23], byteorder='big')
name = data[23:23 + name_len].decode()
return ClaimToTXOValue(
tx_num, position, root_tx_num, root_position, amount, bool(channel_signature_is_valid), name
)
@classmethod
def pack_value(cls, tx_num: int, position: int, root_tx_num: int, root_position: int, amount: int,
channel_signature_is_valid: bool, name: str) -> bytes:
return cls.value_struct.pack(
tx_num, position, root_tx_num, root_position, amount, int(channel_signature_is_valid)
) + length_encoded_name(name)
@classmethod
def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, root_tx_num: int, root_position: int,
amount: int, channel_signature_is_valid: bool, name: str):
return cls.pack_key(claim_hash), \
cls.pack_value(tx_num, position, root_tx_num, root_position, amount, channel_signature_is_valid, name)
class TXOToClaimPrefixRow(PrefixRow):
prefix = DB_PREFIXES.txo_to_claim.value
key_struct = struct.Struct(b'>LH')
value_struct = struct.Struct(b'>20s')
@classmethod
def pack_key(cls, tx_num: int, position: int):
return super().pack_key(tx_num, position)
@classmethod
def unpack_key(cls, key: bytes) -> TXOToClaimKey:
return TXOToClaimKey(*super().unpack_key(key))
@classmethod
def unpack_value(cls, data: bytes) -> TXOToClaimValue:
claim_hash, = cls.value_struct.unpack(data[:20])
name_len = int.from_bytes(data[20:22], byteorder='big')
name = data[22:22 + name_len].decode()
return TXOToClaimValue(claim_hash, name)
@classmethod
def pack_value(cls, claim_hash: bytes, name: str) -> bytes:
return cls.value_struct.pack(claim_hash) + length_encoded_name(name)
@classmethod
def pack_item(cls, tx_num: int, position: int, claim_hash: bytes, name: str):
return cls.pack_key(tx_num, position), \
cls.pack_value(claim_hash, name)
def shortid_key_helper(struct_fmt):
packer = struct.Struct(struct_fmt).pack
def wrapper(name, *args):
return length_encoded_name(name) + packer(*args)
return wrapper
def shortid_key_partial_claim_helper(name: str, partial_claim_hash: bytes):
assert len(partial_claim_hash) <= 20
return length_encoded_name(name) + partial_claim_hash
class ClaimShortIDPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_short_id_prefix.value
key_struct = struct.Struct(b'>20sLH')
value_struct = struct.Struct(b'>LH')
key_part_lambdas = [
lambda: b'',
length_encoded_name,
shortid_key_partial_claim_helper,
shortid_key_helper(b'>20sL'),
shortid_key_helper(b'>20sLH'),
]
@classmethod
def pack_key(cls, name: str, claim_hash: bytes, root_tx_num: int, root_position: int):
return cls.prefix + length_encoded_name(name) + cls.key_struct.pack(claim_hash, root_tx_num, root_position)
@classmethod
def pack_value(cls, tx_num: int, position: int):
return super().pack_value(tx_num, position)
@classmethod
def unpack_key(cls, key: bytes) -> ClaimShortIDKey:
assert key[:1] == cls.prefix
name_len = int.from_bytes(key[1:3], byteorder='big')
name = key[3:3 + name_len].decode()
return ClaimShortIDKey(name, *cls.key_struct.unpack(key[3 + name_len:]))
@classmethod
def unpack_value(cls, data: bytes) -> ClaimShortIDValue:
return ClaimShortIDValue(*super().unpack_value(data))
@classmethod
def pack_item(cls, name: str, claim_hash: bytes, root_tx_num: int, root_position: int,
tx_num: int, position: int):
return cls.pack_key(name, claim_hash, root_tx_num, root_position), \
cls.pack_value(tx_num, position)
class ClaimToChannelPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_to_channel.value
key_struct = struct.Struct(b'>20sLH')
value_struct = struct.Struct(b'>20s')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack,
struct.Struct(b'>20sL').pack,
struct.Struct(b'>20sLH').pack
]
@classmethod
def pack_key(cls, claim_hash: bytes, tx_num: int, position: int):
return super().pack_key(claim_hash, tx_num, position)
@classmethod
def pack_value(cls, signing_hash: bytes):
return super().pack_value(signing_hash)
@classmethod
def unpack_key(cls, key: bytes) -> ClaimToChannelKey:
return ClaimToChannelKey(*super().unpack_key(key))
@classmethod
def unpack_value(cls, data: bytes) -> ClaimToChannelValue:
return ClaimToChannelValue(*super().unpack_value(data))
@classmethod
def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, signing_hash: bytes):
return cls.pack_key(claim_hash, tx_num, position), cls.pack_value(signing_hash)
def channel_to_claim_helper(struct_fmt):
packer = struct.Struct(struct_fmt).pack
def wrapper(signing_hash: bytes, name: str, *args):
return signing_hash + length_encoded_name(name) + packer(*args)
return wrapper
class ChannelToClaimPrefixRow(PrefixRow):
prefix = DB_PREFIXES.channel_to_claim.value
key_struct = struct.Struct(b'>LH')
value_struct = struct.Struct(b'>20s')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack,
channel_to_claim_helper(b''),
channel_to_claim_helper(b'>s'),
channel_to_claim_helper(b'>L'),
channel_to_claim_helper(b'>LH'),
]
@classmethod
def pack_key(cls, signing_hash: bytes, name: str, tx_num: int, position: int):
return cls.prefix + signing_hash + length_encoded_name(name) + cls.key_struct.pack(
tx_num, position
)
@classmethod
def unpack_key(cls, key: bytes) -> ChannelToClaimKey:
assert key[:1] == cls.prefix
signing_hash = key[1:21]
name_len = int.from_bytes(key[21:23], byteorder='big')
name = key[23:23 + name_len].decode()
tx_num, position = cls.key_struct.unpack(key[23 + name_len:])
return ChannelToClaimKey(
signing_hash, name, tx_num, position
)
@classmethod
def pack_value(cls, claim_hash: bytes) -> bytes:
return super().pack_value(claim_hash)
@classmethod
def unpack_value(cls, data: bytes) -> ChannelToClaimValue:
return ChannelToClaimValue(*cls.value_struct.unpack(data))
@classmethod
def pack_item(cls, signing_hash: bytes, name: str, tx_num: int, position: int,
claim_hash: bytes):
return cls.pack_key(signing_hash, name, tx_num, position), \
cls.pack_value(claim_hash)
class ClaimToSupportPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_to_support.value
key_struct = struct.Struct(b'>20sLH')
value_struct = struct.Struct(b'>Q')
@classmethod
def pack_key(cls, claim_hash: bytes, tx_num: int, position: int):
return super().pack_key(claim_hash, tx_num, position)
@classmethod
def unpack_key(cls, key: bytes) -> ClaimToSupportKey:
return ClaimToSupportKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, amount: int) -> bytes:
return super().pack_value(amount)
@classmethod
def unpack_value(cls, data: bytes) -> ClaimToSupportValue:
return ClaimToSupportValue(*super().unpack_value(data))
@classmethod
def pack_item(cls, claim_hash: bytes, tx_num: int, position: int, amount: int):
return cls.pack_key(claim_hash, tx_num, position), \
cls.pack_value(amount)
class SupportToClaimPrefixRow(PrefixRow):
prefix = DB_PREFIXES.support_to_claim.value
key_struct = struct.Struct(b'>LH')
value_struct = struct.Struct(b'>20s')
@classmethod
def pack_key(cls, tx_num: int, position: int):
return super().pack_key(tx_num, position)
@classmethod
def unpack_key(cls, key: bytes) -> SupportToClaimKey:
return SupportToClaimKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, claim_hash: bytes) -> bytes:
return super().pack_value(claim_hash)
@classmethod
def unpack_value(cls, data: bytes) -> SupportToClaimValue:
return SupportToClaimValue(*super().unpack_value(data))
@classmethod
def pack_item(cls, tx_num: int, position: int, claim_hash: bytes):
return cls.pack_key(tx_num, position), \
cls.pack_value(claim_hash)
class ClaimExpirationPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_expiration.value
key_struct = struct.Struct(b'>LLH')
value_struct = struct.Struct(b'>20s')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>L').pack,
struct.Struct(b'>LL').pack,
struct.Struct(b'>LLH').pack,
]
@classmethod
def pack_key(cls, expiration: int, tx_num: int, position: int) -> bytes:
return super().pack_key(expiration, tx_num, position)
@classmethod
def pack_value(cls, claim_hash: bytes, name: str) -> bytes:
return cls.value_struct.pack(claim_hash) + length_encoded_name(name)
@classmethod
def pack_item(cls, expiration: int, tx_num: int, position: int, claim_hash: bytes, name: str) -> typing.Tuple[bytes, bytes]:
return cls.pack_key(expiration, tx_num, position), cls.pack_value(claim_hash, name)
@classmethod
def unpack_key(cls, key: bytes) -> ClaimExpirationKey:
return ClaimExpirationKey(*super().unpack_key(key))
@classmethod
def unpack_value(cls, data: bytes) -> ClaimExpirationValue:
name_len = int.from_bytes(data[20:22], byteorder='big')
name = data[22:22 + name_len].decode()
claim_id, = cls.value_struct.unpack(data[:20])
return ClaimExpirationValue(claim_id, name)
@classmethod
def unpack_item(cls, key: bytes, value: bytes) -> typing.Tuple[ClaimExpirationKey, ClaimExpirationValue]:
return cls.unpack_key(key), cls.unpack_value(value)
class ClaimTakeoverPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_takeover.value
value_struct = struct.Struct(b'>20sL')
@classmethod
def pack_key(cls, name: str):
return cls.prefix + length_encoded_name(name)
@classmethod
def pack_value(cls, claim_hash: bytes, takeover_height: int):
return super().pack_value(claim_hash, takeover_height)
@classmethod
def unpack_key(cls, key: bytes) -> ClaimTakeoverKey:
assert key[:1] == cls.prefix
name_len = int.from_bytes(key[1:3], byteorder='big')
name = key[3:3 + name_len].decode()
return ClaimTakeoverKey(name)
@classmethod
def unpack_value(cls, data: bytes) -> ClaimTakeoverValue:
return ClaimTakeoverValue(*super().unpack_value(data))
@classmethod
def pack_item(cls, name: str, claim_hash: bytes, takeover_height: int):
return cls.pack_key(name), cls.pack_value(claim_hash, takeover_height)
class PendingActivationPrefixRow(PrefixRow):
prefix = DB_PREFIXES.pending_activation.value
key_struct = struct.Struct(b'>LBLH')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>L').pack,
struct.Struct(b'>LB').pack,
struct.Struct(b'>LBL').pack,
struct.Struct(b'>LBLH').pack
]
@classmethod
def pack_key(cls, height: int, txo_type: int, tx_num: int, position: int):
return super().pack_key(height, txo_type, tx_num, position)
@classmethod
def unpack_key(cls, key: bytes) -> PendingActivationKey:
return PendingActivationKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, claim_hash: bytes, name: str) -> bytes:
return claim_hash + length_encoded_name(name)
@classmethod
def unpack_value(cls, data: bytes) -> PendingActivationValue:
claim_hash = data[:20]
name_len = int.from_bytes(data[20:22], byteorder='big')
name = data[22:22 + name_len].decode()
return PendingActivationValue(claim_hash, name)
@classmethod
def pack_item(cls, height: int, txo_type: int, tx_num: int, position: int, claim_hash: bytes, name: str):
return cls.pack_key(height, txo_type, tx_num, position), \
cls.pack_value(claim_hash, name)
class ActivatedPrefixRow(PrefixRow):
prefix = DB_PREFIXES.activated_claim_and_support.value
key_struct = struct.Struct(b'>BLH')
value_struct = struct.Struct(b'>L20s')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>B').pack,
struct.Struct(b'>BL').pack,
struct.Struct(b'>BLH').pack
]
@classmethod
def pack_key(cls, txo_type: int, tx_num: int, position: int):
return super().pack_key(txo_type, tx_num, position)
@classmethod
def unpack_key(cls, key: bytes) -> ActivationKey:
return ActivationKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, height: int, claim_hash: bytes, name: str) -> bytes:
return cls.value_struct.pack(height, claim_hash) + length_encoded_name(name)
@classmethod
def unpack_value(cls, data: bytes) -> ActivationValue:
height, claim_hash = cls.value_struct.unpack(data[:24])
name_len = int.from_bytes(data[24:26], byteorder='big')
name = data[26:26 + name_len].decode()
return ActivationValue(height, claim_hash, name)
@classmethod
def pack_item(cls, txo_type: int, tx_num: int, position: int, height: int, claim_hash: bytes, name: str):
return cls.pack_key(txo_type, tx_num, position), \
cls.pack_value(height, claim_hash, name)
def effective_amount_helper(struct_fmt):
packer = struct.Struct(struct_fmt).pack
def wrapper(name, *args):
if not args:
return length_encoded_name(name)
if len(args) == 1:
return length_encoded_name(name) + packer(0xffffffffffffffff - args[0])
return length_encoded_name(name) + packer(0xffffffffffffffff - args[0], *args[1:])
return wrapper
class EffectiveAmountPrefixRow(PrefixRow):
prefix = DB_PREFIXES.claim_effective_amount_prefix.value
key_struct = struct.Struct(b'>QLH')
value_struct = struct.Struct(b'>20s')
key_part_lambdas = [
lambda: b'',
length_encoded_name,
shortid_key_helper(b'>Q'),
shortid_key_helper(b'>QL'),
shortid_key_helper(b'>QLH'),
]
@classmethod
def pack_key(cls, name: str, effective_amount: int, tx_num: int, position: int):
return cls.prefix + length_encoded_name(name) + cls.key_struct.pack(
0xffffffffffffffff - effective_amount, tx_num, position
)
@classmethod
def unpack_key(cls, key: bytes) -> EffectiveAmountKey:
assert key[:1] == cls.prefix
name_len = int.from_bytes(key[1:3], byteorder='big')
name = key[3:3 + name_len].decode()
ones_comp_effective_amount, tx_num, position = cls.key_struct.unpack(key[3 + name_len:])
return EffectiveAmountKey(name, 0xffffffffffffffff - ones_comp_effective_amount, tx_num, position)
@classmethod
def unpack_value(cls, data: bytes) -> EffectiveAmountValue:
return EffectiveAmountValue(*super().unpack_value(data))
@classmethod
def pack_value(cls, claim_hash: bytes) -> bytes:
return super().pack_value(claim_hash)
@classmethod
def pack_item(cls, name: str, effective_amount: int, tx_num: int, position: int, claim_hash: bytes):
return cls.pack_key(name, effective_amount, tx_num, position), cls.pack_value(claim_hash)
class RepostPrefixRow(PrefixRow):
prefix = DB_PREFIXES.repost.value
@classmethod
def pack_key(cls, claim_hash: bytes):
return cls.prefix + claim_hash
@classmethod
def unpack_key(cls, key: bytes) -> RepostKey:
assert key[0] == cls.prefix
assert len(key) == 21
return RepostKey[1:]
@classmethod
def pack_value(cls, reposted_claim_hash: bytes) -> bytes:
return reposted_claim_hash
@classmethod
def unpack_value(cls, data: bytes) -> RepostValue:
return RepostValue(data)
@classmethod
def pack_item(cls, claim_hash: bytes, reposted_claim_hash: bytes):
return cls.pack_key(claim_hash), cls.pack_value(reposted_claim_hash)
class RepostedPrefixRow(PrefixRow):
prefix = DB_PREFIXES.reposted_claim.value
key_struct = struct.Struct(b'>20sLH')
value_struct = struct.Struct(b'>20s')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack,
struct.Struct(b'>20sL').pack,
struct.Struct(b'>20sLH').pack
]
@classmethod
def pack_key(cls, reposted_claim_hash: bytes, tx_num: int, position: int):
return super().pack_key(reposted_claim_hash, tx_num, position)
@classmethod
def unpack_key(cls, key: bytes) -> RepostedKey:
return RepostedKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, claim_hash: bytes) -> bytes:
return super().pack_value(claim_hash)
@classmethod
def unpack_value(cls, data: bytes) -> RepostedValue:
return RepostedValue(*super().unpack_value(data))
@classmethod
def pack_item(cls, reposted_claim_hash: bytes, tx_num: int, position: int, claim_hash: bytes):
return cls.pack_key(reposted_claim_hash, tx_num, position), cls.pack_value(claim_hash)
class UndoPrefixRow(PrefixRow):
prefix = DB_PREFIXES.undo_claimtrie.value
key_struct = struct.Struct(b'>Q')
@classmethod
def pack_key(cls, height: int):
return super().pack_key(height)
@classmethod
def unpack_key(cls, key: bytes) -> int:
assert key[:1] == cls.prefix
height, = cls.key_struct.unpack(key[1:])
return height
@classmethod
def pack_value(cls, undo_ops: bytes) -> bytes:
return undo_ops
@classmethod
def unpack_value(cls, data: bytes) -> bytes:
return data
@classmethod
def pack_item(cls, height: int, undo_ops: bytes):
return cls.pack_key(height), cls.pack_value(undo_ops)
class Prefixes:
claim_to_support = ClaimToSupportPrefixRow
support_to_claim = SupportToClaimPrefixRow
claim_to_txo = ClaimToTXOPrefixRow
txo_to_claim = TXOToClaimPrefixRow
claim_to_channel = ClaimToChannelPrefixRow
channel_to_claim = ChannelToClaimPrefixRow
claim_short_id = ClaimShortIDPrefixRow
claim_expiration = ClaimExpirationPrefixRow
claim_takeover = ClaimTakeoverPrefixRow
pending_activation = PendingActivationPrefixRow
activated = ActivatedPrefixRow
active_amount = ActiveAmountPrefixRow
effective_amount = EffectiveAmountPrefixRow
repost = RepostPrefixRow
reposted_claim = RepostedPrefixRow
undo = UndoPrefixRow
ROW_TYPES = {
Prefixes.claim_to_support.prefix: Prefixes.claim_to_support,
Prefixes.support_to_claim.prefix: Prefixes.support_to_claim,
Prefixes.claim_to_txo.prefix: Prefixes.claim_to_txo,
Prefixes.txo_to_claim.prefix: Prefixes.txo_to_claim,
Prefixes.claim_to_channel.prefix: Prefixes.claim_to_channel,
Prefixes.channel_to_claim.prefix: Prefixes.channel_to_claim,
Prefixes.claim_short_id.prefix: Prefixes.claim_short_id,
Prefixes.claim_expiration.prefix: Prefixes.claim_expiration,
Prefixes.claim_takeover.prefix: Prefixes.claim_takeover,
Prefixes.pending_activation.prefix: Prefixes.pending_activation,
Prefixes.activated.prefix: Prefixes.activated,
Prefixes.active_amount.prefix: Prefixes.active_amount,
Prefixes.effective_amount.prefix: Prefixes.effective_amount,
Prefixes.repost.prefix: Prefixes.repost,
Prefixes.reposted_claim.prefix: Prefixes.reposted_claim,
Prefixes.undo.prefix: Prefixes.undo
}
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:
try:
return ROW_TYPES[key[:1]].unpack_item(key, value)
except KeyError:
return key, value