dropping apsw

This commit is contained in:
Lex Berezhny 2021-06-15 15:05:53 -04:00
parent 7b39527863
commit 25e16c3565
6 changed files with 23 additions and 46 deletions

View file

@ -1,11 +1,7 @@
.PHONY: install tools lint test test-unit test-unit-coverage test-integration idea
install:
pip install https://s3.amazonaws.com/files.lbry.io/python_libtorrent-1.2.4-py3-none-any.whl
CFLAGS="-DSQLITE_MAX_VARIABLE_NUMBER=2500000" pip install -U https://github.com/rogerbinns/apsw/releases/download/3.30.1-r1/apsw-3.30.1-r1.zip \
--global-option=fetch \
--global-option=--version --global-option=3.30.1 --global-option=--all \
--global-option=build --global-option=--enable --global-option=fts5
pip install lbry-libtorrent
pip install -e .
tools:

View file

@ -798,11 +798,6 @@ class LBRYBlockProcessor(BlockProcessor):
finally:
if self.sql:
self.sql.commit()
if self.sql and self.db.first_sync and self.height == self.daemon.cached_height():
self.timer.run(self.sql.execute, self.sql.SEARCH_INDEXES, timer_name='executing SEARCH_INDEXES')
if self.env.individual_tag_indexes:
self.timer.run(self.sql.execute, self.sql.TAG_INDEXES, timer_name='executing TAG_INDEXES')
self.timer.run(self.sql.execute, self.sql.LANGUAGE_INDEXES, timer_name='executing LANGUAGE_INDEXES')
def advance_txs(self, height, txs, header, block_hash):
timer = self.timer.sub_timers['advance_blocks']

View file

@ -17,10 +17,6 @@ class FindShortestID:
if self.short_id:
return '#'+self.short_id
@classmethod
def factory(cls):
return cls(), cls.step, cls.finalize
def register_canonical_functions(connection):
connection.createaggregatefunction("shortest_id", FindShortestID.factory, 2)
connection.create_aggregate("shortest_id", 2, FindShortestID)

View file

@ -5,7 +5,6 @@ decay rate for high valued claims.
import math
import time
import apsw
# Half life in blocks *for lower LBC claims* (it's shorter for whale claims)
HALF_LIFE = 200
@ -350,7 +349,7 @@ class TrendingDB:
# The "global" instance to work with
# pylint: disable=C0103
trending_data = TrendingDB()
#trending_data = TrendingDB()
def spike_mass(x, x_old):
"""

View file

@ -47,14 +47,10 @@ class ZScore:
return self.last
return (self.last - self.mean) / (self.standard_deviation or 1)
@classmethod
def factory(cls):
return cls(), cls.step, cls.finalize
def install(connection):
connection.createaggregatefunction("zscore", ZScore.factory, 1)
connection.cursor().execute(CREATE_TREND_TABLE)
connection.create_aggregate("zscore", 1, ZScore)
connection.executescript(CREATE_TREND_TABLE)
def run(db, height, final_height, affected_claims):

View file

@ -1,6 +1,6 @@
import os
import apsw
import sqlite3
from typing import Union, Tuple, Set, List
from itertools import chain
from decimal import Decimal
@ -21,6 +21,7 @@ from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES
from lbry.wallet.server.db.elasticsearch import SearchIndex
ATTRIBUTE_ARRAY_MAX_LENGTH = 100
sqlite3.enable_callback_tracebacks(True)
class SQLDB:
@ -233,21 +234,14 @@ class SQLDB:
self.pending_deletes = set()
def open(self):
self.db = apsw.Connection(
self._db_path,
flags=(
apsw.SQLITE_OPEN_READWRITE |
apsw.SQLITE_OPEN_CREATE |
apsw.SQLITE_OPEN_URI
)
)
def exec_factory(cursor, statement, bindings):
tpl = namedtuple('row', (d[0] for d in cursor.getdescription()))
cursor.setrowtrace(lambda cursor, row: tpl(*row))
return True
self.db.setexectrace(exec_factory)
self.execute(self.PRAGMAS)
self.execute(self.CREATE_TABLES_QUERY)
self.db = sqlite3.connect(self._db_path, isolation_level=None, check_same_thread=False, uri=True)
def namedtuple_factory(cursor, row):
Row = namedtuple('Row', (d[0] for d in cursor.description))
return Row(*row)
self.db.row_factory = namedtuple_factory
self.db.executescript(self.PRAGMAS)
self.db.executescript(self.CREATE_TABLES_QUERY)
register_canonical_functions(self.db)
self.blocked_streams = {}
self.blocked_channels = {}
@ -319,10 +313,10 @@ class SQLDB:
return f"DELETE FROM {table} WHERE {where}", values
def execute(self, *args):
return self.db.cursor().execute(*args)
return self.db.execute(*args)
def executemany(self, *args):
return self.db.cursor().executemany(*args)
return self.db.executemany(*args)
def begin(self):
self.execute('begin;')
@ -620,7 +614,7 @@ class SQLDB:
channel_hash IN ({','.join('?' for _ in changed_channel_keys)}) AND
signature IS NOT NULL
"""
for affected_claim in self.execute(sql, changed_channel_keys.keys()):
for affected_claim in self.execute(sql, list(changed_channel_keys.keys())):
if affected_claim.claim_hash not in signables:
claim_updates.append({
'claim_hash': affected_claim.claim_hash,
@ -677,7 +671,7 @@ class SQLDB:
signature_valid=CASE WHEN signature IS NOT NULL THEN 0 END,
channel_join=NULL, canonical_url=NULL
WHERE channel_hash IN ({','.join('?' for _ in spent_claims)})
""", spent_claims
""", list(spent_claims)
)
sub_timer.stop()
@ -802,12 +796,13 @@ class SQLDB:
def update_claimtrie(self, height, changed_claim_hashes, deleted_names, timer):
r = timer.run
binary_claim_hashes = list(changed_claim_hashes)
r(self._calculate_activation_height, height)
r(self._update_support_amount, changed_claim_hashes)
r(self._update_support_amount, binary_claim_hashes)
r(self._update_effective_amount, height, changed_claim_hashes)
r(self._perform_overtake, height, changed_claim_hashes, list(deleted_names))
r(self._update_effective_amount, height, binary_claim_hashes)
r(self._perform_overtake, height, binary_claim_hashes, list(deleted_names))
r(self._update_effective_amount, height)
r(self._perform_overtake, height, [], [])