lbry-sdk/lbry/wallet/server/db/writer.py

991 lines
43 KiB
Python
Raw Normal View History

2019-08-26 23:23:43 +02:00
import os
2019-12-08 00:13:13 +01:00
import apsw
2019-03-31 00:40:01 +01:00
from typing import Union, Tuple, Set, List
2019-05-26 05:06:22 +02:00
from itertools import chain
from decimal import Decimal
2019-12-08 00:13:13 +01:00
from collections import namedtuple
from multiprocessing import Manager
from binascii import unhexlify, hexlify
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.util import class_logger
2020-01-03 04:18:49 +01:00
from lbry.wallet.database import query, constraints_to_sql
2019-03-31 00:40:01 +01:00
from lbry.schema.tags import clean_tags
2019-06-21 02:55:47 +02:00
from lbry.schema.mime_types import guess_stream_type
2020-01-03 04:18:49 +01:00
from lbry.wallet import Ledger, RegTestLedger
2019-06-21 02:55:47 +02:00
from lbry.wallet.transaction import Transaction, Output
from lbry.wallet.server.db.canonical import register_canonical_functions
from lbry.wallet.server.db.trending import TRENDING_ALGORITHMS
2019-03-31 00:40:01 +01:00
2020-09-24 19:00:18 +02:00
from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES
from lbry.wallet.server.db.elasticsearch import SearchIndex
2019-03-31 00:40:01 +01:00
ATTRIBUTE_ARRAY_MAX_LENGTH = 100
2019-03-31 00:40:01 +01:00
class SQLDB:
PRAGMAS = """
pragma journal_mode=WAL;
"""
CREATE_CLAIM_TABLE = """
create table if not exists claim (
claim_hash bytes primary key,
claim_id text not null,
2019-04-29 06:38:58 +02:00
claim_name text not null,
normalized text not null,
2019-03-31 00:40:01 +01:00
txo_hash bytes not null,
2019-04-29 06:38:58 +02:00
tx_position integer not null,
amount integer not null,
2019-05-26 05:06:22 +02:00
timestamp integer not null, -- last updated timestamp
creation_timestamp integer not null,
height integer not null, -- last updated height
creation_height integer not null,
2019-04-29 06:38:58 +02:00
activation_height integer,
expiration_height integer not null,
release_time integer not null,
2019-05-26 05:06:22 +02:00
short_url text not null, -- normalized#shortest-unique-claim_id
canonical_url text, -- channel's-short_url/normalized#shortest-unique-claim_id-within-channel
2019-11-14 20:31:49 +01:00
title text,
author text,
description text,
claim_type integer,
has_source bool,
reposted integer default 0,
-- streams
stream_type text,
media_type text,
fee_amount integer default 0,
fee_currency text,
2020-01-29 21:41:40 +01:00
duration integer,
-- reposts
reposted_claim_hash bytes,
2019-05-26 05:06:22 +02:00
-- claims which are channels
public_key_bytes bytes,
2019-06-03 22:37:21 +02:00
public_key_hash bytes,
2019-05-26 05:06:22 +02:00
claims_in_channel integer,
-- claims which are inside channels
channel_hash bytes,
2019-05-26 05:06:22 +02:00
channel_join integer, -- height at which claim got valid signature / joined channel
signature bytes,
signature_digest bytes,
signature_valid bool,
2019-03-31 00:40:01 +01:00
effective_amount integer not null default 0,
support_amount integer not null default 0,
trending_group integer not null default 0,
trending_mixed integer not null default 0,
trending_local integer not null default 0,
trending_global integer not null default 0
2019-03-31 00:40:01 +01:00
);
create index if not exists claim_normalized_idx on claim (normalized, activation_height);
create index if not exists claim_channel_hash_idx on claim (channel_hash, signature, claim_hash);
create index if not exists claim_claims_in_channel_idx on claim (signature_valid, channel_hash, normalized);
2019-03-31 00:40:01 +01:00
create index if not exists claim_txo_hash_idx on claim (txo_hash);
create index if not exists claim_activation_height_idx on claim (activation_height, claim_hash);
create index if not exists claim_expiration_height_idx on claim (expiration_height);
create index if not exists claim_reposted_claim_hash_idx on claim (reposted_claim_hash);
"""
2019-03-31 00:40:01 +01:00
CREATE_SUPPORT_TABLE = """
create table if not exists support (
txo_hash bytes primary key,
2019-04-29 06:38:58 +02:00
tx_position integer not null,
2019-03-31 00:40:01 +01:00
height integer not null,
2019-04-29 06:38:58 +02:00
claim_hash bytes not null,
amount integer not null
2019-03-31 00:40:01 +01:00
);
create index if not exists support_claim_hash_idx on support (claim_hash, height);
"""
CREATE_TAG_TABLE = """
create table if not exists tag (
tag text not null,
claim_hash bytes not null,
2019-03-31 00:40:01 +01:00
height integer not null
);
2019-07-16 02:43:30 +02:00
create unique index if not exists tag_claim_hash_tag_idx on tag (claim_hash, tag);
2019-03-31 00:40:01 +01:00
"""
2020-09-24 19:00:18 +02:00
CREATE_LANGUAGE_TABLE = """
create table if not exists language (
language text not null,
claim_hash bytes not null,
height integer not null
);
create unique index if not exists language_claim_hash_language_idx on language (claim_hash, language);
"""
2019-03-31 00:40:01 +01:00
CREATE_CLAIMTRIE_TABLE = """
create table if not exists claimtrie (
2019-04-29 06:38:58 +02:00
normalized text primary key,
2019-03-31 00:40:01 +01:00
claim_hash bytes not null,
last_take_over_height integer not null
);
create index if not exists claimtrie_claim_hash_idx on claimtrie (claim_hash);
"""
2021-01-31 21:36:26 +01:00
CREATE_CHANGELOG_TRIGGER = """
create table if not exists changelog (
claim_hash bytes primary key
);
create index if not exists claimtrie_claim_hash_idx on claimtrie (claim_hash);
create trigger if not exists claim_changelog after update on claim
begin
insert or ignore into changelog (claim_hash) values (new.claim_hash);
end;
create trigger if not exists claimtrie_changelog after update on claimtrie
begin
insert or ignore into changelog (claim_hash) values (new.claim_hash);
insert or ignore into changelog (claim_hash) values (old.claim_hash);
end;
2021-01-31 21:36:26 +01:00
"""
SEARCH_INDEXES = """
-- used by any tag clouds
create index if not exists tag_tag_idx on tag (tag, claim_hash);
2020-02-17 22:56:20 +01:00
-- naked order bys (no filters)
create unique index if not exists claim_release_idx on claim (release_time, claim_hash);
create unique index if not exists claim_trending_idx on claim (trending_group, trending_mixed, claim_hash);
create unique index if not exists claim_effective_amount_idx on claim (effective_amount, claim_hash);
2020-02-17 22:56:20 +01:00
-- claim_type filter + order by
create unique index if not exists claim_type_release_idx on claim (release_time, claim_type, claim_hash);
create unique index if not exists claim_type_trending_idx on claim (trending_group, trending_mixed, claim_type, claim_hash);
create unique index if not exists claim_type_effective_amount_idx on claim (effective_amount, claim_type, claim_hash);
2019-07-29 20:11:12 +02:00
2020-02-17 22:56:20 +01:00
-- stream_type filter + order by
create unique index if not exists stream_type_release_idx on claim (stream_type, release_time, claim_hash);
create unique index if not exists stream_type_trending_idx on claim (stream_type, trending_group, trending_mixed, claim_hash);
create unique index if not exists stream_type_effective_amount_idx on claim (stream_type, effective_amount, claim_hash);
-- channel_hash filter + order by
create unique index if not exists channel_hash_release_idx on claim (channel_hash, release_time, claim_hash);
create unique index if not exists channel_hash_trending_idx on claim (channel_hash, trending_group, trending_mixed, claim_hash);
create unique index if not exists channel_hash_effective_amount_idx on claim (channel_hash, effective_amount, claim_hash);
-- duration filter + order by
create unique index if not exists duration_release_idx on claim (duration, release_time, claim_hash);
create unique index if not exists duration_trending_idx on claim (duration, trending_group, trending_mixed, claim_hash);
create unique index if not exists duration_effective_amount_idx on claim (duration, effective_amount, claim_hash);
2020-02-17 23:12:53 +01:00
-- fee_amount + order by
create unique index if not exists fee_amount_release_idx on claim (fee_amount, release_time, claim_hash);
create unique index if not exists fee_amount_trending_idx on claim (fee_amount, trending_group, trending_mixed, claim_hash);
create unique index if not exists fee_amount_effective_amount_idx on claim (fee_amount, effective_amount, claim_hash);
-- TODO: verify that all indexes below are used
create index if not exists claim_height_normalized_idx on claim (height, normalized asc);
create index if not exists claim_resolve_idx on claim (normalized, claim_id);
create index if not exists claim_id_idx on claim (claim_id, claim_hash);
create index if not exists claim_timestamp_idx on claim (timestamp);
create index if not exists claim_public_key_hash_idx on claim (public_key_hash);
create index if not exists claim_signature_valid_idx on claim (signature_valid);
"""
TAG_INDEXES = '\n'.join(
f"create unique index if not exists tag_{tag_key}_idx on tag (tag, claim_hash) WHERE tag='{tag_value}';"
for tag_value, tag_key in COMMON_TAGS.items()
)
2020-09-24 19:00:18 +02:00
LANGUAGE_INDEXES = '\n'.join(
f"create unique index if not exists language_{language}_idx on language (language, claim_hash) WHERE language='{language}';"
for language in INDEXED_LANGUAGES
)
2019-03-31 00:40:01 +01:00
CREATE_TABLES_QUERY = (
CREATE_CLAIM_TABLE +
CREATE_SUPPORT_TABLE +
CREATE_CLAIMTRIE_TABLE +
2020-09-24 19:00:18 +02:00
CREATE_TAG_TABLE +
2021-01-31 21:36:26 +01:00
CREATE_CHANGELOG_TRIGGER +
2020-09-24 19:00:18 +02:00
CREATE_LANGUAGE_TABLE
2019-03-31 00:40:01 +01:00
)
def __init__(
self, main, path: str, blocking_channels: list, filtering_channels: list, trending: list):
self.main = main
2019-03-31 00:40:01 +01:00
self._db_path = path
self.db = None
self.logger = class_logger(__name__, self.__class__.__name__)
self.ledger = Ledger if main.coin.NET == 'mainnet' else RegTestLedger
self.state_manager = None
self.blocked_streams = None
self.blocked_channels = None
self.blocking_channel_hashes = {
unhexlify(channel_id)[::-1] for channel_id in blocking_channels if channel_id
}
self.filtered_streams = None
self.filtered_channels = None
self.filtering_channel_hashes = {
unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id
}
self.trending = trending
2021-03-05 07:16:40 +01:00
self.pending_deletes = set()
2019-03-31 00:40:01 +01:00
def open(self):
2019-12-08 00:13:13 +01:00
self.db = apsw.Connection(
self._db_path,
flags=(
apsw.SQLITE_OPEN_READWRITE |
apsw.SQLITE_OPEN_CREATE |
apsw.SQLITE_OPEN_URI
)
)
def exec_factory(cursor, statement, bindings):
tpl = namedtuple('row', (d[0] for d in cursor.getdescription()))
cursor.setrowtrace(lambda cursor, row: tpl(*row))
return True
self.db.setexectrace(exec_factory)
2019-12-21 09:44:39 +01:00
self.execute(self.PRAGMAS)
2019-12-08 00:13:13 +01:00
self.execute(self.CREATE_TABLES_QUERY)
register_canonical_functions(self.db)
self.state_manager = Manager()
self.blocked_streams = self.state_manager.dict()
self.blocked_channels = self.state_manager.dict()
self.filtered_streams = self.state_manager.dict()
self.filtered_channels = self.state_manager.dict()
self.update_blocked_and_filtered_claims()
for algorithm in self.trending:
algorithm.install(self.db)
2019-03-31 00:40:01 +01:00
def close(self):
2019-12-08 00:13:13 +01:00
if self.db is not None:
self.db.close()
if self.state_manager is not None:
self.state_manager.shutdown()
def update_blocked_and_filtered_claims(self):
self.update_claims_from_channel_hashes(
self.blocked_streams, self.blocked_channels, self.blocking_channel_hashes
)
self.update_claims_from_channel_hashes(
self.filtered_streams, self.filtered_channels, self.filtering_channel_hashes
)
self.filtered_streams.update(self.blocked_streams)
self.filtered_channels.update(self.blocked_channels)
def update_claims_from_channel_hashes(self, shared_streams, shared_channels, channel_hashes):
streams, channels = {}, {}
if channel_hashes:
sql = query(
"SELECT repost.channel_hash, repost.reposted_claim_hash, target.claim_type "
"FROM claim as repost JOIN claim AS target ON (target.claim_hash=repost.reposted_claim_hash)", **{
'repost.reposted_claim_hash__is_not_null': 1,
'repost.channel_hash__in': channel_hashes
}
)
for blocked_claim in self.execute(*sql):
if blocked_claim.claim_type == CLAIM_TYPES['stream']:
streams[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash
elif blocked_claim.claim_type == CLAIM_TYPES['channel']:
channels[blocked_claim.reposted_claim_hash] = blocked_claim.channel_hash
shared_streams.clear()
shared_streams.update(streams)
shared_channels.clear()
shared_channels.update(channels)
2019-03-31 00:40:01 +01:00
@staticmethod
def _insert_sql(table: str, data: dict) -> Tuple[str, list]:
columns, values = [], []
for column, value in data.items():
columns.append(column)
values.append(value)
sql = (
f"INSERT INTO {table} ({', '.join(columns)}) "
f"VALUES ({', '.join(['?'] * len(values))})"
)
return sql, values
@staticmethod
def _update_sql(table: str, data: dict, where: str,
constraints: Union[list, tuple]) -> Tuple[str, list]:
columns, values = [], []
for column, value in data.items():
columns.append(f"{column} = ?")
2019-03-31 00:40:01 +01:00
values.append(value)
values.extend(constraints)
return f"UPDATE {table} SET {', '.join(columns)} WHERE {where}", values
@staticmethod
def _delete_sql(table: str, constraints: dict) -> Tuple[str, dict]:
where, values = constraints_to_sql(constraints)
return f"DELETE FROM {table} WHERE {where}", values
def execute(self, *args):
2019-12-08 00:13:13 +01:00
return self.db.cursor().execute(*args)
def executemany(self, *args):
return self.db.cursor().executemany(*args)
2019-03-31 00:40:01 +01:00
def begin(self):
self.execute('begin;')
def commit(self):
self.execute('commit;')
2019-05-26 05:06:22 +02:00
def _upsertable_claims(self, txos: List[Output], header, clear_first=False):
2020-09-24 19:00:18 +02:00
claim_hashes, claims, tags, languages = set(), [], {}, {}
2019-03-31 00:40:01 +01:00
for txo in txos:
tx = txo.tx_ref.tx
2019-04-29 06:38:58 +02:00
2019-03-31 00:40:01 +01:00
try:
assert txo.claim_name
2019-04-29 06:38:58 +02:00
assert txo.normalized_name
except:
#self.logger.exception(f"Could not decode claim name for {tx.id}:{txo.position}.")
2019-03-31 00:40:01 +01:00
continue
2019-04-29 06:38:58 +02:00
language = 'none'
2020-09-24 19:00:18 +02:00
try:
if txo.claim.is_stream and txo.claim.stream.languages:
language = txo.claim.stream.languages[0].language
except:
pass
2019-12-08 00:13:13 +01:00
claim_hash = txo.claim_hash
claim_hashes.add(claim_hash)
2019-04-29 06:38:58 +02:00
claim_record = {
'claim_hash': claim_hash,
'claim_id': txo.claim_id,
2019-04-29 06:38:58 +02:00
'claim_name': txo.claim_name,
'normalized': txo.normalized_name,
2019-12-08 00:13:13 +01:00
'txo_hash': txo.ref.hash,
2019-04-29 06:38:58 +02:00
'tx_position': tx.position,
'amount': txo.amount,
'timestamp': header['timestamp'],
'height': tx.height,
2019-11-14 20:31:49 +01:00
'title': None,
'description': None,
'author': None,
2020-01-29 21:41:40 +01:00
'duration': None,
'claim_type': None,
'has_source': False,
'stream_type': None,
'media_type': None,
'release_time': None,
'fee_currency': None,
2019-11-14 20:31:49 +01:00
'fee_amount': 0,
'reposted_claim_hash': None
2019-04-29 06:38:58 +02:00
}
claims.append(claim_record)
2019-03-31 00:40:01 +01:00
try:
claim = txo.claim
except:
#self.logger.exception(f"Could not parse claim protobuf for {tx.id}:{txo.position}.")
2019-03-31 00:40:01 +01:00
continue
2019-04-29 06:38:58 +02:00
if claim.is_stream:
claim_record['claim_type'] = CLAIM_TYPES['stream']
claim_record['has_source'] = claim.stream.has_source
claim_record['media_type'] = claim.stream.source.media_type
claim_record['stream_type'] = STREAM_TYPES[guess_stream_type(claim_record['media_type'])]
2019-11-14 20:31:49 +01:00
claim_record['title'] = claim.stream.title
claim_record['description'] = claim.stream.description
claim_record['author'] = claim.stream.author
2020-01-29 21:41:40 +01:00
if claim.stream.video and claim.stream.video.duration:
claim_record['duration'] = claim.stream.video.duration
if claim.stream.audio and claim.stream.audio.duration:
claim_record['duration'] = claim.stream.audio.duration
if claim.stream.release_time:
claim_record['release_time'] = claim.stream.release_time
if claim.stream.has_fee:
fee = claim.stream.fee
if isinstance(fee.currency, str):
claim_record['fee_currency'] = fee.currency.lower()
if isinstance(fee.amount, Decimal):
claim_record['fee_amount'] = int(fee.amount*1000)
elif claim.is_repost:
claim_record['claim_type'] = CLAIM_TYPES['repost']
claim_record['reposted_claim_hash'] = claim.repost.reference.claim_hash
elif claim.is_channel:
claim_record['claim_type'] = CLAIM_TYPES['channel']
elif claim.is_collection:
claim_record['claim_type'] = CLAIM_TYPES['collection']
languages[(language, claim_hash)] = (language, claim_hash, tx.height)
2020-09-24 19:00:18 +02:00
for tag in clean_tags(claim.message.tags):
2019-07-16 02:43:30 +02:00
tags[(tag, claim_hash)] = (tag, claim_hash, tx.height)
if clear_first:
self._clear_claim_metadata(claim_hashes)
2019-04-29 06:38:58 +02:00
2019-03-31 00:40:01 +01:00
if tags:
2019-12-08 00:13:13 +01:00
self.executemany(
2019-07-18 03:50:20 +02:00
"INSERT OR IGNORE INTO tag (tag, claim_hash, height) VALUES (?, ?, ?)", tags.values()
2019-03-31 00:40:01 +01:00
)
2020-09-24 19:00:18 +02:00
if languages:
self.executemany(
"INSERT OR IGNORE INTO language (language, claim_hash, height) VALUES (?, ?, ?)", languages.values()
)
2019-04-29 06:38:58 +02:00
2019-03-31 00:40:01 +01:00
return claims
2019-05-26 05:06:22 +02:00
def insert_claims(self, txos: List[Output], header):
claims = self._upsertable_claims(txos, header)
2019-03-31 00:40:01 +01:00
if claims:
2019-12-08 00:13:13 +01:00
self.executemany("""
2021-02-23 03:00:32 +01:00
INSERT OR REPLACE INTO claim (
2019-05-26 05:06:22 +02:00
claim_hash, claim_id, claim_name, normalized, txo_hash, tx_position, amount,
claim_type, media_type, stream_type, timestamp, creation_timestamp, has_source,
2020-01-29 21:41:40 +01:00
fee_currency, fee_amount, title, description, author, duration, height, reposted_claim_hash,
creation_height, release_time, activation_height, expiration_height, short_url)
2019-05-18 05:54:03 +02:00
VALUES (
2019-05-26 05:06:22 +02:00
:claim_hash, :claim_id, :claim_name, :normalized, :txo_hash, :tx_position, :amount,
:claim_type, :media_type, :stream_type, :timestamp, :timestamp, :has_source,
2020-01-29 21:41:40 +01:00
:fee_currency, :fee_amount, :title, :description, :author, :duration, :height, :reposted_claim_hash, :height,
CASE WHEN :release_time IS NOT NULL THEN :release_time ELSE :timestamp END,
CASE WHEN :normalized NOT IN (SELECT normalized FROM claimtrie) THEN :height END,
2019-06-05 03:42:15 +02:00
CASE WHEN :height >= 137181 THEN :height+2102400 ELSE :height+262974 END,
:claim_name||COALESCE(
(SELECT shortest_id(claim_id, :claim_id) FROM claim WHERE normalized = :normalized),
'#'||substr(:claim_id, 1, 1)
2019-05-26 05:06:22 +02:00
)
2019-05-18 05:54:03 +02:00
)""", claims)
2019-05-26 05:06:22 +02:00
def update_claims(self, txos: List[Output], header):
claims = self._upsertable_claims(txos, header, clear_first=True)
2019-03-31 00:40:01 +01:00
if claims:
2019-12-08 00:13:13 +01:00
self.executemany("""
2019-05-26 05:06:22 +02:00
UPDATE claim SET
txo_hash=:txo_hash, tx_position=:tx_position, amount=:amount, height=:height,
claim_type=:claim_type, media_type=:media_type, stream_type=:stream_type,
timestamp=:timestamp, fee_amount=:fee_amount, fee_currency=:fee_currency, has_source=:has_source,
2020-01-29 21:41:40 +01:00
title=:title, duration=:duration, description=:description, author=:author, reposted_claim_hash=:reposted_claim_hash,
2019-05-26 05:06:22 +02:00
release_time=CASE WHEN :release_time IS NOT NULL THEN :release_time ELSE release_time END
WHERE claim_hash=:claim_hash;
""", claims)
2019-03-31 00:40:01 +01:00
def delete_claims(self, claim_hashes: Set[bytes]):
2019-03-31 00:40:01 +01:00
""" Deletes claim supports and from claimtrie in case of an abandon. """
if claim_hashes:
affected_channels = self.execute(*query(
2019-12-08 00:13:13 +01:00
"SELECT channel_hash FROM claim", channel_hash__is_not_null=1, claim_hash__in=claim_hashes
)).fetchall()
2019-03-31 00:40:01 +01:00
for table in ('claim', 'support', 'claimtrie'):
2019-12-08 00:13:13 +01:00
self.execute(*self._delete_sql(table, {'claim_hash__in': claim_hashes}))
self._clear_claim_metadata(claim_hashes)
return {r.channel_hash for r in affected_channels}
return set()
2019-03-31 00:40:01 +01:00
def delete_claims_above_height(self, height: int):
claim_hashes = [x[0] for x in self.execute(
2020-04-05 22:58:36 +02:00
"SELECT claim_hash FROM claim WHERE height>?", (height, )
).fetchall()]
while claim_hashes:
batch = set(claim_hashes[:500])
claim_hashes = claim_hashes[500:]
self.delete_claims(batch)
2019-12-08 00:13:13 +01:00
def _clear_claim_metadata(self, claim_hashes: Set[bytes]):
if claim_hashes:
for table in ('tag',): # 'language', 'location', etc
2019-12-08 00:13:13 +01:00
self.execute(*self._delete_sql(table, {'claim_hash__in': claim_hashes}))
def split_inputs_into_claims_supports_and_other(self, txis):
txo_hashes = {txi.txo_ref.hash for txi in txis}
claims = self.execute(*query(
2019-12-08 00:13:13 +01:00
"SELECT txo_hash, claim_hash, normalized FROM claim", txo_hash__in=txo_hashes
)).fetchall()
2019-12-08 00:13:13 +01:00
txo_hashes -= {r.txo_hash for r in claims}
supports = {}
if txo_hashes:
supports = self.execute(*query(
2019-12-08 00:13:13 +01:00
"SELECT txo_hash, claim_hash FROM support", txo_hash__in=txo_hashes
)).fetchall()
2019-12-08 00:13:13 +01:00
txo_hashes -= {r.txo_hash for r in supports}
return claims, supports, txo_hashes
2019-03-31 00:40:01 +01:00
2019-05-26 05:06:22 +02:00
def insert_supports(self, txos: List[Output]):
2019-03-31 00:40:01 +01:00
supports = []
for txo in txos:
tx = txo.tx_ref.tx
supports.append((
2019-12-08 00:13:13 +01:00
txo.ref.hash, tx.position, tx.height,
txo.claim_hash, txo.amount
2019-03-31 00:40:01 +01:00
))
if supports:
2019-12-08 00:13:13 +01:00
self.executemany(
2019-06-23 16:56:59 +02:00
"INSERT OR IGNORE INTO support ("
2019-04-29 06:38:58 +02:00
" txo_hash, tx_position, height, claim_hash, amount"
") "
"VALUES (?, ?, ?, ?, ?)", supports
2019-03-31 00:40:01 +01:00
)
def delete_supports(self, txo_hashes: Set[bytes]):
2019-03-31 00:40:01 +01:00
if txo_hashes:
2019-12-08 00:13:13 +01:00
self.execute(*self._delete_sql('support', {'txo_hash__in': txo_hashes}))
2019-03-31 00:40:01 +01:00
def calculate_reposts(self, txos: List[Output]):
targets = set()
for txo in txos:
try:
claim = txo.claim
except:
continue
if claim.is_repost:
targets.add((claim.repost.reference.claim_hash,))
if targets:
2019-12-08 00:13:13 +01:00
self.executemany(
"""
UPDATE claim SET reposted = (
SELECT count(*) FROM claim AS repost WHERE repost.reposted_claim_hash = claim.claim_hash
)
WHERE claim_hash = ?
""", targets
)
2021-01-20 00:38:03 +01:00
return set(target[0] for target in targets)
def validate_channel_signatures(self, height, new_claims, updated_claims, spent_claims, affected_channels, timer):
2019-05-28 04:20:21 +02:00
if not new_claims and not updated_claims and not spent_claims:
2019-05-26 05:06:22 +02:00
return
sub_timer = timer.add_timer('segregate channels and signables')
sub_timer.start()
2019-05-26 05:06:22 +02:00
channels, new_channel_keys, signables = {}, {}, {}
for txo in chain(new_claims, updated_claims):
try:
claim = txo.claim
except:
continue
if claim.is_channel:
channels[txo.claim_hash] = txo
new_channel_keys[txo.claim_hash] = claim.channel.public_key_bytes
else:
signables[txo.claim_hash] = txo
sub_timer.stop()
2019-05-26 05:06:22 +02:00
sub_timer = timer.add_timer('make list of channels we need to lookup')
sub_timer.start()
2019-05-26 05:06:22 +02:00
missing_channel_keys = set()
for txo in signables.values():
claim = txo.claim
if claim.is_signed and claim.signing_channel_hash not in new_channel_keys:
missing_channel_keys.add(claim.signing_channel_hash)
sub_timer.stop()
2019-05-26 05:06:22 +02:00
sub_timer = timer.add_timer('lookup missing channels')
sub_timer.start()
2019-05-26 05:06:22 +02:00
all_channel_keys = {}
if new_channel_keys or missing_channel_keys or affected_channels:
2019-05-26 05:06:22 +02:00
all_channel_keys = dict(self.execute(*query(
"SELECT claim_hash, public_key_bytes FROM claim",
2019-12-08 00:13:13 +01:00
claim_hash__in=set(new_channel_keys) | missing_channel_keys | affected_channels
2019-05-26 05:06:22 +02:00
)))
sub_timer.stop()
2019-05-26 05:06:22 +02:00
sub_timer = timer.add_timer('prepare for updating claims')
sub_timer.start()
2019-05-26 05:06:22 +02:00
changed_channel_keys = {}
for claim_hash, new_key in new_channel_keys.items():
2019-05-26 05:50:37 +02:00
if claim_hash not in all_channel_keys or all_channel_keys[claim_hash] != new_key:
2019-05-26 05:06:22 +02:00
all_channel_keys[claim_hash] = new_key
changed_channel_keys[claim_hash] = new_key
claim_updates = []
for claim_hash, txo in signables.items():
claim = txo.claim
update = {
2019-12-08 00:13:13 +01:00
'claim_hash': claim_hash,
2019-05-26 05:06:22 +02:00
'channel_hash': None,
'signature': None,
'signature_digest': None,
'signature_valid': None
2019-05-26 05:06:22 +02:00
}
if claim.is_signed:
update.update({
2019-12-08 00:13:13 +01:00
'channel_hash': claim.signing_channel_hash,
'signature': txo.get_encoded_signature(),
'signature_digest': txo.get_signature_digest(self.ledger),
'signature_valid': 0
2019-05-26 05:06:22 +02:00
})
claim_updates.append(update)
sub_timer.stop()
2019-05-26 05:06:22 +02:00
sub_timer = timer.add_timer('find claims affected by a change in channel key')
sub_timer.start()
2019-05-26 05:06:22 +02:00
if changed_channel_keys:
sql = f"""
SELECT * FROM claim WHERE
channel_hash IN ({','.join('?' for _ in changed_channel_keys)}) AND
signature IS NOT NULL
"""
2019-12-08 00:25:13 +01:00
for affected_claim in self.execute(sql, changed_channel_keys.keys()):
2019-12-08 00:13:13 +01:00
if affected_claim.claim_hash not in signables:
2019-05-26 05:06:22 +02:00
claim_updates.append({
2019-12-08 00:13:13 +01:00
'claim_hash': affected_claim.claim_hash,
'channel_hash': affected_claim.channel_hash,
'signature': affected_claim.signature,
'signature_digest': affected_claim.signature_digest,
'signature_valid': 0
2019-05-26 05:06:22 +02:00
})
sub_timer.stop()
2019-05-26 05:06:22 +02:00
sub_timer = timer.add_timer('verify signatures')
sub_timer.start()
2019-05-26 05:06:22 +02:00
for update in claim_updates:
channel_pub_key = all_channel_keys.get(update['channel_hash'])
if channel_pub_key and update['signature']:
update['signature_valid'] = Output.is_signature_valid(
2019-05-26 05:06:22 +02:00
bytes(update['signature']), bytes(update['signature_digest']), channel_pub_key
)
sub_timer.stop()
2019-05-26 05:06:22 +02:00
sub_timer = timer.add_timer('update claims')
sub_timer.start()
2019-05-26 05:06:22 +02:00
if claim_updates:
2019-12-08 00:13:13 +01:00
self.executemany(f"""
2019-05-26 05:06:22 +02:00
UPDATE claim SET
channel_hash=:channel_hash, signature=:signature, signature_digest=:signature_digest,
signature_valid=:signature_valid,
2019-05-26 05:06:22 +02:00
channel_join=CASE
WHEN signature_valid=1 AND :signature_valid=1 AND channel_hash=:channel_hash THEN channel_join
WHEN :signature_valid=1 THEN {height}
2019-05-26 05:06:22 +02:00
END,
canonical_url=CASE
WHEN signature_valid=1 AND :signature_valid=1 AND channel_hash=:channel_hash THEN canonical_url
WHEN :signature_valid=1 THEN
2019-05-26 05:06:22 +02:00
(SELECT short_url FROM claim WHERE claim_hash=:channel_hash)||'/'||
claim_name||COALESCE(
2019-05-26 05:06:22 +02:00
(SELECT shortest_id(other_claim.claim_id, claim.claim_id) FROM claim AS other_claim
WHERE other_claim.signature_valid = 1 AND
2019-05-26 05:06:22 +02:00
other_claim.channel_hash = :channel_hash AND
other_claim.normalized = claim.normalized),
2019-05-26 05:06:22 +02:00
'#'||substr(claim_id, 1, 1)
)
END
WHERE claim_hash=:claim_hash;
""", claim_updates)
sub_timer.stop()
2019-05-26 05:06:22 +02:00
sub_timer = timer.add_timer('update claims affected by spent channels')
sub_timer.start()
2019-05-28 04:20:21 +02:00
if spent_claims:
self.execute(
f"""
UPDATE claim SET
signature_valid=CASE WHEN signature IS NOT NULL THEN 0 END,
channel_join=NULL, canonical_url=NULL
2019-05-28 04:20:21 +02:00
WHERE channel_hash IN ({','.join('?' for _ in spent_claims)})
2019-12-08 00:13:13 +01:00
""", spent_claims
2019-05-28 04:20:21 +02:00
)
sub_timer.stop()
2019-05-28 04:20:21 +02:00
sub_timer = timer.add_timer('update channels')
sub_timer.start()
2019-05-26 05:06:22 +02:00
if channels:
2019-12-08 00:13:13 +01:00
self.executemany(
2019-06-03 22:37:21 +02:00
"""
UPDATE claim SET
public_key_bytes=:public_key_bytes,
public_key_hash=:public_key_hash
WHERE claim_hash=:claim_hash""", [{
2019-12-08 00:13:13 +01:00
'claim_hash': claim_hash,
'public_key_bytes': txo.claim.channel.public_key_bytes,
'public_key_hash': self.ledger.address_to_hash160(
self.ledger.public_key_to_address(txo.claim.channel.public_key_bytes)
)
2019-05-26 05:06:22 +02:00
} for claim_hash, txo in channels.items()]
)
sub_timer.stop()
2019-05-26 05:06:22 +02:00
sub_timer = timer.add_timer('update claims_in_channel counts')
sub_timer.start()
2019-05-26 05:06:22 +02:00
if all_channel_keys:
2019-12-08 00:13:13 +01:00
self.executemany(f"""
2019-05-26 05:06:22 +02:00
UPDATE claim SET
claims_in_channel=(
SELECT COUNT(*) FROM claim AS claim_in_channel
WHERE claim_in_channel.signature_valid=1 AND
claim_in_channel.channel_hash=claim.claim_hash
2019-05-26 05:06:22 +02:00
)
WHERE claim_hash = ?
""", [(channel_hash,) for channel_hash in all_channel_keys])
sub_timer.stop()
2019-05-26 05:06:22 +02:00
sub_timer = timer.add_timer('update blocked claims list')
sub_timer.start()
if (self.blocking_channel_hashes.intersection(all_channel_keys) or
self.filtering_channel_hashes.intersection(all_channel_keys)):
self.update_blocked_and_filtered_claims()
sub_timer.stop()
def _update_support_amount(self, claim_hashes):
if claim_hashes:
self.execute(f"""
UPDATE claim SET
support_amount = COALESCE(
(SELECT SUM(amount) FROM support WHERE support.claim_hash=claim.claim_hash), 0
)
WHERE claim_hash IN ({','.join('?' for _ in claim_hashes)})
2019-05-16 07:34:18 +02:00
""", claim_hashes)
2019-05-18 05:54:03 +02:00
def _update_effective_amount(self, height, claim_hashes=None):
self.execute(
f"UPDATE claim SET effective_amount = amount + support_amount "
f"WHERE activation_height = {height}"
)
if claim_hashes:
self.execute(
f"UPDATE claim SET effective_amount = amount + support_amount "
f"WHERE activation_height < {height} "
f" AND claim_hash IN ({','.join('?' for _ in claim_hashes)})",
claim_hashes
)
2019-05-16 07:34:18 +02:00
2019-05-18 05:54:03 +02:00
def _calculate_activation_height(self, height):
last_take_over_height = f"""COALESCE(
(SELECT last_take_over_height FROM claimtrie
WHERE claimtrie.normalized=claim.normalized),
{height}
)
"""
2019-05-16 07:34:18 +02:00
self.execute(f"""
UPDATE claim SET activation_height =
2019-05-18 05:54:03 +02:00
{height} + min(4032, cast(({height} - {last_take_over_height}) / 32 AS INT))
2019-05-16 07:34:18 +02:00
WHERE activation_height IS NULL
""")
2019-03-31 00:40:01 +01:00
def _perform_overtake(self, height, changed_claim_hashes, deleted_names):
deleted_names_sql = claim_hashes_sql = ""
if changed_claim_hashes:
claim_hashes_sql = f"OR claim_hash IN ({','.join('?' for _ in changed_claim_hashes)})"
if deleted_names:
deleted_names_sql = f"OR normalized IN ({','.join('?' for _ in deleted_names)})"
2019-05-16 07:34:18 +02:00
overtakes = self.execute(f"""
SELECT winner.normalized, winner.claim_hash,
claimtrie.claim_hash AS current_winner,
2019-12-08 00:13:13 +01:00
MAX(winner.effective_amount) AS max_winner_effective_amount
FROM (
SELECT normalized, claim_hash, effective_amount FROM claim
WHERE normalized IN (
SELECT normalized FROM claim WHERE activation_height={height} {claim_hashes_sql}
) {deleted_names_sql}
ORDER BY effective_amount DESC, height ASC, tx_position ASC
2019-05-18 05:54:03 +02:00
) AS winner LEFT JOIN claimtrie USING (normalized)
GROUP BY winner.normalized
HAVING current_winner IS NULL OR current_winner <> winner.claim_hash
2019-12-08 00:25:13 +01:00
""", list(changed_claim_hashes)+deleted_names)
2019-05-16 07:34:18 +02:00
for overtake in overtakes:
2019-12-08 00:13:13 +01:00
if overtake.current_winner:
2019-05-18 05:54:03 +02:00
self.execute(
f"UPDATE claimtrie SET claim_hash = ?, last_take_over_height = {height} "
f"WHERE normalized = ?",
2019-12-08 00:13:13 +01:00
(overtake.claim_hash, overtake.normalized)
2019-05-18 05:54:03 +02:00
)
else:
self.execute(
f"INSERT INTO claimtrie (claim_hash, normalized, last_take_over_height) "
f"VALUES (?, ?, {height})",
2019-12-08 00:13:13 +01:00
(overtake.claim_hash, overtake.normalized)
2019-05-18 05:54:03 +02:00
)
2019-03-31 00:40:01 +01:00
self.execute(
2019-05-06 01:16:17 +02:00
f"UPDATE claim SET activation_height = {height} WHERE normalized = ? "
f"AND (activation_height IS NULL OR activation_height > {height})",
2019-12-08 00:13:13 +01:00
(overtake.normalized,)
2019-03-31 00:40:01 +01:00
)
2019-05-15 17:37:53 +02:00
def _copy(self, height):
if height > 50:
self.execute(f"DROP TABLE claimtrie{height-50}")
self.execute(f"CREATE TABLE claimtrie{height} AS SELECT * FROM claimtrie")
def update_claimtrie(self, height, changed_claim_hashes, deleted_names, timer):
2019-05-19 21:57:39 +02:00
r = timer.run
2019-05-18 05:54:03 +02:00
r(self._calculate_activation_height, height)
2019-12-08 00:13:13 +01:00
r(self._update_support_amount, changed_claim_hashes)
2019-05-16 07:34:18 +02:00
2019-12-08 00:13:13 +01:00
r(self._update_effective_amount, height, changed_claim_hashes)
r(self._perform_overtake, height, changed_claim_hashes, list(deleted_names))
2019-05-16 07:34:18 +02:00
2019-05-18 05:54:03 +02:00
r(self._update_effective_amount, height)
r(self._perform_overtake, height, [], [])
2019-03-31 00:40:01 +01:00
def get_expiring(self, height):
return self.execute(
f"SELECT claim_hash, normalized FROM claim WHERE expiration_height = {height}"
)
2021-01-31 21:36:26 +01:00
def enqueue_changes(self):
2021-01-17 09:40:39 +01:00
for claim in self.execute(f"""
SELECT claimtrie.claim_hash as is_controlling,
claimtrie.last_take_over_height,
2021-01-27 02:33:17 +01:00
(select group_concat(tag, ',,') from tag where tag.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as tags,
2021-01-20 05:20:50 +01:00
(select group_concat(language, ' ') from language where language.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as languages,
2021-03-25 08:46:21 +01:00
(select cr.has_source from claim cr where cr.claim_hash = claim.reposted_claim_hash) as reposted_has_source,
2021-01-17 09:40:39 +01:00
claim.*
2021-01-31 05:25:12 +01:00
FROM claim LEFT JOIN claimtrie USING (claim_hash)
2021-01-31 21:36:26 +01:00
WHERE claim.claim_hash in (SELECT claim_hash FROM changelog)
2021-01-30 04:35:22 +01:00
"""):
claim = claim._asdict()
2021-01-20 05:20:50 +01:00
id_set = set(filter(None, (claim['claim_hash'], claim['channel_hash'], claim['reposted_claim_hash'])))
claim['censor_type'] = 0
claim['censoring_channel_hash'] = None
2021-03-25 08:46:21 +01:00
claim['has_source'] = bool(claim.pop('reposted_has_source') or claim['has_source'])
for reason_id in id_set:
2021-01-20 05:20:50 +01:00
if reason_id in self.blocked_streams:
claim['censor_type'] = 2
claim['censoring_channel_hash'] = self.blocked_streams.get(reason_id)
elif reason_id in self.blocked_channels:
claim['censor_type'] = 2
claim['censoring_channel_hash'] = self.blocked_channels.get(reason_id)
elif reason_id in self.filtered_streams:
claim['censor_type'] = 1
claim['censoring_channel_hash'] = self.filtered_streams.get(reason_id)
elif reason_id in self.filtered_channels:
claim['censor_type'] = 1
claim['censoring_channel_hash'] = self.filtered_channels.get(reason_id)
2021-01-27 02:33:17 +01:00
claim['tags'] = claim['tags'].split(',,') if claim['tags'] else []
2021-01-20 05:20:50 +01:00
claim['languages'] = claim['languages'].split(' ') if claim['languages'] else []
2021-03-05 07:16:40 +01:00
yield 'update', claim
def clear_changelog(self):
2021-01-31 21:36:26 +01:00
self.execute("delete from changelog;")
2021-01-31 09:14:46 +01:00
2021-03-05 07:16:40 +01:00
def claim_producer(self):
while self.pending_deletes:
claim_hash = self.pending_deletes.pop()
yield 'delete', hexlify(claim_hash[::-1]).decode()
for claim in self.enqueue_changes():
yield claim
self.clear_changelog()
2021-01-17 09:40:39 +01:00
def advance_txs(self, height, all_txs, header, daemon_height, timer):
2019-05-26 05:06:22 +02:00
insert_claims = []
update_claims = []
update_claim_hashes = set()
2021-03-05 07:16:40 +01:00
delete_claim_hashes = self.pending_deletes
2019-05-26 05:06:22 +02:00
insert_supports = []
delete_support_txo_hashes = set()
recalculate_claim_hashes = set() # added/deleted supports, added/updated claim
deleted_claim_names = set()
2019-05-26 05:06:22 +02:00
delete_others = set()
2019-05-16 07:34:18 +02:00
body_timer = timer.add_timer('body')
for position, (etx, txid) in enumerate(all_txs):
tx = timer.run(
2019-11-25 04:37:11 +01:00
Transaction, etx.raw, height=height, position=position
2019-05-16 07:34:18 +02:00
)
# Inputs
2019-05-26 05:06:22 +02:00
spent_claims, spent_supports, spent_others = timer.run(
2019-05-16 07:34:18 +02:00
self.split_inputs_into_claims_supports_and_other, tx.inputs
)
body_timer.start()
2019-12-08 00:13:13 +01:00
delete_claim_hashes.update({r.claim_hash for r in spent_claims})
deleted_claim_names.update({r.normalized for r in spent_claims})
delete_support_txo_hashes.update({r.txo_hash for r in spent_supports})
recalculate_claim_hashes.update({r.claim_hash for r in spent_supports})
2019-05-26 05:06:22 +02:00
delete_others.update(spent_others)
# Outputs
2019-05-16 07:34:18 +02:00
for output in tx.outputs:
if output.is_support:
2019-05-26 05:06:22 +02:00
insert_supports.append(output)
recalculate_claim_hashes.add(output.claim_hash)
2019-05-16 07:34:18 +02:00
elif output.script.is_claim_name:
2019-05-26 05:06:22 +02:00
insert_claims.append(output)
recalculate_claim_hashes.add(output.claim_hash)
2019-05-16 07:34:18 +02:00
elif output.script.is_update_claim:
claim_hash = output.claim_hash
2019-05-26 05:06:22 +02:00
update_claims.append(output)
recalculate_claim_hashes.add(claim_hash)
2019-05-16 07:34:18 +02:00
body_timer.stop()
skip_update_claim_timer = timer.add_timer('skip update of abandoned claims')
skip_update_claim_timer.start()
for updated_claim in list(update_claims):
if updated_claim.ref.hash in delete_others:
update_claims.remove(updated_claim)
for updated_claim in update_claims:
claim_hash = updated_claim.claim_hash
delete_claim_hashes.discard(claim_hash)
update_claim_hashes.add(claim_hash)
skip_update_claim_timer.stop()
skip_insert_claim_timer = timer.add_timer('skip insertion of abandoned claims')
skip_insert_claim_timer.start()
2019-05-26 05:06:22 +02:00
for new_claim in list(insert_claims):
if new_claim.ref.hash in delete_others:
if new_claim.claim_hash not in update_claim_hashes:
insert_claims.remove(new_claim)
skip_insert_claim_timer.stop()
skip_insert_support_timer = timer.add_timer('skip insertion of abandoned supports')
skip_insert_support_timer.start()
for new_support in list(insert_supports):
if new_support.ref.hash in delete_others:
insert_supports.remove(new_support)
skip_insert_support_timer.stop()
expire_timer = timer.add_timer('recording expired claims')
expire_timer.start()
for expired in self.get_expiring(height):
2019-12-08 00:13:13 +01:00
delete_claim_hashes.add(expired.claim_hash)
deleted_claim_names.add(expired.normalized)
expire_timer.stop()
2019-05-16 07:34:18 +02:00
r = timer.run
affected_channels = r(self.delete_claims, delete_claim_hashes)
r(self.delete_supports, delete_support_txo_hashes)
2019-05-26 05:06:22 +02:00
r(self.insert_claims, insert_claims, header)
2021-02-03 16:57:15 +01:00
r(self.calculate_reposts, insert_claims)
2019-05-26 05:06:22 +02:00
r(self.update_claims, update_claims, header)
r(self.validate_channel_signatures, height, insert_claims,
update_claims, delete_claim_hashes, affected_channels, forward_timer=True)
2019-05-16 07:34:18 +02:00
r(self.insert_supports, insert_supports)
r(self.update_claimtrie, height, recalculate_claim_hashes, deleted_claim_names, forward_timer=True)
for algorithm in self.trending:
r(algorithm.run, self.db.cursor(), height, daemon_height, recalculate_claim_hashes)
2019-05-16 07:34:18 +02:00
class LBRYLevelDB(LevelDB):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
2019-08-26 23:23:43 +02:00
path = os.path.join(self.env.db_dir, 'claims.db')
trending = []
for algorithm_name in self.env.trending_algorithms:
if algorithm_name in TRENDING_ALGORITHMS:
trending.append(TRENDING_ALGORITHMS[algorithm_name])
2021-02-09 23:05:23 +01:00
if self.env.es_mode == 'reader':
self.logger.info('Index mode: reader')
self.sql = None
else:
2021-02-04 22:49:30 +01:00
self.logger.info('Index mode: writer. Using SQLite db to sync ES')
self.sql = SQLDB(
self, path,
self.env.default('BLOCKING_CHANNEL_IDS', '').split(' '),
self.env.default('FILTERING_CHANNEL_IDS', '').split(' '),
trending
)
# Search index
self.search_index = SearchIndex(
self.env.es_index_prefix, self.env.database_query_timeout, self.env.elastic_host, self.env.elastic_port
)
2018-12-15 21:31:02 +01:00
def close(self):
super().close()
2021-02-04 22:49:30 +01:00
if self.sql:
self.sql.close()
2019-04-27 20:27:18 +02:00
async def _open_dbs(self, *args, **kwargs):
await self.search_index.start()
2019-04-27 20:27:18 +02:00
await super()._open_dbs(*args, **kwargs)
2021-02-04 22:49:30 +01:00
if self.sql:
self.sql.open()