From 25e16c3565930883808293bd8c52dc3ad4a6f92a Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Tue, 15 Jun 2021 15:05:53 -0400 Subject: [PATCH] dropping apsw --- Makefile | 6 +-- lbry/wallet/server/block_processor.py | 5 --- lbry/wallet/server/db/canonical.py | 6 +-- .../server/db/trending/variable_decay.py | 3 +- lbry/wallet/server/db/trending/zscore.py | 8 +--- lbry/wallet/server/db/writer.py | 41 ++++++++----------- 6 files changed, 23 insertions(+), 46 deletions(-) diff --git a/Makefile b/Makefile index 1fe9d7033..0a2bf7c15 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,7 @@ .PHONY: install tools lint test test-unit test-unit-coverage test-integration idea install: - pip install https://s3.amazonaws.com/files.lbry.io/python_libtorrent-1.2.4-py3-none-any.whl - CFLAGS="-DSQLITE_MAX_VARIABLE_NUMBER=2500000" pip install -U https://github.com/rogerbinns/apsw/releases/download/3.30.1-r1/apsw-3.30.1-r1.zip \ - --global-option=fetch \ - --global-option=--version --global-option=3.30.1 --global-option=--all \ - --global-option=build --global-option=--enable --global-option=fts5 + pip install lbry-libtorrent pip install -e . tools: diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 4391736d0..8e38aa9a2 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -798,11 +798,6 @@ class LBRYBlockProcessor(BlockProcessor): finally: if self.sql: self.sql.commit() - if self.sql and self.db.first_sync and self.height == self.daemon.cached_height(): - self.timer.run(self.sql.execute, self.sql.SEARCH_INDEXES, timer_name='executing SEARCH_INDEXES') - if self.env.individual_tag_indexes: - self.timer.run(self.sql.execute, self.sql.TAG_INDEXES, timer_name='executing TAG_INDEXES') - self.timer.run(self.sql.execute, self.sql.LANGUAGE_INDEXES, timer_name='executing LANGUAGE_INDEXES') def advance_txs(self, height, txs, header, block_hash): timer = self.timer.sub_timers['advance_blocks'] diff --git a/lbry/wallet/server/db/canonical.py b/lbry/wallet/server/db/canonical.py index a85fc8369..1b0edacba 100644 --- a/lbry/wallet/server/db/canonical.py +++ b/lbry/wallet/server/db/canonical.py @@ -17,10 +17,6 @@ class FindShortestID: if self.short_id: return '#'+self.short_id - @classmethod - def factory(cls): - return cls(), cls.step, cls.finalize - def register_canonical_functions(connection): - connection.createaggregatefunction("shortest_id", FindShortestID.factory, 2) + connection.create_aggregate("shortest_id", 2, FindShortestID) diff --git a/lbry/wallet/server/db/trending/variable_decay.py b/lbry/wallet/server/db/trending/variable_decay.py index 0f6fa71be..30410e666 100644 --- a/lbry/wallet/server/db/trending/variable_decay.py +++ b/lbry/wallet/server/db/trending/variable_decay.py @@ -5,7 +5,6 @@ decay rate for high valued claims. import math import time -import apsw # Half life in blocks *for lower LBC claims* (it's shorter for whale claims) HALF_LIFE = 200 @@ -350,7 +349,7 @@ class TrendingDB: # The "global" instance to work with # pylint: disable=C0103 -trending_data = TrendingDB() +#trending_data = TrendingDB() def spike_mass(x, x_old): """ diff --git a/lbry/wallet/server/db/trending/zscore.py b/lbry/wallet/server/db/trending/zscore.py index bc0987d96..ff442fdec 100644 --- a/lbry/wallet/server/db/trending/zscore.py +++ b/lbry/wallet/server/db/trending/zscore.py @@ -47,14 +47,10 @@ class ZScore: return self.last return (self.last - self.mean) / (self.standard_deviation or 1) - @classmethod - def factory(cls): - return cls(), cls.step, cls.finalize - def install(connection): - connection.createaggregatefunction("zscore", ZScore.factory, 1) - connection.cursor().execute(CREATE_TREND_TABLE) + connection.create_aggregate("zscore", 1, ZScore) + connection.executescript(CREATE_TREND_TABLE) def run(db, height, final_height, affected_claims): diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index febde66c6..9984ea326 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -1,6 +1,6 @@ import os -import apsw +import sqlite3 from typing import Union, Tuple, Set, List from itertools import chain from decimal import Decimal @@ -21,6 +21,7 @@ from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES from lbry.wallet.server.db.elasticsearch import SearchIndex ATTRIBUTE_ARRAY_MAX_LENGTH = 100 +sqlite3.enable_callback_tracebacks(True) class SQLDB: @@ -233,21 +234,14 @@ class SQLDB: self.pending_deletes = set() def open(self): - self.db = apsw.Connection( - self._db_path, - flags=( - apsw.SQLITE_OPEN_READWRITE | - apsw.SQLITE_OPEN_CREATE | - apsw.SQLITE_OPEN_URI - ) - ) - def exec_factory(cursor, statement, bindings): - tpl = namedtuple('row', (d[0] for d in cursor.getdescription())) - cursor.setrowtrace(lambda cursor, row: tpl(*row)) - return True - self.db.setexectrace(exec_factory) - self.execute(self.PRAGMAS) - self.execute(self.CREATE_TABLES_QUERY) + self.db = sqlite3.connect(self._db_path, isolation_level=None, check_same_thread=False, uri=True) + + def namedtuple_factory(cursor, row): + Row = namedtuple('Row', (d[0] for d in cursor.description)) + return Row(*row) + self.db.row_factory = namedtuple_factory + self.db.executescript(self.PRAGMAS) + self.db.executescript(self.CREATE_TABLES_QUERY) register_canonical_functions(self.db) self.blocked_streams = {} self.blocked_channels = {} @@ -319,10 +313,10 @@ class SQLDB: return f"DELETE FROM {table} WHERE {where}", values def execute(self, *args): - return self.db.cursor().execute(*args) + return self.db.execute(*args) def executemany(self, *args): - return self.db.cursor().executemany(*args) + return self.db.executemany(*args) def begin(self): self.execute('begin;') @@ -620,7 +614,7 @@ class SQLDB: channel_hash IN ({','.join('?' for _ in changed_channel_keys)}) AND signature IS NOT NULL """ - for affected_claim in self.execute(sql, changed_channel_keys.keys()): + for affected_claim in self.execute(sql, list(changed_channel_keys.keys())): if affected_claim.claim_hash not in signables: claim_updates.append({ 'claim_hash': affected_claim.claim_hash, @@ -677,7 +671,7 @@ class SQLDB: signature_valid=CASE WHEN signature IS NOT NULL THEN 0 END, channel_join=NULL, canonical_url=NULL WHERE channel_hash IN ({','.join('?' for _ in spent_claims)}) - """, spent_claims + """, list(spent_claims) ) sub_timer.stop() @@ -802,12 +796,13 @@ class SQLDB: def update_claimtrie(self, height, changed_claim_hashes, deleted_names, timer): r = timer.run + binary_claim_hashes = list(changed_claim_hashes) r(self._calculate_activation_height, height) - r(self._update_support_amount, changed_claim_hashes) + r(self._update_support_amount, binary_claim_hashes) - r(self._update_effective_amount, height, changed_claim_hashes) - r(self._perform_overtake, height, changed_claim_hashes, list(deleted_names)) + r(self._update_effective_amount, height, binary_claim_hashes) + r(self._perform_overtake, height, binary_claim_hashes, list(deleted_names)) r(self._update_effective_amount, height) r(self._perform_overtake, height, [], [])