Merge branch 'blocked_search_metadata'

* blocked_search_metadata:
  add index for claim_type and release_time
  fix test
  fix json api generator
  add pagination for claim_search
  regenerate protobufs
  using multiprocessing.Manager to keep blocked content synced between readers
This commit is contained in:
Alex Grintsvayg 2020-01-20 12:51:02 -05:00
commit 1d5e553f9c
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
15 changed files with 1677 additions and 1367 deletions

File diff suppressed because one or more lines are too long

View file

@ -2186,7 +2186,6 @@ class Daemon(metaclass=JSONRPCServerType):
[--claim_id=<claim_id> | --claim_ids=<claim_ids>...]
[--channel=<channel> |
[[--channel_ids=<channel_ids>...] [--not_channel_ids=<not_channel_ids>...]]]
[--blocklist_channel_ids=<blocklist_channel_ids>...]
[--has_channel_signature] [--valid_channel_signature | --invalid_channel_signature]
[--is_controlling] [--release_time=<release_time>] [--public_key_id=<public_key_id>]
[--timestamp=<timestamp>] [--creation_timestamp=<creation_timestamp>]
@ -2227,9 +2226,6 @@ class Daemon(metaclass=JSONRPCServerType):
use in conjunction with --valid_channel_signature
--not_channel_ids=<not_channel_ids>: (list) exclude claims signed by any of these channels
(arguments must be claim ids of the channels)
--blocklist_channel_ids=<blocklist_channel_ids>: (list) channel_ids of channels containing
reposts of claims you want to be blocked from
search results
--has_channel_signature : (bool) claims with a channel signature (valid or invalid)
--valid_channel_signature : (bool) claims with a valid channel signature or no signature,
use in conjunction with --has_channel_signature to
@ -2320,8 +2316,13 @@ class Daemon(metaclass=JSONRPCServerType):
kwargs['signature_valid'] = 0
page_num, page_size = abs(kwargs.pop('page', 1)), min(abs(kwargs.pop('page_size', DEFAULT_PAGE_SIZE)), 50)
kwargs.update({'offset': page_size * (page_num - 1), 'limit': page_size})
txos, _, total = await self.ledger.claim_search(wallet.accounts, **kwargs)
result = {"items": txos, "page": page_num, "page_size": page_size}
txos, blocked, _, total = await self.ledger.claim_search(wallet.accounts, **kwargs)
result = {
"items": txos,
"blocked": blocked,
"page": page_num,
"page_size": page_size
}
if not kwargs.pop('no_totals', False):
result['total_pages'] = int((total + (page_size - 1)) / page_size)
result['total_items'] = total
@ -2756,7 +2757,7 @@ class Daemon(metaclass=JSONRPCServerType):
# check that the holding_address hasn't changed since the export was made
holding_address = data['holding_address']
channels, _, _ = await self.ledger.claim_search(
channels, _, _, _ = await self.ledger.claim_search(
wallet.accounts, public_key_id=self.ledger.public_key_to_address(public_key_der)
)
if channels and channels[0].get_address(self.ledger) != holding_address:

View file

@ -1,5 +1,5 @@
build:
rm types/v2/* -rf
touch types/v2/__init__.py
cd types/v2/ && protoc --python_out=. -I ../../../../../../types/v2/proto/ ../../../../../../types/v2/proto/*.proto
cd types/v2/ && protoc --python_out=. -I ../../../../../types/v2/proto/ ../../../../../types/v2/proto/*.proto
sed -e 's/^import\ \(.*\)_pb2\ /from . import\ \1_pb2\ /g' -i types/v2/*.py

View file

@ -7,22 +7,85 @@ from itertools import chain
from lbry.schema.types.v2.result_pb2 import Outputs as OutputsMessage
class Censor:
def __init__(self, claim_ids: dict = None, channel_ids: set = None, tags: set = None):
self.claim_ids = claim_ids or {}
self.channel_ids = channel_ids or set()
self.tags = tags or set()
self.blocked_claims = {}
self.blocked_channels = {}
self.blocked_tags = {}
self.total = 0
def censor(self, row) -> bool:
censored = False
if row['claim_hash'] in self.claim_ids:
censored = True
channel_id = self.claim_ids[row['claim_hash']]
self.blocked_claims.setdefault(channel_id, 0)
self.blocked_claims[channel_id] += 1
if row['channel_hash'] in self.channel_ids:
censored = True
self.blocked_channels.setdefault(row['channel_hash'], 0)
self.blocked_channels[row['channel_hash']] += 1
if self.tags.intersection(row['tags']):
censored = True
for tag in self.tags:
if tag in row['tags']:
self.blocked_tags.setdefault(tag, 0)
self.blocked_tags[tag] += 1
if censored:
self.total += 1
return censored
def to_message(self, outputs: OutputsMessage):
outputs.blocked_total = self.total
for channel_hash, count in self.blocked_claims.items():
block = outputs.blocked.add()
block.count = count
block.reposted_in_channel = channel_hash
for channel_hash, count in self.blocked_channels.items():
block = outputs.blocked.add()
block.count = count
block.in_channel = channel_hash
for tag, count in self.blocked_tags.items():
block = outputs.blocked.add()
block.count = count
block.has_tag = tag
class Outputs:
__slots__ = 'txos', 'extra_txos', 'txs', 'offset', 'total'
__slots__ = 'txos', 'extra_txos', 'txs', 'offset', 'total', 'blocked', 'blocked_total'
def __init__(self, txos: List, extra_txos: List, txs: set, offset: int, total: int):
def __init__(self, txos: List, extra_txos: List, txs: set,
offset: int, total: int, blocked: List, blocked_total: int):
self.txos = txos
self.txs = txs
self.extra_txos = extra_txos
self.offset = offset
self.total = total
self.blocked = blocked
self.blocked_total = blocked_total
def inflate(self, txs):
tx_map = {tx.hash: tx for tx in txs}
for txo_message in self.extra_txos:
self.message_to_txo(txo_message, tx_map)
return [self.message_to_txo(txo_message, tx_map) for txo_message in self.txos]
txos = [self.message_to_txo(txo_message, tx_map) for txo_message in self.txos]
return txos, self.inflate_blocked()
def inflate_blocked(self):
result = {"total": self.blocked_total}
for blocked_message in self.blocked:
reason = blocked_message.WhichOneof('reason')
if reason == "has_tag":
key = blocked_message.has_tag
else:
key = hexlify(getattr(blocked_message, reason)[::-1]).decode()
result.setdefault(reason, {})[key] = blocked_message.count
return result
def message_to_txo(self, txo_message, tx_map):
if txo_message.WhichOneof('meta') == 'error':
@ -70,18 +133,24 @@ class Outputs:
if txo_message.WhichOneof('meta') == 'error':
continue
txs.add((hexlify(txo_message.tx_hash[::-1]).decode(), txo_message.height))
return cls(outputs.txos, outputs.extra_txos, txs, outputs.offset, outputs.total)
return cls(
outputs.txos, outputs.extra_txos, txs,
outputs.offset, outputs.total,
outputs.blocked, outputs.blocked_total
)
@classmethod
def to_base64(cls, txo_rows, extra_txo_rows, offset=0, total=None) -> str:
return base64.b64encode(cls.to_bytes(txo_rows, extra_txo_rows, offset, total)).decode()
def to_base64(cls, txo_rows, extra_txo_rows, offset=0, total=None, blocked=None) -> str:
return base64.b64encode(cls.to_bytes(txo_rows, extra_txo_rows, offset, total, blocked)).decode()
@classmethod
def to_bytes(cls, txo_rows, extra_txo_rows, offset=0, total=None) -> bytes:
def to_bytes(cls, txo_rows, extra_txo_rows, offset=0, total=None, blocked: Censor = None) -> bytes:
page = OutputsMessage()
page.offset = offset
if total is not None:
page.total = total
if blocked is not None:
blocked.to_message(page)
for row in txo_rows:
cls.row_to_message(row, page.txos.add(), extra_txo_rows)
for row in extra_txo_rows:
@ -121,6 +190,10 @@ class Outputs:
cls.set_reference(txo_message, 'channel', txo['channel_hash'], extra_txo_rows)
cls.set_reference(txo_message, 'repost', txo['reposted_claim_hash'], extra_txo_rows)
@staticmethod
def set_blocked(message, blocked):
message.blocked_total = blocked.total
@staticmethod
def set_reference(message, attr, claim_hash, rows):
if claim_hash:

File diff suppressed because one or more lines are too long

View file

@ -7,6 +7,7 @@ from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
@ -18,9 +19,9 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='purchase.proto',
package='pb',
syntax='proto3',
serialized_options=None,
serialized_pb=_b('\n\x0epurchase.proto\x12\x02pb\"\x1e\n\x08Purchase\x12\x12\n\nclaim_hash\x18\x01 \x01(\x0c\x62\x06proto3')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
@ -38,14 +39,14 @@ _PURCHASE = _descriptor.Descriptor(
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@ -56,7 +57,6 @@ _PURCHASE = _descriptor.Descriptor(
)
DESCRIPTOR.message_types_by_name['Purchase'] = _PURCHASE
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Purchase = _reflection.GeneratedProtocolMessageType('Purchase', (_message.Message,), dict(
DESCRIPTOR = _PURCHASE,

View file

@ -7,6 +7,7 @@ from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
@ -18,9 +19,9 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='result.proto',
package='pb',
syntax='proto3',
serialized_options=None,
serialized_pb=_b('\n\x0cresult.proto\x12\x02pb\"b\n\x07Outputs\x12\x18\n\x04txos\x18\x01 \x03(\x0b\x32\n.pb.Output\x12\x1e\n\nextra_txos\x18\x02 \x03(\x0b\x32\n.pb.Output\x12\r\n\x05total\x18\x03 \x01(\r\x12\x0e\n\x06offset\x18\x04 \x01(\r\"{\n\x06Output\x12\x0f\n\x07tx_hash\x18\x01 \x01(\x0c\x12\x0c\n\x04nout\x18\x02 \x01(\r\x12\x0e\n\x06height\x18\x03 \x01(\r\x12\x1e\n\x05\x63laim\x18\x07 \x01(\x0b\x32\r.pb.ClaimMetaH\x00\x12\x1a\n\x05\x65rror\x18\x0f \x01(\x0b\x32\t.pb.ErrorH\x00\x42\x06\n\x04meta\"\xaf\x03\n\tClaimMeta\x12\x1b\n\x07\x63hannel\x18\x01 \x01(\x0b\x32\n.pb.Output\x12\x1a\n\x06repost\x18\x02 \x01(\x0b\x32\n.pb.Output\x12\x11\n\tshort_url\x18\x03 \x01(\t\x12\x15\n\rcanonical_url\x18\x04 \x01(\t\x12\x16\n\x0eis_controlling\x18\x05 \x01(\x08\x12\x18\n\x10take_over_height\x18\x06 \x01(\r\x12\x17\n\x0f\x63reation_height\x18\x07 \x01(\r\x12\x19\n\x11\x61\x63tivation_height\x18\x08 \x01(\r\x12\x19\n\x11\x65xpiration_height\x18\t \x01(\r\x12\x19\n\x11\x63laims_in_channel\x18\n \x01(\r\x12\x10\n\x08reposted\x18\x0b \x01(\r\x12\x18\n\x10\x65\x66\x66\x65\x63tive_amount\x18\x14 \x01(\x04\x12\x16\n\x0esupport_amount\x18\x15 \x01(\x04\x12\x16\n\x0etrending_group\x18\x16 \x01(\r\x12\x16\n\x0etrending_mixed\x18\x17 \x01(\x02\x12\x16\n\x0etrending_local\x18\x18 \x01(\x02\x12\x17\n\x0ftrending_global\x18\x19 \x01(\x02\"i\n\x05\x45rror\x12\x1c\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x0e.pb.Error.Code\x12\x0c\n\x04text\x18\x02 \x01(\t\"4\n\x04\x43ode\x12\x10\n\x0cUNKNOWN_CODE\x10\x00\x12\r\n\tNOT_FOUND\x10\x01\x12\x0b\n\x07INVALID\x10\x02\x62\x06proto3')
serialized_pb=_b('\n\x0cresult.proto\x12\x02pb\"\x97\x01\n\x07Outputs\x12\x18\n\x04txos\x18\x01 \x03(\x0b\x32\n.pb.Output\x12\x1e\n\nextra_txos\x18\x02 \x03(\x0b\x32\n.pb.Output\x12\r\n\x05total\x18\x03 \x01(\r\x12\x0e\n\x06offset\x18\x04 \x01(\r\x12\x1c\n\x07\x62locked\x18\x05 \x03(\x0b\x32\x0b.pb.Blocked\x12\x15\n\rblocked_total\x18\x06 \x01(\r\"{\n\x06Output\x12\x0f\n\x07tx_hash\x18\x01 \x01(\x0c\x12\x0c\n\x04nout\x18\x02 \x01(\r\x12\x0e\n\x06height\x18\x03 \x01(\r\x12\x1e\n\x05\x63laim\x18\x07 \x01(\x0b\x32\r.pb.ClaimMetaH\x00\x12\x1a\n\x05\x65rror\x18\x0f \x01(\x0b\x32\t.pb.ErrorH\x00\x42\x06\n\x04meta\"\xaf\x03\n\tClaimMeta\x12\x1b\n\x07\x63hannel\x18\x01 \x01(\x0b\x32\n.pb.Output\x12\x1a\n\x06repost\x18\x02 \x01(\x0b\x32\n.pb.Output\x12\x11\n\tshort_url\x18\x03 \x01(\t\x12\x15\n\rcanonical_url\x18\x04 \x01(\t\x12\x16\n\x0eis_controlling\x18\x05 \x01(\x08\x12\x18\n\x10take_over_height\x18\x06 \x01(\r\x12\x17\n\x0f\x63reation_height\x18\x07 \x01(\r\x12\x19\n\x11\x61\x63tivation_height\x18\x08 \x01(\r\x12\x19\n\x11\x65xpiration_height\x18\t \x01(\r\x12\x19\n\x11\x63laims_in_channel\x18\n \x01(\r\x12\x10\n\x08reposted\x18\x0b \x01(\r\x12\x18\n\x10\x65\x66\x66\x65\x63tive_amount\x18\x14 \x01(\x04\x12\x16\n\x0esupport_amount\x18\x15 \x01(\x04\x12\x16\n\x0etrending_group\x18\x16 \x01(\r\x12\x16\n\x0etrending_mixed\x18\x17 \x01(\x02\x12\x16\n\x0etrending_local\x18\x18 \x01(\x02\x12\x17\n\x0ftrending_global\x18\x19 \x01(\x02\"\x94\x01\n\x05\x45rror\x12\x1c\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x0e.pb.Error.Code\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x1c\n\x07\x62locked\x18\x03 \x01(\x0b\x32\x0b.pb.Blocked\"A\n\x04\x43ode\x12\x10\n\x0cUNKNOWN_CODE\x10\x00\x12\r\n\tNOT_FOUND\x10\x01\x12\x0b\n\x07INVALID\x10\x02\x12\x0b\n\x07\x42LOCKED\x10\x03\"j\n\x07\x42locked\x12\r\n\x05\x63ount\x18\x01 \x01(\r\x12\x1d\n\x13reposted_in_channel\x18\x02 \x01(\x0cH\x00\x12\x14\n\nin_channel\x18\x03 \x01(\x0cH\x00\x12\x11\n\x07has_tag\x18\x04 \x01(\tH\x00\x42\x08\n\x06reasonb\x06proto3')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
@ -32,21 +33,25 @@ _ERROR_CODE = _descriptor.EnumDescriptor(
values=[
_descriptor.EnumValueDescriptor(
name='UNKNOWN_CODE', index=0, number=0,
serialized_options=None,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='NOT_FOUND', index=1, number=1,
serialized_options=None,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='INVALID', index=2, number=2,
serialized_options=None,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='BLOCKED', index=3, number=3,
options=None,
type=None),
],
containing_type=None,
serialized_options=None,
serialized_start=732,
serialized_end=784,
options=None,
serialized_start=817,
serialized_end=882,
)
_sym_db.RegisterEnumDescriptor(_ERROR_CODE)
@ -64,42 +69,56 @@ _OUTPUTS = _descriptor.Descriptor(
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='extra_txos', full_name='pb.Outputs.extra_txos', index=1,
number=2, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='total', full_name='pb.Outputs.total', index=2,
number=3, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='offset', full_name='pb.Outputs.offset', index=3,
number=4, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='blocked', full_name='pb.Outputs.blocked', index=4,
number=5, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='blocked_total', full_name='pb.Outputs.blocked_total', index=5,
number=6, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=20,
serialized_end=118,
serialized_start=21,
serialized_end=172,
)
@ -116,42 +135,42 @@ _OUTPUT = _descriptor.Descriptor(
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='nout', full_name='pb.Output.nout', index=1,
number=2, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='height', full_name='pb.Output.height', index=2,
number=3, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='claim', full_name='pb.Output.claim', index=3,
number=7, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='error', full_name='pb.Output.error', index=4,
number=15, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@ -160,8 +179,8 @@ _OUTPUT = _descriptor.Descriptor(
name='meta', full_name='pb.Output.meta',
index=0, containing_type=None, fields=[]),
],
serialized_start=120,
serialized_end=243,
serialized_start=174,
serialized_end=297,
)
@ -178,133 +197,133 @@ _CLAIMMETA = _descriptor.Descriptor(
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='repost', full_name='pb.ClaimMeta.repost', index=1,
number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='short_url', full_name='pb.ClaimMeta.short_url', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='canonical_url', full_name='pb.ClaimMeta.canonical_url', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='is_controlling', full_name='pb.ClaimMeta.is_controlling', index=4,
number=5, type=8, cpp_type=7, label=1,
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='take_over_height', full_name='pb.ClaimMeta.take_over_height', index=5,
number=6, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='creation_height', full_name='pb.ClaimMeta.creation_height', index=6,
number=7, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='activation_height', full_name='pb.ClaimMeta.activation_height', index=7,
number=8, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='expiration_height', full_name='pb.ClaimMeta.expiration_height', index=8,
number=9, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='claims_in_channel', full_name='pb.ClaimMeta.claims_in_channel', index=9,
number=10, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='reposted', full_name='pb.ClaimMeta.reposted', index=10,
number=11, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='effective_amount', full_name='pb.ClaimMeta.effective_amount', index=11,
number=20, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='support_amount', full_name='pb.ClaimMeta.support_amount', index=12,
number=21, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='trending_group', full_name='pb.ClaimMeta.trending_group', index=13,
number=22, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='trending_mixed', full_name='pb.ClaimMeta.trending_mixed', index=14,
number=23, type=2, cpp_type=6, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='trending_local', full_name='pb.ClaimMeta.trending_local', index=15,
number=24, type=2, cpp_type=6, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='trending_global', full_name='pb.ClaimMeta.trending_global', index=16,
number=25, type=2, cpp_type=6, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=246,
serialized_end=677,
serialized_start=300,
serialized_end=731,
)
@ -321,14 +340,21 @@ _ERROR = _descriptor.Descriptor(
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='text', full_name='pb.Error.text', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='blocked', full_name='pb.Error.blocked', index=2,
number=3, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
@ -336,18 +362,74 @@ _ERROR = _descriptor.Descriptor(
enum_types=[
_ERROR_CODE,
],
serialized_options=None,
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=679,
serialized_end=784,
serialized_start=734,
serialized_end=882,
)
_BLOCKED = _descriptor.Descriptor(
name='Blocked',
full_name='pb.Blocked',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='count', full_name='pb.Blocked.count', index=0,
number=1, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='reposted_in_channel', full_name='pb.Blocked.reposted_in_channel', index=1,
number=2, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='in_channel', full_name='pb.Blocked.in_channel', index=2,
number=3, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='has_tag', full_name='pb.Blocked.has_tag', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
_descriptor.OneofDescriptor(
name='reason', full_name='pb.Blocked.reason',
index=0, containing_type=None, fields=[]),
],
serialized_start=884,
serialized_end=990,
)
_OUTPUTS.fields_by_name['txos'].message_type = _OUTPUT
_OUTPUTS.fields_by_name['extra_txos'].message_type = _OUTPUT
_OUTPUTS.fields_by_name['blocked'].message_type = _BLOCKED
_OUTPUT.fields_by_name['claim'].message_type = _CLAIMMETA
_OUTPUT.fields_by_name['error'].message_type = _ERROR
_OUTPUT.oneofs_by_name['meta'].fields.append(
@ -359,12 +441,22 @@ _OUTPUT.fields_by_name['error'].containing_oneof = _OUTPUT.oneofs_by_name['meta'
_CLAIMMETA.fields_by_name['channel'].message_type = _OUTPUT
_CLAIMMETA.fields_by_name['repost'].message_type = _OUTPUT
_ERROR.fields_by_name['code'].enum_type = _ERROR_CODE
_ERROR.fields_by_name['blocked'].message_type = _BLOCKED
_ERROR_CODE.containing_type = _ERROR
_BLOCKED.oneofs_by_name['reason'].fields.append(
_BLOCKED.fields_by_name['reposted_in_channel'])
_BLOCKED.fields_by_name['reposted_in_channel'].containing_oneof = _BLOCKED.oneofs_by_name['reason']
_BLOCKED.oneofs_by_name['reason'].fields.append(
_BLOCKED.fields_by_name['in_channel'])
_BLOCKED.fields_by_name['in_channel'].containing_oneof = _BLOCKED.oneofs_by_name['reason']
_BLOCKED.oneofs_by_name['reason'].fields.append(
_BLOCKED.fields_by_name['has_tag'])
_BLOCKED.fields_by_name['has_tag'].containing_oneof = _BLOCKED.oneofs_by_name['reason']
DESCRIPTOR.message_types_by_name['Outputs'] = _OUTPUTS
DESCRIPTOR.message_types_by_name['Output'] = _OUTPUT
DESCRIPTOR.message_types_by_name['ClaimMeta'] = _CLAIMMETA
DESCRIPTOR.message_types_by_name['Error'] = _ERROR
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
DESCRIPTOR.message_types_by_name['Blocked'] = _BLOCKED
Outputs = _reflection.GeneratedProtocolMessageType('Outputs', (_message.Message,), dict(
DESCRIPTOR = _OUTPUTS,
@ -394,5 +486,12 @@ Error = _reflection.GeneratedProtocolMessageType('Error', (_message.Message,), d
))
_sym_db.RegisterMessage(Error)
Blocked = _reflection.GeneratedProtocolMessageType('Blocked', (_message.Message,), dict(
DESCRIPTOR = _BLOCKED,
__module__ = 'result_pb2'
# @@protoc_insertion_point(class_scope:pb.Blocked)
))
_sym_db.RegisterMessage(Blocked)
# @@protoc_insertion_point(module_scope)

View file

@ -629,7 +629,7 @@ class Ledger(metaclass=LedgerRegistry):
print(record['history'], addresses, tx.id)
raise asyncio.TimeoutError('Timed out waiting for transaction.')
async def _inflate_outputs(self, query, accounts):
async def _inflate_outputs(self, query, accounts) -> Tuple[List[Output], dict, int, int]:
outputs = Outputs.from_base64(await query)
txs = []
if len(outputs.txs) > 0:
@ -652,7 +652,8 @@ class Ledger(metaclass=LedgerRegistry):
}
for txo in priced_claims:
txo.purchase_receipt = receipts.get(txo.claim_id)
return outputs.inflate(txs), outputs.offset, outputs.total
txos, blocked = outputs.inflate(txs)
return txos, blocked, outputs.offset, outputs.total
async def resolve(self, accounts, urls):
resolve = partial(self.network.retriable_call, self.network.resolve)
@ -669,7 +670,7 @@ class Ledger(metaclass=LedgerRegistry):
result[url] = {'error': f'{url} did not resolve to a claim'}
return result
async def claim_search(self, accounts, **kwargs) -> Tuple[List[Output], int, int]:
async def claim_search(self, accounts, **kwargs) -> Tuple[List[Output], dict, int, int]:
return await self._inflate_outputs(self.network.claim_search(**kwargs), accounts)
async def get_claim_by_claim_id(self, accounts, claim_id) -> Output:
@ -717,7 +718,7 @@ class Ledger(metaclass=LedgerRegistry):
if resolve:
claim_ids = [p.purchased_claim_id for p in purchases]
try:
resolved, _, _ = await self.claim_search([], claim_ids=claim_ids)
resolved, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise
@ -752,7 +753,7 @@ class Ledger(metaclass=LedgerRegistry):
async def resolve_collection(self, collection, offset=0, page_size=1):
claim_ids = collection.claim.collection.claims.ids[offset:page_size+offset]
try:
resolve_results, _, _ = await self.claim_search([], claim_ids=claim_ids)
resolve_results, _, _, _ = await self.claim_search([], claim_ids=claim_ids)
except Exception as err:
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise

View file

@ -12,9 +12,9 @@ from lbry.wallet.server.util import cachedproperty, subclasses
from lbry.wallet.server.hash import Base58, hash160, double_sha256, hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.daemon import Daemon, LBCDaemon
from lbry.wallet.server.script import ScriptPubKey, OpCodes
from lbry.wallet.server.leveldb import DB
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager
from lbry.wallet.server.db.writer import LBRYDB
from lbry.wallet.server.db.writer import LBRYLevelDB
from lbry.wallet.server.block_processor import LBRYBlockProcessor
@ -41,7 +41,7 @@ class Coin:
DAEMON = Daemon
BLOCK_PROCESSOR = LBRYBlockProcessor
SESSION_MANAGER = LBRYSessionManager
DB = DB
DB = LevelDB
HEADER_VALUES = [
'version', 'prev_block_hash', 'merkle_root', 'timestamp', 'bits', 'nonce'
]
@ -240,7 +240,7 @@ class LBC(Coin):
BLOCK_PROCESSOR = LBRYBlockProcessor
SESSION_MANAGER = LBRYSessionManager
DESERIALIZER = DeserializerSegWit
DB = LBRYDB
DB = LBRYLevelDB
NAME = "LBRY"
SHORTNAME = "LBC"
NET = "mainnet"

View file

@ -14,7 +14,7 @@ from lbry.wallet.database import query, interpolate
from lbry.schema.url import URL, normalize_name
from lbry.schema.tags import clean_tags
from lbry.schema.result import Outputs
from lbry.schema.result import Outputs, Censor
from lbry.wallet import Ledger, RegTestLedger
from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS
@ -47,7 +47,7 @@ INTEGER_PARAMS = {
SEARCH_PARAMS = {
'name', 'text', 'claim_id', 'claim_ids', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids',
'public_key_id', 'claim_type', 'stream_types', 'media_types', 'fee_currency',
'has_channel_signature', 'signature_valid', 'blocklist_channel_ids',
'has_channel_signature', 'signature_valid',
'any_tags', 'all_tags', 'not_tags', 'reposted_claim_id',
'any_locations', 'all_locations', 'not_locations',
'any_languages', 'all_languages', 'not_languages',
@ -70,6 +70,7 @@ class ReaderState:
ledger: Type[Ledger]
query_timeout: float
log: logging.Logger
blocked_claims: Dict
def close(self):
self.db.close()
@ -92,16 +93,22 @@ class ReaderState:
ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx')
def initializer(log, _path, _ledger_name, query_timeout, _measure=False):
def row_factory(cursor, row):
return {
k[0]: (set(row[i].split(',')) if k[0] == 'tags' else row[i])
for i, k in enumerate(cursor.getdescription())
}
def initializer(log, _path, _ledger_name, query_timeout, _measure=False, blocked_claims=None):
db = apsw.Connection(_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI)
def row_factory(cursor, row):
return {k[0]: row[i] for i, k in enumerate(cursor.getdescription())}
db.setrowtrace(row_factory)
ctx.set(
ReaderState(
db=db, stack=[], metrics={}, is_tracking_metrics=_measure,
ledger=Ledger if _ledger_name == 'mainnet' else RegTestLedger,
query_timeout=query_timeout, log=log
query_timeout=query_timeout, log=log,
blocked_claims={} if blocked_claims is None else blocked_claims
)
)
@ -159,11 +166,28 @@ def encode_result(result):
@measure
def execute_query(sql, values) -> List:
def execute_query(sql, values, row_offset: int, row_limit: int, censor: Censor) -> List:
context = ctx.get()
context.set_query_timeout()
try:
return context.db.cursor().execute(sql, values).fetchall()
c = context.db.cursor()
def row_filter(cursor, row):
nonlocal row_offset
row = row_factory(cursor, row)
if len(row) > 1 and censor.censor(row):
return
if row_offset:
row_offset -= 1
return
return row
c.setrowtrace(row_filter)
i, rows = 0, []
for row in c.execute(sql, values):
i += 1
rows.append(row)
if i >= row_limit:
break
return rows
except apsw.Error as err:
plain_sql = interpolate(sql, values)
if context.is_tracking_metrics:
@ -177,8 +201,11 @@ def execute_query(sql, values) -> List:
def _get_claims(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
if 'order_by' in constraints:
order_by_parts = constraints['order_by']
if isinstance(order_by_parts, str):
order_by_parts = [order_by_parts]
sql_order_by = []
for order_by in constraints['order_by']:
for order_by in order_by_parts:
is_asc = order_by.startswith('^')
column = order_by[1:] if is_asc else order_by
if column not in ORDER_FIELDS:
@ -243,34 +270,6 @@ def _get_claims(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
constraints['claim.channel_hash__in'] = [
unhexlify(cid)[::-1] for cid in channel_ids
]
if 'not_channel_ids' in constraints:
not_channel_ids = constraints.pop('not_channel_ids')
if not_channel_ids:
not_channel_ids_binary = [
unhexlify(ncid)[::-1] for ncid in not_channel_ids
]
if constraints.get('has_channel_signature', False):
constraints['claim.channel_hash__not_in'] = not_channel_ids_binary
else:
constraints['null_or_not_channel__or'] = {
'claim.signature_valid__is_null': True,
'claim.channel_hash__not_in': not_channel_ids_binary
}
if 'blocklist_channel_ids' in constraints:
blocklist_ids = constraints.pop('blocklist_channel_ids')
if blocklist_ids:
blocking_channels = [
unhexlify(channel_id)[::-1] for channel_id in blocklist_ids
]
constraints.update({
f'$blocking_channel{i}': a for i, a in enumerate(blocking_channels)
})
blocklist = ', '.join([
f':$blocking_channel{i}' for i in range(len(blocking_channels))
])
constraints['claim.claim_hash__not_in#blocklist_channel_ids'] = f"""
SELECT reposted_claim_hash FROM claim WHERE channel_hash IN ({blocklist})
"""
if 'signature_valid' in constraints:
has_channel_signature = constraints.pop('has_channel_signature', False)
if has_channel_signature:
@ -319,16 +318,23 @@ def _get_claims(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
return query(select, **constraints)
def get_claims(cols, for_count=False, **constraints) -> List:
def get_claims(cols, for_count=False, **constraints) -> Tuple[List, Censor]:
if 'channel' in constraints:
channel_url = constraints.pop('channel')
match = resolve_url(channel_url)
if isinstance(match, dict):
constraints['channel_hash'] = match['claim_hash']
else:
return [{'row_count': 0}] if cols == 'count(*) as row_count' else []
return ([{'row_count': 0}] if cols == 'count(*) as row_count' else []), Censor()
censor = Censor(
ctx.get().blocked_claims,
{unhexlify(ncid)[::-1] for ncid in constraints.pop('not_channel_ids', [])},
set(clean_tags(constraints.pop('not_tags', {})))
)
row_offset = constraints.pop('offset', 0)
row_limit = constraints.pop('limit', 20)
sql, values = _get_claims(cols, for_count, **constraints)
return execute_query(sql, values)
return execute_query(sql, values, row_offset, row_limit, censor), censor
@measure
@ -336,11 +342,11 @@ def get_claims_count(**constraints) -> int:
constraints.pop('offset', None)
constraints.pop('limit', None)
constraints.pop('order_by', None)
count = get_claims('count(*) as row_count', for_count=True, **constraints)
count, _ = get_claims('count(*) as row_count', for_count=True, **constraints)
return count[0]['row_count']
def _search(**constraints):
def _search(**constraints) -> Tuple[List, Censor]:
return get_claims(
"""
claimtrie.claim_hash as is_controlling,
@ -354,7 +360,8 @@ def _search(**constraints):
claim.trending_local, claim.trending_global,
claim.short_url, claim.canonical_url,
claim.channel_hash, claim.reposted_claim_hash,
claim.signature_valid
claim.signature_valid,
COALESCE((SELECT group_concat(tag) FROM tag WHERE tag.claim_hash = claim.claim_hash), "") as tags
""", **constraints
)
@ -365,19 +372,19 @@ def _get_referenced_rows(txo_rows: List[dict]):
reposted_txos = []
if repost_hashes:
reposted_txos = _search(**{'claim.claim_hash__in': repost_hashes})
reposted_txos, _ = _search(**{'claim.claim_hash__in': repost_hashes})
channel_hashes |= set(filter(None, map(itemgetter('channel_hash'), reposted_txos)))
channel_txos = []
if channel_hashes:
channel_txos = _search(**{'claim.claim_hash__in': channel_hashes})
channel_txos, _ = _search(**{'claim.claim_hash__in': channel_hashes})
# channels must come first for client side inflation to work properly
return channel_txos + reposted_txos
@measure
def search(constraints) -> Tuple[List, List, int, int]:
def search(constraints) -> Tuple[List, List, int, int, Censor]:
assert set(constraints).issubset(SEARCH_PARAMS), \
f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}"
total = None
@ -387,9 +394,9 @@ def search(constraints) -> Tuple[List, List, int, int]:
constraints['limit'] = min(abs(constraints.get('limit', 10)), 50)
if 'order_by' not in constraints:
constraints['order_by'] = ["claim_hash"]
txo_rows = _search(**constraints)
txo_rows, censor = _search(**constraints)
extra_txo_rows = _get_referenced_rows(txo_rows)
return txo_rows, extra_txo_rows, constraints['offset'], total
return txo_rows, extra_txo_rows, constraints['offset'], total, censor
@measure
@ -415,7 +422,7 @@ def resolve_url(raw_url):
query['is_controlling'] = True
else:
query['order_by'] = ['^creation_height']
matches = _search(**query, limit=1)
matches, _ = _search(**query, limit=1)
if matches:
channel = matches[0]
else:
@ -433,7 +440,7 @@ def resolve_url(raw_url):
query['signature_valid'] = 1
elif set(query) == {'name'}:
query['is_controlling'] = 1
matches = _search(**query, limit=1)
matches, _ = _search(**query, limit=1)
if matches:
return matches[0]
else:

View file

@ -4,8 +4,10 @@ from typing import Union, Tuple, Set, List
from itertools import chain
from decimal import Decimal
from collections import namedtuple
from multiprocessing import Manager
from binascii import unhexlify
from lbry.wallet.server.leveldb import DB
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.util import class_logger
from lbry.wallet.database import query, constraints_to_sql
@ -136,6 +138,8 @@ class SQLDB:
create unique index if not exists filter_fee_amount_order_release_time_idx on claim (fee_amount, release_time, claim_hash);
create unique index if not exists claim_type_trending_idx on claim (claim_type, trending_global, trending_mixed, claim_hash);
create unique index if not exists claim_type_release_idx on claim (claim_type, release_time, claim_hash);
create unique index if not exists claim_type_effective_amount_idx on claim (claim_type, effective_amount, claim_hash);
-- TODO: verify that all indexes below are used
create index if not exists claim_height_normalized_idx on claim (height, normalized asc);
@ -166,13 +170,18 @@ class SQLDB:
CREATE_TAG_TABLE
)
def __init__(self, main, path):
def __init__(self, main, path: str, filtering_channels: list):
self.main = main
self._db_path = path
self.db = None
self.state_manager = None
self.blocked_claims = None
self.logger = class_logger(__name__, self.__class__.__name__)
self.ledger = Ledger if self.main.coin.NET == 'mainnet' else RegTestLedger
self.ledger = Ledger if main.coin.NET == 'mainnet' else RegTestLedger
self._fts_synced = False
self.filtering_channel_hashes = {
unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id
}
def open(self):
self.db = apsw.Connection(
@ -192,10 +201,27 @@ class SQLDB:
self.execute(self.CREATE_TABLES_QUERY)
register_canonical_functions(self.db)
register_trending_functions(self.db)
self.state_manager = Manager()
self.blocked_claims = self.state_manager.dict()
self.update_blocked_claims()
def close(self):
if self.db is not None:
self.db.close()
if self.state_manager is not None:
self.state_manager.shutdown()
def update_blocked_claims(self):
sql = query(
"SELECT channel_hash, reposted_claim_hash FROM claim",
reposted_claim_hash__is_not_null=1,
channel_hash__in=self.filtering_channel_hashes
)
blocked_claims = {}
for blocked_claim in self.execute(*sql):
blocked_claims[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash
self.blocked_claims.clear()
self.blocked_claims.update(blocked_claims)
@staticmethod
def _insert_sql(table: str, data: dict) -> Tuple[str, list]:
@ -585,6 +611,12 @@ class SQLDB:
""", [(channel_hash,) for channel_hash in all_channel_keys.keys()])
sub_timer.stop()
sub_timer = timer.add_timer('update blocked claims list')
sub_timer.start()
if self.filtering_channel_hashes.intersection(all_channel_keys):
self.update_blocked_claims()
sub_timer.stop()
def _update_support_amount(self, claim_hashes):
if claim_hashes:
self.execute(f"""
@ -778,12 +810,13 @@ class SQLDB:
self._fts_synced = True
class LBRYDB(DB):
class LBRYLevelDB(LevelDB):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
path = os.path.join(self.env.db_dir, 'claims.db')
self.sql = SQLDB(self, path)
# space separated list of channel URIs used for filtering bad content
self.sql = SQLDB(self, path, self.env.default('FILTERING_CHANNELS_IDS', '').split(' '))
def close(self):
super().close()

View file

@ -47,7 +47,7 @@ class FlushData:
tip = attr.ib()
class DB:
class LevelDB:
"""Simple wrapper of the backend database for querying.
Performs no DB update, though the DB will be cleaned on opening if

View file

@ -23,7 +23,7 @@ from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import lbry
from lbry.wallet.server.block_processor import LBRYBlockProcessor
from lbry.wallet.server.db.writer import LBRYDB
from lbry.wallet.server.db.writer import LBRYLevelDB
from lbry.wallet.server.db import reader
from lbry.wallet.server.websocket import AdminWebSocket
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
@ -40,8 +40,6 @@ from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.peers import PeerManager
if typing.TYPE_CHECKING:
from lbry.wallet.server.env import Env
from lbry.wallet.server.leveldb import DB
from lbry.wallet.server.block_processor import BlockProcessor
from lbry.wallet.server.mempool import MemPool
from lbry.wallet.server.daemon import Daemon
@ -120,7 +118,7 @@ class SessionGroup:
class SessionManager:
"""Holds global state about all sessions."""
def __init__(self, env: 'Env', db: 'DB', bp: 'BlockProcessor', daemon: 'Daemon', mempool: 'MemPool',
def __init__(self, env: 'Env', db: LBRYLevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool',
shutdown_event: asyncio.Event):
env.max_send = max(350000, env.max_send)
self.env = env
@ -750,7 +748,7 @@ class LBRYSessionManager(SessionManager):
args = dict(
initializer=reader.initializer,
initargs=(self.logger, path, self.env.coin.NET, self.env.database_query_timeout,
self.env.track_metrics)
self.env.track_metrics, self.db.sql.blocked_claims)
)
if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
self.query_executor = ThreadPoolExecutor(max_workers=1, **args)
@ -793,10 +791,7 @@ class LBRYElectrumX(SessionBase):
# fixme: this is a rebase hack, we need to go through ChainState instead later
self.daemon = self.session_mgr.daemon
self.bp: LBRYBlockProcessor = self.session_mgr.bp
self.db: LBRYDB = self.bp.db
# space separated list of channel URIs used for filtering bad content
filtering_channels = self.env.default('FILTERING_CHANNELS_IDS', '')
self.filtering_channels_ids = list(filter(None, filtering_channels.split(' ')))
self.db: LBRYLevelDB = self.bp.db
@classmethod
def protocol_min_max_strings(cls):
@ -936,7 +931,6 @@ class LBRYElectrumX(SessionBase):
async def claimtrie_search(self, **kwargs):
if kwargs:
kwargs.setdefault('blocklist_channel_ids', []).extend(self.filtering_channels_ids)
return await self.run_and_cache_query('search', reader.search_to_bytes, kwargs)
async def claimtrie_resolve(self, *urls):

View file

@ -751,27 +751,37 @@ class StreamCommands(ClaimTestCase):
self.assertEqual(resolved['newstuff-again']['reposted_claim']['name'], 'newstuff')
async def test_filtering_channels_for_removing_content(self):
await self.channel_create('@badstuff', '1.0')
await self.stream_create('not_bad', '1.1', channel_name='@badstuff')
tx = await self.stream_create('too_bad', '1.1', channel_name='@badstuff')
claim_id = self.get_claim_id(tx)
await self.channel_create('@reposts', '1.0')
await self.stream_repost(claim_id, 'normal_repost', '1.2', channel_name='@reposts')
filtering1 = await self.channel_create('@filtering1', '1.0')
filtering1 = self.get_claim_id(filtering1)
await self.stream_repost(claim_id, 'filter1', '1.1', channel_name='@filtering1')
await self.conductor.spv_node.stop()
await self.conductor.spv_node.start(
self.conductor.blockchain_node, extraconf={'FILTERING_CHANNELS_IDS': filtering1}
await self.channel_create('@some_channel', '1.0')
await self.stream_create('good_content', '1.1', channel_name='@some_channel', tags=['good'])
bad_content_id = self.get_claim_id(
await self.stream_create('bad_content', '1.1', channel_name='@some_channel', tags=['bad'])
)
await self.ledger.stop()
await self.ledger.start()
filtered_claim_search = await self.claim_search(name='too_bad')
self.assertEqual(filtered_claim_search, [])
filtered_claim_search = await self.claim_search(name='not_bad')
self.assertEqual(len(filtered_claim_search), 1)
filtered_claim_search = await self.claim_search(name='normal_repost')
self.assertEqual(len(filtered_claim_search), 1)
blocking_channel_id = self.get_claim_id(
await self.channel_create('@filtering', '1.0')
)
self.conductor.spv_node.server.db.sql.filtering_channel_hashes.add(
unhexlify(blocking_channel_id)[::-1]
)
await self.stream_repost(bad_content_id, 'filter1', '1.1', channel_name='@filtering')
# search for blocked content directly
result = await self.out(self.daemon.jsonrpc_claim_search(name='bad_content'))
self.assertEqual([], result['items'])
self.assertEqual({"reposted_in_channel": {blocking_channel_id: 1}, "total": 1}, result['blocked'])
# search channel containing blocked content
result = await self.out(self.daemon.jsonrpc_claim_search(channel='@some_channel'))
self.assertEqual(1, len(result['items']))
self.assertEqual({"reposted_in_channel": {blocking_channel_id: 1}, "total": 1}, result['blocked'])
# search channel containing blocked content, also block tag
result = await self.out(self.daemon.jsonrpc_claim_search(channel='@some_channel', not_tags=["good", "bad"]))
self.assertEqual(0, len(result['items']))
self.assertEqual({
"reposted_in_channel": {blocking_channel_id: 1},
"has_tag": {"good": 1, "bad": 1},
"total": 2
}, result['blocked'])
async def test_publish_updates_file_list(self):
tx = await self.stream_create(title='created')

View file

@ -3,8 +3,8 @@ import ecdsa
import hashlib
import logging
from binascii import hexlify
from lbry.wallet.constants import COIN, NULL_HASH32
from lbry.wallet.constants import COIN, NULL_HASH32
from lbry.schema.claim import Claim
from lbry.wallet.server.db import reader, writer
from lbry.wallet.server.coin import LBCRegTest
@ -36,10 +36,13 @@ class TestSQLDB(unittest.TestCase):
self.daemon_height = 1
self.coin = LBCRegTest()
db_url = 'file:test_sqldb?mode=memory&cache=shared'
self.sql = writer.SQLDB(self, db_url)
self.sql = writer.SQLDB(self, db_url, [])
self.addCleanup(self.sql.close)
self.sql.open()
reader.initializer(logging.getLogger(__name__), db_url, 'regtest', self.query_timeout)
reader.initializer(
logging.getLogger(__name__), db_url, 'regtest',
self.query_timeout, blocked_claims=self.sql.blocked_claims
)
self.addCleanup(reader.cleanup)
self.timer = Timer('BlockProcessor')
self._current_height = 0
@ -74,9 +77,9 @@ class TestSQLDB(unittest.TestCase):
Input.spend(channel)
)
def get_stream(self, title, amount, name='foo', channel=None):
def get_stream(self, title, amount, name='foo', channel=None, **kwargs):
claim = Claim()
claim.stream.title = title
claim.stream.update(title=title, **kwargs)
result = self._make_tx(Output.pay_claim_name_pubkey_hash(amount, name, claim, b'abc'))
if channel:
result[0].outputs[0].sign(channel)
@ -96,6 +99,14 @@ class TestSQLDB(unittest.TestCase):
result[0]._reset()
return result
def get_repost(self, claim_id, amount, channel):
claim = Claim()
claim.repost.reference.claim_id = claim_id
result = self._make_tx(Output.pay_claim_name_pubkey_hash(amount, 'repost', claim, b'abc'))
result[0].outputs[0].sign(channel)
result[0]._reset()
return result
def get_abandon(self, tx):
claim = Transaction(tx[0].raw).outputs[0]
return self._make_tx(
@ -319,7 +330,7 @@ class TestClaimtrie(TestSQLDB):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
advance(10, [stream, self.get_stream_update(stream, 11*COIN)])
self.assertTrue(reader._search())
self.assertTrue(reader._search()[0])
def test_double_updates_in_same_block(self):
advance, state = self.advance, self.state
@ -327,13 +338,13 @@ class TestClaimtrie(TestSQLDB):
advance(10, [stream])
update = self.get_stream_update(stream, 11*COIN)
advance(20, [update, self.get_stream_update(update, 9*COIN)])
self.assertTrue(reader._search())
self.assertTrue(reader._search()[0])
def test_create_and_abandon_in_same_block(self):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
advance(10, [stream, self.get_abandon(stream)])
self.assertFalse(reader._search())
self.assertFalse(reader._search()[0])
def test_update_and_abandon_in_same_block(self):
advance, state = self.advance, self.state
@ -341,14 +352,14 @@ class TestClaimtrie(TestSQLDB):
advance(10, [stream])
update = self.get_stream_update(stream, 11*COIN)
advance(20, [update, self.get_abandon(update)])
self.assertFalse(reader._search())
self.assertFalse(reader._search()[0])
def test_create_update_and_delete_in_same_block(self):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
update = self.get_stream_update(stream, 11*COIN)
advance(10, [stream, update, self.get_abandon(update)])
self.assertFalse(reader._search())
self.assertFalse(reader._search()[0])
def test_support_added_and_removed_in_same_block(self):
advance, state = self.advance, self.state
@ -356,7 +367,7 @@ class TestClaimtrie(TestSQLDB):
advance(10, [stream])
support = self.get_support(stream, COIN)
advance(20, [support, self.get_abandon(support)])
self.assertEqual(reader._search()[0]['support_amount'], 0)
self.assertEqual(reader._search()[0][0]['support_amount'], 0)
@staticmethod
def _get_x_with_claim_id_prefix(getter, prefix, cached_iteration=None, **kwargs):
@ -385,7 +396,7 @@ class TestClaimtrie(TestSQLDB):
txo_chan_ab = tx_chan_ab[0].outputs[0]
advance(1, [tx_chan_a])
advance(2, [tx_chan_ab])
r_ab, r_a = reader._search(order_by=['creation_height'], limit=2)
(r_ab, r_a), _ = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual("@foo#a", r_a['short_url'])
self.assertEqual("@foo#ab", r_ab['short_url'])
self.assertIsNone(r_a['canonical_url'])
@ -398,7 +409,7 @@ class TestClaimtrie(TestSQLDB):
tx_abc = self.get_stream_with_claim_id_prefix('abc', 65)
advance(3, [tx_a])
advance(4, [tx_ab, tx_abc])
r_abc, r_ab, r_a = reader._search(order_by=['creation_height', 'tx_position'], limit=3)
(r_abc, r_ab, r_a), _ = reader._search(order_by=['creation_height', 'tx_position'], limit=3)
self.assertEqual("foo#a", r_a['short_url'])
self.assertEqual("foo#ab", r_ab['short_url'])
self.assertEqual("foo#abc", r_abc['short_url'])
@ -412,51 +423,51 @@ class TestClaimtrie(TestSQLDB):
ab2_claim = tx_ab2[0].outputs[0]
advance(6, [tx_a2])
advance(7, [tx_ab2])
r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2)
(r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url'])
self.assertEqual("@foo#a/foo#a", r_a2['canonical_url'])
self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel'])
# change channel public key, invaliding stream claim signatures
advance(8, [self.get_channel_update(txo_chan_a, COIN, key=b'a')])
r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2)
(r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url'])
self.assertIsNone(r_a2['canonical_url'])
self.assertIsNone(r_ab2['canonical_url'])
self.assertEqual(0, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(0, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel'])
# reinstate previous channel public key (previous stream claim signatures become valid again)
channel_update = self.get_channel_update(txo_chan_a, COIN, key=b'c')
advance(9, [channel_update])
r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2)
(r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url'])
self.assertEqual("@foo#a/foo#a", r_a2['canonical_url'])
self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(0, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel'])
self.assertEqual(0, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0][0]['claims_in_channel'])
# change channel of stream
self.assertEqual("@foo#a/foo#ab", reader._search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url'])
self.assertEqual("@foo#a/foo#ab", reader._search(claim_id=ab2_claim.claim_id, limit=1)[0][0]['canonical_url'])
tx_ab2 = self.get_stream_update(tx_ab2, COIN, txo_chan_ab)
advance(10, [tx_ab2])
self.assertEqual("@foo#ab/foo#a", reader._search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url'])
self.assertEqual("@foo#ab/foo#a", reader._search(claim_id=ab2_claim.claim_id, limit=1)[0][0]['canonical_url'])
# TODO: currently there is a bug where stream leaving a channel does not update that channels claims count
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel'])
# TODO: after bug is fixed remove test above and add test below
#self.assertEqual(1, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(1, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel'])
#self.assertEqual(1, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel'])
self.assertEqual(1, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0][0]['claims_in_channel'])
# claim abandon updates claims_in_channel
advance(11, [self.get_abandon(tx_ab2)])
self.assertEqual(0, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(0, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0][0]['claims_in_channel'])
# delete channel, invaliding stream claim signatures
advance(12, [self.get_abandon(channel_update)])
r_a2, = reader._search(order_by=['creation_height'], limit=1)
(r_a2,), _ = reader._search(order_by=['creation_height'], limit=1)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertIsNone(r_a2['canonical_url'])
@ -514,7 +525,7 @@ class TestTrending(TestSQLDB):
self.get_support(up_medium, (20+(window*(2 if window == 7 else 1)))*COIN),
self.get_support(up_biggly, (20+(window*(3 if window == 7 else 1)))*COIN),
])
results = reader._search(order_by=['trending_local'])
results, _ = reader._search(order_by=['trending_local'])
self.assertEqual([c.claim_id for c in claims], [hexlify(c['claim_hash'][::-1]).decode() for c in results])
self.assertEqual([10, 6, 2, 0, -2], [int(c['trending_local']) for c in results])
self.assertEqual([53, 38, -32, 0, -6], [int(c['trending_global']) for c in results])
@ -526,3 +537,90 @@ class TestTrending(TestSQLDB):
self.advance(1, [problematic])
self.advance(TRENDING_WINDOW, [self.get_support(problematic, 53000000000)])
self.advance(TRENDING_WINDOW * 2, [self.get_support(problematic, 500000000)])
class TestContentBlocking(TestSQLDB):
def test_blocking(self):
tx0 = self.get_channel('A Channel', COIN)
a_channel = tx0[0].outputs[0]
tx1 = self.get_stream('Claim One', COIN)
tx2 = self.get_stream('Claim Two', COIN, tags=["mature"], channel=a_channel)
self.advance(1, [tx0, tx1, tx2])
claim1, claim2 = tx1[0].outputs[0], tx2[0].outputs[0]
# nothing blocked
results, censor = reader._search(text='Claim')
self.assertEqual(2, len(results))
self.assertEqual(0, censor.total)
self.assertEqual({}, dict(self.sql.blocked_claims))
# block claim reposted to blocking channel
tx = self.get_channel('Blocking Channel', COIN)
channel = tx[0].outputs[0]
self.sql.filtering_channel_hashes.add(channel.claim_hash)
self.advance(2, [tx])
self.assertEqual({}, dict(self.sql.blocked_claims))
tx = self.get_repost(claim1.claim_id, COIN, channel)
reposting_claim = tx[0].outputs[0]
self.advance(3, [tx])
self.assertEqual(
{reposting_claim.claim.repost.reference.claim_hash: channel.claim_hash},
dict(self.sql.blocked_claims)
)
# claim is blocked from results by repost
results, censor = reader._search(text='Claim')
self.assertEqual(1, len(results))
self.assertEqual(claim2.claim_hash, results[0]['claim_hash'])
self.assertEqual(1, censor.total)
self.assertEqual({channel.claim_hash: 1}, censor.blocked_claims)
self.assertEqual({}, censor.blocked_channels)
self.assertEqual({}, censor.blocked_tags)
# claim is blocked from results by repost and tags
results, censor = reader._search(text='Claim', not_tags=["mature"])
self.assertEqual(0, len(results))
self.assertEqual(2, censor.total)
self.assertEqual({channel.claim_hash: 1}, censor.blocked_claims)
self.assertEqual({}, censor.blocked_channels)
self.assertEqual({"mature": 1}, censor.blocked_tags)
# claim is blocked from results by repost and channel
results, censor = reader._search(text='Claim', not_channel_ids=[a_channel.claim_id])
self.assertEqual(0, len(results))
self.assertEqual(2, censor.total)
self.assertEqual({channel.claim_hash: 1}, censor.blocked_claims)
self.assertEqual({a_channel.claim_hash: 1}, censor.blocked_channels)
self.assertEqual({}, censor.blocked_tags)
def test_pagination(self):
one, two, three, four, five, six, seven = (
self.advance(1, [self.get_stream('One', COIN, tags=["mature"])])[0],
self.advance(2, [self.get_stream('Two', COIN, tags=["mature"])])[0],
self.advance(3, [self.get_stream('Three', COIN)])[0],
self.advance(4, [self.get_stream('Four', COIN)])[0],
self.advance(5, [self.get_stream('Five', COIN)])[0],
self.advance(6, [self.get_stream('Six', COIN)])[0],
self.advance(7, [self.get_stream('Seven', COIN)])[0],
)
# nothing blocked
results, censor = reader._search(order_by='^height', offset=1, limit=3)
self.assertEqual(3, len(results))
self.assertEqual(
[two.claim_hash, three.claim_hash, four.claim_hash],
[r['claim_hash'] for r in results]
)
self.assertEqual(0, censor.total)
# tags blocked
results, censor = reader._search(order_by='^height', not_tags=('mature',), offset=1, limit=3)
self.assertEqual(3, len(results))
self.assertEqual(
[four.claim_hash, five.claim_hash, six.claim_hash],
[r['claim_hash'] for r in results]
)
self.assertEqual(2, censor.total)
self.assertEqual({"mature": 2}, censor.blocked_tags)