using multiprocessing.Manager to keep blocked content synced between readers

This commit is contained in:
Lex Berezhny 2020-01-10 10:47:57 -05:00 committed by Alex Grintsvayg
parent 59a5bacb2e
commit 86cedfe8b2
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
14 changed files with 1464 additions and 1195 deletions

View file

@ -2320,7 +2320,7 @@ class Daemon(metaclass=JSONRPCServerType):
kwargs['signature_valid'] = 0
page_num, page_size = abs(kwargs.pop('page', 1)), min(abs(kwargs.pop('page_size', DEFAULT_PAGE_SIZE)), 50)
kwargs.update({'offset': page_size * (page_num - 1), 'limit': page_size})
txos, _, total = await self.ledger.claim_search(wallet.accounts, **kwargs)
txos, blocked, _, total = await self.ledger.claim_search(wallet.accounts, **kwargs)
result = {"items": txos, "page": page_num, "page_size": page_size}
if not kwargs.pop('no_totals', False):
result['total_pages'] = int((total + (page_size - 1)) / page_size)

View file

@ -1,5 +1,5 @@
build:
rm types/v2/* -rf
touch types/v2/__init__.py
cd types/v2/ && protoc --python_out=. -I ../../../../../../types/v2/proto/ ../../../../../../types/v2/proto/*.proto
cd types/v2/ && protoc --python_out=. -I ../../../../../types/v2/proto/ ../../../../../types/v2/proto/*.proto
sed -e 's/^import\ \(.*\)_pb2\ /from . import\ \1_pb2\ /g' -i types/v2/*.py

View file

@ -7,22 +7,74 @@ from itertools import chain
from lbry.schema.types.v2.result_pb2 import Outputs as OutputsMessage
class Censor:
def __init__(self, claim_ids: dict = None, channel_ids: set = None, tags: set = None):
self.claim_ids = claim_ids or {}
self.channel_ids = channel_ids or set()
self.tags = tags or set()
self.blocked_claims = {}
self.blocked_channels = {}
self.blocked_tags = {}
self.total = 0
def censor(self, row) -> bool:
censored = False
if row['claim_hash'] in self.claim_ids:
censored = True
channel_id = self.claim_ids[row['claim_hash']]
self.blocked_claims.setdefault(channel_id, 0)
self.blocked_claims[channel_id] += 1
if row['channel_hash'] in self.channel_ids:
censored = True
self.blocked_channels.setdefault(row['channel_hash'], 0)
self.blocked_channels[row['channel_hash']] += 1
if self.tags.intersection(row['tags']):
censored = True
for tag in self.tags:
if tag in row['tags']:
self.blocked_tags.setdefault(tag, 0)
self.blocked_tags[tag] += 1
if censored:
self.total += 1
return censored
def to_message(self, outputs: OutputsMessage):
outputs.blocked_total = self.total
for channel_hash, count in self.blocked_claims.items():
block = outputs.blocked.add()
block.count = count
block.ban_channel = channel_hash
for channel_hash, count in self.blocked_channels.items():
block = outputs.blocked.add()
block.count = count
block.not_channel = channel_hash
for tag, count in self.blocked_tags.items():
block = outputs.blocked.add()
block.count = count
block.not_tag = tag
class Outputs:
__slots__ = 'txos', 'extra_txos', 'txs', 'offset', 'total'
__slots__ = 'txos', 'extra_txos', 'txs', 'offset', 'total', 'blocked', 'blocked_total'
def __init__(self, txos: List, extra_txos: List, txs: set, offset: int, total: int):
def __init__(self, txos: List, extra_txos: List, txs: set,
offset: int, total: int, blocked: List, blocked_total: int):
self.txos = txos
self.txs = txs
self.extra_txos = extra_txos
self.offset = offset
self.total = total
self.blocked = blocked
self.blocked_total = blocked_total
def inflate(self, txs):
tx_map = {tx.hash: tx for tx in txs}
for txo_message in self.extra_txos:
self.message_to_txo(txo_message, tx_map)
return [self.message_to_txo(txo_message, tx_map) for txo_message in self.txos]
txos = [self.message_to_txo(txo_message, tx_map) for txo_message in self.txos]
return txos, self.inflate_blocked(txs)
def message_to_txo(self, txo_message, tx_map):
if txo_message.WhichOneof('meta') == 'error':
@ -57,6 +109,41 @@ class Outputs:
pass
return txo
def inflate_blocked(self, txs):
blocked = {}
return
if txo_message.WhichOneof('meta') == 'error':
return None
txo = tx_map[txo_message.tx_hash].outputs[txo_message.nout]
if txo_message.WhichOneof('meta') == 'claim':
claim = txo_message.claim
txo.meta = {
'short_url': f'lbry://{claim.short_url}',
'canonical_url': f'lbry://{claim.canonical_url or claim.short_url}',
'reposted': claim.reposted,
'is_controlling': claim.is_controlling,
'take_over_height': claim.take_over_height,
'creation_height': claim.creation_height,
'activation_height': claim.activation_height,
'expiration_height': claim.expiration_height,
'effective_amount': claim.effective_amount,
'support_amount': claim.support_amount,
'trending_group': claim.trending_group,
'trending_mixed': claim.trending_mixed,
'trending_local': claim.trending_local,
'trending_global': claim.trending_global,
}
if claim.HasField('channel'):
txo.channel = tx_map[claim.channel.tx_hash].outputs[claim.channel.nout]
if claim.HasField('repost'):
txo.reposted_claim = tx_map[claim.repost.tx_hash].outputs[claim.repost.nout]
try:
if txo.claim.is_channel:
txo.meta['claims_in_channel'] = claim.claims_in_channel
except:
pass
return txo
@classmethod
def from_base64(cls, data: str) -> 'Outputs':
return cls.from_bytes(base64.b64decode(data))
@ -70,18 +157,24 @@ class Outputs:
if txo_message.WhichOneof('meta') == 'error':
continue
txs.add((hexlify(txo_message.tx_hash[::-1]).decode(), txo_message.height))
return cls(outputs.txos, outputs.extra_txos, txs, outputs.offset, outputs.total)
return cls(
outputs.txos, outputs.extra_txos, txs,
outputs.offset, outputs.total,
outputs.blocked, outputs.blocked_total
)
@classmethod
def to_base64(cls, txo_rows, extra_txo_rows, offset=0, total=None) -> str:
return base64.b64encode(cls.to_bytes(txo_rows, extra_txo_rows, offset, total)).decode()
def to_base64(cls, txo_rows, extra_txo_rows, offset=0, total=None, blocked=None) -> str:
return base64.b64encode(cls.to_bytes(txo_rows, extra_txo_rows, offset, total, blocked)).decode()
@classmethod
def to_bytes(cls, txo_rows, extra_txo_rows, offset=0, total=None) -> bytes:
def to_bytes(cls, txo_rows, extra_txo_rows, offset=0, total=None, blocked: Censor = None) -> bytes:
page = OutputsMessage()
page.offset = offset
if total is not None:
page.total = total
if blocked is not None:
blocked.to_message(page)
for row in txo_rows:
cls.row_to_message(row, page.txos.add(), extra_txo_rows)
for row in extra_txo_rows:
@ -121,6 +214,10 @@ class Outputs:
cls.set_reference(txo_message, 'channel', txo['channel_hash'], extra_txo_rows)
cls.set_reference(txo_message, 'repost', txo['reposted_claim_hash'], extra_txo_rows)
@staticmethod
def set_blocked(message, blocked):
message.blocked_total = blocked.total
@staticmethod
def set_reference(message, attr, claim_hash, rows):
if claim_hash:

File diff suppressed because one or more lines are too long

View file

@ -7,6 +7,7 @@ from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
@ -18,9 +19,9 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='purchase.proto',
package='pb',
syntax='proto3',
serialized_options=None,
serialized_pb=_b('\n\x0epurchase.proto\x12\x02pb\"\x1e\n\x08Purchase\x12\x12\n\nclaim_hash\x18\x01 \x01(\x0c\x62\x06proto3')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
@ -38,14 +39,14 @@ _PURCHASE = _descriptor.Descriptor(
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@ -56,7 +57,6 @@ _PURCHASE = _descriptor.Descriptor(
)
DESCRIPTOR.message_types_by_name['Purchase'] = _PURCHASE
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Purchase = _reflection.GeneratedProtocolMessageType('Purchase', (_message.Message,), dict(
DESCRIPTOR = _PURCHASE,

View file

@ -7,6 +7,7 @@ from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
@ -18,9 +19,9 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='result.proto',
package='pb',
syntax='proto3',
serialized_options=None,
serialized_pb=_b('\n\x0cresult.proto\x12\x02pb\"b\n\x07Outputs\x12\x18\n\x04txos\x18\x01 \x03(\x0b\x32\n.pb.Output\x12\x1e\n\nextra_txos\x18\x02 \x03(\x0b\x32\n.pb.Output\x12\r\n\x05total\x18\x03 \x01(\r\x12\x0e\n\x06offset\x18\x04 \x01(\r\"{\n\x06Output\x12\x0f\n\x07tx_hash\x18\x01 \x01(\x0c\x12\x0c\n\x04nout\x18\x02 \x01(\r\x12\x0e\n\x06height\x18\x03 \x01(\r\x12\x1e\n\x05\x63laim\x18\x07 \x01(\x0b\x32\r.pb.ClaimMetaH\x00\x12\x1a\n\x05\x65rror\x18\x0f \x01(\x0b\x32\t.pb.ErrorH\x00\x42\x06\n\x04meta\"\xaf\x03\n\tClaimMeta\x12\x1b\n\x07\x63hannel\x18\x01 \x01(\x0b\x32\n.pb.Output\x12\x1a\n\x06repost\x18\x02 \x01(\x0b\x32\n.pb.Output\x12\x11\n\tshort_url\x18\x03 \x01(\t\x12\x15\n\rcanonical_url\x18\x04 \x01(\t\x12\x16\n\x0eis_controlling\x18\x05 \x01(\x08\x12\x18\n\x10take_over_height\x18\x06 \x01(\r\x12\x17\n\x0f\x63reation_height\x18\x07 \x01(\r\x12\x19\n\x11\x61\x63tivation_height\x18\x08 \x01(\r\x12\x19\n\x11\x65xpiration_height\x18\t \x01(\r\x12\x19\n\x11\x63laims_in_channel\x18\n \x01(\r\x12\x10\n\x08reposted\x18\x0b \x01(\r\x12\x18\n\x10\x65\x66\x66\x65\x63tive_amount\x18\x14 \x01(\x04\x12\x16\n\x0esupport_amount\x18\x15 \x01(\x04\x12\x16\n\x0etrending_group\x18\x16 \x01(\r\x12\x16\n\x0etrending_mixed\x18\x17 \x01(\x02\x12\x16\n\x0etrending_local\x18\x18 \x01(\x02\x12\x17\n\x0ftrending_global\x18\x19 \x01(\x02\"i\n\x05\x45rror\x12\x1c\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x0e.pb.Error.Code\x12\x0c\n\x04text\x18\x02 \x01(\t\"4\n\x04\x43ode\x12\x10\n\x0cUNKNOWN_CODE\x10\x00\x12\r\n\tNOT_FOUND\x10\x01\x12\x0b\n\x07INVALID\x10\x02\x62\x06proto3')
serialized_pb=_b('\n\x0cresult.proto\x12\x02pb\"\x97\x01\n\x07Outputs\x12\x18\n\x04txos\x18\x01 \x03(\x0b\x32\n.pb.Output\x12\x1e\n\nextra_txos\x18\x02 \x03(\x0b\x32\n.pb.Output\x12\x1c\n\x07\x62locked\x18\x03 \x03(\x0b\x32\x0b.pb.Blocked\x12\r\n\x05total\x18\x04 \x01(\r\x12\x0e\n\x06offset\x18\x05 \x01(\r\x12\x15\n\rblocked_total\x18\x06 \x01(\r\"{\n\x06Output\x12\x0f\n\x07tx_hash\x18\x01 \x01(\x0c\x12\x0c\n\x04nout\x18\x02 \x01(\r\x12\x0e\n\x06height\x18\x03 \x01(\r\x12\x1e\n\x05\x63laim\x18\x07 \x01(\x0b\x32\r.pb.ClaimMetaH\x00\x12\x1a\n\x05\x65rror\x18\x0f \x01(\x0b\x32\t.pb.ErrorH\x00\x42\x06\n\x04meta\"\xaf\x03\n\tClaimMeta\x12\x1b\n\x07\x63hannel\x18\x01 \x01(\x0b\x32\n.pb.Output\x12\x1a\n\x06repost\x18\x02 \x01(\x0b\x32\n.pb.Output\x12\x11\n\tshort_url\x18\x03 \x01(\t\x12\x15\n\rcanonical_url\x18\x04 \x01(\t\x12\x16\n\x0eis_controlling\x18\x05 \x01(\x08\x12\x18\n\x10take_over_height\x18\x06 \x01(\r\x12\x17\n\x0f\x63reation_height\x18\x07 \x01(\r\x12\x19\n\x11\x61\x63tivation_height\x18\x08 \x01(\r\x12\x19\n\x11\x65xpiration_height\x18\t \x01(\r\x12\x19\n\x11\x63laims_in_channel\x18\n \x01(\r\x12\x10\n\x08reposted\x18\x0b \x01(\r\x12\x18\n\x10\x65\x66\x66\x65\x63tive_amount\x18\x14 \x01(\x04\x12\x16\n\x0esupport_amount\x18\x15 \x01(\x04\x12\x16\n\x0etrending_group\x18\x16 \x01(\r\x12\x16\n\x0etrending_mixed\x18\x17 \x01(\x02\x12\x16\n\x0etrending_local\x18\x18 \x01(\x02\x12\x17\n\x0ftrending_global\x18\x19 \x01(\x02\"\x94\x01\n\x05\x45rror\x12\x1c\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x0e.pb.Error.Code\x12\x0c\n\x04text\x18\x02 \x01(\t\x12\x1c\n\x07\x62locked\x18\x03 \x01(\x0b\x32\x0b.pb.Blocked\"A\n\x04\x43ode\x12\x10\n\x0cUNKNOWN_CODE\x10\x00\x12\r\n\tNOT_FOUND\x10\x01\x12\x0b\n\x07INVALID\x10\x02\x12\x0b\n\x07\x42LOCKED\x10\x03\"j\n\x07\x42locked\x12\r\n\x05\x63ount\x18\x01 \x01(\r\x12\x1d\n\x13reposted_in_channel\x18\x02 \x01(\x0cH\x00\x12\x14\n\nin_channel\x18\x03 \x01(\x0cH\x00\x12\x11\n\x07has_tag\x18\x04 \x01(\tH\x00\x42\x08\n\x06reasonb\x06proto3')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
@ -32,21 +33,25 @@ _ERROR_CODE = _descriptor.EnumDescriptor(
values=[
_descriptor.EnumValueDescriptor(
name='UNKNOWN_CODE', index=0, number=0,
serialized_options=None,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='NOT_FOUND', index=1, number=1,
serialized_options=None,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='INVALID', index=2, number=2,
serialized_options=None,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='BLOCKED', index=3, number=3,
options=None,
type=None),
],
containing_type=None,
serialized_options=None,
serialized_start=732,
serialized_end=784,
options=None,
serialized_start=817,
serialized_end=882,
)
_sym_db.RegisterEnumDescriptor(_ERROR_CODE)
@ -64,42 +69,56 @@ _OUTPUTS = _descriptor.Descriptor(
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='extra_txos', full_name='pb.Outputs.extra_txos', index=1,
number=2, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='total', full_name='pb.Outputs.total', index=2,
number=3, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
name='blocked', full_name='pb.Outputs.blocked', index=2,
number=3, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='offset', full_name='pb.Outputs.offset', index=3,
name='total', full_name='pb.Outputs.total', index=3,
number=4, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='offset', full_name='pb.Outputs.offset', index=4,
number=5, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='blocked_total', full_name='pb.Outputs.blocked_total', index=5,
number=6, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=20,
serialized_end=118,
serialized_start=21,
serialized_end=172,
)
@ -116,42 +135,42 @@ _OUTPUT = _descriptor.Descriptor(
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='nout', full_name='pb.Output.nout', index=1,
number=2, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='height', full_name='pb.Output.height', index=2,
number=3, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='claim', full_name='pb.Output.claim', index=3,
number=7, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='error', full_name='pb.Output.error', index=4,
number=15, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
@ -160,8 +179,8 @@ _OUTPUT = _descriptor.Descriptor(
name='meta', full_name='pb.Output.meta',
index=0, containing_type=None, fields=[]),
],
serialized_start=120,
serialized_end=243,
serialized_start=174,
serialized_end=297,
)
@ -178,133 +197,133 @@ _CLAIMMETA = _descriptor.Descriptor(
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='repost', full_name='pb.ClaimMeta.repost', index=1,
number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='short_url', full_name='pb.ClaimMeta.short_url', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='canonical_url', full_name='pb.ClaimMeta.canonical_url', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='is_controlling', full_name='pb.ClaimMeta.is_controlling', index=4,
number=5, type=8, cpp_type=7, label=1,
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='take_over_height', full_name='pb.ClaimMeta.take_over_height', index=5,
number=6, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='creation_height', full_name='pb.ClaimMeta.creation_height', index=6,
number=7, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='activation_height', full_name='pb.ClaimMeta.activation_height', index=7,
number=8, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='expiration_height', full_name='pb.ClaimMeta.expiration_height', index=8,
number=9, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='claims_in_channel', full_name='pb.ClaimMeta.claims_in_channel', index=9,
number=10, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='reposted', full_name='pb.ClaimMeta.reposted', index=10,
number=11, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='effective_amount', full_name='pb.ClaimMeta.effective_amount', index=11,
number=20, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='support_amount', full_name='pb.ClaimMeta.support_amount', index=12,
number=21, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='trending_group', full_name='pb.ClaimMeta.trending_group', index=13,
number=22, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='trending_mixed', full_name='pb.ClaimMeta.trending_mixed', index=14,
number=23, type=2, cpp_type=6, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='trending_local', full_name='pb.ClaimMeta.trending_local', index=15,
number=24, type=2, cpp_type=6, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='trending_global', full_name='pb.ClaimMeta.trending_global', index=16,
number=25, type=2, cpp_type=6, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=246,
serialized_end=677,
serialized_start=300,
serialized_end=731,
)
@ -321,14 +340,21 @@ _ERROR = _descriptor.Descriptor(
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='text', full_name='pb.Error.text', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
options=None),
_descriptor.FieldDescriptor(
name='blocked', full_name='pb.Error.blocked', index=2,
number=3, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
@ -336,18 +362,74 @@ _ERROR = _descriptor.Descriptor(
enum_types=[
_ERROR_CODE,
],
serialized_options=None,
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=679,
serialized_end=784,
serialized_start=734,
serialized_end=882,
)
_BLOCKED = _descriptor.Descriptor(
name='Blocked',
full_name='pb.Blocked',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='count', full_name='pb.Blocked.count', index=0,
number=1, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='reposted_in_channel', full_name='pb.Blocked.reposted_in_channel', index=1,
number=2, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='in_channel', full_name='pb.Blocked.in_channel', index=2,
number=3, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='has_tag', full_name='pb.Blocked.has_tag', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
_descriptor.OneofDescriptor(
name='reason', full_name='pb.Blocked.reason',
index=0, containing_type=None, fields=[]),
],
serialized_start=884,
serialized_end=990,
)
_OUTPUTS.fields_by_name['txos'].message_type = _OUTPUT
_OUTPUTS.fields_by_name['extra_txos'].message_type = _OUTPUT
_OUTPUTS.fields_by_name['blocked'].message_type = _BLOCKED
_OUTPUT.fields_by_name['claim'].message_type = _CLAIMMETA
_OUTPUT.fields_by_name['error'].message_type = _ERROR
_OUTPUT.oneofs_by_name['meta'].fields.append(
@ -359,12 +441,22 @@ _OUTPUT.fields_by_name['error'].containing_oneof = _OUTPUT.oneofs_by_name['meta'
_CLAIMMETA.fields_by_name['channel'].message_type = _OUTPUT
_CLAIMMETA.fields_by_name['repost'].message_type = _OUTPUT
_ERROR.fields_by_name['code'].enum_type = _ERROR_CODE
_ERROR.fields_by_name['blocked'].message_type = _BLOCKED
_ERROR_CODE.containing_type = _ERROR
_BLOCKED.oneofs_by_name['reason'].fields.append(
_BLOCKED.fields_by_name['reposted_in_channel'])
_BLOCKED.fields_by_name['reposted_in_channel'].containing_oneof = _BLOCKED.oneofs_by_name['reason']
_BLOCKED.oneofs_by_name['reason'].fields.append(
_BLOCKED.fields_by_name['in_channel'])
_BLOCKED.fields_by_name['in_channel'].containing_oneof = _BLOCKED.oneofs_by_name['reason']
_BLOCKED.oneofs_by_name['reason'].fields.append(
_BLOCKED.fields_by_name['has_tag'])
_BLOCKED.fields_by_name['has_tag'].containing_oneof = _BLOCKED.oneofs_by_name['reason']
DESCRIPTOR.message_types_by_name['Outputs'] = _OUTPUTS
DESCRIPTOR.message_types_by_name['Output'] = _OUTPUT
DESCRIPTOR.message_types_by_name['ClaimMeta'] = _CLAIMMETA
DESCRIPTOR.message_types_by_name['Error'] = _ERROR
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
DESCRIPTOR.message_types_by_name['Blocked'] = _BLOCKED
Outputs = _reflection.GeneratedProtocolMessageType('Outputs', (_message.Message,), dict(
DESCRIPTOR = _OUTPUTS,
@ -394,5 +486,12 @@ Error = _reflection.GeneratedProtocolMessageType('Error', (_message.Message,), d
))
_sym_db.RegisterMessage(Error)
Blocked = _reflection.GeneratedProtocolMessageType('Blocked', (_message.Message,), dict(
DESCRIPTOR = _BLOCKED,
__module__ = 'result_pb2'
# @@protoc_insertion_point(class_scope:pb.Blocked)
))
_sym_db.RegisterMessage(Blocked)
# @@protoc_insertion_point(module_scope)

View file

@ -629,7 +629,7 @@ class Ledger(metaclass=LedgerRegistry):
print(record['history'], addresses, tx.id)
raise asyncio.TimeoutError('Timed out waiting for transaction.')
async def _inflate_outputs(self, query, accounts):
async def _inflate_outputs(self, query, accounts) -> Tuple[List[Output], dict, int, int]:
outputs = Outputs.from_base64(await query)
txs = []
if len(outputs.txs) > 0:
@ -652,7 +652,8 @@ class Ledger(metaclass=LedgerRegistry):
}
for txo in priced_claims:
txo.purchase_receipt = receipts.get(txo.claim_id)
return outputs.inflate(txs), outputs.offset, outputs.total
txos, blocked = outputs.inflate(txs)
return txos, blocked, outputs.offset, outputs.total
async def resolve(self, accounts, urls):
resolve = partial(self.network.retriable_call, self.network.resolve)
@ -669,7 +670,7 @@ class Ledger(metaclass=LedgerRegistry):
result[url] = {'error': f'{url} did not resolve to a claim'}
return result
async def claim_search(self, accounts, **kwargs) -> Tuple[List[Output], int, int]:
async def claim_search(self, accounts, **kwargs) -> Tuple[List[Output], dict, int, int]:
return await self._inflate_outputs(self.network.claim_search(**kwargs), accounts)
async def get_claim_by_claim_id(self, accounts, claim_id) -> Output:

View file

@ -12,9 +12,9 @@ from lbry.wallet.server.util import cachedproperty, subclasses
from lbry.wallet.server.hash import Base58, hash160, double_sha256, hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.daemon import Daemon, LBCDaemon
from lbry.wallet.server.script import ScriptPubKey, OpCodes
from lbry.wallet.server.leveldb import DB
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager
from lbry.wallet.server.db.writer import LBRYDB
from lbry.wallet.server.db.writer import LBRYLevelDB
from lbry.wallet.server.block_processor import LBRYBlockProcessor
@ -41,7 +41,7 @@ class Coin:
DAEMON = Daemon
BLOCK_PROCESSOR = LBRYBlockProcessor
SESSION_MANAGER = LBRYSessionManager
DB = DB
DB = LevelDB
HEADER_VALUES = [
'version', 'prev_block_hash', 'merkle_root', 'timestamp', 'bits', 'nonce'
]
@ -240,7 +240,7 @@ class LBC(Coin):
BLOCK_PROCESSOR = LBRYBlockProcessor
SESSION_MANAGER = LBRYSessionManager
DESERIALIZER = DeserializerSegWit
DB = LBRYDB
DB = LBRYLevelDB
NAME = "LBRY"
SHORTNAME = "LBC"
NET = "mainnet"

View file

@ -14,7 +14,7 @@ from lbry.wallet.database import query, interpolate
from lbry.schema.url import URL, normalize_name
from lbry.schema.tags import clean_tags
from lbry.schema.result import Outputs
from lbry.schema.result import Outputs, Censor
from lbry.wallet import Ledger, RegTestLedger
from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS
@ -47,7 +47,7 @@ INTEGER_PARAMS = {
SEARCH_PARAMS = {
'name', 'text', 'claim_id', 'claim_ids', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids',
'public_key_id', 'claim_type', 'stream_types', 'media_types', 'fee_currency',
'has_channel_signature', 'signature_valid', 'blocklist_channel_ids',
'has_channel_signature', 'signature_valid',
'any_tags', 'all_tags', 'not_tags', 'reposted_claim_id',
'any_locations', 'all_locations', 'not_locations',
'any_languages', 'all_languages', 'not_languages',
@ -70,6 +70,7 @@ class ReaderState:
ledger: Type[Ledger]
query_timeout: float
log: logging.Logger
blocked_claims: Dict
def close(self):
self.db.close()
@ -92,16 +93,22 @@ class ReaderState:
ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx')
def initializer(log, _path, _ledger_name, query_timeout, _measure=False):
def row_factory(cursor, row):
return {
k[0]: (set(row[i].split(',')) if k[0] == 'tags' else row[i])
for i, k in enumerate(cursor.getdescription())
}
def initializer(log, _path, _ledger_name, query_timeout, _measure=False, blocked_claims=None):
db = apsw.Connection(_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI)
def row_factory(cursor, row):
return {k[0]: row[i] for i, k in enumerate(cursor.getdescription())}
db.setrowtrace(row_factory)
ctx.set(
ReaderState(
db=db, stack=[], metrics={}, is_tracking_metrics=_measure,
ledger=Ledger if _ledger_name == 'mainnet' else RegTestLedger,
query_timeout=query_timeout, log=log
query_timeout=query_timeout, log=log,
blocked_claims={} if blocked_claims is None else blocked_claims
)
)
@ -159,11 +166,24 @@ def encode_result(result):
@measure
def execute_query(sql, values) -> List:
def execute_query(sql, values, row_limit, censor) -> List:
context = ctx.get()
context.set_query_timeout()
try:
return context.db.cursor().execute(sql, values).fetchall()
c = context.db.cursor()
def row_filter(cursor, row):
row = row_factory(cursor, row)
if len(row) > 1 and censor.censor(row):
return
return row
c.setrowtrace(row_filter)
i, rows = 0, []
for row in c.execute(sql, values):
i += 1
rows.append(row)
if i >= row_limit:
break
return rows
except apsw.Error as err:
plain_sql = interpolate(sql, values)
if context.is_tracking_metrics:
@ -243,34 +263,6 @@ def _get_claims(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
constraints['claim.channel_hash__in'] = [
unhexlify(cid)[::-1] for cid in channel_ids
]
if 'not_channel_ids' in constraints:
not_channel_ids = constraints.pop('not_channel_ids')
if not_channel_ids:
not_channel_ids_binary = [
unhexlify(ncid)[::-1] for ncid in not_channel_ids
]
if constraints.get('has_channel_signature', False):
constraints['claim.channel_hash__not_in'] = not_channel_ids_binary
else:
constraints['null_or_not_channel__or'] = {
'claim.signature_valid__is_null': True,
'claim.channel_hash__not_in': not_channel_ids_binary
}
if 'blocklist_channel_ids' in constraints:
blocklist_ids = constraints.pop('blocklist_channel_ids')
if blocklist_ids:
blocking_channels = [
unhexlify(channel_id)[::-1] for channel_id in blocklist_ids
]
constraints.update({
f'$blocking_channel{i}': a for i, a in enumerate(blocking_channels)
})
blocklist = ', '.join([
f':$blocking_channel{i}' for i in range(len(blocking_channels))
])
constraints['claim.claim_hash__not_in#blocklist_channel_ids'] = f"""
SELECT reposted_claim_hash FROM claim WHERE channel_hash IN ({blocklist})
"""
if 'signature_valid' in constraints:
has_channel_signature = constraints.pop('has_channel_signature', False)
if has_channel_signature:
@ -319,16 +311,23 @@ def _get_claims(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
return query(select, **constraints)
def get_claims(cols, for_count=False, **constraints) -> List:
def get_claims(cols, for_count=False, **constraints) -> Tuple[List, Censor]:
if 'channel' in constraints:
channel_url = constraints.pop('channel')
match = resolve_url(channel_url)
if isinstance(match, dict):
constraints['channel_hash'] = match['claim_hash']
else:
return [{'row_count': 0}] if cols == 'count(*) as row_count' else []
return ([{'row_count': 0}] if cols == 'count(*) as row_count' else []), Censor()
censor = Censor(
ctx.get().blocked_claims,
{unhexlify(ncid)[::-1] for ncid in constraints.pop('not_channel_ids', [])},
set(constraints.pop('not_tags', {}))
)
row_limit = constraints.pop('limit', 20)
constraints['limit'] = 1000
sql, values = _get_claims(cols, for_count, **constraints)
return execute_query(sql, values)
return execute_query(sql, values, row_limit, censor), censor
@measure
@ -336,11 +335,11 @@ def get_claims_count(**constraints) -> int:
constraints.pop('offset', None)
constraints.pop('limit', None)
constraints.pop('order_by', None)
count = get_claims('count(*) as row_count', for_count=True, **constraints)
count, _ = get_claims('count(*) as row_count', for_count=True, **constraints)
return count[0]['row_count']
def _search(**constraints):
def _search(**constraints) -> Tuple[List, Censor]:
return get_claims(
"""
claimtrie.claim_hash as is_controlling,
@ -354,7 +353,11 @@ def _search(**constraints):
claim.trending_local, claim.trending_global,
claim.short_url, claim.canonical_url,
claim.channel_hash, claim.reposted_claim_hash,
claim.signature_valid
claim.signature_valid,
COALESCE(
(SELECT group_concat(tag) FROM tag WHERE tag.claim_hash = claim.claim_hash),
""
) as tags
""", **constraints
)
@ -365,19 +368,19 @@ def _get_referenced_rows(txo_rows: List[dict]):
reposted_txos = []
if repost_hashes:
reposted_txos = _search(**{'claim.claim_hash__in': repost_hashes})
reposted_txos, _ = _search(**{'claim.claim_hash__in': repost_hashes})
channel_hashes |= set(filter(None, map(itemgetter('channel_hash'), reposted_txos)))
channel_txos = []
if channel_hashes:
channel_txos = _search(**{'claim.claim_hash__in': channel_hashes})
channel_txos, _ = _search(**{'claim.claim_hash__in': channel_hashes})
# channels must come first for client side inflation to work properly
return channel_txos + reposted_txos
@measure
def search(constraints) -> Tuple[List, List, int, int]:
def search(constraints) -> Tuple[List, List, int, int, Censor]:
assert set(constraints).issubset(SEARCH_PARAMS), \
f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}"
total = None
@ -387,9 +390,9 @@ def search(constraints) -> Tuple[List, List, int, int]:
constraints['limit'] = min(abs(constraints.get('limit', 10)), 50)
if 'order_by' not in constraints:
constraints['order_by'] = ["claim_hash"]
txo_rows = _search(**constraints)
txo_rows, censor = _search(**constraints)
extra_txo_rows = _get_referenced_rows(txo_rows)
return txo_rows, extra_txo_rows, constraints['offset'], total
return txo_rows, extra_txo_rows, constraints['offset'], total, censor
@measure
@ -415,7 +418,7 @@ def resolve_url(raw_url):
query['is_controlling'] = True
else:
query['order_by'] = ['^creation_height']
matches = _search(**query, limit=1)
matches, _ = _search(**query, limit=1)
if matches:
channel = matches[0]
else:
@ -433,7 +436,7 @@ def resolve_url(raw_url):
query['signature_valid'] = 1
elif set(query) == {'name'}:
query['is_controlling'] = 1
matches = _search(**query, limit=1)
matches, _ = _search(**query, limit=1)
if matches:
return matches[0]
else:
@ -445,10 +448,6 @@ def resolve_url(raw_url):
def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False):
any_items = set(cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
all_items = set(cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
not_items = set(cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
all_items = {item for item in all_items if item not in not_items}
any_items = {item for item in any_items if item not in not_items}
any_queries = {}
@ -526,23 +525,3 @@ def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_coun
AND {attr} IN ({values})
)
"""
if not_items:
constraints.update({
f'$not_{attr}{i}': item for i, item in enumerate(not_items)
})
values = ', '.join(
f':$not_{attr}{i}' for i in range(len(not_items))
)
if for_count:
constraints[f'claim.claim_hash__not_in#_not_{attr}'] = f"""
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
"""
else:
constraints[f'#_not_{attr}'] = f"""
NOT EXISTS(
SELECT 1 FROM {attr} WHERE
claim.claim_hash={attr}.claim_hash
AND {attr} IN ({values})
)
"""

View file

@ -4,8 +4,10 @@ from typing import Union, Tuple, Set, List
from itertools import chain
from decimal import Decimal
from collections import namedtuple
from multiprocessing import Manager
from binascii import unhexlify
from lbry.wallet.server.leveldb import DB
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.util import class_logger
from lbry.wallet.database import query, constraints_to_sql
@ -166,13 +168,18 @@ class SQLDB:
CREATE_TAG_TABLE
)
def __init__(self, main, path):
def __init__(self, main, path: str, filtering_channels: list):
self.main = main
self._db_path = path
self.db = None
self.state_manager = None
self.blocked_claims = None
self.logger = class_logger(__name__, self.__class__.__name__)
self.ledger = Ledger if self.main.coin.NET == 'mainnet' else RegTestLedger
self.ledger = Ledger if main.coin.NET == 'mainnet' else RegTestLedger
self._fts_synced = False
self.filtering_channel_hashes = {
unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id
}
def open(self):
self.db = apsw.Connection(
@ -192,10 +199,27 @@ class SQLDB:
self.execute(self.CREATE_TABLES_QUERY)
register_canonical_functions(self.db)
register_trending_functions(self.db)
self.state_manager = Manager()
self.blocked_claims = self.state_manager.dict()
self.update_blocked_claims()
def close(self):
if self.db is not None:
self.db.close()
if self.state_manager is not None:
self.state_manager.shutdown()
def update_blocked_claims(self):
sql = query(
"SELECT channel_hash, reposted_claim_hash FROM claim",
reposted_claim_hash__is_not_null=1,
channel_hash__in=self.filtering_channel_hashes
)
blocked_claims = {}
for blocked_claim in self.execute(*sql):
blocked_claims[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash
self.blocked_claims.clear()
self.blocked_claims.update(blocked_claims)
@staticmethod
def _insert_sql(table: str, data: dict) -> Tuple[str, list]:
@ -585,6 +609,12 @@ class SQLDB:
""", [(channel_hash,) for channel_hash in all_channel_keys.keys()])
sub_timer.stop()
sub_timer = timer.add_timer('update blocked claims list')
sub_timer.start()
if self.filtering_channel_hashes.intersection(all_channel_keys):
self.update_blocked_claims()
sub_timer.stop()
def _update_support_amount(self, claim_hashes):
if claim_hashes:
self.execute(f"""
@ -778,12 +808,13 @@ class SQLDB:
self._fts_synced = True
class LBRYDB(DB):
class LBRYLevelDB(LevelDB):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
path = os.path.join(self.env.db_dir, 'claims.db')
self.sql = SQLDB(self, path)
# space separated list of channel URIs used for filtering bad content
self.sql = SQLDB(self, path, self.env.default('FILTERING_CHANNELS_IDS', '').split(' '))
def close(self):
super().close()

View file

@ -47,7 +47,7 @@ class FlushData:
tip = attr.ib()
class DB:
class LevelDB:
"""Simple wrapper of the backend database for querying.
Performs no DB update, though the DB will be cleaned on opening if

View file

@ -23,7 +23,7 @@ from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import lbry
from lbry.wallet.server.block_processor import LBRYBlockProcessor
from lbry.wallet.server.db.writer import LBRYDB
from lbry.wallet.server.db.writer import LBRYLevelDB
from lbry.wallet.server.db import reader
from lbry.wallet.server.websocket import AdminWebSocket
from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics
@ -40,8 +40,6 @@ from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.peers import PeerManager
if typing.TYPE_CHECKING:
from lbry.wallet.server.env import Env
from lbry.wallet.server.leveldb import DB
from lbry.wallet.server.block_processor import BlockProcessor
from lbry.wallet.server.mempool import MemPool
from lbry.wallet.server.daemon import Daemon
@ -120,7 +118,7 @@ class SessionGroup:
class SessionManager:
"""Holds global state about all sessions."""
def __init__(self, env: 'Env', db: 'DB', bp: 'BlockProcessor', daemon: 'Daemon', mempool: 'MemPool',
def __init__(self, env: 'Env', db: LBRYLevelDB, bp: LBRYBlockProcessor, daemon: 'Daemon', mempool: 'MemPool',
shutdown_event: asyncio.Event):
env.max_send = max(350000, env.max_send)
self.env = env
@ -750,7 +748,7 @@ class LBRYSessionManager(SessionManager):
args = dict(
initializer=reader.initializer,
initargs=(self.logger, path, self.env.coin.NET, self.env.database_query_timeout,
self.env.track_metrics)
self.env.track_metrics, self.db.sql.blocked_claims)
)
if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
self.query_executor = ThreadPoolExecutor(max_workers=1, **args)
@ -793,10 +791,7 @@ class LBRYElectrumX(SessionBase):
# fixme: this is a rebase hack, we need to go through ChainState instead later
self.daemon = self.session_mgr.daemon
self.bp: LBRYBlockProcessor = self.session_mgr.bp
self.db: LBRYDB = self.bp.db
# space separated list of channel URIs used for filtering bad content
filtering_channels = self.env.default('FILTERING_CHANNELS_IDS', '')
self.filtering_channels_ids = list(filter(None, filtering_channels.split(' ')))
self.db: LBRYLevelDB = self.bp.db
@classmethod
def protocol_min_max_strings(cls):
@ -936,7 +931,6 @@ class LBRYElectrumX(SessionBase):
async def claimtrie_search(self, **kwargs):
if kwargs:
kwargs.setdefault('blocklist_channel_ids', []).extend(self.filtering_channels_ids)
return await self.run_and_cache_query('search', reader.search_to_bytes, kwargs)
async def claimtrie_resolve(self, *urls):

View file

@ -766,7 +766,7 @@ class StreamCommands(ClaimTestCase):
)
await self.ledger.stop()
await self.ledger.start()
filtered_claim_search = await self.claim_search(name='too_bad')
filtered_claim_search = await self.out(self.daemon.jsonrpc_claim_search(name='too_bad'))
self.assertEqual(filtered_claim_search, [])
filtered_claim_search = await self.claim_search(name='not_bad')
self.assertEqual(len(filtered_claim_search), 1)

View file

@ -3,8 +3,8 @@ import ecdsa
import hashlib
import logging
from binascii import hexlify
from lbry.wallet.constants import COIN, NULL_HASH32
from lbry.wallet.constants import COIN, NULL_HASH32
from lbry.schema.claim import Claim
from lbry.wallet.server.db import reader, writer
from lbry.wallet.server.coin import LBCRegTest
@ -36,10 +36,13 @@ class TestSQLDB(unittest.TestCase):
self.daemon_height = 1
self.coin = LBCRegTest()
db_url = 'file:test_sqldb?mode=memory&cache=shared'
self.sql = writer.SQLDB(self, db_url)
self.sql = writer.SQLDB(self, db_url, [])
self.addCleanup(self.sql.close)
self.sql.open()
reader.initializer(logging.getLogger(__name__), db_url, 'regtest', self.query_timeout)
reader.initializer(
logging.getLogger(__name__), db_url, 'regtest',
self.query_timeout, blocked_claims=self.sql.blocked_claims
)
self.addCleanup(reader.cleanup)
self.timer = Timer('BlockProcessor')
self._current_height = 0
@ -74,9 +77,9 @@ class TestSQLDB(unittest.TestCase):
Input.spend(channel)
)
def get_stream(self, title, amount, name='foo', channel=None):
def get_stream(self, title, amount, name='foo', channel=None, **kwargs):
claim = Claim()
claim.stream.title = title
claim.stream.update(title=title, **kwargs)
result = self._make_tx(Output.pay_claim_name_pubkey_hash(amount, name, claim, b'abc'))
if channel:
result[0].outputs[0].sign(channel)
@ -96,6 +99,14 @@ class TestSQLDB(unittest.TestCase):
result[0]._reset()
return result
def get_repost(self, claim_id, amount, channel):
claim = Claim()
claim.repost.reference.claim_id = claim_id
result = self._make_tx(Output.pay_claim_name_pubkey_hash(amount, 'repost', claim, b'abc'))
result[0].outputs[0].sign(channel)
result[0]._reset()
return result
def get_abandon(self, tx):
claim = Transaction(tx[0].raw).outputs[0]
return self._make_tx(
@ -319,7 +330,7 @@ class TestClaimtrie(TestSQLDB):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
advance(10, [stream, self.get_stream_update(stream, 11*COIN)])
self.assertTrue(reader._search())
self.assertTrue(reader._search()[0])
def test_double_updates_in_same_block(self):
advance, state = self.advance, self.state
@ -327,13 +338,13 @@ class TestClaimtrie(TestSQLDB):
advance(10, [stream])
update = self.get_stream_update(stream, 11*COIN)
advance(20, [update, self.get_stream_update(update, 9*COIN)])
self.assertTrue(reader._search())
self.assertTrue(reader._search()[0])
def test_create_and_abandon_in_same_block(self):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
advance(10, [stream, self.get_abandon(stream)])
self.assertFalse(reader._search())
self.assertFalse(reader._search()[0])
def test_update_and_abandon_in_same_block(self):
advance, state = self.advance, self.state
@ -341,14 +352,14 @@ class TestClaimtrie(TestSQLDB):
advance(10, [stream])
update = self.get_stream_update(stream, 11*COIN)
advance(20, [update, self.get_abandon(update)])
self.assertFalse(reader._search())
self.assertFalse(reader._search()[0])
def test_create_update_and_delete_in_same_block(self):
advance, state = self.advance, self.state
stream = self.get_stream('Claim A', 10*COIN)
update = self.get_stream_update(stream, 11*COIN)
advance(10, [stream, update, self.get_abandon(update)])
self.assertFalse(reader._search())
self.assertFalse(reader._search()[0])
def test_support_added_and_removed_in_same_block(self):
advance, state = self.advance, self.state
@ -356,7 +367,7 @@ class TestClaimtrie(TestSQLDB):
advance(10, [stream])
support = self.get_support(stream, COIN)
advance(20, [support, self.get_abandon(support)])
self.assertEqual(reader._search()[0]['support_amount'], 0)
self.assertEqual(reader._search()[0][0]['support_amount'], 0)
@staticmethod
def _get_x_with_claim_id_prefix(getter, prefix, cached_iteration=None, **kwargs):
@ -385,7 +396,7 @@ class TestClaimtrie(TestSQLDB):
txo_chan_ab = tx_chan_ab[0].outputs[0]
advance(1, [tx_chan_a])
advance(2, [tx_chan_ab])
r_ab, r_a = reader._search(order_by=['creation_height'], limit=2)
(r_ab, r_a), _ = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual("@foo#a", r_a['short_url'])
self.assertEqual("@foo#ab", r_ab['short_url'])
self.assertIsNone(r_a['canonical_url'])
@ -398,7 +409,7 @@ class TestClaimtrie(TestSQLDB):
tx_abc = self.get_stream_with_claim_id_prefix('abc', 65)
advance(3, [tx_a])
advance(4, [tx_ab, tx_abc])
r_abc, r_ab, r_a = reader._search(order_by=['creation_height', 'tx_position'], limit=3)
(r_abc, r_ab, r_a), _ = reader._search(order_by=['creation_height', 'tx_position'], limit=3)
self.assertEqual("foo#a", r_a['short_url'])
self.assertEqual("foo#ab", r_ab['short_url'])
self.assertEqual("foo#abc", r_abc['short_url'])
@ -412,51 +423,51 @@ class TestClaimtrie(TestSQLDB):
ab2_claim = tx_ab2[0].outputs[0]
advance(6, [tx_a2])
advance(7, [tx_ab2])
r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2)
(r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url'])
self.assertEqual("@foo#a/foo#a", r_a2['canonical_url'])
self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel'])
# change channel public key, invaliding stream claim signatures
advance(8, [self.get_channel_update(txo_chan_a, COIN, key=b'a')])
r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2)
(r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url'])
self.assertIsNone(r_a2['canonical_url'])
self.assertIsNone(r_ab2['canonical_url'])
self.assertEqual(0, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(0, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel'])
# reinstate previous channel public key (previous stream claim signatures become valid again)
channel_update = self.get_channel_update(txo_chan_a, COIN, key=b'c')
advance(9, [channel_update])
r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2)
(r_ab2, r_a2), _ = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url'])
self.assertEqual("@foo#a/foo#a", r_a2['canonical_url'])
self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(0, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel'])
self.assertEqual(0, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0][0]['claims_in_channel'])
# change channel of stream
self.assertEqual("@foo#a/foo#ab", reader._search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url'])
self.assertEqual("@foo#a/foo#ab", reader._search(claim_id=ab2_claim.claim_id, limit=1)[0][0]['canonical_url'])
tx_ab2 = self.get_stream_update(tx_ab2, COIN, txo_chan_ab)
advance(10, [tx_ab2])
self.assertEqual("@foo#ab/foo#a", reader._search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url'])
self.assertEqual("@foo#ab/foo#a", reader._search(claim_id=ab2_claim.claim_id, limit=1)[0][0]['canonical_url'])
# TODO: currently there is a bug where stream leaving a channel does not update that channels claims count
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel'])
# TODO: after bug is fixed remove test above and add test below
#self.assertEqual(1, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(1, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel'])
#self.assertEqual(1, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0][0]['claims_in_channel'])
self.assertEqual(1, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0][0]['claims_in_channel'])
# claim abandon updates claims_in_channel
advance(11, [self.get_abandon(tx_ab2)])
self.assertEqual(0, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel'])
self.assertEqual(0, reader._search(claim_id=txo_chan_ab.claim_id, limit=1)[0][0]['claims_in_channel'])
# delete channel, invaliding stream claim signatures
advance(12, [self.get_abandon(channel_update)])
r_a2, = reader._search(order_by=['creation_height'], limit=1)
(r_a2,), _ = reader._search(order_by=['creation_height'], limit=1)
self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url'])
self.assertIsNone(r_a2['canonical_url'])
@ -514,7 +525,7 @@ class TestTrending(TestSQLDB):
self.get_support(up_medium, (20+(window*(2 if window == 7 else 1)))*COIN),
self.get_support(up_biggly, (20+(window*(3 if window == 7 else 1)))*COIN),
])
results = reader._search(order_by=['trending_local'])
results, _ = reader._search(order_by=['trending_local'])
self.assertEqual([c.claim_id for c in claims], [hexlify(c['claim_hash'][::-1]).decode() for c in results])
self.assertEqual([10, 6, 2, 0, -2], [int(c['trending_local']) for c in results])
self.assertEqual([53, 38, -32, 0, -6], [int(c['trending_global']) for c in results])
@ -526,3 +537,60 @@ class TestTrending(TestSQLDB):
self.advance(1, [problematic])
self.advance(TRENDING_WINDOW, [self.get_support(problematic, 53000000000)])
self.advance(TRENDING_WINDOW * 2, [self.get_support(problematic, 500000000)])
class TestContentBlocking(TestSQLDB):
def test_blocking(self):
tx0 = self.get_channel('A Channel', COIN)
a_channel = tx0[0].outputs[0]
tx1 = self.get_stream('Claim One', COIN)
tx2 = self.get_stream('Claim Two', COIN, tags=["mature"], channel=a_channel)
self.advance(1, [tx0, tx1, tx2])
claim1, claim2 = tx1[0].outputs[0], tx2[0].outputs[0]
# nothing blocked
results, censor = reader._search(text='Claim')
self.assertEqual(2, len(results))
self.assertEqual(0, censor.total)
self.assertEqual({}, dict(self.sql.blocked_claims))
# block claim reposted to blocking channel
tx = self.get_channel('Blocking Channel', COIN)
channel = tx[0].outputs[0]
self.sql.filtering_channel_hashes.add(channel.claim_hash)
self.advance(2, [tx])
self.assertEqual({}, dict(self.sql.blocked_claims))
tx = self.get_repost(claim1.claim_id, COIN, channel)
reposting_claim = tx[0].outputs[0]
self.advance(3, [tx])
self.assertEqual(
{reposting_claim.claim.repost.reference.claim_hash: channel.claim_hash},
dict(self.sql.blocked_claims)
)
# claim is blocked from results by repost
results, censor = reader._search(text='Claim')
self.assertEqual(1, len(results))
self.assertEqual(claim2.claim_hash, results[0]['claim_hash'])
self.assertEqual(1, censor.total)
self.assertEqual({channel.claim_hash: 1}, censor.blocked_claims)
self.assertEqual({}, censor.blocked_channels)
self.assertEqual({}, censor.blocked_tags)
# claim is blocked from results by repost and tags
results, censor = reader._search(text='Claim', not_tags=["mature"])
self.assertEqual(0, len(results))
self.assertEqual(2, censor.total)
self.assertEqual({channel.claim_hash: 1}, censor.blocked_claims)
self.assertEqual({}, censor.blocked_channels)
self.assertEqual({"mature": 1}, censor.blocked_tags)
# claim is blocked from results by repost and channel
results, censor = reader._search(text='Claim', not_channel_ids=[a_channel.claim_id])
self.assertEqual(0, len(results))
self.assertEqual(2, censor.total)
self.assertEqual({channel.claim_hash: 1}, censor.blocked_claims)
self.assertEqual({a_channel.claim_hash: 1}, censor.blocked_channels)
self.assertEqual({}, censor.blocked_tags)