update BlobManager to use SQLiteStorage, remove old database functions

-remove blob upload/download history
This commit is contained in:
Jack Robison 2018-02-12 13:43:36 -05:00
parent db7061ce92
commit e671005c3c
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 21 additions and 191 deletions

View file

@ -1,21 +1,17 @@
import logging import logging
import os import os
import time from sqlite3 import IntegrityError
import sqlite3
from twisted.internet import threads, defer, reactor from twisted.internet import threads, defer, reactor
from twisted.enterprise import adbapi
from lbrynet import conf from lbrynet import conf
from lbrynet.blob.blob_file import BlobFile from lbrynet.blob.blob_file import BlobFile
from lbrynet.blob.creator import BlobFileCreator from lbrynet.blob.creator import BlobFileCreator
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
from lbrynet.core.sqlite_helpers import rerun_if_locked
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class DiskBlobManager(DHTHashSupplier): class DiskBlobManager(DHTHashSupplier):
def __init__(self, hash_announcer, blob_dir, db_dir): def __init__(self, hash_announcer, blob_dir, storage):
""" """
This class stores blobs on the hard disk, This class stores blobs on the hard disk,
@ -24,27 +20,19 @@ class DiskBlobManager(DHTHashSupplier):
""" """
DHTHashSupplier.__init__(self, hash_announcer) DHTHashSupplier.__init__(self, hash_announcer)
self.storage = storage
self.announce_head_blobs_only = conf.settings['announce_head_blobs_only'] self.announce_head_blobs_only = conf.settings['announce_head_blobs_only']
self.blob_dir = blob_dir self.blob_dir = blob_dir
self.db_file = os.path.join(db_dir, "blobs.db")
self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False)
self.blob_creator_type = BlobFileCreator self.blob_creator_type = BlobFileCreator
# TODO: consider using an LRU for blobs as there could potentially # TODO: consider using an LRU for blobs as there could potentially
# be thousands of blobs loaded up, many stale # be thousands of blobs loaded up, many stale
self.blobs = {} self.blobs = {}
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)} self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
@defer.inlineCallbacks
def setup(self): def setup(self):
log.info("Starting disk blob manager. blob_dir: %s, db_file: %s", str(self.blob_dir), return defer.succeed(True)
str(self.db_file))
yield self._open_db()
def stop(self): def stop(self):
log.info("Stopping disk blob manager.")
self.db_conn.close()
return defer.succeed(True) return defer.succeed(True)
def get_blob(self, blob_hash, length=None): def get_blob(self, blob_hash, length=None):
@ -75,8 +63,9 @@ class DiskBlobManager(DHTHashSupplier):
def blob_completed(self, blob, next_announce_time=None, should_announce=True): def blob_completed(self, blob, next_announce_time=None, should_announce=True):
if next_announce_time is None: if next_announce_time is None:
next_announce_time = self.get_next_announce_time() next_announce_time = self.get_next_announce_time()
yield self._add_completed_blob(blob.blob_hash, blob.length, yield self.storage.add_completed_blob(
next_announce_time, should_announce) blob.blob_hash, blob.length, next_announce_time, should_announce
)
# we announce all blobs immediately, if announce_head_blob_only is False # we announce all blobs immediately, if announce_head_blob_only is False
# otherwise, announce only if marked as should_announce # otherwise, announce only if marked as should_announce
if not self.announce_head_blobs_only or should_announce: if not self.announce_head_blobs_only or should_announce:
@ -86,22 +75,22 @@ class DiskBlobManager(DHTHashSupplier):
return self._completed_blobs(blobhashes_to_check) return self._completed_blobs(blobhashes_to_check)
def hashes_to_announce(self): def hashes_to_announce(self):
return self._get_blobs_to_announce() return self.storage.get_blobs_to_announce(self.hash_announcer)
def count_should_announce_blobs(self): def count_should_announce_blobs(self):
return self._count_should_announce_blobs() return self.storage.count_should_announce_blobs()
def set_should_announce(self, blob_hash, should_announce): def set_should_announce(self, blob_hash, should_announce):
if blob_hash in self.blobs: if blob_hash in self.blobs:
blob = self.blobs[blob_hash] blob = self.blobs[blob_hash]
if blob.get_is_verified(): if blob.get_is_verified():
return self._set_should_announce(blob_hash, return self.storage.set_should_announce(
self.get_next_announce_time(), blob_hash, self.get_next_announce_time(), should_announce
should_announce) )
return defer.succeed(False) return defer.succeed(False)
def get_should_announce(self, blob_hash): def get_should_announce(self, blob_hash):
return self._should_announce(blob_hash) return self.storage.should_announce(blob_hash)
def creator_finished(self, blob_creator, should_announce): def creator_finished(self, blob_creator, should_announce):
log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash) log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
@ -114,8 +103,7 @@ class DiskBlobManager(DHTHashSupplier):
new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length) new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length)
self.blobs[blob_creator.blob_hash] = new_blob self.blobs[blob_creator.blob_hash] = new_blob
next_announce_time = self.get_next_announce_time() next_announce_time = self.get_next_announce_time()
d = self.blob_completed(new_blob, next_announce_time, should_announce) return self.blob_completed(new_blob, next_announce_time, should_announce)
return d
def immediate_announce_all_blobs(self): def immediate_announce_all_blobs(self):
d = self._get_all_verified_blob_hashes() d = self._get_all_verified_blob_hashes()
@ -127,24 +115,6 @@ class DiskBlobManager(DHTHashSupplier):
d.addCallback(self.completed_blobs) d.addCallback(self.completed_blobs)
return d return d
def add_blob_to_download_history(self, blob_hash, host, rate):
d = self._add_blob_to_download_history(blob_hash, host, rate)
return d
@defer.inlineCallbacks
def get_host_downloaded_from(self, blob_hash):
query_str = "SELECT host FROM download WHERE blob=? ORDER BY ts DESC LIMIT 1"
host = yield self.db_conn.runQuery(query_str, (blob_hash,))
if host:
result = host[0][0]
else:
result = None
defer.returnValue(result)
def add_blob_to_upload_history(self, blob_hash, host, rate):
d = self._add_blob_to_upload_history(blob_hash, host, rate)
return d
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_blobs(self, blob_hashes): def delete_blobs(self, blob_hashes):
bh_to_delete_from_db = [] bh_to_delete_from_db = []
@ -156,74 +126,11 @@ class DiskBlobManager(DHTHashSupplier):
del self.blobs[blob_hash] del self.blobs[blob_hash]
except Exception as e: except Exception as e:
log.warning("Failed to delete blob file. Reason: %s", e) log.warning("Failed to delete blob file. Reason: %s", e)
yield self._delete_blobs_from_db(bh_to_delete_from_db) try:
yield self.storage.delete_blobs_from_db(bh_to_delete_from_db)
######### database calls ######### except IntegrityError as err:
if err.message != "FOREIGN KEY constraint failed":
def _open_db(self): raise err
# check_same_thread=False is solely to quiet a spurious error that appears to be due
# to a bug in twisted, where the connection is closed by a different thread than the
# one that opened it. The individual connections in the pool are not used in multiple
# threads.
def create_tables(transaction):
transaction.execute('PRAGMA journal_mode=WAL')
transaction.execute("create table if not exists blobs (" +
" blob_hash text primary key, " +
" blob_length integer, " +
" last_verified_time real, " +
" next_announce_time real, " +
" should_announce integer)")
transaction.execute("create table if not exists download (" +
" id integer primary key autoincrement, " +
" blob text, " +
" host text, " +
" rate float, " +
" ts integer)")
transaction.execute("create table if not exists upload (" +
" id integer primary key autoincrement, " +
" blob text, " +
" host text, " +
" rate float, " +
" ts integer)")
return self.db_conn.runInteraction(create_tables)
@rerun_if_locked
def _add_completed_blob(self, blob_hash, length, next_announce_time, should_announce):
log.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length))
should_announce = 1 if should_announce else 0
d = self.db_conn.runQuery("insert into blobs (blob_hash, blob_length, next_announce_time, "
"should_announce) values (?, ?, ?, ?)", (blob_hash, length,
next_announce_time,
should_announce))
# TODO: why is this here?
d.addErrback(lambda err: err.trap(sqlite3.IntegrityError))
return d
@rerun_if_locked
@defer.inlineCallbacks
def _set_should_announce(self, blob_hash, next_announce_time, should_announce):
yield self.db_conn.runOperation("update blobs set next_announce_time=?, should_announce=? "
"where blob_hash=?", (next_announce_time, should_announce,
blob_hash))
defer.returnValue(True)
@rerun_if_locked
@defer.inlineCallbacks
def _should_announce(self, blob_hash):
result = yield self.db_conn.runQuery("select should_announce from blobs where blob_hash=?",
(blob_hash,))
defer.returnValue(result[0][0])
@rerun_if_locked
@defer.inlineCallbacks
def _count_should_announce_blobs(self):
result = yield self.db_conn.runQuery("select count(*) from blobs where should_announce=1")
defer.returnValue(result[0][0])
@defer.inlineCallbacks @defer.inlineCallbacks
def _completed_blobs(self, blobhashes_to_check): def _completed_blobs(self, blobhashes_to_check):
@ -232,65 +139,12 @@ class DiskBlobManager(DHTHashSupplier):
blob_hashes = [b.blob_hash for success, b in blobs if success and b.verified] blob_hashes = [b.blob_hash for success, b in blobs if success and b.verified]
defer.returnValue(blob_hashes) defer.returnValue(blob_hashes)
@rerun_if_locked
def _update_blob_verified_timestamp(self, blob, timestamp):
return self.db_conn.runQuery("update blobs set last_verified_time = ? where blob_hash = ?",
(blob, timestamp))
@rerun_if_locked
def _get_blobs_to_announce(self):
def get_and_update(transaction):
timestamp = time.time()
if self.announce_head_blobs_only is True:
r = transaction.execute("select blob_hash from blobs " +
"where next_announce_time < ? and blob_hash is not null "+
"and should_announce = 1",
(timestamp,))
else:
r = transaction.execute("select blob_hash from blobs " +
"where next_announce_time < ? and blob_hash is not null",
(timestamp,))
blobs = [b for b, in r.fetchall()]
next_announce_time = self.get_next_announce_time(len(blobs))
transaction.execute(
"update blobs set next_announce_time = ? where next_announce_time < ?",
(next_announce_time, timestamp))
log.debug("Got %s blobs to announce, next announce time is in %s seconds",
len(blobs), next_announce_time-time.time())
return blobs
return self.db_conn.runInteraction(get_and_update)
@rerun_if_locked
def _delete_blobs_from_db(self, blob_hashes):
def delete_blobs(transaction):
for b in blob_hashes:
transaction.execute("delete from blobs where blob_hash = ?", (b,))
return self.db_conn.runInteraction(delete_blobs)
@rerun_if_locked
def _get_all_blob_hashes(self):
d = self.db_conn.runQuery("select blob_hash from blobs")
return d
@rerun_if_locked
@defer.inlineCallbacks
def _get_all_should_announce_blob_hashes(self):
# return a list of blob hashes where should_announce is True
blob_hashes = yield self.db_conn.runQuery(
"select blob_hash from blobs where should_announce = 1")
defer.returnValue([d[0] for d in blob_hashes])
@rerun_if_locked
def _get_all_verified_blob_hashes(self): def _get_all_verified_blob_hashes(self):
d = self._get_all_blob_hashes() d = self.storage.get_all_blob_hashes()
def get_verified_blobs(blobs): def get_verified_blobs(blobs):
verified_blobs = [] verified_blobs = []
for blob_hash, in blobs: for blob_hash in blobs:
file_path = os.path.join(self.blob_dir, blob_hash) file_path = os.path.join(self.blob_dir, blob_hash)
if os.path.isfile(file_path): if os.path.isfile(file_path):
verified_blobs.append(blob_hash) verified_blobs.append(blob_hash)
@ -298,19 +152,3 @@ class DiskBlobManager(DHTHashSupplier):
d.addCallback(lambda blobs: threads.deferToThread(get_verified_blobs, blobs)) d.addCallback(lambda blobs: threads.deferToThread(get_verified_blobs, blobs))
return d return d
@rerun_if_locked
def _add_blob_to_download_history(self, blob_hash, host, rate):
ts = int(time.time())
d = self.db_conn.runQuery(
"insert into download values (null, ?, ?, ?, ?) ",
(blob_hash, str(host), float(rate), ts))
return d
@rerun_if_locked
def _add_blob_to_upload_history(self, blob_hash, host, rate):
ts = int(time.time())
d = self.db_conn.runQuery(
"insert into upload values (null, ?, ?, ?, ?) ",
(blob_hash, str(host), float(rate), ts))
return d

View file

@ -566,8 +566,6 @@ class DownloadRequest(RequestHelper):
self.peer.update_score(5.0) self.peer.update_score(5.0)
should_announce = blob.blob_hash == self.head_blob_hash should_announce = blob.blob_hash == self.head_blob_hash
d = self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce) d = self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce)
d.addCallback(lambda _: self.requestor.blob_manager.add_blob_to_download_history(
blob.blob_hash, self.peer.host, self.protocol_prices[self.protocol]))
d.addCallback(lambda _: arg) d.addCallback(lambda _: arg)
return d return d

View file

@ -152,7 +152,6 @@ class BlobRequestHandler(object):
response_fields['blob_hash'] = blob.blob_hash response_fields['blob_hash'] = blob.blob_hash
response_fields['length'] = blob.length response_fields['length'] = blob.length
response['incoming_blob'] = response_fields response['incoming_blob'] = response_fields
d.addCallback(lambda _: self.record_transaction(blob))
d.addCallback(lambda _: response) d.addCallback(lambda _: response)
return d return d
log.debug("We can not send %s", str(blob)) log.debug("We can not send %s", str(blob))
@ -160,11 +159,6 @@ class BlobRequestHandler(object):
d.addCallback(lambda _: response) d.addCallback(lambda _: response)
return d return d
def record_transaction(self, blob):
d = self.blob_manager.add_blob_to_upload_history(
blob.blob_hash, self.peer.host, self.blob_data_payment_rate)
return d
def _reply_to_send_request(self, response, incoming): def _reply_to_send_request(self, response, incoming):
response_fields = {} response_fields = {}
response['incoming_blob'] = response_fields response['incoming_blob'] = response_fields