_prepare_claim_for_sync generators
This commit is contained in:
parent
c85648d43b
commit
da4e4ecd23
2 changed files with 184 additions and 150 deletions
|
@ -26,11 +26,11 @@ from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, StagedClaimtrie
|
||||||
from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation, get_add_effective_amount_ops
|
from lbry.wallet.server.db.claimtrie import get_takeover_name_ops, StagedActivation, get_add_effective_amount_ops
|
||||||
from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effective_amount_ops
|
from lbry.wallet.server.db.claimtrie import get_remove_name_ops, get_remove_effective_amount_ops
|
||||||
from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
|
from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
|
||||||
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue
|
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, Prefixes
|
||||||
from lbry.wallet.server.udp import StatusServer
|
from lbry.wallet.server.udp import StatusServer
|
||||||
|
from lbry.wallet.server.db.revertable import RevertableOp, RevertablePut, RevertableDelete
|
||||||
if typing.TYPE_CHECKING:
|
if typing.TYPE_CHECKING:
|
||||||
from lbry.wallet.server.leveldb import LevelDB
|
from lbry.wallet.server.leveldb import LevelDB
|
||||||
from lbry.wallet.server.db.revertable import RevertableOp
|
|
||||||
|
|
||||||
|
|
||||||
class Prefetcher:
|
class Prefetcher:
|
||||||
|
@ -259,22 +259,6 @@ class BlockProcessor:
|
||||||
self.amount_cache = {}
|
self.amount_cache = {}
|
||||||
|
|
||||||
def claim_producer(self):
|
def claim_producer(self):
|
||||||
def get_claim_txo(tx_hash, nout):
|
|
||||||
raw = self.db.db.get(
|
|
||||||
DB_PREFIXES.TX_PREFIX.value + tx_hash
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
output: TxOutput = self.coin.transaction(raw).outputs[nout]
|
|
||||||
script = OutputScript(output.pk_script)
|
|
||||||
script.parse()
|
|
||||||
return Claim.from_bytes(script.values['claim'])
|
|
||||||
except:
|
|
||||||
self.logger.error(
|
|
||||||
"tx parsing for ES went boom %s %s", tx_hash[::-1].hex(),
|
|
||||||
raw.hex()
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
if self.db.db_height <= 1:
|
if self.db.db_height <= 1:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -284,135 +268,8 @@ class BlockProcessor:
|
||||||
|
|
||||||
for claim_hash in self.removed_claims_to_send_es:
|
for claim_hash in self.removed_claims_to_send_es:
|
||||||
yield 'delete', claim_hash.hex()
|
yield 'delete', claim_hash.hex()
|
||||||
for claim_hash in to_send_es:
|
for claim in self.db.claims_producer(to_send_es):
|
||||||
claim = self.db._fs_get_claim_by_hash(claim_hash)
|
yield 'update', claim
|
||||||
metadata = get_claim_txo(claim.tx_hash, claim.position)
|
|
||||||
if not metadata:
|
|
||||||
continue
|
|
||||||
reposted_claim_hash = None if not metadata.is_repost else metadata.repost.reference.claim_hash[::-1]
|
|
||||||
reposted_claim = None
|
|
||||||
reposted_metadata = None
|
|
||||||
if reposted_claim_hash:
|
|
||||||
reposted_claim = self.db.get_claim_txo(reposted_claim_hash)
|
|
||||||
if not reposted_claim:
|
|
||||||
continue
|
|
||||||
reposted_metadata = get_claim_txo(
|
|
||||||
self.db.total_transactions[reposted_claim.tx_num], reposted_claim.position
|
|
||||||
)
|
|
||||||
if not reposted_metadata:
|
|
||||||
continue
|
|
||||||
reposted_tags = []
|
|
||||||
reposted_languages = []
|
|
||||||
reposted_has_source = None
|
|
||||||
reposted_claim_type = None
|
|
||||||
if reposted_claim:
|
|
||||||
reposted_tx_hash = self.db.total_transactions[reposted_claim.tx_num]
|
|
||||||
raw_reposted_claim_tx = self.db.db.get(
|
|
||||||
DB_PREFIXES.TX_PREFIX.value + reposted_tx_hash
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
reposted_claim_txo: TxOutput = self.coin.transaction(
|
|
||||||
raw_reposted_claim_tx
|
|
||||||
).outputs[reposted_claim.position]
|
|
||||||
reposted_script = OutputScript(reposted_claim_txo.pk_script)
|
|
||||||
reposted_script.parse()
|
|
||||||
except:
|
|
||||||
self.logger.error(
|
|
||||||
"repost tx parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(),
|
|
||||||
raw_reposted_claim_tx.hex()
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
reposted_metadata = Claim.from_bytes(reposted_script.values['claim'])
|
|
||||||
except:
|
|
||||||
self.logger.error(
|
|
||||||
"reposted claim parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(),
|
|
||||||
raw_reposted_claim_tx.hex()
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
if reposted_metadata:
|
|
||||||
reposted_tags = [] if not reposted_metadata.is_stream else [tag for tag in reposted_metadata.stream.tags]
|
|
||||||
reposted_languages = [] if not reposted_metadata.is_stream else (
|
|
||||||
[lang.language or 'none' for lang in reposted_metadata.stream.languages] or ['none']
|
|
||||||
)
|
|
||||||
reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source
|
|
||||||
reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type]
|
|
||||||
claim_tags = [] if not metadata.is_stream else [tag for tag in metadata.stream.tags]
|
|
||||||
claim_languages = [] if not metadata.is_stream else (
|
|
||||||
[lang.language or 'none' for lang in metadata.stream.languages] or ['none']
|
|
||||||
)
|
|
||||||
tags = list(set(claim_tags).union(set(reposted_tags)))
|
|
||||||
languages = list(set(claim_languages).union(set(reposted_languages)))
|
|
||||||
canonical_url = f'{claim.name}#{claim.claim_hash.hex()}'
|
|
||||||
if metadata.is_signed:
|
|
||||||
channel_name = self._get_pending_claim_name(metadata.signing_channel_hash[::-1])
|
|
||||||
canonical_url = f'{channel_name}#{metadata.signing_channel_hash[::-1].hex()}/{canonical_url}'
|
|
||||||
|
|
||||||
value = {
|
|
||||||
'claim_hash': claim_hash[::-1],
|
|
||||||
# 'claim_id': claim_hash.hex(),
|
|
||||||
'claim_name': claim.name,
|
|
||||||
'normalized': claim.name,
|
|
||||||
'tx_id': claim.tx_hash[::-1].hex(),
|
|
||||||
'tx_num': claim.tx_num,
|
|
||||||
'tx_nout': claim.position,
|
|
||||||
'amount': claim.amount,
|
|
||||||
'timestamp': 0, # TODO: fix
|
|
||||||
'creation_timestamp': 0, # TODO: fix
|
|
||||||
'height': claim.height,
|
|
||||||
'creation_height': claim.creation_height,
|
|
||||||
'activation_height': claim.activation_height,
|
|
||||||
'expiration_height': claim.expiration_height,
|
|
||||||
'effective_amount': claim.effective_amount,
|
|
||||||
'support_amount': claim.support_amount,
|
|
||||||
'is_controlling': claim.is_controlling,
|
|
||||||
'last_take_over_height': claim.last_takeover_height,
|
|
||||||
|
|
||||||
'short_url': f'{claim.name}#{claim.claim_hash.hex()}', # TODO: fix
|
|
||||||
'canonical_url': canonical_url,
|
|
||||||
|
|
||||||
'title': None if not metadata.is_stream else metadata.stream.title,
|
|
||||||
'author': None if not metadata.is_stream else metadata.stream.author,
|
|
||||||
'description': None if not metadata.is_stream else metadata.stream.description,
|
|
||||||
'claim_type': CLAIM_TYPES[metadata.claim_type],
|
|
||||||
'has_source': None if not metadata.is_stream else metadata.stream.has_source,
|
|
||||||
'stream_type': None if not metadata.is_stream else STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)],
|
|
||||||
'media_type': None if not metadata.is_stream else metadata.stream.source.media_type,
|
|
||||||
'fee_amount': None if not metadata.is_stream or not metadata.stream.has_fee else int(
|
|
||||||
max(metadata.stream.fee.amount or 0, 0)*1000
|
|
||||||
),
|
|
||||||
'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency,
|
|
||||||
|
|
||||||
'reposted': self.db.get_reposted_count(claim_hash),
|
|
||||||
'reposted_claim_hash': reposted_claim_hash,
|
|
||||||
'reposted_claim_type': reposted_claim_type,
|
|
||||||
'reposted_has_source': reposted_has_source,
|
|
||||||
|
|
||||||
'channel_hash': metadata.signing_channel_hash,
|
|
||||||
|
|
||||||
'public_key_bytes': None if not metadata.is_channel else metadata.channel.public_key_bytes,
|
|
||||||
'public_key_hash': None if not metadata.is_channel else self.ledger.address_to_hash160(
|
|
||||||
self.ledger.public_key_to_address(metadata.channel.public_key_bytes)
|
|
||||||
),
|
|
||||||
'signature': metadata.signature,
|
|
||||||
'signature_digest': None, # TODO: fix
|
|
||||||
'signature_valid': claim.channel_hash is not None, # TODO: fix
|
|
||||||
'tags': tags,
|
|
||||||
'languages': languages,
|
|
||||||
'censor_type': 0, # TODO: fix
|
|
||||||
'censoring_channel_hash': None, # TODO: fix
|
|
||||||
# 'trending_group': 0,
|
|
||||||
# 'trending_mixed': 0,
|
|
||||||
# 'trending_local': 0,
|
|
||||||
# 'trending_global': 0,
|
|
||||||
}
|
|
||||||
if metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration):
|
|
||||||
value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration
|
|
||||||
if metadata.is_stream and metadata.stream.release_time:
|
|
||||||
value['release_time'] = metadata.stream.release_time
|
|
||||||
if metadata.is_channel:
|
|
||||||
value['claims_in_channel'] = self.db.get_claims_in_channel_count(claim_hash)
|
|
||||||
yield 'update', value
|
|
||||||
|
|
||||||
async def run_in_thread_with_lock(self, func, *args):
|
async def run_in_thread_with_lock(self, func, *args):
|
||||||
# Run in a thread to prevent blocking. Shielded so that
|
# Run in a thread to prevent blocking. Shielded so that
|
||||||
|
@ -444,9 +301,9 @@ class BlockProcessor:
|
||||||
for block in blocks:
|
for block in blocks:
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
await self.run_in_thread_with_lock(self.advance_block, block)
|
await self.run_in_thread_with_lock(self.advance_block, block)
|
||||||
self.logger.info("advanced to %i in %ds", self.height, time.perf_counter() - start)
|
self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
|
||||||
# TODO: we shouldnt wait on the search index updating before advancing to the next block
|
# TODO: we shouldnt wait on the search index updating before advancing to the next block
|
||||||
# await self.db.search_index.claim_consumer(self.claim_producer())
|
await self.db.search_index.claim_consumer(self.claim_producer())
|
||||||
self.db.search_index.clear_caches()
|
self.db.search_index.clear_caches()
|
||||||
self.touched_claims_to_send_es.clear()
|
self.touched_claims_to_send_es.clear()
|
||||||
self.removed_claims_to_send_es.clear()
|
self.removed_claims_to_send_es.clear()
|
||||||
|
|
|
@ -36,11 +36,14 @@ from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_ui
|
||||||
from lbry.wallet.server.storage import db_class
|
from lbry.wallet.server.storage import db_class
|
||||||
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
|
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
|
||||||
from lbry.wallet.server.db import DB_PREFIXES
|
from lbry.wallet.server.db import DB_PREFIXES
|
||||||
from lbry.wallet.server.db.common import ResolveResult
|
from lbry.wallet.server.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES
|
||||||
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue
|
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue
|
||||||
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
|
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
|
||||||
from lbry.wallet.server.db.prefixes import PendingActivationKey, ClaimToTXOKey, TXOToClaimValue
|
from lbry.wallet.server.db.prefixes import PendingActivationKey, ClaimToTXOKey, TXOToClaimValue
|
||||||
from lbry.wallet.server.db.claimtrie import length_encoded_name
|
from lbry.wallet.server.db.claimtrie import length_encoded_name
|
||||||
|
from lbry.wallet.transaction import OutputScript
|
||||||
|
from lbry.schema.claim import Claim, guess_stream_type
|
||||||
|
from lbry.wallet.ledger import Ledger, RegTestLedger, TestNetLedger
|
||||||
|
|
||||||
from lbry.wallet.server.db.elasticsearch import SearchIndex
|
from lbry.wallet.server.db.elasticsearch import SearchIndex
|
||||||
|
|
||||||
|
@ -152,6 +155,13 @@ class LevelDB:
|
||||||
|
|
||||||
self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH)
|
self.genesis_bytes = bytes.fromhex(self.coin.GENESIS_HASH)
|
||||||
|
|
||||||
|
if env.coin.NET == 'mainnet':
|
||||||
|
self.ledger = Ledger
|
||||||
|
elif env.coin.NET == 'testnet':
|
||||||
|
self.ledger = TestNetLedger
|
||||||
|
else:
|
||||||
|
self.ledger = RegTestLedger
|
||||||
|
|
||||||
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
|
def get_claim_from_txo(self, tx_num: int, tx_idx: int) -> Optional[TXOToClaimValue]:
|
||||||
claim_hash_and_name = self.db.get(Prefixes.txo_to_claim.pack_key(tx_num, tx_idx))
|
claim_hash_and_name = self.db.get(Prefixes.txo_to_claim.pack_key(tx_num, tx_idx))
|
||||||
if not claim_hash_and_name:
|
if not claim_hash_and_name:
|
||||||
|
@ -429,6 +439,168 @@ class LevelDB:
|
||||||
txos[claim_hash] = tx_num, nout
|
txos[claim_hash] = tx_num, nout
|
||||||
return txos
|
return txos
|
||||||
|
|
||||||
|
def get_claim_output_script(self, tx_hash, nout):
|
||||||
|
raw = self.db.get(
|
||||||
|
DB_PREFIXES.TX_PREFIX.value + tx_hash
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
output = self.coin.transaction(raw).outputs[nout]
|
||||||
|
script = OutputScript(output.pk_script)
|
||||||
|
script.parse()
|
||||||
|
return Claim.from_bytes(script.values['claim'])
|
||||||
|
except:
|
||||||
|
self.logger.error(
|
||||||
|
"tx parsing for ES went boom %s %s", tx_hash[::-1].hex(),
|
||||||
|
raw.hex()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
def _prepare_claim_for_sync(self, claim_hash: bytes):
|
||||||
|
claim = self._fs_get_claim_by_hash(claim_hash)
|
||||||
|
if not claim:
|
||||||
|
print("wat")
|
||||||
|
return
|
||||||
|
metadata = self.get_claim_output_script(claim.tx_hash, claim.position)
|
||||||
|
if not metadata:
|
||||||
|
return
|
||||||
|
reposted_claim_hash = None if not metadata.is_repost else metadata.repost.reference.claim_hash[::-1]
|
||||||
|
reposted_claim = None
|
||||||
|
reposted_metadata = None
|
||||||
|
if reposted_claim_hash:
|
||||||
|
reposted_claim = self.get_claim_txo(reposted_claim_hash)
|
||||||
|
if not reposted_claim:
|
||||||
|
return
|
||||||
|
reposted_metadata = self.get_claim_output_script(
|
||||||
|
self.total_transactions[reposted_claim.tx_num], reposted_claim.position
|
||||||
|
)
|
||||||
|
if not reposted_metadata:
|
||||||
|
return
|
||||||
|
reposted_tags = []
|
||||||
|
reposted_languages = []
|
||||||
|
reposted_has_source = None
|
||||||
|
reposted_claim_type = None
|
||||||
|
if reposted_claim:
|
||||||
|
reposted_tx_hash = self.total_transactions[reposted_claim.tx_num]
|
||||||
|
raw_reposted_claim_tx = self.db.get(
|
||||||
|
DB_PREFIXES.TX_PREFIX.value + reposted_tx_hash
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
reposted_claim_txo = self.coin.transaction(
|
||||||
|
raw_reposted_claim_tx
|
||||||
|
).outputs[reposted_claim.position]
|
||||||
|
reposted_script = OutputScript(reposted_claim_txo.pk_script)
|
||||||
|
reposted_script.parse()
|
||||||
|
except:
|
||||||
|
self.logger.error(
|
||||||
|
"repost tx parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(),
|
||||||
|
raw_reposted_claim_tx.hex()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
reposted_metadata = Claim.from_bytes(reposted_script.values['claim'])
|
||||||
|
except:
|
||||||
|
self.logger.error(
|
||||||
|
"reposted claim parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(),
|
||||||
|
raw_reposted_claim_tx.hex()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
if reposted_metadata:
|
||||||
|
reposted_tags = [] if not reposted_metadata.is_stream else [tag for tag in reposted_metadata.stream.tags]
|
||||||
|
reposted_languages = [] if not reposted_metadata.is_stream else (
|
||||||
|
[lang.language or 'none' for lang in reposted_metadata.stream.languages] or ['none']
|
||||||
|
)
|
||||||
|
reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source
|
||||||
|
reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type]
|
||||||
|
claim_tags = [] if not metadata.is_stream else [tag for tag in metadata.stream.tags]
|
||||||
|
claim_languages = [] if not metadata.is_stream else (
|
||||||
|
[lang.language or 'none' for lang in metadata.stream.languages] or ['none']
|
||||||
|
)
|
||||||
|
tags = list(set(claim_tags).union(set(reposted_tags)))
|
||||||
|
languages = list(set(claim_languages).union(set(reposted_languages)))
|
||||||
|
canonical_url = f'{claim.name}#{claim.claim_hash.hex()}'
|
||||||
|
if metadata.is_signed:
|
||||||
|
channel = self.get_claim_txo(metadata.signing_channel_hash[::-1])
|
||||||
|
if channel:
|
||||||
|
canonical_url = f'{channel.name}#{metadata.signing_channel_hash[::-1].hex()}/{canonical_url}'
|
||||||
|
|
||||||
|
value = {
|
||||||
|
'claim_hash': claim_hash[::-1],
|
||||||
|
# 'claim_id': claim_hash.hex(),
|
||||||
|
'claim_name': claim.name,
|
||||||
|
'normalized': claim.name,
|
||||||
|
'tx_id': claim.tx_hash[::-1].hex(),
|
||||||
|
'tx_num': claim.tx_num,
|
||||||
|
'tx_nout': claim.position,
|
||||||
|
'amount': claim.amount,
|
||||||
|
'timestamp': 0, # TODO: fix
|
||||||
|
'creation_timestamp': 0, # TODO: fix
|
||||||
|
'height': claim.height,
|
||||||
|
'creation_height': claim.creation_height,
|
||||||
|
'activation_height': claim.activation_height,
|
||||||
|
'expiration_height': claim.expiration_height,
|
||||||
|
'effective_amount': claim.effective_amount,
|
||||||
|
'support_amount': claim.support_amount,
|
||||||
|
'is_controlling': claim.is_controlling,
|
||||||
|
'last_take_over_height': claim.last_takeover_height,
|
||||||
|
|
||||||
|
'short_url': f'{claim.name}#{claim.claim_hash.hex()}', # TODO: fix
|
||||||
|
'canonical_url': canonical_url,
|
||||||
|
|
||||||
|
'title': None if not metadata.is_stream else metadata.stream.title,
|
||||||
|
'author': None if not metadata.is_stream else metadata.stream.author,
|
||||||
|
'description': None if not metadata.is_stream else metadata.stream.description,
|
||||||
|
'claim_type': CLAIM_TYPES[metadata.claim_type],
|
||||||
|
'has_source': None if not metadata.is_stream else metadata.stream.has_source,
|
||||||
|
'stream_type': None if not metadata.is_stream else STREAM_TYPES[
|
||||||
|
guess_stream_type(metadata.stream.source.media_type)],
|
||||||
|
'media_type': None if not metadata.is_stream else metadata.stream.source.media_type,
|
||||||
|
'fee_amount': None if not metadata.is_stream or not metadata.stream.has_fee else int(
|
||||||
|
max(metadata.stream.fee.amount or 0, 0) * 1000
|
||||||
|
),
|
||||||
|
'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency,
|
||||||
|
|
||||||
|
'reposted': self.get_reposted_count(claim_hash),
|
||||||
|
'reposted_claim_hash': reposted_claim_hash,
|
||||||
|
'reposted_claim_type': reposted_claim_type,
|
||||||
|
'reposted_has_source': reposted_has_source,
|
||||||
|
|
||||||
|
'channel_hash': metadata.signing_channel_hash,
|
||||||
|
|
||||||
|
'public_key_bytes': None if not metadata.is_channel else metadata.channel.public_key_bytes,
|
||||||
|
'public_key_hash': None if not metadata.is_channel else self.ledger.address_to_hash160(
|
||||||
|
self.ledger.public_key_to_address(metadata.channel.public_key_bytes)
|
||||||
|
),
|
||||||
|
'signature': metadata.signature,
|
||||||
|
'signature_digest': None, # TODO: fix
|
||||||
|
'signature_valid': claim.signature_valid,
|
||||||
|
'tags': tags,
|
||||||
|
'languages': languages,
|
||||||
|
'censor_type': 0, # TODO: fix
|
||||||
|
'censoring_channel_hash': None, # TODO: fix
|
||||||
|
'claims_in_channel': None if not metadata.is_channel else self.get_claims_in_channel_count(claim_hash)
|
||||||
|
# 'trending_group': 0,
|
||||||
|
# 'trending_mixed': 0,
|
||||||
|
# 'trending_local': 0,
|
||||||
|
# 'trending_global': 0,
|
||||||
|
}
|
||||||
|
if metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration):
|
||||||
|
value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration
|
||||||
|
if metadata.is_stream and metadata.stream.release_time:
|
||||||
|
value['release_time'] = metadata.stream.release_time
|
||||||
|
return value
|
||||||
|
|
||||||
|
def all_claims_producer(self):
|
||||||
|
for claim_hash in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix, include_value=False):
|
||||||
|
claim = self._prepare_claim_for_sync(claim_hash[1:])
|
||||||
|
if claim:
|
||||||
|
yield claim
|
||||||
|
|
||||||
|
def claims_producer(self, claim_hashes: Set[bytes]):
|
||||||
|
for claim_hash in claim_hashes:
|
||||||
|
result = self._prepare_claim_for_sync(claim_hash)
|
||||||
|
if result:
|
||||||
|
yield result
|
||||||
|
|
||||||
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
|
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
|
||||||
activated = defaultdict(list)
|
activated = defaultdict(list)
|
||||||
for _k, _v in self.db.iterator(prefix=Prefixes.pending_activation.pack_partial_key(height)):
|
for _k, _v in self.db.iterator(prefix=Prefixes.pending_activation.pack_partial_key(height)):
|
||||||
|
@ -567,6 +739,11 @@ class LevelDB:
|
||||||
if height >= min_height:
|
if height >= min_height:
|
||||||
break
|
break
|
||||||
keys.append(key)
|
keys.append(key)
|
||||||
|
if min_height > 0:
|
||||||
|
for key in self.db.iterator(start=DB_PREFIXES.undo_claimtrie.value,
|
||||||
|
stop=DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(min_height),
|
||||||
|
include_value=False):
|
||||||
|
keys.append(key)
|
||||||
if keys:
|
if keys:
|
||||||
with self.db.write_batch() as batch:
|
with self.db.write_batch() as batch:
|
||||||
for key in keys:
|
for key in keys:
|
||||||
|
|
Loading…
Reference in a new issue