Indexing of extensions and basic query support.

This commit is contained in:
Jonathan Moody 2022-11-15 15:40:14 -06:00
parent df57bf1bf1
commit 1991513e15
5 changed files with 349 additions and 17 deletions

View file

@ -764,7 +764,8 @@ INDEX_DEFAULT_SETTINGS = {
"claim_type": {"type": "byte"},
"censor_type": {"type": "byte"},
"trending_score": {"type": "double"},
"release_time": {"type": "long"}
"release_time": {"type": "long"},
"extensions": {"type": "object"},
}
}
}
@ -788,7 +789,7 @@ FIELDS = {
'reposted_claim_id', 'repost_count', 'sd_hash',
'trending_score', 'tx_num',
'channel_tx_id', 'channel_tx_position', 'channel_height', 'reposted_tx_id',
'reposted_tx_position', 'reposted_height',
'reposted_tx_position', 'reposted_height', 'extensions',
}
TEXT_FIELDS = {
@ -807,7 +808,11 @@ RANGE_FIELDS = {
'channel_tx_position', 'channel_height',
}
ALL_FIELDS = RANGE_FIELDS | TEXT_FIELDS | FIELDS
OBJECT_FIELDS = {
'extensions',
}
ALL_FIELDS = OBJECT_FIELDS | RANGE_FIELDS | TEXT_FIELDS | FIELDS
REPLACEMENTS = {
'claim_name': 'normalized_name',
@ -826,6 +831,7 @@ REPLACEMENTS = {
def expand_query(**kwargs):
#print(f'expand_query: >>> {kwargs}')
if "amount_order" in kwargs:
kwargs["limit"] = 1
kwargs["order_by"] = "effective_amount"
@ -849,6 +855,7 @@ def expand_query(**kwargs):
if value is None or isinstance(value, list) and len(value) == 0:
continue
key = REPLACEMENTS.get(key, key)
#print(f'expand_query: *** {key} = {value}')
if key in FIELDS:
partial_id = False
if key == 'claim_type':
@ -911,6 +918,26 @@ def expand_query(**kwargs):
]}
}
)
elif key in OBJECT_FIELDS:
def flatten(field, d):
if isinstance(d, dict) and len(d) > 0:
for k, v in d.items():
subfield = f'{field}.{k}' if field else k
yield from flatten(subfield, v)
elif isinstance(d, dict):
# require <field> be present
yield {"exists": {"field": field}}
elif isinstance(d, list):
# require <field> match all values <d>
yield {"bool": {"must": [{"match": {field: {"query": e}}} for e in d]}}
else:
# require <field> match value <d>
yield {"match": {field: {"query": d}}}
#yield {"term": {field: {"value": d}}}
query['must'].append(
{"exists": {"field": key}},
)
query['must'].extend(flatten(key, value))
elif many:
query['must'].append({"terms": {key: value}})
else:
@ -1022,6 +1049,7 @@ def expand_query(**kwargs):
"sort": query["sort"]
}
}
#print(f'expand_query: <<< {query}')
return query

View file

@ -1,6 +1,6 @@
from typing import Optional, Set, Dict, List
from concurrent.futures.thread import ThreadPoolExecutor
from hub.schema.claim import guess_stream_type
from hub.schema.claim import guess_stream_type, Claim
from hub.schema.result import Censor
from hub.common import hash160, STREAM_TYPES, CLAIM_TYPES, LRUCache
from hub.db import SecondaryDB
@ -43,9 +43,11 @@ class ElasticSyncDB(SecondaryDB):
metadatas.update(await self.get_claim_metadatas(list(needed_txos)))
for claim_hash, claim in claims.items():
assert isinstance(claim, ResolveResult)
metadata = metadatas.get((claim.tx_hash, claim.position))
if not metadata:
continue
assert isinstance(metadata, Claim)
if not metadata.is_stream or not metadata.stream.has_fee:
fee_amount = 0
else:
@ -98,16 +100,24 @@ class ElasticSyncDB(SecondaryDB):
if reposted_metadata.is_stream and \
(reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration):
reposted_duration = reposted_metadata.stream.video.duration or reposted_metadata.stream.audio.duration
extensions = None
if metadata.is_stream:
meta = metadata.stream
extensions = meta.extensions.to_dict()
elif metadata.is_channel:
meta = metadata.channel
elif metadata.is_collection:
meta = metadata.collection
elif metadata.is_repost:
meta = metadata.repost
modified = meta.reference.apply(reposted_metadata)
modified = getattr(modified, modified.claim_type)
if hasattr(modified, 'extensions'):
extensions = modified.extensions.to_dict()
else:
continue
claim_tags = [tag for tag in meta.tags]
claim_languages = [lang.language or 'none' for lang in meta.languages] or ['none']
tags = list(set(claim_tags).union(set(reposted_tags)))
@ -181,6 +191,7 @@ class ElasticSyncDB(SecondaryDB):
'channel_tx_id': None if not claim.channel_tx_hash else claim.channel_tx_hash[::-1].hex(),
'channel_tx_position': claim.channel_tx_position,
'channel_height': claim.channel_height,
'extensions': extensions
}
if metadata.is_repost and reposted_duration is not None:

View file

@ -134,6 +134,8 @@ class ElasticSyncService(BlockchainReaderService):
index_version = await self.get_index_version()
res = await self.sync_client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400)
if 'error' in res:
self.log.warning("es index create failed: %s", res)
acked = res.get('acknowledged', False)
if acked:
@ -202,13 +204,13 @@ class ElasticSyncService(BlockchainReaderService):
@staticmethod
def _upsert_claim_query(index, claim):
return {
'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS},
doc = {key: value for key, value in claim.items() if key in ALL_FIELDS}
doc.update({
'_id': claim['claim_id'],
'_index': index,
'_op_type': 'update',
'doc_as_upsert': True
}
'_op_type': 'index',
})
return doc
@staticmethod
def _delete_claim_query(index, claim_hash: bytes):

View file

@ -2,23 +2,27 @@ import json
import logging
import os.path
import hashlib
from collections.abc import Mapping, Iterable
from typing import Tuple, List
from string import ascii_letters
from decimal import Decimal, ROUND_UP
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToDict, ParseDict, ParseError
from hub.schema.base58 import Base58, b58_encode
from hub.error import MissingPublishedFileError, EmptyPublishedFileError
import hub.schema.claim as claim
from hub.schema.mime_types import guess_media_type
from hub.schema.base import Metadata, BaseMessageList
from hub.schema.tags import normalize_tag
from hub.schema.types.v2.claim_pb2 import (
from google.protobuf.message import Message as ProtobufMessage
from lbry.schema.types.v2.claim_pb2 import (
Claim as ClaimMessage,
Fee as FeeMessage,
Location as LocationMessage,
Language as LanguageMessage
Language as LanguageMessage,
)
from lbry.schema.types.v2.extension_pb2 import Extension as ExtensionMessage
log = logging.getLogger(__name__)
@ -371,6 +375,86 @@ class ClaimReference(Metadata):
def claim_hash(self, claim_hash: bytes):
self.message.claim_hash = claim_hash
class ModifyingClaimReference(ClaimReference):
__slots__ = ()
@property
def modification_type(self) -> str:
return self.message.WhichOneof('type')
@modification_type.setter
def modification_type(self, claim_type: str):
"""Select the appropriate member (stream, channel, repost, or collection)"""
old_type = self.message.WhichOneof('type')
if old_type == claim_type:
return
if old_type and claim_type is None:
self.message.ClearField(old_type)
return
member = getattr(self.message, claim_type)
member.SetInParent()
def update(self, claim_type: str, **kwargs) -> dict:
"""
Store updates to modifiable fields in deletions/edits.
Currently, only the "extensions" field (StreamExtensionMap)
of a stream claim may be modified. Returns a dict containing
the unhandled portion of "kwargs".
"""
if claim_type != 'stream':
return kwargs
clr_exts = kwargs.pop('clear_extensions', None)
set_exts = kwargs.pop('extensions', None)
if clr_exts is None and set_exts is None:
return kwargs
self.modification_type = claim_type
if not self.modification_type == 'stream':
return kwargs
mods = getattr(self.message, self.modification_type)
if clr_exts is not None:
deletions = StreamModifiable(mods.deletions)
if isinstance(clr_exts, str) and clr_exts.startswith('{'):
clr_exts = json.loads(clr_exts)
deletions.extensions.merge(clr_exts)
if set_exts is not None:
edits = StreamModifiable(mods.edits)
if isinstance(set_exts, str) and set_exts.startswith('{'):
set_exts = json.loads(set_exts)
edits.extensions.merge(set_exts)
return kwargs
def apply(self, reposted: 'claim.Claim') -> 'claim.Claim':
"""
Given a reposted claim, apply the stored deletions/edits, and return
the modified claim. Returns the original claim if the claim type has
changed such that the modifications are not relevant.
"""
if not self.modification_type or self.modification_type != reposted.claim_type:
return reposted
if not reposted.claim_type == 'stream':
return reposted
m = ClaimMessage()
m.CopyFrom(reposted.message)
result = claim.Claim(m)
# only stream claims, and only stream extensions are handled
stream = getattr(result, result.claim_type)
exts = getattr(stream, 'extensions')
mods = getattr(self.message, self.modification_type)
# apply deletions
exts.merge(StreamModifiable(mods.deletions).extensions, delete=True)
# apply edits
exts.merge(StreamModifiable(mods.edits).extensions)
return result
class ClaimList(BaseMessageList[ClaimReference]):
@ -571,3 +655,169 @@ class TagList(BaseMessageList[str]):
tag = normalize_tag(tag)
if tag and tag not in self.message:
self.message.append(tag)
class StreamExtension(Metadata):
__slots__ = Metadata.__slots__ + ('extension_schema',)
def __init__(self, schema, message):
super().__init__(message)
self.extension_schema = schema
def to_dict(self, include_schema=True):
attrs = self.unpacked.to_dict()
return { f'{self.schema}': attrs } if include_schema else attrs
def from_value(self, value):
schema = self.schema
# If incoming is an extension, we have an Extension message.
if isinstance(value, StreamExtension):
schema = value.schema or schema
# Translate str -> (JSON) dict.
if isinstance(value, str) and value.startswith('{'):
value = json.loads(value)
# Check for 1-element dictionary at top level: {<schema>: <attrs>}.
if isinstance(value, dict) and len(value) == 1:
k = next(iter(value.keys()))
if self.schema is None or self.schema == k:
# Schema is determined. Extract dict containining attrs.
schema = k
value = value[schema]
# Try to decode attrs dict -> Extension message containing protobuf.Struct.
if isinstance(value, dict):
try:
ext = StreamExtension(schema, ExtensionMessage())
ParseDict(value, ext.message.struct)
value = ext
except ParseError:
pass
# Either we have an Extension message or decoding failed.
if isinstance(value, StreamExtension):
self.extension_schema = value.schema or schema
self.message.CopyFrom(value.message)
else:
log.info('Could not parse StreamExtension value: %s type: %s', value, type(value))
raise ValueError(f'Could not parse StreamExtension value: {value}')
@property
def schema(self):
return self.extension_schema
@property
def unpacked(self):
return Struct(self.message.struct)
def merge(self, ext: 'StreamExtension', delete: bool = False) -> 'StreamExtension':
self.unpacked.merge(ext.unpacked, delete=delete)
return self
class Struct(Metadata, Mapping, Iterable):
__slots__ = ()
def to_dict(self) -> dict:
return MessageToDict(self.message)
def merge(self, other: 'Struct', delete: bool = False) -> 'Struct':
for k, v in other.message.fields.items():
if k not in self.message.fields:
if not delete:
self.message.fields[k].CopyFrom(v)
continue
my_value = self.message.fields[k]
my_kind = my_value.WhichOneof('kind')
kind = v.WhichOneof('kind')
if kind != my_kind:
continue
if kind == 'struct_value':
if len(v.struct_value.fields) > 0:
Struct(my_value).merge(v.struct_value, delete=delete)
elif delete:
del self.message.fields[k]
elif kind == 'list_value':
if len(v.list_value.values) > 0:
for _, o in enumerate(v.list_value.values):
for i, v in enumerate(my_value.list_value.values):
if v == o:
if delete:
del my_value.list_value.values[i]
break
if not delete:
if isinstance(o, ProtobufMessage):
my_value.list_value.values.add().CopyFrom(o)
else:
my_value.list_value.values.append(o)
elif delete:
del self.message.fields[k]
elif getattr(my_value, my_kind) == getattr(v, kind):
del self.message.fields[k]
return self
def __getitem__(self, key):
def extract(val):
if not isinstance(val, ProtobufMessage):
return val
kind = val.WhichOneof('kind')
if kind == 'struct_value':
return dict(Struct(val.struct_value))
elif kind == 'list_value':
return list(map(extract, val.list_value.values))
else:
return getattr(val, kind)
if key in self.message.fields:
val = self.message.fields[key]
return extract(val)
raise KeyError(key)
def __iter__(self):
return iter(self.message.fields)
def __len__(self):
return len(self.message.fields)
class StreamExtensionMap(Metadata, Mapping, Iterable):
__slots__ = ()
item_class = StreamExtension
def to_dict(self):
return { k: v.to_dict(include_schema=False) for k, v in self.items() }
def merge(self, exts, delete: bool = False) -> 'StreamExtensionMap':
if isinstance(exts, StreamExtension):
exts = {exts.schema: exts}
if isinstance(exts, str) and exts.startswith('{'):
exts = json.loads(exts)
for schema, ext in exts.items():
obj = StreamExtension(schema, ExtensionMessage())
if isinstance(ext, StreamExtension):
obj.from_value(ext)
else:
obj.from_value({schema: ext})
if delete and not len(obj.unpacked):
del self.message[schema]
continue
existing = StreamExtension(schema, self.message[schema])
existing.merge(obj, delete=delete)
return self
def __getitem__(self, key):
if key in self.message:
return StreamExtension(key, self.message[key])
raise KeyError(key)
def __iter__(self):
return iter(self.message)
def __len__(self):
return len(self.message)
class StreamModifiable(Metadata):
__slots__ = ()
@property
def extensions(self) -> StreamExtensionMap:
return StreamExtensionMap(self.message.extensions)

View file

@ -16,7 +16,8 @@ from hub.schema.base import Signable
from hub.schema.mime_types import guess_media_type, guess_stream_type
from hub.schema.attrs import (
Source, Playable, Dimmensional, Fee, Image, Video, Audio,
LanguageList, LocationList, ClaimList, ClaimReference, TagList
LanguageList, LocationList, ClaimList, ModifyingClaimReference, TagList,
StreamExtensionMap
)
from hub.schema.types.v2.claim_pb2 import Claim as ClaimMessage
from hub.error import InputValueIsNoneError
@ -211,6 +212,8 @@ class Stream(BaseClaim):
fee['address'] = self.fee.address
if 'amount' in fee:
fee['amount'] = str(self.fee.amount)
if 'extensions' in claim:
claim['extensions'] = self.extensions.to_dict()
return claim
def update(self, file_path=None, height=None, width=None, duration=None, **kwargs):
@ -264,7 +267,24 @@ class Stream(BaseClaim):
media_args['width'] = width
media.update(**media_args)
super().update(**kwargs)
clr_exts = kwargs.pop('clear_extensions', None)
if clr_exts:
if isinstance(clr_exts, list):
for e in clr_exts:
self.extensions.merge(e, delete=True)
elif isinstance(clr_exts, (str, dict)):
self.extensions.merge(clr_exts, delete=True)
else:
self.message.ClearField('extensions')
set_exts = kwargs.pop('extensions', None)
if set_exts:
if isinstance(set_exts, list):
for e in set_exts:
self.extensions.merge(e)
else:
self.extensions.merge(set_exts)
return super().update(**kwargs)
@property
def author(self) -> str:
@ -330,6 +350,10 @@ class Stream(BaseClaim):
def audio(self) -> Audio:
return Audio(self.message.audio)
@property
def extensions(self) -> StreamExtensionMap:
return StreamExtensionMap(self.message.extensions)
class Channel(BaseClaim):
@ -398,9 +422,26 @@ class Repost(BaseClaim):
claim_type = Claim.REPOST
def to_dict(self):
claim = super().to_dict()
if claim.pop('claim_hash', None):
claim['claim_id'] = self.reference.claim_id
return claim
def update(self, **kwargs):
claim_type = kwargs.pop('claim_type', None)
if claim_type:
# Try to apply updates to ClaimReference.
kwargs = self.reference.update(claim_type, **kwargs)
# Update common fields within BaseClaim.
super().update(**kwargs)
def apply(self, reposted: 'Claim'):
return self.reference.apply(reposted)
@property
def reference(self) -> ClaimReference:
return ClaimReference(self.message)
def reference(self) -> ModifyingClaimReference:
return ModifyingClaimReference(self.message)
class Collection(BaseClaim):