reposts
This commit is contained in:
parent
9ad31008a5
commit
6ea96e79bd
9 changed files with 260 additions and 89 deletions
|
@ -120,10 +120,10 @@ class Outputs:
|
||||||
'expiration_height': claim.expiration_height,
|
'expiration_height': claim.expiration_height,
|
||||||
'effective_amount': claim.effective_amount,
|
'effective_amount': claim.effective_amount,
|
||||||
'support_amount': claim.support_amount,
|
'support_amount': claim.support_amount,
|
||||||
'trending_group': claim.trending_group,
|
# 'trending_group': claim.trending_group,
|
||||||
'trending_mixed': claim.trending_mixed,
|
# 'trending_mixed': claim.trending_mixed,
|
||||||
'trending_local': claim.trending_local,
|
# 'trending_local': claim.trending_local,
|
||||||
'trending_global': claim.trending_global,
|
# 'trending_global': claim.trending_global,
|
||||||
}
|
}
|
||||||
if claim.HasField('channel'):
|
if claim.HasField('channel'):
|
||||||
txo.channel = tx_map[claim.channel.tx_hash].outputs[claim.channel.nout]
|
txo.channel = tx_map[claim.channel.tx_hash].outputs[claim.channel.nout]
|
||||||
|
@ -210,7 +210,7 @@ class Outputs:
|
||||||
txo_message.nout = resolve_result.position
|
txo_message.nout = resolve_result.position
|
||||||
txo_message.height = resolve_result.height
|
txo_message.height = resolve_result.height
|
||||||
txo_message.claim.short_url = resolve_result.short_url
|
txo_message.claim.short_url = resolve_result.short_url
|
||||||
txo_message.claim.reposted = 0
|
txo_message.claim.reposted = resolve_result.reposted
|
||||||
txo_message.claim.is_controlling = resolve_result.is_controlling
|
txo_message.claim.is_controlling = resolve_result.is_controlling
|
||||||
txo_message.claim.creation_height = resolve_result.creation_height
|
txo_message.claim.creation_height = resolve_result.creation_height
|
||||||
txo_message.claim.activation_height = resolve_result.activation_height
|
txo_message.claim.activation_height = resolve_result.activation_height
|
||||||
|
|
|
@ -9,9 +9,10 @@ from prometheus_client import Gauge, Histogram
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
import lbry
|
import lbry
|
||||||
from lbry.schema.claim import Claim
|
from lbry.schema.claim import Claim
|
||||||
|
from lbry.schema.mime_types import guess_stream_type
|
||||||
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
|
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
|
||||||
from lbry.wallet.constants import TXO_TYPES
|
from lbry.wallet.constants import TXO_TYPES
|
||||||
from lbry.wallet.server.db.common import STREAM_TYPES
|
from lbry.wallet.server.db.common import STREAM_TYPES, CLAIM_TYPES
|
||||||
|
|
||||||
from lbry.wallet.transaction import OutputScript, Output
|
from lbry.wallet.transaction import OutputScript, Output
|
||||||
from lbry.wallet.server.tx import Tx, TxOutput, TxInput
|
from lbry.wallet.server.tx import Tx, TxOutput, TxInput
|
||||||
|
@ -213,7 +214,7 @@ class BlockProcessor:
|
||||||
# is consistent with self.height
|
# is consistent with self.height
|
||||||
self.state_lock = asyncio.Lock()
|
self.state_lock = asyncio.Lock()
|
||||||
|
|
||||||
self.search_cache = {}
|
# self.search_cache = {}
|
||||||
self.history_cache = {}
|
self.history_cache = {}
|
||||||
self.status_server = StatusServer()
|
self.status_server = StatusServer()
|
||||||
|
|
||||||
|
@ -251,32 +252,98 @@ class BlockProcessor:
|
||||||
self.removed_claims_to_send_es = set()
|
self.removed_claims_to_send_es = set()
|
||||||
self.touched_claims_to_send_es = set()
|
self.touched_claims_to_send_es = set()
|
||||||
|
|
||||||
|
self.pending_reposted_count = set()
|
||||||
|
|
||||||
def claim_producer(self):
|
def claim_producer(self):
|
||||||
|
def get_claim_txo(tx_hash, nout):
|
||||||
|
raw = self.db.db.get(
|
||||||
|
DB_PREFIXES.TX_PREFIX.value + tx_hash
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
output: TxOutput = self.coin.transaction(raw).outputs[nout]
|
||||||
|
script = OutputScript(output.pk_script)
|
||||||
|
script.parse()
|
||||||
|
return Claim.from_bytes(script.values['claim'])
|
||||||
|
except:
|
||||||
|
self.logger.exception(
|
||||||
|
"tx parsing for ES went boom %s %s", tx_hash[::-1].hex(),
|
||||||
|
raw.hex()
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
if self.db.db_height <= 1:
|
if self.db.db_height <= 1:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
to_send_es = set(self.touched_claims_to_send_es)
|
||||||
|
to_send_es.update(self.pending_reposted_count.difference(self.removed_claims_to_send_es))
|
||||||
|
|
||||||
for claim_hash in self.removed_claims_to_send_es:
|
for claim_hash in self.removed_claims_to_send_es:
|
||||||
yield 'delete', claim_hash.hex()
|
yield 'delete', claim_hash.hex()
|
||||||
for claim_hash in self.touched_claims_to_send_es:
|
for claim_hash in to_send_es:
|
||||||
claim = self.db._fs_get_claim_by_hash(claim_hash)
|
claim = self.db._fs_get_claim_by_hash(claim_hash)
|
||||||
raw_claim_tx = self.db.db.get(DB_PREFIXES.TX_PREFIX.value + claim.tx_hash)
|
metadata = get_claim_txo(claim.tx_hash, claim.position)
|
||||||
try:
|
if not metadata:
|
||||||
claim_txo: TxOutput = self.coin.transaction(raw_claim_tx).outputs[claim.position]
|
|
||||||
script = OutputScript(claim_txo.pk_script)
|
|
||||||
script.parse()
|
|
||||||
except:
|
|
||||||
self.logger.exception(
|
|
||||||
"tx parsing for ES went boom %s %s", claim.tx_hash[::-1].hex(), raw_claim_tx.hex()
|
|
||||||
)
|
|
||||||
continue
|
continue
|
||||||
try:
|
reposted_claim_hash = None if not metadata.is_repost else metadata.repost.reference.claim_hash[::-1]
|
||||||
metadata = Claim.from_bytes(script.values['claim'])
|
reposted_claim = None
|
||||||
except:
|
reposted_metadata = None
|
||||||
self.logger.exception(
|
if reposted_claim_hash:
|
||||||
"claim parsing for ES went boom %s %s", claim.tx_hash[::-1].hex(), raw_claim_tx.hex()
|
reposted_claim = self.db.get_claim_txo(reposted_claim_hash)
|
||||||
|
if not reposted_claim:
|
||||||
|
continue
|
||||||
|
reposted_metadata = get_claim_txo(
|
||||||
|
self.db.total_transactions[reposted_claim[0].tx_num], reposted_claim[0].position
|
||||||
)
|
)
|
||||||
continue
|
if not reposted_metadata:
|
||||||
|
continue
|
||||||
|
reposted_tags = []
|
||||||
|
reposted_languages = []
|
||||||
|
reposted_has_source = None
|
||||||
|
reposted_claim_type = None
|
||||||
|
if reposted_claim:
|
||||||
|
reposted_tx_hash = self.db.total_transactions[reposted_claim[0].tx_num]
|
||||||
|
raw_reposted_claim_tx = self.db.db.get(
|
||||||
|
DB_PREFIXES.TX_PREFIX.value + reposted_tx_hash
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
reposted_claim_txo: TxOutput = self.coin.transaction(
|
||||||
|
raw_reposted_claim_tx
|
||||||
|
).outputs[reposted_claim[0].position]
|
||||||
|
reposted_script = OutputScript(reposted_claim_txo.pk_script)
|
||||||
|
reposted_script.parse()
|
||||||
|
except:
|
||||||
|
self.logger.exception(
|
||||||
|
"repost tx parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(),
|
||||||
|
raw_reposted_claim_tx.hex()
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
reposted_metadata = Claim.from_bytes(reposted_script.values['claim'])
|
||||||
|
except:
|
||||||
|
self.logger.exception(
|
||||||
|
"reposted claim parsing for ES went boom %s %s", reposted_tx_hash[::-1].hex(),
|
||||||
|
raw_reposted_claim_tx.hex()
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
if reposted_metadata:
|
||||||
|
reposted_tags = [] if not reposted_metadata.is_stream else [tag for tag in reposted_metadata.stream.tags]
|
||||||
|
reposted_languages = [] if not reposted_metadata.is_stream else (
|
||||||
|
[lang.language or 'none' for lang in reposted_metadata.stream.languages] or ['none']
|
||||||
|
)
|
||||||
|
reposted_has_source = False if not reposted_metadata.is_stream else reposted_metadata.stream.has_source
|
||||||
|
reposted_claim_type = CLAIM_TYPES[reposted_metadata.claim_type]
|
||||||
|
claim_tags = [] if not metadata.is_stream else [tag for tag in metadata.stream.tags]
|
||||||
|
claim_languages = [] if not metadata.is_stream else (
|
||||||
|
[lang.language or 'none' for lang in metadata.stream.languages] or ['none']
|
||||||
|
)
|
||||||
|
tags = list(set(claim_tags).union(set(reposted_tags)))
|
||||||
|
languages = list(set(claim_languages).union(set(reposted_languages)))
|
||||||
|
canonical_url = f'{claim.name}#{claim.claim_hash.hex()}'
|
||||||
|
if metadata.is_signed:
|
||||||
|
channel_txo = self.db.get_claim_txo(metadata.signing_channel_hash[::-1])
|
||||||
|
canonical_url = f'{channel_txo[1].name}#{metadata.signing_channel_hash[::-1].hex()}/{canonical_url}'
|
||||||
|
|
||||||
yield ('update', {
|
value = {
|
||||||
'claim_hash': claim_hash[::-1],
|
'claim_hash': claim_hash[::-1],
|
||||||
# 'claim_id': claim_hash.hex(),
|
# 'claim_id': claim_hash.hex(),
|
||||||
'claim_name': claim.name,
|
'claim_name': claim.name,
|
||||||
|
@ -285,8 +352,8 @@ class BlockProcessor:
|
||||||
'tx_num': claim.tx_num,
|
'tx_num': claim.tx_num,
|
||||||
'tx_nout': claim.position,
|
'tx_nout': claim.position,
|
||||||
'amount': claim.amount,
|
'amount': claim.amount,
|
||||||
'timestamp': 0,
|
'timestamp': 0, # TODO: fix
|
||||||
'creation_timestamp': 0,
|
'creation_timestamp': 0, # TODO: fix
|
||||||
'height': claim.height,
|
'height': claim.height,
|
||||||
'creation_height': claim.creation_height,
|
'creation_height': claim.creation_height,
|
||||||
'activation_height': claim.activation_height,
|
'activation_height': claim.activation_height,
|
||||||
|
@ -296,25 +363,24 @@ class BlockProcessor:
|
||||||
'is_controlling': claim.is_controlling,
|
'is_controlling': claim.is_controlling,
|
||||||
'last_take_over_height': claim.last_takeover_height,
|
'last_take_over_height': claim.last_takeover_height,
|
||||||
|
|
||||||
'short_url': '',
|
'short_url': f'{claim.name}#{claim.claim_hash.hex()}', # TODO: fix
|
||||||
'canonical_url': '',
|
'canonical_url': canonical_url,
|
||||||
|
|
||||||
'release_time': None if not metadata.is_stream else metadata.stream.release_time,
|
|
||||||
'title': None if not metadata.is_stream else metadata.stream.title,
|
'title': None if not metadata.is_stream else metadata.stream.title,
|
||||||
'author': None if not metadata.is_stream else metadata.stream.author,
|
'author': None if not metadata.is_stream else metadata.stream.author,
|
||||||
'description': None if not metadata.is_stream else metadata.stream.description,
|
'description': None if not metadata.is_stream else metadata.stream.description,
|
||||||
'claim_type': TXO_TYPES[metadata.claim_type],
|
'claim_type': CLAIM_TYPES[metadata.claim_type],
|
||||||
'has_source': None if not metadata.is_stream else metadata.stream.has_source,
|
'has_source': None if not metadata.is_stream else metadata.stream.has_source,
|
||||||
'stream_type': None if not metadata.is_stream else STREAM_TYPES.get(metadata.stream.stream_type, None),
|
'stream_type': None if not metadata.is_stream else STREAM_TYPES[guess_stream_type(metadata.stream.source.media_type)],
|
||||||
'media_type': None if not metadata.is_stream else metadata.stream.source.media_type,
|
'media_type': None if not metadata.is_stream else metadata.stream.source.media_type,
|
||||||
'fee_amount': None if not metadata.is_stream else metadata.stream.fee.amount,
|
'fee_amount': None if not metadata.is_stream or not metadata.stream.has_fee else int(max(metadata.stream.fee.amount or 0, 0)*1000),
|
||||||
'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency,
|
'fee_currency': None if not metadata.is_stream else metadata.stream.fee.currency,
|
||||||
'duration': None if not metadata.is_stream else (metadata.stream.video.duration or metadata.stream.audio.duration),
|
# 'duration': None if not metadata.is_stream else (metadata.stream.video.duration or metadata.stream.audio.duration),
|
||||||
|
|
||||||
'reposted': 0,
|
'reposted': self.db.get_reposted_count(claim_hash),
|
||||||
'reposted_claim_hash': None,
|
'reposted_claim_hash': reposted_claim_hash,
|
||||||
'reposted_claim_type': None,
|
'reposted_claim_type': reposted_claim_type,
|
||||||
'reposted_has_source': False,
|
'reposted_has_source': reposted_has_source,
|
||||||
|
|
||||||
'channel_hash': metadata.signing_channel_hash,
|
'channel_hash': metadata.signing_channel_hash,
|
||||||
|
|
||||||
|
@ -323,21 +389,25 @@ class BlockProcessor:
|
||||||
self.ledger.public_key_to_address(metadata.channel.public_key_bytes)
|
self.ledger.public_key_to_address(metadata.channel.public_key_bytes)
|
||||||
),
|
),
|
||||||
'signature': metadata.signature,
|
'signature': metadata.signature,
|
||||||
'signature_digest': None,
|
'signature_digest': None, # TODO: fix
|
||||||
'signature_valid': False,
|
'signature_valid': False, # TODO: fix
|
||||||
'claims_in_channel': 0,
|
'claims_in_channel': 0, # TODO: fix
|
||||||
|
|
||||||
'tags': [] if not metadata.is_stream else [tag for tag in metadata.stream.tags],
|
'tags': tags,
|
||||||
'languages': [] if not metadata.is_stream else (
|
'languages': languages,
|
||||||
[lang.language or 'none' for lang in metadata.stream.languages] or ['none']
|
'censor_type': 0, # TODO: fix
|
||||||
),
|
'censoring_channel_hash': None, # TODO: fix
|
||||||
'censor_type': 0,
|
|
||||||
'censoring_channel_hash': None,
|
|
||||||
# 'trending_group': 0,
|
# 'trending_group': 0,
|
||||||
# 'trending_mixed': 0,
|
# 'trending_mixed': 0,
|
||||||
# 'trending_local': 0,
|
# 'trending_local': 0,
|
||||||
# 'trending_global': 0,
|
# 'trending_global': 0,
|
||||||
})
|
}
|
||||||
|
if metadata.is_stream and (metadata.stream.video.duration or metadata.stream.audio.duration):
|
||||||
|
value['duration'] = metadata.stream.video.duration or metadata.stream.audio.duration
|
||||||
|
if metadata.is_stream and metadata.stream.release_time:
|
||||||
|
value['release_time'] = metadata.stream.release_time
|
||||||
|
|
||||||
|
yield ('update', value)
|
||||||
|
|
||||||
async def run_in_thread_with_lock(self, func, *args):
|
async def run_in_thread_with_lock(self, func, *args):
|
||||||
# Run in a thread to prevent blocking. Shielded so that
|
# Run in a thread to prevent blocking. Shielded so that
|
||||||
|
@ -368,17 +438,20 @@ class BlockProcessor:
|
||||||
try:
|
try:
|
||||||
for block in blocks:
|
for block in blocks:
|
||||||
await self.run_in_thread_with_lock(self.advance_block, block)
|
await self.run_in_thread_with_lock(self.advance_block, block)
|
||||||
|
# TODO: we shouldnt wait on the search index updating before advancing to the next block
|
||||||
await self.db.search_index.claim_consumer(self.claim_producer())
|
await self.db.search_index.claim_consumer(self.claim_producer())
|
||||||
|
self.db.search_index.clear_caches()
|
||||||
self.touched_claims_to_send_es.clear()
|
self.touched_claims_to_send_es.clear()
|
||||||
self.removed_claims_to_send_es.clear()
|
self.removed_claims_to_send_es.clear()
|
||||||
|
self.pending_reposted_count.clear()
|
||||||
print("******************\n")
|
print("******************\n")
|
||||||
except:
|
except:
|
||||||
self.logger.exception("advance blocks failed")
|
self.logger.exception("advance blocks failed")
|
||||||
raise
|
raise
|
||||||
# if self.sql:
|
# if self.sql:
|
||||||
|
|
||||||
for cache in self.search_cache.values():
|
# for cache in self.search_cache.values():
|
||||||
cache.clear()
|
# cache.clear()
|
||||||
self.history_cache.clear() # TODO: is this needed?
|
self.history_cache.clear() # TODO: is this needed?
|
||||||
self.notifications.notified_mempool_txs.clear()
|
self.notifications.notified_mempool_txs.clear()
|
||||||
|
|
||||||
|
@ -535,11 +608,16 @@ class BlockProcessor:
|
||||||
|
|
||||||
ops = []
|
ops = []
|
||||||
signing_channel_hash = None
|
signing_channel_hash = None
|
||||||
|
reposted_claim_hash = None
|
||||||
|
if txo.claim.is_repost:
|
||||||
|
reposted_claim_hash = txo.claim.repost.reference.claim_hash[::-1]
|
||||||
|
self.pending_reposted_count.add(reposted_claim_hash)
|
||||||
|
|
||||||
if signable and signable.signing_channel_hash:
|
if signable and signable.signing_channel_hash:
|
||||||
signing_channel_hash = txo.signable.signing_channel_hash[::-1]
|
signing_channel_hash = txo.signable.signing_channel_hash[::-1]
|
||||||
if txo.script.is_claim_name:
|
if txo.script.is_claim_name: # it's a root claim
|
||||||
root_tx_num, root_idx = tx_num, nout
|
root_tx_num, root_idx = tx_num, nout
|
||||||
else:
|
else: # it's a claim update
|
||||||
if claim_hash not in spent_claims:
|
if claim_hash not in spent_claims:
|
||||||
print(f"\tthis is a wonky tx, contains unlinked claim update {claim_hash.hex()}")
|
print(f"\tthis is a wonky tx, contains unlinked claim update {claim_hash.hex()}")
|
||||||
return []
|
return []
|
||||||
|
@ -561,7 +639,7 @@ class BlockProcessor:
|
||||||
)
|
)
|
||||||
pending = StagedClaimtrieItem(
|
pending = StagedClaimtrieItem(
|
||||||
claim_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, root_tx_num,
|
claim_name, claim_hash, txo.amount, self.coin.get_expiration_height(height), tx_num, nout, root_tx_num,
|
||||||
root_idx, signing_channel_hash
|
root_idx, signing_channel_hash, reposted_claim_hash
|
||||||
)
|
)
|
||||||
self.pending_claims[(tx_num, nout)] = pending
|
self.pending_claims[(tx_num, nout)] = pending
|
||||||
self.pending_claim_txos[claim_hash] = (tx_num, nout)
|
self.pending_claim_txos[claim_hash] = (tx_num, nout)
|
||||||
|
@ -625,11 +703,14 @@ class BlockProcessor:
|
||||||
claim_hash = spent_claim_hash_and_name.claim_hash
|
claim_hash = spent_claim_hash_and_name.claim_hash
|
||||||
signing_hash = self.db.get_channel_for_claim(claim_hash)
|
signing_hash = self.db.get_channel_for_claim(claim_hash)
|
||||||
k, v = self.db.get_claim_txo(claim_hash)
|
k, v = self.db.get_claim_txo(claim_hash)
|
||||||
|
reposted_claim_hash = self.db.get_repost(claim_hash)
|
||||||
spent = StagedClaimtrieItem(
|
spent = StagedClaimtrieItem(
|
||||||
v.name, claim_hash, v.amount,
|
v.name, claim_hash, v.amount,
|
||||||
self.coin.get_expiration_height(bisect_right(self.db.tx_counts, txin_num)),
|
self.coin.get_expiration_height(bisect_right(self.db.tx_counts, txin_num)),
|
||||||
txin_num, txin.prev_idx, v.root_tx_num, v.root_position, signing_hash
|
txin_num, txin.prev_idx, v.root_tx_num, v.root_position, signing_hash, reposted_claim_hash
|
||||||
)
|
)
|
||||||
|
if spent.reposted_claim_hash:
|
||||||
|
self.pending_reposted_count.add(spent.reposted_claim_hash)
|
||||||
spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.name)
|
spent_claims[spent.claim_hash] = (spent.tx_num, spent.position, spent.name)
|
||||||
print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}")
|
print(f"\tspend lbry://{spent.name}#{spent.claim_hash.hex()}")
|
||||||
return spent.get_spend_claim_txo_ops()
|
return spent.get_spend_claim_txo_ops()
|
||||||
|
@ -646,6 +727,7 @@ class BlockProcessor:
|
||||||
self.staged_pending_abandoned[pending.claim_hash] = pending
|
self.staged_pending_abandoned[pending.claim_hash] = pending
|
||||||
claim_root_tx_num, claim_root_idx = pending.root_claim_tx_num, pending.root_claim_tx_position
|
claim_root_tx_num, claim_root_idx = pending.root_claim_tx_num, pending.root_claim_tx_position
|
||||||
prev_amount, prev_signing_hash = pending.amount, pending.signing_hash
|
prev_amount, prev_signing_hash = pending.amount, pending.signing_hash
|
||||||
|
reposted_claim_hash = pending.reposted_claim_hash
|
||||||
expiration = self.coin.get_expiration_height(self.height)
|
expiration = self.coin.get_expiration_height(self.height)
|
||||||
else:
|
else:
|
||||||
k, v = self.db.get_claim_txo(
|
k, v = self.db.get_claim_txo(
|
||||||
|
@ -653,10 +735,11 @@ class BlockProcessor:
|
||||||
)
|
)
|
||||||
claim_root_tx_num, claim_root_idx, prev_amount = v.root_tx_num, v.root_position, v.amount
|
claim_root_tx_num, claim_root_idx, prev_amount = v.root_tx_num, v.root_position, v.amount
|
||||||
prev_signing_hash = self.db.get_channel_for_claim(claim_hash)
|
prev_signing_hash = self.db.get_channel_for_claim(claim_hash)
|
||||||
|
reposted_claim_hash = self.db.get_repost(claim_hash)
|
||||||
expiration = self.coin.get_expiration_height(bisect_right(self.db.tx_counts, tx_num))
|
expiration = self.coin.get_expiration_height(bisect_right(self.db.tx_counts, tx_num))
|
||||||
self.staged_pending_abandoned[claim_hash] = staged = StagedClaimtrieItem(
|
self.staged_pending_abandoned[claim_hash] = staged = StagedClaimtrieItem(
|
||||||
name, claim_hash, prev_amount, expiration, tx_num, nout, claim_root_tx_num,
|
name, claim_hash, prev_amount, expiration, tx_num, nout, claim_root_tx_num,
|
||||||
claim_root_idx, prev_signing_hash
|
claim_root_idx, prev_signing_hash, reposted_claim_hash
|
||||||
)
|
)
|
||||||
|
|
||||||
self.pending_supports[claim_hash].clear()
|
self.pending_supports[claim_hash].clear()
|
||||||
|
@ -1216,8 +1299,8 @@ class BlockProcessor:
|
||||||
self.possible_future_activated_support.clear()
|
self.possible_future_activated_support.clear()
|
||||||
self.possible_future_support_txos.clear()
|
self.possible_future_support_txos.clear()
|
||||||
|
|
||||||
for cache in self.search_cache.values():
|
# for cache in self.search_cache.values():
|
||||||
cache.clear()
|
# cache.clear()
|
||||||
self.history_cache.clear()
|
self.history_cache.clear()
|
||||||
self.notifications.notified_mempool_txs.clear()
|
self.notifications.notified_mempool_txs.clear()
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import enum
|
import enum
|
||||||
|
|
||||||
|
|
||||||
|
@enum.unique
|
||||||
class DB_PREFIXES(enum.Enum):
|
class DB_PREFIXES(enum.Enum):
|
||||||
claim_to_support = b'K'
|
claim_to_support = b'K'
|
||||||
support_to_claim = b'L'
|
support_to_claim = b'L'
|
||||||
|
@ -20,6 +21,9 @@ class DB_PREFIXES(enum.Enum):
|
||||||
activated_claim_and_support = b'R'
|
activated_claim_and_support = b'R'
|
||||||
active_amount = b'S'
|
active_amount = b'S'
|
||||||
|
|
||||||
|
repost = b'V'
|
||||||
|
reposted_claim = b'W'
|
||||||
|
|
||||||
undo_claimtrie = b'M'
|
undo_claimtrie = b'M'
|
||||||
|
|
||||||
HISTORY_PREFIX = b'A'
|
HISTORY_PREFIX = b'A'
|
||||||
|
|
|
@ -3,7 +3,7 @@ from typing import Optional
|
||||||
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
|
from lbry.wallet.server.db.revertable import RevertablePut, RevertableDelete, RevertableOp, delete_prefix
|
||||||
from lbry.wallet.server.db import DB_PREFIXES
|
from lbry.wallet.server.db import DB_PREFIXES
|
||||||
from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue, EffectiveAmountPrefixRow
|
from lbry.wallet.server.db.prefixes import Prefixes, ClaimTakeoverValue, EffectiveAmountPrefixRow
|
||||||
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE
|
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, RepostPrefixRow, RepostedPrefixRow
|
||||||
|
|
||||||
|
|
||||||
def length_encoded_name(name: str) -> bytes:
|
def length_encoded_name(name: str) -> bytes:
|
||||||
|
@ -137,6 +137,7 @@ class StagedClaimtrieItem(typing.NamedTuple):
|
||||||
root_claim_tx_num: int
|
root_claim_tx_num: int
|
||||||
root_claim_tx_position: int
|
root_claim_tx_position: int
|
||||||
signing_hash: Optional[bytes]
|
signing_hash: Optional[bytes]
|
||||||
|
reposted_claim_hash: Optional[bytes]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_update(self) -> bool:
|
def is_update(self) -> bool:
|
||||||
|
@ -191,6 +192,16 @@ class StagedClaimtrieItem(typing.NamedTuple):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
])
|
])
|
||||||
|
if self.reposted_claim_hash:
|
||||||
|
ops.extend([
|
||||||
|
op(
|
||||||
|
*RepostPrefixRow.pack_item(self.claim_hash, self.reposted_claim_hash)
|
||||||
|
),
|
||||||
|
op(
|
||||||
|
*RepostedPrefixRow.pack_item(self.reposted_claim_hash, self.tx_num, self.position, self.claim_hash)
|
||||||
|
),
|
||||||
|
|
||||||
|
])
|
||||||
return ops
|
return ops
|
||||||
|
|
||||||
def get_add_claim_utxo_ops(self) -> typing.List[RevertableOp]:
|
def get_add_claim_utxo_ops(self) -> typing.List[RevertableOp]:
|
||||||
|
@ -207,9 +218,8 @@ class StagedClaimtrieItem(typing.NamedTuple):
|
||||||
] + delete_prefix(db, DB_PREFIXES.channel_to_claim.value + self.signing_hash)
|
] + delete_prefix(db, DB_PREFIXES.channel_to_claim.value + self.signing_hash)
|
||||||
|
|
||||||
def get_abandon_ops(self, db) -> typing.List[RevertableOp]:
|
def get_abandon_ops(self, db) -> typing.List[RevertableOp]:
|
||||||
packed_name = length_encoded_name(self.name)
|
|
||||||
delete_short_id_ops = delete_prefix(
|
delete_short_id_ops = delete_prefix(
|
||||||
db, DB_PREFIXES.claim_short_id_prefix.value + packed_name + self.claim_hash
|
db, Prefixes.claim_short_id.pack_partial_key(self.name, self.claim_hash)
|
||||||
)
|
)
|
||||||
delete_claim_ops = delete_prefix(db, DB_PREFIXES.claim_to_txo.value + self.claim_hash)
|
delete_claim_ops = delete_prefix(db, DB_PREFIXES.claim_to_txo.value + self.claim_hash)
|
||||||
delete_supports_ops = delete_prefix(db, DB_PREFIXES.claim_to_support.value + self.claim_hash)
|
delete_supports_ops = delete_prefix(db, DB_PREFIXES.claim_to_support.value + self.claim_hash)
|
||||||
|
|
|
@ -438,6 +438,7 @@ class ResolveResult(typing.NamedTuple):
|
||||||
expiration_height: int
|
expiration_height: int
|
||||||
effective_amount: int
|
effective_amount: int
|
||||||
support_amount: int
|
support_amount: int
|
||||||
|
reposted: int
|
||||||
last_takeover_height: typing.Optional[int]
|
last_takeover_height: typing.Optional[int]
|
||||||
claims_in_channel: typing.Optional[int]
|
claims_in_channel: typing.Optional[int]
|
||||||
channel_hash: typing.Optional[bytes]
|
channel_hash: typing.Optional[bytes]
|
||||||
|
|
|
@ -211,7 +211,8 @@ class SearchIndex:
|
||||||
last_takeover_height=r['last_take_over_height'],
|
last_takeover_height=r['last_take_over_height'],
|
||||||
claims_in_channel=r['claims_in_channel'],
|
claims_in_channel=r['claims_in_channel'],
|
||||||
channel_hash=r['channel_hash'],
|
channel_hash=r['channel_hash'],
|
||||||
reposted_claim_hash=r['reposted_claim_hash']
|
reposted_claim_hash=r['reposted_claim_hash'],
|
||||||
|
reposted=r['reposted']
|
||||||
) for r in response
|
) for r in response
|
||||||
]
|
]
|
||||||
extra = [
|
extra = [
|
||||||
|
@ -234,7 +235,8 @@ class SearchIndex:
|
||||||
last_takeover_height=r['last_take_over_height'],
|
last_takeover_height=r['last_take_over_height'],
|
||||||
claims_in_channel=r['claims_in_channel'],
|
claims_in_channel=r['claims_in_channel'],
|
||||||
channel_hash=r['channel_hash'],
|
channel_hash=r['channel_hash'],
|
||||||
reposted_claim_hash=r['reposted_claim_hash']
|
reposted_claim_hash=r['reposted_claim_hash'],
|
||||||
|
reposted=r['reposted']
|
||||||
) for r in await self._get_referenced_rows(total_referenced)
|
) for r in await self._get_referenced_rows(total_referenced)
|
||||||
]
|
]
|
||||||
result = Outputs.to_base64(
|
result = Outputs.to_base64(
|
||||||
|
@ -471,7 +473,7 @@ class SearchIndex:
|
||||||
def extract_doc(doc, index):
|
def extract_doc(doc, index):
|
||||||
doc['claim_id'] = doc.pop('claim_hash')[::-1].hex()
|
doc['claim_id'] = doc.pop('claim_hash')[::-1].hex()
|
||||||
if doc['reposted_claim_hash'] is not None:
|
if doc['reposted_claim_hash'] is not None:
|
||||||
doc['reposted_claim_id'] = doc.pop('reposted_claim_hash')[::-1].hex()
|
doc['reposted_claim_id'] = doc.pop('reposted_claim_hash').hex()
|
||||||
else:
|
else:
|
||||||
doc['reposted_claim_id'] = None
|
doc['reposted_claim_id'] = None
|
||||||
channel_hash = doc.pop('channel_hash')
|
channel_hash = doc.pop('channel_hash')
|
||||||
|
|
|
@ -193,6 +193,24 @@ class EffectiveAmountValue(typing.NamedTuple):
|
||||||
claim_hash: bytes
|
claim_hash: bytes
|
||||||
|
|
||||||
|
|
||||||
|
class RepostKey(typing.NamedTuple):
|
||||||
|
claim_hash: bytes
|
||||||
|
|
||||||
|
|
||||||
|
class RepostValue(typing.NamedTuple):
|
||||||
|
reposted_claim_hash: bytes
|
||||||
|
|
||||||
|
|
||||||
|
class RepostedKey(typing.NamedTuple):
|
||||||
|
reposted_claim_hash: bytes
|
||||||
|
tx_num: int
|
||||||
|
position: int
|
||||||
|
|
||||||
|
|
||||||
|
class RepostedValue(typing.NamedTuple):
|
||||||
|
claim_hash: bytes
|
||||||
|
|
||||||
|
|
||||||
class ActiveAmountPrefixRow(PrefixRow):
|
class ActiveAmountPrefixRow(PrefixRow):
|
||||||
prefix = DB_PREFIXES.active_amount.value
|
prefix = DB_PREFIXES.active_amount.value
|
||||||
key_struct = struct.Struct(b'>20sBLLH')
|
key_struct = struct.Struct(b'>20sBLLH')
|
||||||
|
@ -676,6 +694,64 @@ class EffectiveAmountPrefixRow(PrefixRow):
|
||||||
return cls.pack_key(name, effective_amount, tx_num, position), cls.pack_value(claim_hash)
|
return cls.pack_key(name, effective_amount, tx_num, position), cls.pack_value(claim_hash)
|
||||||
|
|
||||||
|
|
||||||
|
class RepostPrefixRow(PrefixRow):
|
||||||
|
prefix = DB_PREFIXES.repost.value
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def pack_key(cls, claim_hash: bytes):
|
||||||
|
return cls.prefix + claim_hash
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def unpack_key(cls, key: bytes) -> RepostKey:
|
||||||
|
assert key[0] == cls.prefix
|
||||||
|
assert len(key) == 21
|
||||||
|
return RepostKey[1:]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def pack_value(cls, reposted_claim_hash: bytes) -> bytes:
|
||||||
|
return reposted_claim_hash
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def unpack_value(cls, data: bytes) -> RepostValue:
|
||||||
|
return RepostValue(data)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def pack_item(cls, claim_hash: bytes, reposted_claim_hash: bytes):
|
||||||
|
return cls.pack_key(claim_hash), cls.pack_value(reposted_claim_hash)
|
||||||
|
|
||||||
|
|
||||||
|
class RepostedPrefixRow(PrefixRow):
|
||||||
|
prefix = DB_PREFIXES.reposted_claim.value
|
||||||
|
key_struct = struct.Struct(b'>20sLH')
|
||||||
|
value_struct = struct.Struct(b'>20s')
|
||||||
|
key_part_lambdas = [
|
||||||
|
lambda: b'',
|
||||||
|
struct.Struct(b'>20s').pack,
|
||||||
|
struct.Struct(b'>20sL').pack,
|
||||||
|
struct.Struct(b'>20sLH').pack
|
||||||
|
]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def pack_key(cls, reposted_claim_hash: bytes, tx_num: int, position: int):
|
||||||
|
return super().pack_key(reposted_claim_hash, tx_num, position)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def unpack_key(cls, key: bytes) -> RepostedKey:
|
||||||
|
return RepostedKey(*super().unpack_key(key))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def pack_value(cls, claim_hash: bytes) -> bytes:
|
||||||
|
return super().pack_value(claim_hash)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def unpack_value(cls, data: bytes) -> RepostedValue:
|
||||||
|
return RepostedValue(*super().unpack_value(data))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def pack_item(cls, reposted_claim_hash: bytes, tx_num: int, position: int, claim_hash: bytes):
|
||||||
|
return cls.pack_key(reposted_claim_hash, tx_num, position), cls.pack_value(claim_hash)
|
||||||
|
|
||||||
|
|
||||||
class Prefixes:
|
class Prefixes:
|
||||||
claim_to_support = ClaimToSupportPrefixRow
|
claim_to_support = ClaimToSupportPrefixRow
|
||||||
support_to_claim = SupportToClaimPrefixRow
|
support_to_claim = SupportToClaimPrefixRow
|
||||||
|
@ -696,4 +772,7 @@ class Prefixes:
|
||||||
|
|
||||||
effective_amount = EffectiveAmountPrefixRow
|
effective_amount = EffectiveAmountPrefixRow
|
||||||
|
|
||||||
|
repost = RepostPrefixRow
|
||||||
|
reposted_claim = RepostedPrefixRow
|
||||||
|
|
||||||
# undo_claimtrie = b'M'
|
# undo_claimtrie = b'M'
|
||||||
|
|
|
@ -40,7 +40,7 @@ from lbry.wallet.server.db.common import ResolveResult
|
||||||
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue
|
from lbry.wallet.server.db.prefixes import Prefixes, PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue
|
||||||
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
|
from lbry.wallet.server.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE
|
||||||
from lbry.wallet.server.db.prefixes import PendingActivationKey, ClaimToTXOKey, TXOToClaimValue
|
from lbry.wallet.server.db.prefixes import PendingActivationKey, ClaimToTXOKey, TXOToClaimValue
|
||||||
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, length_encoded_name
|
from lbry.wallet.server.db.claimtrie import length_encoded_name
|
||||||
|
|
||||||
from lbry.wallet.server.db.elasticsearch import SearchIndex
|
from lbry.wallet.server.db.elasticsearch import SearchIndex
|
||||||
|
|
||||||
|
@ -58,8 +58,6 @@ TXO_STRUCT_unpack = TXO_STRUCT.unpack
|
||||||
TXO_STRUCT_pack = TXO_STRUCT.pack
|
TXO_STRUCT_pack = TXO_STRUCT.pack
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True)
|
@attr.s(slots=True)
|
||||||
class FlushData:
|
class FlushData:
|
||||||
height = attr.ib()
|
height = attr.ib()
|
||||||
|
@ -158,6 +156,18 @@ class LevelDB:
|
||||||
return
|
return
|
||||||
return Prefixes.txo_to_claim.unpack_value(claim_hash_and_name)
|
return Prefixes.txo_to_claim.unpack_value(claim_hash_and_name)
|
||||||
|
|
||||||
|
def get_repost(self, claim_hash) -> Optional[bytes]:
|
||||||
|
repost = self.db.get(Prefixes.repost.pack_key(claim_hash))
|
||||||
|
if repost:
|
||||||
|
return Prefixes.repost.unpack_value(repost).reposted_claim_hash
|
||||||
|
return
|
||||||
|
|
||||||
|
def get_reposted_count(self, claim_hash: bytes) -> int:
|
||||||
|
cnt = 0
|
||||||
|
for _ in self.db.iterator(prefix=Prefixes.reposted_claim.pack_partial_key(claim_hash)):
|
||||||
|
cnt += 1
|
||||||
|
return cnt
|
||||||
|
|
||||||
def get_activation(self, tx_num, position, is_support=False) -> int:
|
def get_activation(self, tx_num, position, is_support=False) -> int:
|
||||||
activation = self.db.get(
|
activation = self.db.get(
|
||||||
Prefixes.activated.pack_key(
|
Prefixes.activated.pack_key(
|
||||||
|
@ -208,6 +218,7 @@ class LevelDB:
|
||||||
|
|
||||||
effective_amount = support_amount + claim_amount
|
effective_amount = support_amount + claim_amount
|
||||||
channel_hash = self.get_channel_for_claim(claim_hash)
|
channel_hash = self.get_channel_for_claim(claim_hash)
|
||||||
|
reposted_claim_hash = self.get_repost(claim_hash)
|
||||||
|
|
||||||
claims_in_channel = None
|
claims_in_channel = None
|
||||||
short_url = f'{name}#{claim_hash.hex()}'
|
short_url = f'{name}#{claim_hash.hex()}'
|
||||||
|
@ -224,7 +235,8 @@ class LevelDB:
|
||||||
last_takeover_height=last_take_over_height, claims_in_channel=claims_in_channel,
|
last_takeover_height=last_take_over_height, claims_in_channel=claims_in_channel,
|
||||||
creation_height=created_height, activation_height=activation_height,
|
creation_height=created_height, activation_height=activation_height,
|
||||||
expiration_height=expiration_height, effective_amount=effective_amount, support_amount=support_amount,
|
expiration_height=expiration_height, effective_amount=effective_amount, support_amount=support_amount,
|
||||||
channel_hash=channel_hash, reposted_claim_hash=None
|
channel_hash=channel_hash, reposted_claim_hash=reposted_claim_hash,
|
||||||
|
reposted=self.get_reposted_count(claim_hash)
|
||||||
)
|
)
|
||||||
|
|
||||||
def _resolve(self, normalized_name: str, claim_id: Optional[str] = None,
|
def _resolve(self, normalized_name: str, claim_id: Optional[str] = None,
|
||||||
|
@ -339,26 +351,6 @@ class LevelDB:
|
||||||
self.executor, self._fs_get_claim_by_hash, bytes.fromhex(claim_id)
|
self.executor, self._fs_get_claim_by_hash, bytes.fromhex(claim_id)
|
||||||
)
|
)
|
||||||
|
|
||||||
def make_staged_claim_item(self, claim_hash: bytes) -> Optional[StagedClaimtrieItem]:
|
|
||||||
claim_info = self.get_claim_txo(claim_hash)
|
|
||||||
k, v = claim_info
|
|
||||||
root_tx_num = v.root_tx_num
|
|
||||||
root_idx = v.root_position
|
|
||||||
value = v.amount
|
|
||||||
name = v.name
|
|
||||||
tx_num = k.tx_num
|
|
||||||
idx = k.position
|
|
||||||
height = bisect_right(self.tx_counts, tx_num)
|
|
||||||
signing_hash = self.get_channel_for_claim(claim_hash)
|
|
||||||
# if signing_hash:
|
|
||||||
# count = self.get_claims_in_channel_count(signing_hash)
|
|
||||||
# else:
|
|
||||||
# count = 0
|
|
||||||
return StagedClaimtrieItem(
|
|
||||||
name, claim_hash, value, self.coin.get_expiration_height(height), tx_num, idx,
|
|
||||||
root_tx_num, root_idx, signing_hash
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_claim_txo_amount(self, claim_hash: bytes, tx_num: int, position: int) -> Optional[int]:
|
def get_claim_txo_amount(self, claim_hash: bytes, tx_num: int, position: int) -> Optional[int]:
|
||||||
v = self.db.get(Prefixes.claim_to_txo.pack_key(claim_hash, tx_num, position))
|
v = self.db.get(Prefixes.claim_to_txo.pack_key(claim_hash, tx_num, position))
|
||||||
if v:
|
if v:
|
||||||
|
|
|
@ -1484,9 +1484,9 @@ class StreamCommands(ClaimTestCase):
|
||||||
filtering_channel_id = self.get_claim_id(
|
filtering_channel_id = self.get_claim_id(
|
||||||
await self.channel_create('@filtering', '0.1')
|
await self.channel_create('@filtering', '0.1')
|
||||||
)
|
)
|
||||||
self.conductor.spv_node.server.db.sql.filtering_channel_hashes.add(
|
# self.conductor.spv_node.server.db.sql.filtering_channel_hashes.add(
|
||||||
unhexlify(filtering_channel_id)[::-1]
|
# unhexlify(filtering_channel_id)[::-1]
|
||||||
)
|
# )
|
||||||
self.assertEqual(0, len(self.conductor.spv_node.server.db.sql.filtered_streams))
|
self.assertEqual(0, len(self.conductor.spv_node.server.db.sql.filtered_streams))
|
||||||
await self.stream_repost(bad_content_id, 'filter1', '0.1', channel_name='@filtering')
|
await self.stream_repost(bad_content_id, 'filter1', '0.1', channel_name='@filtering')
|
||||||
self.assertEqual(1, len(self.conductor.spv_node.server.db.sql.filtered_streams))
|
self.assertEqual(1, len(self.conductor.spv_node.server.db.sql.filtered_streams))
|
||||||
|
|
Loading…
Reference in a new issue