forked from LBRYCommunity/lbry-sdk
remove leveldb, use sqlite and unqlite. create migrator tool
This commit is contained in:
parent
192ac6959a
commit
c7758506ac
19 changed files with 1108 additions and 440 deletions
|
@ -1,14 +1,16 @@
|
|||
import logging
|
||||
import os
|
||||
import leveldb
|
||||
import time
|
||||
import json
|
||||
import sqlite3
|
||||
from twisted.internet import threads, defer, reactor, task
|
||||
from twisted.python.failure import Failure
|
||||
from twisted.enterprise import adbapi
|
||||
from lbrynet.core.HashBlob import BlobFile, TempBlob, BlobFileCreator, TempBlobCreator
|
||||
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
|
||||
from lbrynet.core.utils import is_valid_blobhash
|
||||
from lbrynet.core.cryptoutils import get_lbry_hash_obj
|
||||
from lbrynet.core.Error import NoSuchBlobError
|
||||
from lbrynet.core.sqlite_helpers import rerun_if_locked
|
||||
|
||||
|
||||
class BlobManager(DHTHashSupplier):
|
||||
|
@ -68,8 +70,8 @@ class DiskBlobManager(BlobManager):
|
|||
def __init__(self, hash_announcer, blob_dir, db_dir):
|
||||
BlobManager.__init__(self, hash_announcer)
|
||||
self.blob_dir = blob_dir
|
||||
self.db_dir = db_dir
|
||||
self.db = None
|
||||
self.db_file = os.path.join(db_dir, "blobs.db")
|
||||
self.db_conn = None
|
||||
self.blob_type = BlobFile
|
||||
self.blob_creator_type = BlobFileCreator
|
||||
self.blobs = {}
|
||||
|
@ -77,7 +79,7 @@ class DiskBlobManager(BlobManager):
|
|||
self._next_manage_call = None
|
||||
|
||||
def setup(self):
|
||||
d = threads.deferToThread(self._open_db)
|
||||
d = self._open_db()
|
||||
d.addCallback(lambda _: self._manage())
|
||||
return d
|
||||
|
||||
|
@ -85,7 +87,8 @@ class DiskBlobManager(BlobManager):
|
|||
if self._next_manage_call is not None and self._next_manage_call.active():
|
||||
self._next_manage_call.cancel()
|
||||
self._next_manage_call = None
|
||||
self.db = None
|
||||
#d = self.db_conn.close()
|
||||
self.db_conn = None
|
||||
return defer.succeed(True)
|
||||
|
||||
def get_blob(self, blob_hash, upload_allowed, length=None):
|
||||
|
@ -101,7 +104,7 @@ class DiskBlobManager(BlobManager):
|
|||
def _make_new_blob(self, blob_hash, upload_allowed, length=None):
|
||||
blob = self.blob_type(self.blob_dir, blob_hash, upload_allowed, length)
|
||||
self.blobs[blob_hash] = blob
|
||||
d = threads.deferToThread(self._completed_blobs, [blob_hash])
|
||||
d = self._completed_blobs([blob_hash])
|
||||
|
||||
def check_completed(completed_blobs):
|
||||
|
||||
|
@ -110,7 +113,7 @@ class DiskBlobManager(BlobManager):
|
|||
|
||||
if len(completed_blobs) == 1 and completed_blobs[0] == blob_hash:
|
||||
blob.verified = True
|
||||
inner_d = threads.deferToThread(self._get_blob_length, blob_hash)
|
||||
inner_d = self._get_blob_length(blob_hash)
|
||||
inner_d.addCallback(set_length)
|
||||
inner_d.addCallback(lambda _: blob)
|
||||
else:
|
||||
|
@ -123,15 +126,15 @@ class DiskBlobManager(BlobManager):
|
|||
def blob_completed(self, blob, next_announce_time=None):
|
||||
if next_announce_time is None:
|
||||
next_announce_time = time.time()
|
||||
return threads.deferToThread(self._add_completed_blob, blob.blob_hash, blob.length,
|
||||
time.time(), next_announce_time)
|
||||
return self._add_completed_blob(blob.blob_hash, blob.length,
|
||||
time.time(), next_announce_time)
|
||||
|
||||
def completed_blobs(self, blobs_to_check):
|
||||
return threads.deferToThread(self._completed_blobs, blobs_to_check)
|
||||
return self._completed_blobs(blobs_to_check)
|
||||
|
||||
def hashes_to_announce(self):
|
||||
next_announce_time = time.time() + self.hash_reannounce_time
|
||||
return threads.deferToThread(self._get_blobs_to_announce, next_announce_time)
|
||||
return self._get_blobs_to_announce(next_announce_time)
|
||||
|
||||
def creator_finished(self, blob_creator):
|
||||
logging.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
|
||||
|
@ -155,18 +158,18 @@ class DiskBlobManager(BlobManager):
|
|||
self.blob_hashes_to_delete[blob_hash] = False
|
||||
|
||||
def update_all_last_verified_dates(self, timestamp):
|
||||
return threads.deferToThread(self._update_all_last_verified_dates, timestamp)
|
||||
return self._update_all_last_verified_dates(timestamp)
|
||||
|
||||
def immediate_announce_all_blobs(self):
|
||||
d = threads.deferToThread(self._get_all_verified_blob_hashes)
|
||||
d = self._get_all_verified_blob_hashes()
|
||||
d.addCallback(self.hash_announcer.immediate_announce)
|
||||
return d
|
||||
|
||||
def get_blob_length(self, blob_hash):
|
||||
return threads.deferToThread(self._get_blob_length, blob_hash)
|
||||
return self._get_blob_length(blob_hash)
|
||||
|
||||
def check_consistency(self):
|
||||
return threads.deferToThread(self._check_consistency)
|
||||
return self._check_consistency()
|
||||
|
||||
def _manage(self):
|
||||
from twisted.internet import reactor
|
||||
|
@ -192,7 +195,7 @@ class DiskBlobManager(BlobManager):
|
|||
def delete_from_db(result):
|
||||
b_hs = [r[1] for r in result if r[0] is True]
|
||||
if b_hs:
|
||||
d = threads.deferToThread(self._delete_blobs_from_db, b_hs)
|
||||
d = self._delete_blobs_from_db(b_hs)
|
||||
else:
|
||||
d = defer.succeed(True)
|
||||
|
||||
|
@ -221,72 +224,127 @@ class DiskBlobManager(BlobManager):
|
|||
######### database calls #########
|
||||
|
||||
def _open_db(self):
|
||||
self.db = leveldb.LevelDB(os.path.join(self.db_dir, "blobs.db"))
|
||||
# check_same_thread=False is solely to quiet a spurious error that appears to be due
|
||||
# to a bug in twisted, where the connection is closed by a different thread than the
|
||||
# one that opened it. The individual connections in the pool are not used in multiple
|
||||
# threads.
|
||||
self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False)
|
||||
return self.db_conn.runQuery("create table if not exists blobs (" +
|
||||
" blob_hash text primary key, " +
|
||||
" blob_length integer, " +
|
||||
" last_verified_time real, " +
|
||||
" next_announce_time real"
|
||||
")")
|
||||
|
||||
@rerun_if_locked
|
||||
def _add_completed_blob(self, blob_hash, length, timestamp, next_announce_time=None):
|
||||
logging.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length))
|
||||
if next_announce_time is None:
|
||||
next_announce_time = timestamp
|
||||
self.db.Put(blob_hash, json.dumps((length, timestamp, next_announce_time)), sync=True)
|
||||
d = self.db_conn.runQuery("insert into blobs values (?, ?, ?, ?)",
|
||||
(blob_hash, length, timestamp, next_announce_time))
|
||||
d.addErrback(lambda err: err.trap(sqlite3.IntegrityError))
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _completed_blobs(self, blobs_to_check):
|
||||
blobs = []
|
||||
for b in blobs_to_check:
|
||||
if is_valid_blobhash(b):
|
||||
try:
|
||||
length, verified_time, next_announce_time = json.loads(self.db.Get(b))
|
||||
except KeyError:
|
||||
continue
|
||||
blobs_to_check = filter(is_valid_blobhash, blobs_to_check)
|
||||
|
||||
def get_blobs_in_db(db_transaction):
|
||||
blobs_in_db = [] # [(blob_hash, last_verified_time)]
|
||||
for b in blobs_to_check:
|
||||
result = db_transaction.execute("select last_verified_time from blobs where blob_hash = ?",
|
||||
(b,))
|
||||
row = result.fetchone()
|
||||
if row is not None:
|
||||
blobs_in_db.append((b, row[0]))
|
||||
return blobs_in_db
|
||||
|
||||
def get_valid_blobs(blobs_in_db):
|
||||
|
||||
def check_blob_verified_date(b, verified_time):
|
||||
file_path = os.path.join(self.blob_dir, b)
|
||||
if os.path.isfile(file_path):
|
||||
if verified_time > os.path.getctime(file_path):
|
||||
blobs.append(b)
|
||||
return blobs
|
||||
return True
|
||||
return False
|
||||
|
||||
def return_valid_blobs(results):
|
||||
valid_blobs = []
|
||||
for (b, verified_date), (success, result) in zip(blobs_in_db, results):
|
||||
if success is True and result is True:
|
||||
valid_blobs.append(b)
|
||||
return valid_blobs
|
||||
|
||||
ds = []
|
||||
for b, verified_date in blobs_in_db:
|
||||
ds.append(threads.deferToThread(check_blob_verified_date, b, verified_date))
|
||||
dl = defer.DeferredList(ds)
|
||||
dl.addCallback(return_valid_blobs)
|
||||
return dl
|
||||
|
||||
d = self.db_conn.runInteraction(get_blobs_in_db)
|
||||
d.addCallback(get_valid_blobs)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_blob_length(self, blob):
|
||||
length, verified_time, next_announce_time = json.loads(self.db.Get(blob))
|
||||
return length
|
||||
d = self.db_conn.runQuery("select blob_length from blobs where blob_hash = ?", (blob,))
|
||||
d.addCallback(lambda r: r[0] if len(r) else Failure(NoSuchBlobError(blob)))
|
||||
return d
|
||||
|
||||
#length, verified_time, next_announce_time = json.loads(self.db.Get(blob))
|
||||
#return length
|
||||
|
||||
@rerun_if_locked
|
||||
def _update_blob_verified_timestamp(self, blob, timestamp):
|
||||
length, old_verified_time, next_announce_time = json.loads(self.db.Get(blob))
|
||||
self.db.Put(blob, json.dumps((length, timestamp, next_announce_time)), sync=True)
|
||||
return self.db_conn.runQuery("update blobs set last_verified_time = ? where blob_hash = ?",
|
||||
(blob, timestamp))
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_blobs_to_announce(self, next_announce_time):
|
||||
# TODO: See if the following would be better for handling announce times:
|
||||
# TODO: Have a separate db for them, and read the whole thing into memory
|
||||
# TODO: on startup, and then write changes to db when they happen
|
||||
blobs = []
|
||||
batch = leveldb.WriteBatch()
|
||||
current_time = time.time()
|
||||
for blob_hash, blob_info in self.db.RangeIter():
|
||||
length, verified_time, announce_time = json.loads(blob_info)
|
||||
if announce_time < current_time:
|
||||
batch.Put(blob_hash, json.dumps((length, verified_time, next_announce_time)))
|
||||
blobs.append(blob_hash)
|
||||
self.db.Write(batch, sync=True)
|
||||
return blobs
|
||||
|
||||
def get_and_update(transaction):
|
||||
timestamp = time.time()
|
||||
r = transaction.execute("select blob_hash from blobs " +
|
||||
"where next_announce_time < ? and blob_hash is not null",
|
||||
(timestamp,))
|
||||
blobs = [b for b, in r.fetchall()]
|
||||
transaction.execute("update blobs set next_announce_time = ? where next_announce_time < ?",
|
||||
(next_announce_time, timestamp))
|
||||
return blobs
|
||||
|
||||
return self.db_conn.runInteraction(get_and_update)
|
||||
|
||||
@rerun_if_locked
|
||||
def _update_all_last_verified_dates(self, timestamp):
|
||||
batch = leveldb.WriteBatch()
|
||||
for blob_hash, blob_info in self.db.RangeIter():
|
||||
length, verified_time, announce_time = json.loads(blob_info)
|
||||
batch.Put(blob_hash, json.dumps((length, timestamp, announce_time)))
|
||||
self.db.Write(batch, sync=True)
|
||||
return self.db_conn.runQuery("update blobs set last_verified_date = ?", (timestamp,))
|
||||
|
||||
@rerun_if_locked
|
||||
def _delete_blobs_from_db(self, blob_hashes):
|
||||
batch = leveldb.WriteBatch()
|
||||
for blob_hash in blob_hashes:
|
||||
batch.Delete(blob_hash)
|
||||
self.db.Write(batch, sync=True)
|
||||
|
||||
def delete_blobs(transaction):
|
||||
for b in blob_hashes:
|
||||
transaction.execute("delete from blobs where blob_hash = ?", (b,))
|
||||
|
||||
return self.db_conn.runInteraction(delete_blobs)
|
||||
|
||||
@rerun_if_locked
|
||||
def _check_consistency(self):
|
||||
batch = leveldb.WriteBatch()
|
||||
|
||||
ALREADY_VERIFIED = 1
|
||||
NEWLY_VERIFIED = 2
|
||||
INVALID = 3
|
||||
|
||||
current_time = time.time()
|
||||
for blob_hash, blob_info in self.db.RangeIter():
|
||||
length, verified_time, announce_time = json.loads(blob_info)
|
||||
d = self.db_conn.runQuery("select blob_hash, blob_length, last_verified_time from blobs")
|
||||
|
||||
def check_blob(blob_hash, blob_length, verified_time):
|
||||
file_path = os.path.join(self.blob_dir, blob_hash)
|
||||
if os.path.isfile(file_path):
|
||||
if verified_time < os.path.getctime(file_path):
|
||||
if verified_time >= os.path.getctime(file_path):
|
||||
return ALREADY_VERIFIED
|
||||
else:
|
||||
h = get_lbry_hash_obj()
|
||||
len_so_far = 0
|
||||
f = open(file_path)
|
||||
|
@ -296,19 +354,63 @@ class DiskBlobManager(BlobManager):
|
|||
break
|
||||
h.update(data)
|
||||
len_so_far += len(data)
|
||||
if len_so_far == length and h.hexdigest() == blob_hash:
|
||||
batch.Put(blob_hash, json.dumps((length, current_time, announce_time)))
|
||||
self.db.Write(batch, sync=True)
|
||||
if len_so_far == blob_length and h.hexdigest() == blob_hash:
|
||||
return NEWLY_VERIFIED
|
||||
return INVALID
|
||||
|
||||
def do_check(blobs):
|
||||
already_verified = []
|
||||
newly_verified = []
|
||||
invalid = []
|
||||
for blob_hash, blob_length, verified_time in blobs:
|
||||
status = check_blob(blob_hash, blob_length, verified_time)
|
||||
if status == ALREADY_VERIFIED:
|
||||
already_verified.append(blob_hash)
|
||||
elif status == NEWLY_VERIFIED:
|
||||
newly_verified.append(blob_hash)
|
||||
else:
|
||||
invalid.append(blob_hash)
|
||||
return already_verified, newly_verified, invalid
|
||||
|
||||
def update_newly_verified(transaction, blobs):
|
||||
for b in blobs:
|
||||
transaction.execute("update blobs set last_verified_time = ? where blob_hash = ?",
|
||||
(current_time, b))
|
||||
|
||||
def check_blobs(blobs):
|
||||
|
||||
@rerun_if_locked
|
||||
def update_and_return(status_lists):
|
||||
|
||||
already_verified, newly_verified, invalid = status_lists
|
||||
|
||||
d = self.db_conn.runInteraction(update_newly_verified, newly_verified)
|
||||
d.addCallback(lambda _: status_lists)
|
||||
return d
|
||||
|
||||
d = threads.deferToThread(do_check, blobs)
|
||||
|
||||
d.addCallback(update_and_return)
|
||||
return d
|
||||
|
||||
d.addCallback(check_blobs)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_all_verified_blob_hashes(self):
|
||||
blob_hashes = []
|
||||
for blob_hash, blob_info in self.db.RangeIter():
|
||||
length, verified_time, announce_time = json.loads(blob_info)
|
||||
file_path = os.path.join(self.blob_dir, blob_hash)
|
||||
if os.path.isfile(file_path):
|
||||
if verified_time > os.path.getctime(file_path):
|
||||
blob_hashes.append(blob_hash)
|
||||
return blob_hashes
|
||||
d = self.db_conn.runQuery("select blob_hash, last_verified_time from blobs")
|
||||
|
||||
def get_verified_blobs(blobs):
|
||||
verified_blobs = []
|
||||
for blob_hash, verified_time in blobs:
|
||||
file_path = os.path.join(self.blob_dir, blob_hash)
|
||||
if os.path.isfile(file_path):
|
||||
if verified_time > os.path.getctime(file_path):
|
||||
verified_blobs.append(blob_hash)
|
||||
return verified_blobs
|
||||
|
||||
d.addCallback(lambda blobs: threads.deferToThread(get_verified_blobs, blobs))
|
||||
return d
|
||||
|
||||
|
||||
class TempBlobManager(BlobManager):
|
||||
|
@ -389,7 +491,7 @@ class TempBlobManager(BlobManager):
|
|||
if blob_hash in self.blobs:
|
||||
if self.blobs[blob_hash].length is not None:
|
||||
return defer.succeed(self.blobs[blob_hash].length)
|
||||
return defer.fail(ValueError("No such blob hash is known"))
|
||||
return defer.fail(NoSuchBlobError(blob_hash))
|
||||
|
||||
def immediate_announce_all_blobs(self):
|
||||
return self.hash_announcer.immediate_announce(self.blobs.iterkeys())
|
||||
|
@ -432,7 +534,7 @@ class TempBlobManager(BlobManager):
|
|||
ds.append(d)
|
||||
else:
|
||||
remove_from_list(blob_hash)
|
||||
d = defer.fail(Failure(ValueError("No such blob known")))
|
||||
d = defer.fail(Failure(NoSuchBlobError(blob_hash)))
|
||||
logging.warning("Blob %s cannot be deleted because it is unknown")
|
||||
ds.append(d)
|
||||
return defer.DeferredList(ds)
|
|
@ -63,4 +63,12 @@ class NoResponseError(MisbehavingPeerError):
|
|||
|
||||
|
||||
class InvalidResponseError(MisbehavingPeerError):
|
||||
pass
|
||||
|
||||
|
||||
class NoSuchBlobError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NoSuchStreamHashError(Exception):
|
||||
pass
|
|
@ -1,7 +1,7 @@
|
|||
from collections import defaultdict
|
||||
import logging
|
||||
import leveldb
|
||||
import os
|
||||
import unqlite
|
||||
import time
|
||||
from Crypto.Hash import SHA512
|
||||
from Crypto.PublicKey import RSA
|
||||
|
@ -71,7 +71,7 @@ class PTCWallet(object):
|
|||
|
||||
def save_key(success, private_key):
|
||||
if success is True:
|
||||
threads.deferToThread(self.save_private_key, private_key.exportKey())
|
||||
self._save_private_key(private_key.exportKey())
|
||||
return True
|
||||
return False
|
||||
|
||||
|
@ -95,8 +95,8 @@ class PTCWallet(object):
|
|||
def start_manage():
|
||||
self.manage()
|
||||
return True
|
||||
d = threads.deferToThread(self._open_db)
|
||||
d.addCallback(lambda _: threads.deferToThread(self.get_wallet_private_key))
|
||||
d = self._open_db()
|
||||
d.addCallback(lambda _: self._get_wallet_private_key())
|
||||
d.addCallback(ensure_private_key_exists)
|
||||
d.addCallback(lambda _: start_manage())
|
||||
return d
|
||||
|
@ -211,16 +211,21 @@ class PTCWallet(object):
|
|||
str(peer), self.peer_pub_keys[peer], str(min_expected_balance), str(received_balance))
|
||||
|
||||
def _open_db(self):
|
||||
self.db = leveldb.LevelDB(os.path.join(self.db_dir, "ptcwallet.db"))
|
||||
def open_db():
|
||||
self.db = unqlite.UnQLite(os.path.join(self.db_dir, "ptcwallet.db"))
|
||||
return threads.deferToThread(open_db)
|
||||
|
||||
def save_private_key(self, private_key):
|
||||
self.db.Put("private_key", private_key)
|
||||
def _save_private_key(self, private_key):
|
||||
def save_key():
|
||||
self.db['private_key'] = private_key
|
||||
return threads.deferToThread(save_key)
|
||||
|
||||
def get_wallet_private_key(self):
|
||||
try:
|
||||
return self.db.Get("private_key")
|
||||
except KeyError:
|
||||
def _get_wallet_private_key(self):
|
||||
def get_key():
|
||||
if 'private_key' in self.db:
|
||||
return self.db['private_key']
|
||||
return None
|
||||
return threads.deferToThread(get_key)
|
||||
|
||||
|
||||
class PointTraderKeyExchanger(object):
|
||||
|
|
20
lbrynet/core/sqlite_helpers.py
Normal file
20
lbrynet/core/sqlite_helpers.py
Normal file
|
@ -0,0 +1,20 @@
|
|||
import sqlite3
|
||||
from twisted.internet import task, reactor
|
||||
import logging
|
||||
|
||||
|
||||
def rerun_if_locked(f):
|
||||
|
||||
def rerun(err, *args, **kwargs):
|
||||
if err.check(sqlite3.OperationalError) and err.value.message == "database is locked":
|
||||
logging.warning("database was locked. rerunning %s with args %s, kwargs %s",
|
||||
str(f), str(args), str(kwargs))
|
||||
return task.deferLater(reactor, 0, wrapper, *args, **kwargs)
|
||||
return err
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
d = f(*args, **kwargs)
|
||||
d.addErrback(rerun, *args, **kwargs)
|
||||
return d
|
||||
|
||||
return wrapper
|
0
lbrynet/db_migrator/__init__.py
Normal file
0
lbrynet/db_migrator/__init__.py
Normal file
14
lbrynet/db_migrator/dbmigrator.py
Normal file
14
lbrynet/db_migrator/dbmigrator.py
Normal file
|
@ -0,0 +1,14 @@
|
|||
def migrate_db(db_dir, start, end):
|
||||
current = start
|
||||
old_dirs = []
|
||||
while current < end:
|
||||
if current == 0:
|
||||
from lbrynet.db_migrator.migrate0to1 import do_migration
|
||||
old_dirs.append(do_migration(db_dir))
|
||||
current += 1
|
||||
return old_dirs
|
||||
|
||||
|
||||
def run_migration_script():
|
||||
import sys
|
||||
migrate_db(sys.argv[1], sys.argv[2], sys.argv[3])
|
304
lbrynet/db_migrator/migrate0to1.py
Normal file
304
lbrynet/db_migrator/migrate0to1.py
Normal file
|
@ -0,0 +1,304 @@
|
|||
import sqlite3
|
||||
import unqlite
|
||||
import leveldb
|
||||
import shutil
|
||||
import os
|
||||
import logging
|
||||
import json
|
||||
|
||||
|
||||
known_dbs = ['lbryfile_desc.db', 'lbryfiles.db', 'valuable_blobs.db', 'blobs.db',
|
||||
'lbryfile_blob.db', 'lbryfile_info.db', 'settings.db', 'blind_settings.db',
|
||||
'blind_peers.db', 'blind_info.db', 'lbryfile_info.db', 'lbryfile_manager.db',
|
||||
'live_stream.db', 'stream_info.db', 'stream_blob.db', 'stream_desc.db']
|
||||
|
||||
|
||||
def do_move(from_dir, to_dir):
|
||||
for known_db in known_dbs:
|
||||
known_db_path = os.path.join(from_dir, known_db)
|
||||
if os.path.exists(known_db_path):
|
||||
logging.debug("Moving %s to %s",
|
||||
os.path.abspath(known_db_path),
|
||||
os.path.abspath(os.path.join(to_dir, known_db)))
|
||||
shutil.move(known_db_path, os.path.join(to_dir, known_db))
|
||||
else:
|
||||
logging.debug("Did not find %s", os.path.abspath(known_db_path))
|
||||
|
||||
|
||||
def do_migration(db_dir):
|
||||
old_dir = os.path.join(db_dir, "_0_to_1_old")
|
||||
new_dir = os.path.join(db_dir, "_0_to_1_new")
|
||||
try:
|
||||
logging.info("Moving dbs from the real directory to %s", os.path.abspath(old_dir))
|
||||
os.makedirs(old_dir)
|
||||
do_move(db_dir, old_dir)
|
||||
except:
|
||||
logging.error("An error occurred moving the old db files.")
|
||||
raise
|
||||
try:
|
||||
logging.info("Creating the new directory in %s", os.path.abspath(new_dir))
|
||||
os.makedirs(new_dir)
|
||||
|
||||
except:
|
||||
logging.error("An error occurred creating the new directory.")
|
||||
raise
|
||||
try:
|
||||
logging.info("Doing the migration")
|
||||
migrate_blob_db(old_dir, new_dir)
|
||||
migrate_lbryfile_db(old_dir, new_dir)
|
||||
migrate_livestream_db(old_dir, new_dir)
|
||||
migrate_ptc_db(old_dir, new_dir)
|
||||
migrate_lbryfile_manager_db(old_dir, new_dir)
|
||||
migrate_settings_db(old_dir, new_dir)
|
||||
migrate_repeater_db(old_dir, new_dir)
|
||||
logging.info("Migration succeeded")
|
||||
except:
|
||||
logging.error("An error occurred during the migration. Restoring.")
|
||||
do_move(old_dir, db_dir)
|
||||
raise
|
||||
try:
|
||||
logging.info("Moving dbs in the new directory to the real directory")
|
||||
do_move(new_dir, db_dir)
|
||||
db_revision = open(os.path.join(db_dir, 'db_revision'), mode='w+')
|
||||
db_revision.write("1")
|
||||
db_revision.close()
|
||||
os.rmdir(new_dir)
|
||||
except:
|
||||
logging.error("An error occurred moving the new db files.")
|
||||
raise
|
||||
return old_dir
|
||||
|
||||
|
||||
def migrate_blob_db(old_db_dir, new_db_dir):
|
||||
old_blob_db_path = os.path.join(old_db_dir, "blobs.db")
|
||||
if not os.path.exists(old_blob_db_path):
|
||||
return True
|
||||
|
||||
old_db = leveldb.LevelDB(old_blob_db_path)
|
||||
new_db_conn = sqlite3.connect(os.path.join(new_db_dir, "blobs.db"))
|
||||
c = new_db_conn.cursor()
|
||||
c.execute("create table if not exists blobs (" +
|
||||
" blob_hash text primary key, " +
|
||||
" blob_length integer, " +
|
||||
" last_verified_time real, " +
|
||||
" next_announce_time real"
|
||||
")")
|
||||
new_db_conn.commit()
|
||||
c = new_db_conn.cursor()
|
||||
for blob_hash, blob_info in old_db.RangeIter():
|
||||
blob_length, verified_time, announce_time = json.loads(blob_info)
|
||||
c.execute("insert into blobs values (?, ?, ?, ?)",
|
||||
(blob_hash, blob_length, verified_time, announce_time))
|
||||
new_db_conn.commit()
|
||||
new_db_conn.close()
|
||||
|
||||
|
||||
def migrate_lbryfile_db(old_db_dir, new_db_dir):
|
||||
old_lbryfile_db_path = os.path.join(old_db_dir, "lbryfiles.db")
|
||||
if not os.path.exists(old_lbryfile_db_path):
|
||||
return True
|
||||
|
||||
stream_info_db = leveldb.LevelDB(os.path.join(old_db_dir, "lbryfile_info.db"))
|
||||
stream_blob_db = leveldb.LevelDB(os.path.join(old_db_dir, "lbryfile_blob.db"))
|
||||
stream_desc_db = leveldb.LevelDB(os.path.join(old_db_dir, "lbryfile_desc.db"))
|
||||
|
||||
db_conn = sqlite3.connect(os.path.join(new_db_dir, "lbryfile_info.db"))
|
||||
c = db_conn.cursor()
|
||||
c.execute("create table if not exists lbry_files (" +
|
||||
" stream_hash text primary key, " +
|
||||
" key text, " +
|
||||
" stream_name text, " +
|
||||
" suggested_file_name text" +
|
||||
")")
|
||||
c.execute("create table if not exists lbry_file_blobs (" +
|
||||
" blob_hash text, " +
|
||||
" stream_hash text, " +
|
||||
" position integer, " +
|
||||
" iv text, " +
|
||||
" length integer, " +
|
||||
" foreign key(stream_hash) references lbry_files(stream_hash)" +
|
||||
")")
|
||||
c.execute("create table if not exists lbry_file_descriptors (" +
|
||||
" sd_blob_hash TEXT PRIMARY KEY, " +
|
||||
" stream_hash TEXT, " +
|
||||
" foreign key(stream_hash) references lbry_files(stream_hash)" +
|
||||
")")
|
||||
db_conn.commit()
|
||||
c = db_conn.cursor()
|
||||
for stream_hash, stream_info in stream_info_db.RangeIter():
|
||||
key, name, suggested_file_name = json.loads(stream_info)
|
||||
c.execute("insert into lbry_files values (?, ?, ?, ?)",
|
||||
(stream_hash, key, name, suggested_file_name))
|
||||
db_conn.commit()
|
||||
c = db_conn.cursor()
|
||||
for blob_hash_stream_hash, blob_info in stream_blob_db.RangeIter():
|
||||
b_h, s_h = json.loads(blob_hash_stream_hash)
|
||||
position, iv, length = json.loads(blob_info)
|
||||
c.execute("insert into lbry_file_blobs values (?, ?, ?, ?, ?)",
|
||||
(b_h, s_h, position, iv, length))
|
||||
db_conn.commit()
|
||||
c = db_conn.cursor()
|
||||
for sd_blob_hash, stream_hash in stream_desc_db.RangeIter():
|
||||
c.execute("insert into lbry_file_descriptors values (?, ?)",
|
||||
(sd_blob_hash, stream_hash))
|
||||
db_conn.commit()
|
||||
db_conn.close()
|
||||
|
||||
|
||||
def migrate_livestream_db(old_db_dir, new_db_dir):
|
||||
old_db_path = os.path.join(old_db_dir, "stream_info.db")
|
||||
if not os.path.exists(old_db_path):
|
||||
return True
|
||||
stream_info_db = leveldb.LevelDB(os.path.join(old_db_dir, "stream_info.db"))
|
||||
stream_blob_db = leveldb.LevelDB(os.path.join(old_db_dir, "stream_blob.db"))
|
||||
stream_desc_db = leveldb.LevelDB(os.path.join(old_db_dir, "stream_desc.db"))
|
||||
|
||||
db_conn = sqlite3.connect(os.path.join(new_db_dir, "live_stream.db"))
|
||||
|
||||
c = db_conn.cursor()
|
||||
|
||||
c.execute("create table if not exists live_streams (" +
|
||||
" stream_hash text primary key, " +
|
||||
" public_key text, " +
|
||||
" key text, " +
|
||||
" stream_name text, " +
|
||||
" next_announce_time real" +
|
||||
")")
|
||||
c.execute("create table if not exists live_stream_blobs (" +
|
||||
" blob_hash text, " +
|
||||
" stream_hash text, " +
|
||||
" position integer, " +
|
||||
" revision integer, " +
|
||||
" iv text, " +
|
||||
" length integer, " +
|
||||
" signature text, " +
|
||||
" foreign key(stream_hash) references live_streams(stream_hash)" +
|
||||
")")
|
||||
c.execute("create table if not exists live_stream_descriptors (" +
|
||||
" sd_blob_hash TEXT PRIMARY KEY, " +
|
||||
" stream_hash TEXT, " +
|
||||
" foreign key(stream_hash) references live_streams(stream_hash)" +
|
||||
")")
|
||||
|
||||
db_conn.commit()
|
||||
|
||||
c = db_conn.cursor()
|
||||
for stream_hash, stream_info in stream_info_db.RangeIter():
|
||||
public_key, key, name, next_announce_time = json.loads(stream_info)
|
||||
c.execute("insert into live_streams values (?, ?, ?, ?, ?)",
|
||||
(stream_hash, public_key, key, name, next_announce_time))
|
||||
db_conn.commit()
|
||||
c = db_conn.cursor()
|
||||
for blob_hash_stream_hash, blob_info in stream_blob_db.RangeIter():
|
||||
b_h, s_h = json.loads(blob_hash_stream_hash)
|
||||
position, revision, iv, length, signature = json.loads(blob_info)
|
||||
c.execute("insert into live_stream_blobs values (?, ?, ?, ?, ?, ?, ?)",
|
||||
(b_h, s_h, position, revision, iv, length, signature))
|
||||
db_conn.commit()
|
||||
c = db_conn.cursor()
|
||||
for sd_blob_hash, stream_hash in stream_desc_db.RangeIter():
|
||||
c.execute("insert into live_stream_descriptors values (?, ?)",
|
||||
(sd_blob_hash, stream_hash))
|
||||
db_conn.commit()
|
||||
db_conn.close()
|
||||
|
||||
|
||||
def migrate_ptc_db(old_db_dir, new_db_dir):
|
||||
old_db_path = os.path.join(old_db_dir, "ptcwallet.db")
|
||||
if not os.path.exists(old_db_path):
|
||||
return True
|
||||
old_db = leveldb.LevelDB(old_db_path)
|
||||
try:
|
||||
p_key = old_db.Get("private_key")
|
||||
new_db = unqlite.UnQLite(os.path.join(new_db_dir, "ptcwallet.db"))
|
||||
new_db['private_key'] = p_key
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
|
||||
def migrate_lbryfile_manager_db(old_db_dir, new_db_dir):
|
||||
old_db_path = os.path.join(old_db_dir, "lbryfiles.db")
|
||||
if not os.path.exists(old_db_path):
|
||||
return True
|
||||
old_db = leveldb.LevelDB(old_db_path)
|
||||
new_db = sqlite3.connect(os.path.join(new_db_dir, "lbryfile_info.db"))
|
||||
c = new_db.cursor()
|
||||
c.execute("create table if not exists lbry_file_options (" +
|
||||
" blob_data_rate real, " +
|
||||
" status text," +
|
||||
" stream_hash text,"
|
||||
" foreign key(stream_hash) references lbry_files(stream_hash)" +
|
||||
")")
|
||||
new_db.commit()
|
||||
LBRYFILE_STATUS = "t"
|
||||
LBRYFILE_OPTIONS = "o"
|
||||
c = new_db.cursor()
|
||||
for k, v in old_db.RangeIter():
|
||||
key_type, stream_hash = json.loads(k)
|
||||
if key_type == LBRYFILE_STATUS:
|
||||
try:
|
||||
rate = json.loads(old_db.Get(json.dumps((LBRYFILE_OPTIONS, stream_hash))))[0]
|
||||
except KeyError:
|
||||
rate = None
|
||||
c.execute("insert into lbry_file_options values (?, ?, ?)",
|
||||
(rate, v, stream_hash))
|
||||
new_db.commit()
|
||||
new_db.close()
|
||||
|
||||
|
||||
def migrate_settings_db(old_db_dir, new_db_dir):
|
||||
old_settings_db_path = os.path.join(old_db_dir, "settings.db")
|
||||
if not os.path.exists(old_settings_db_path):
|
||||
return True
|
||||
old_db = leveldb.LevelDB(old_settings_db_path)
|
||||
new_db = unqlite.UnQLite(os.path.join(new_db_dir, "settings.db"))
|
||||
for k, v in old_db.RangeIter():
|
||||
new_db[k] = v
|
||||
|
||||
|
||||
def migrate_repeater_db(old_db_dir, new_db_dir):
|
||||
old_repeater_db_path = os.path.join(old_db_dir, "valuable_blobs.db")
|
||||
if not os.path.exists(old_repeater_db_path):
|
||||
return True
|
||||
old_db = leveldb.LevelDB(old_repeater_db_path)
|
||||
info_db = sqlite3.connect(os.path.join(new_db_dir, "blind_info.db"))
|
||||
peer_db = sqlite3.connect(os.path.join(new_db_dir, "blind_peers.db"))
|
||||
unql_db = unqlite.UnQLite(os.path.join(new_db_dir, "blind_settings.db"))
|
||||
BLOB_INFO_TYPE = 'b'
|
||||
SETTING_TYPE = 's'
|
||||
PEER_TYPE = 'p'
|
||||
info_c = info_db.cursor()
|
||||
info_c.execute("create table if not exists valuable_blobs (" +
|
||||
" blob_hash text primary key, " +
|
||||
" blob_length integer, " +
|
||||
" reference text, " +
|
||||
" peer_host text, " +
|
||||
" peer_port integer, " +
|
||||
" peer_score text" +
|
||||
")")
|
||||
info_db.commit()
|
||||
peer_c = peer_db.cursor()
|
||||
peer_c.execute("create table if not exists approved_peers (" +
|
||||
" ip_address text, " +
|
||||
" port integer" +
|
||||
")")
|
||||
peer_db.commit()
|
||||
info_c = info_db.cursor()
|
||||
peer_c = peer_db.cursor()
|
||||
for k, v in old_db.RangeIter():
|
||||
key_type, key_rest = json.loads(k)
|
||||
if key_type == PEER_TYPE:
|
||||
host, port = key_rest
|
||||
peer_c.execute("insert into approved_peers values (?, ?)",
|
||||
(host, port))
|
||||
elif key_type == SETTING_TYPE:
|
||||
unql_db[key_rest] = v
|
||||
elif key_type == BLOB_INFO_TYPE:
|
||||
blob_hash = key_rest
|
||||
length, reference, peer_host, peer_port, peer_score = json.loads(v)
|
||||
info_c.execute("insert into valuable_blobs values (?, ?, ?, ?, ?, ?)",
|
||||
(blob_hash, length, reference, peer_host, peer_port, peer_score))
|
||||
info_db.commit()
|
||||
peer_db.commit()
|
||||
info_db.close()
|
||||
peer_db.close()
|
|
@ -1,9 +1,11 @@
|
|||
import logging
|
||||
import leveldb
|
||||
import json
|
||||
import sqlite3
|
||||
import os
|
||||
from twisted.internet import threads, defer
|
||||
from lbrynet.core.Error import DuplicateStreamHashError
|
||||
from twisted.python.failure import Failure
|
||||
from twisted.enterprise import adbapi
|
||||
from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError
|
||||
from lbrynet.core.sqlite_helpers import rerun_if_locked
|
||||
|
||||
|
||||
class DBLBRYFileMetadataManager(object):
|
||||
|
@ -16,163 +18,217 @@ class DBLBRYFileMetadataManager(object):
|
|||
self.stream_desc_db = None
|
||||
|
||||
def setup(self):
|
||||
return threads.deferToThread(self._open_db)
|
||||
return self._open_db()
|
||||
|
||||
def stop(self):
|
||||
self.stream_info_db = None
|
||||
self.stream_blob_db = None
|
||||
self.stream_desc_db = None
|
||||
self.db_conn = None
|
||||
return defer.succeed(True)
|
||||
|
||||
def get_all_streams(self):
|
||||
return threads.deferToThread(self._get_all_streams)
|
||||
return self._get_all_streams()
|
||||
|
||||
def save_stream(self, stream_hash, file_name, key, suggested_file_name, blobs):
|
||||
d = threads.deferToThread(self._store_stream, stream_hash, file_name, key, suggested_file_name)
|
||||
d = self._store_stream(stream_hash, file_name, key, suggested_file_name)
|
||||
d.addCallback(lambda _: self.add_blobs_to_stream(stream_hash, blobs))
|
||||
return d
|
||||
|
||||
def get_stream_info(self, stream_hash):
|
||||
return threads.deferToThread(self._get_stream_info, stream_hash)
|
||||
return self._get_stream_info(stream_hash)
|
||||
|
||||
def check_if_stream_exists(self, stream_hash):
|
||||
return threads.deferToThread(self._check_if_stream_exists, stream_hash)
|
||||
return self._check_if_stream_exists(stream_hash)
|
||||
|
||||
def delete_stream(self, stream_hash):
|
||||
return threads.deferToThread(self._delete_stream, stream_hash)
|
||||
return self._delete_stream(stream_hash)
|
||||
|
||||
def add_blobs_to_stream(self, stream_hash, blobs):
|
||||
|
||||
def add_blobs():
|
||||
self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True)
|
||||
|
||||
return threads.deferToThread(add_blobs)
|
||||
return self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True)
|
||||
|
||||
def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False):
|
||||
logging.info("Getting blobs for a stream. Count is %s", str(count))
|
||||
|
||||
def get_positions_of_start_and_end():
|
||||
if start_blob is not None:
|
||||
start_num = self._get_blob_num_by_hash(stream_hash, start_blob)
|
||||
d1 = self._get_blob_num_by_hash(stream_hash, start_blob)
|
||||
else:
|
||||
start_num = None
|
||||
d1 = defer.succeed(None)
|
||||
if end_blob is not None:
|
||||
end_num = self._get_blob_num_by_hash(stream_hash, end_blob)
|
||||
d2 = self._get_blob_num_by_hash(stream_hash, end_blob)
|
||||
else:
|
||||
d2 = defer.succeed(None)
|
||||
|
||||
dl = defer.DeferredList([d1, d2])
|
||||
|
||||
def get_positions(results):
|
||||
start_num = None
|
||||
end_num = None
|
||||
return start_num, end_num
|
||||
if results[0][0] is True:
|
||||
start_num = results[0][1]
|
||||
if results[1][0] is True:
|
||||
end_num = results[1][1]
|
||||
return start_num, end_num
|
||||
|
||||
dl.addCallback(get_positions)
|
||||
return dl
|
||||
|
||||
def get_blob_infos(nums):
|
||||
start_num, end_num = nums
|
||||
return threads.deferToThread(self._get_further_blob_infos, stream_hash, start_num, end_num,
|
||||
count, reverse)
|
||||
return self._get_further_blob_infos(stream_hash, start_num, end_num,
|
||||
count, reverse)
|
||||
|
||||
d = threads.deferToThread(get_positions_of_start_and_end)
|
||||
d = get_positions_of_start_and_end()
|
||||
d.addCallback(get_blob_infos)
|
||||
return d
|
||||
|
||||
def get_stream_of_blob(self, blob_hash):
|
||||
return threads.deferToThread(self._get_stream_of_blobhash, blob_hash)
|
||||
return self._get_stream_of_blobhash(blob_hash)
|
||||
|
||||
def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
|
||||
return threads.deferToThread(self._save_sd_blob_hash_to_stream, stream_hash, sd_blob_hash)
|
||||
return self._save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash)
|
||||
|
||||
def get_sd_blob_hashes_for_stream(self, stream_hash):
|
||||
return threads.deferToThread(self._get_sd_blob_hashes_for_stream, stream_hash)
|
||||
return self._get_sd_blob_hashes_for_stream(stream_hash)
|
||||
|
||||
def _open_db(self):
|
||||
self.stream_info_db = leveldb.LevelDB(os.path.join(self.db_dir, "lbryfile_info.db"))
|
||||
self.stream_blob_db = leveldb.LevelDB(os.path.join(self.db_dir, "lbryfile_blob.db"))
|
||||
self.stream_desc_db = leveldb.LevelDB(os.path.join(self.db_dir, "lbryfile_desc.db"))
|
||||
# check_same_thread=False is solely to quiet a spurious error that appears to be due
|
||||
# to a bug in twisted, where the connection is closed by a different thread than the
|
||||
# one that opened it. The individual connections in the pool are not used in multiple
|
||||
# threads.
|
||||
self.db_conn = adbapi.ConnectionPool("sqlite3", (os.path.join(self.db_dir, "lbryfile_info.db")),
|
||||
check_same_thread=False)
|
||||
|
||||
def create_tables(transaction):
|
||||
transaction.execute("create table if not exists lbry_files (" +
|
||||
" stream_hash text primary key, " +
|
||||
" key text, " +
|
||||
" stream_name text, " +
|
||||
" suggested_file_name text" +
|
||||
")")
|
||||
transaction.execute("create table if not exists lbry_file_blobs (" +
|
||||
" blob_hash text, " +
|
||||
" stream_hash text, " +
|
||||
" position integer, " +
|
||||
" iv text, " +
|
||||
" length integer, " +
|
||||
" foreign key(stream_hash) references lbry_files(stream_hash)" +
|
||||
")")
|
||||
transaction.execute("create table if not exists lbry_file_descriptors (" +
|
||||
" sd_blob_hash TEXT PRIMARY KEY, " +
|
||||
" stream_hash TEXT, " +
|
||||
" foreign key(stream_hash) references lbry_files(stream_hash)" +
|
||||
")")
|
||||
|
||||
return self.db_conn.runInteraction(create_tables)
|
||||
|
||||
@rerun_if_locked
|
||||
def _delete_stream(self, stream_hash):
|
||||
desc_batch = leveldb.WriteBatch()
|
||||
for sd_blob_hash, s_h in self.stream_desc_db.RangeIter():
|
||||
if stream_hash == s_h:
|
||||
desc_batch.Delete(sd_blob_hash)
|
||||
self.stream_desc_db.Write(desc_batch, sync=True)
|
||||
d = self.db_conn.runQuery("select stream_hash from lbry_files where stream_hash = ?", (stream_hash,))
|
||||
d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHashError(stream_hash)))
|
||||
|
||||
blob_batch = leveldb.WriteBatch()
|
||||
for blob_hash_stream_hash, blob_info in self.stream_blob_db.RangeIter():
|
||||
b_h, s_h = json.loads(blob_hash_stream_hash)
|
||||
if stream_hash == s_h:
|
||||
blob_batch.Delete(blob_hash_stream_hash)
|
||||
self.stream_blob_db.Write(blob_batch, sync=True)
|
||||
def do_delete(transaction, s_h):
|
||||
transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,))
|
||||
transaction.execute("delete from lbry_file_blobs where stream_hash = ?", (s_h,))
|
||||
transaction.execute("delete from lbry_file_descriptors where stream_hash = ?", (s_h,))
|
||||
|
||||
stream_batch = leveldb.WriteBatch()
|
||||
for s_h, stream_info in self.stream_info_db.RangeIter():
|
||||
if stream_hash == s_h:
|
||||
stream_batch.Delete(s_h)
|
||||
self.stream_info_db.Write(stream_batch, sync=True)
|
||||
d.addCallback(lambda s_h: self.db_conn.runInteraction(do_delete, s_h))
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _store_stream(self, stream_hash, name, key, suggested_file_name):
|
||||
try:
|
||||
self.stream_info_db.Get(stream_hash)
|
||||
raise DuplicateStreamHashError("Stream hash %s already exists" % stream_hash)
|
||||
except KeyError:
|
||||
pass
|
||||
self.stream_info_db.Put(stream_hash, json.dumps((key, name, suggested_file_name)), sync=True)
|
||||
d = self.db_conn.runQuery("insert into lbry_files values (?, ?, ?, ?)",
|
||||
(stream_hash, key, name, suggested_file_name))
|
||||
|
||||
def check_duplicate(err):
|
||||
if err.check(sqlite3.IntegrityError):
|
||||
raise DuplicateStreamHashError(stream_hash)
|
||||
return err
|
||||
|
||||
d.addErrback(check_duplicate)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_all_streams(self):
|
||||
return [stream_hash for stream_hash, stream_info in self.stream_info_db.RangeIter()]
|
||||
d = self.db_conn.runQuery("select stream_hash from lbry_files")
|
||||
d.addCallback(lambda results: [r[0] for r in results])
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_stream_info(self, stream_hash):
|
||||
return json.loads(self.stream_info_db.Get(stream_hash))[:3]
|
||||
d = self.db_conn.runQuery("select key, stream_name, suggested_file_name from lbry_files where stream_hash = ?",
|
||||
(stream_hash,))
|
||||
d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHashError(stream_hash)))
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _check_if_stream_exists(self, stream_hash):
|
||||
try:
|
||||
self.stream_info_db.Get(stream_hash)
|
||||
return True
|
||||
except KeyError:
|
||||
return False
|
||||
d = self.db_conn.runQuery("select stream_hash from lbry_files where stream_hash = ?", (stream_hash,))
|
||||
d.addCallback(lambda r: True if len(r) else False)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_blob_num_by_hash(self, stream_hash, blob_hash):
|
||||
blob_hash_stream_hash = json.dumps((blob_hash, stream_hash))
|
||||
return json.loads(self.stream_blob_db.Get(blob_hash_stream_hash))[0]
|
||||
d = self.db_conn.runQuery("select position from lbry_file_blobs where stream_hash = ? and blob_hash = ?",
|
||||
(stream_hash, blob_hash))
|
||||
d.addCallback(lambda r: r[0][0] if len(r) else None)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False):
|
||||
blob_infos = []
|
||||
for blob_hash_stream_hash, blob_info in self.stream_blob_db.RangeIter():
|
||||
b_h, s_h = json.loads(blob_hash_stream_hash)
|
||||
if stream_hash == s_h:
|
||||
position, iv, length = json.loads(blob_info)
|
||||
if (start_num is None) or (position > start_num):
|
||||
if (end_num is None) or (position < end_num):
|
||||
blob_infos.append((b_h, position, iv, length))
|
||||
blob_infos.sort(key=lambda i: i[1], reverse=reverse)
|
||||
params = []
|
||||
q_string = "select * from ("
|
||||
q_string += " select blob_hash, position, iv, length from lbry_file_blobs "
|
||||
q_string += " where stream_hash = ? "
|
||||
params.append(stream_hash)
|
||||
if start_num is not None:
|
||||
q_string += " and position > ? "
|
||||
params.append(start_num)
|
||||
if end_num is not None:
|
||||
q_string += " and position < ? "
|
||||
params.append(end_num)
|
||||
q_string += " order by position "
|
||||
if reverse is True:
|
||||
q_string += " DESC "
|
||||
if count is not None:
|
||||
blob_infos = blob_infos[:count]
|
||||
return blob_infos
|
||||
q_string += " limit ? "
|
||||
params.append(count)
|
||||
q_string += ") order by position"
|
||||
# Order by position is done twice so that it always returns them from lowest position to
|
||||
# greatest, but the limit by clause can select the 'count' greatest or 'count' least
|
||||
return self.db_conn.runQuery(q_string, tuple(params))
|
||||
|
||||
@rerun_if_locked
|
||||
def _add_blobs_to_stream(self, stream_hash, blob_infos, ignore_duplicate_error=False):
|
||||
batch = leveldb.WriteBatch()
|
||||
for blob_info in blob_infos:
|
||||
blob_hash_stream_hash = json.dumps((blob_info.blob_hash, stream_hash))
|
||||
try:
|
||||
self.stream_blob_db.Get(blob_hash_stream_hash)
|
||||
if ignore_duplicate_error is False:
|
||||
raise KeyError() # TODO: change this to DuplicateStreamBlobError?
|
||||
continue
|
||||
except KeyError:
|
||||
pass
|
||||
batch.Put(blob_hash_stream_hash,
|
||||
json.dumps((blob_info.blob_num,
|
||||
blob_info.iv,
|
||||
blob_info.length)))
|
||||
self.stream_blob_db.Write(batch, sync=True)
|
||||
|
||||
def add_blobs(transaction):
|
||||
for blob_info in blob_infos:
|
||||
try:
|
||||
transaction.execute("insert into lbry_file_blobs values (?, ?, ?, ?, ?)",
|
||||
(blob_info.blob_hash, stream_hash, blob_info.blob_num,
|
||||
blob_info.iv, blob_info.length))
|
||||
except sqlite3.IntegrityError:
|
||||
if ignore_duplicate_error is False:
|
||||
raise
|
||||
|
||||
return self.db_conn.runInteraction(add_blobs)
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_stream_of_blobhash(self, blob_hash):
|
||||
for blob_hash_stream_hash, blob_info in self.stream_blob_db.RangeIter():
|
||||
b_h, s_h = json.loads(blob_hash_stream_hash)
|
||||
if blob_hash == b_h:
|
||||
return s_h
|
||||
return None
|
||||
d = self.db_conn.runQuery("select stream_hash from lbry_file_blobs where blob_hash = ?",
|
||||
(blob_hash,))
|
||||
d.addCallback(lambda r: r[0][0] if len(r) else None)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
|
||||
self.stream_desc_db.Put(sd_blob_hash, stream_hash)
|
||||
return self.db_conn.runQuery("insert into lbry_file_descriptors values (?, ?)",
|
||||
(sd_blob_hash, stream_hash))
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_sd_blob_hashes_for_stream(self, stream_hash):
|
||||
return [sd_blob_hash for sd_blob_hash, s_h in self.stream_desc_db.RangeIter() if stream_hash == s_h]
|
||||
d = self.db_conn.runQuery("select sd_blob_hash from lbry_file_descriptors where stream_hash = ?",
|
||||
(stream_hash,))
|
||||
d.addCallback(lambda results: [r[0] for r in results])
|
||||
return d
|
||||
|
||||
|
||||
class TempLBRYFileMetadataManager(object):
|
||||
|
|
|
@ -5,7 +5,7 @@ Keep track of which LBRY Files are downloading and store their LBRY File specifi
|
|||
import logging
|
||||
import json
|
||||
|
||||
import leveldb
|
||||
from twisted.enterprise import adbapi
|
||||
|
||||
import os
|
||||
from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloader
|
||||
|
@ -15,32 +15,30 @@ from lbrynet.core.PaymentRateManager import PaymentRateManager
|
|||
from twisted.internet import threads, defer, task, reactor
|
||||
from twisted.python.failure import Failure
|
||||
from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError, CurrentlyStoppingError
|
||||
from lbrynet.core.sqlite_helpers import rerun_if_locked
|
||||
|
||||
|
||||
class LBRYFileManager(object):
|
||||
"""
|
||||
Keeps track of currently opened LBRY Files, their options, and their LBRY File specific metadata.
|
||||
"""
|
||||
SETTING = "s"
|
||||
LBRYFILE_STATUS = "t"
|
||||
LBRYFILE_OPTIONS = "o"
|
||||
|
||||
def __init__(self, session, stream_info_manager, sd_identifier):
|
||||
self.session = session
|
||||
self.stream_info_manager = stream_info_manager
|
||||
self.sd_identifier = sd_identifier
|
||||
self.lbry_files = []
|
||||
self.db = None
|
||||
self.sql_db = None
|
||||
self.download_directory = os.getcwd()
|
||||
|
||||
def setup(self):
|
||||
d = threads.deferToThread(self._open_db)
|
||||
d = self._open_db()
|
||||
d.addCallback(lambda _: self._add_to_sd_identifier())
|
||||
d.addCallback(lambda _: self._start_lbry_files())
|
||||
return d
|
||||
|
||||
def get_all_lbry_file_stream_hashes_and_options(self):
|
||||
d = threads.deferToThread(self._get_all_lbry_file_stream_hashes)
|
||||
d = self._get_all_lbry_file_stream_hashes()
|
||||
|
||||
def get_options(stream_hashes):
|
||||
ds = []
|
||||
|
@ -60,26 +58,20 @@ class LBRYFileManager(object):
|
|||
return d
|
||||
|
||||
def get_lbry_file_status(self, stream_hash):
|
||||
return threads.deferToThread(self._get_lbry_file_status, stream_hash)
|
||||
|
||||
def save_lbry_file_options(self, stream_hash, blob_data_rate):
|
||||
return threads.deferToThread(self._save_lbry_file_options, stream_hash, blob_data_rate)
|
||||
return self._get_lbry_file_status(stream_hash)
|
||||
|
||||
def get_lbry_file_options(self, stream_hash):
|
||||
return threads.deferToThread(self._get_lbry_file_options, stream_hash)
|
||||
return self._get_lbry_file_options(stream_hash)
|
||||
|
||||
def delete_lbry_file_options(self, stream_hash):
|
||||
return threads.deferToThread(self._delete_lbry_file_options, stream_hash)
|
||||
return self._delete_lbry_file_options(stream_hash)
|
||||
|
||||
def set_lbry_file_data_payment_rate(self, stream_hash, new_rate):
|
||||
return threads.deferToThread(self._set_lbry_file_payment_rate, stream_hash, new_rate)
|
||||
return self._set_lbry_file_payment_rate(stream_hash, new_rate)
|
||||
|
||||
def change_lbry_file_status(self, stream_hash, status):
|
||||
logging.debug("Changing status of %s to %s", stream_hash, status)
|
||||
return threads.deferToThread(self._change_file_status, stream_hash, status)
|
||||
|
||||
def delete_lbry_file_status(self, stream_hash):
|
||||
return threads.deferToThread(self._delete_lbry_file_status, stream_hash)
|
||||
return self._change_file_status(stream_hash, status)
|
||||
|
||||
def get_lbry_file_status_reports(self):
|
||||
ds = []
|
||||
|
@ -129,7 +121,7 @@ class LBRYFileManager(object):
|
|||
self.download_directory,
|
||||
upload_allowed)
|
||||
self.lbry_files.append(lbry_file_downloader)
|
||||
d = self.save_lbry_file_options(stream_hash, blob_data_rate)
|
||||
d = self.set_lbry_file_data_payment_rate(stream_hash, blob_data_rate)
|
||||
d.addCallback(lambda _: lbry_file_downloader.set_stream_info())
|
||||
d.addCallback(lambda _: lbry_file_downloader)
|
||||
return d
|
||||
|
@ -161,7 +153,6 @@ class LBRYFileManager(object):
|
|||
|
||||
d.addCallback(lambda _: remove_from_list())
|
||||
d.addCallback(lambda _: self.delete_lbry_file_options(stream_hash))
|
||||
d.addCallback(lambda _: self.delete_lbry_file_status(stream_hash))
|
||||
return d
|
||||
|
||||
def toggle_lbry_file_running(self, stream_hash):
|
||||
|
@ -207,47 +198,52 @@ class LBRYFileManager(object):
|
|||
######### database calls #########
|
||||
|
||||
def _open_db(self):
|
||||
self.db = leveldb.LevelDB(os.path.join(self.session.db_dir, "lbryfiles.db"))
|
||||
# check_same_thread=False is solely to quiet a spurious error that appears to be due
|
||||
# to a bug in twisted, where the connection is closed by a different thread than the
|
||||
# one that opened it. The individual connections in the pool are not used in multiple
|
||||
# threads.
|
||||
self.sql_db = adbapi.ConnectionPool("sqlite3", os.path.join(self.session.db_dir, "lbryfile_info.db"),
|
||||
check_same_thread=False)
|
||||
#self.unql_db = unqlite.UnQLite(os.path.join(self.session.db_dir, "lbryfile_manager.db"))
|
||||
|
||||
def _save_payment_rate(self, rate_type, rate):
|
||||
if rate is not None:
|
||||
self.db.Put(json.dumps((self.SETTING, rate_type)), json.dumps(rate), sync=True)
|
||||
else:
|
||||
self.db.Delete(json.dumps((self.SETTING, rate_type)), sync=True)
|
||||
|
||||
def _save_lbry_file_options(self, stream_hash, blob_data_rate):
|
||||
self.db.Put(json.dumps((self.LBRYFILE_OPTIONS, stream_hash)), json.dumps((blob_data_rate,)),
|
||||
sync=True)
|
||||
return self.sql_db.runQuery("create table if not exists lbry_file_options (" +
|
||||
" blob_data_rate real, " +
|
||||
" status text," +
|
||||
" stream_hash text,"
|
||||
" foreign key(stream_hash) references lbry_files(stream_hash)" +
|
||||
")")
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_lbry_file_options(self, stream_hash):
|
||||
try:
|
||||
return json.loads(self.db.Get(json.dumps((self.LBRYFILE_OPTIONS, stream_hash))))
|
||||
except KeyError:
|
||||
return None, None
|
||||
d = self.sql_db.runQuery("select blob_data_rate from lbry_file_options where stream_hash = ?",
|
||||
(stream_hash,))
|
||||
d.addCallback(lambda result: result[0] if len(result) else (None, ))
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _delete_lbry_file_options(self, stream_hash):
|
||||
self.db.Delete(json.dumps((self.LBRYFILE_OPTIONS, stream_hash)), sync=True)
|
||||
return self.sql_db.runQuery("delete from lbry_file_options where stream_hash = ?",
|
||||
(stream_hash,))
|
||||
|
||||
@rerun_if_locked
|
||||
def _set_lbry_file_payment_rate(self, stream_hash, new_rate):
|
||||
return self.sql_db.runQuery("update lbry_file_options set blob_data_rate = ? where stream_hash = ?",
|
||||
(new_rate, stream_hash))
|
||||
|
||||
self.db.Put(json.dumps((self.LBRYFILE_OPTIONS, stream_hash)), json.dumps((new_rate, )), sync=True)
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_all_lbry_file_stream_hashes(self):
|
||||
hashes = []
|
||||
for k, v in self.db.RangeIter():
|
||||
key_type, stream_hash = json.loads(k)
|
||||
if key_type == self.LBRYFILE_STATUS:
|
||||
hashes.append(stream_hash)
|
||||
return hashes
|
||||
d = self.sql_db.runQuery("select stream_hash from lbry_file_options")
|
||||
d.addCallback(lambda results: [r[0] for r in results])
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _change_file_status(self, stream_hash, new_status):
|
||||
self.db.Put(json.dumps((self.LBRYFILE_STATUS, stream_hash)), new_status, sync=True)
|
||||
return self.sql_db.runQuery("update lbry_file_options set status = ? where stream_hash = ?",
|
||||
(new_status, stream_hash))
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_lbry_file_status(self, stream_hash):
|
||||
try:
|
||||
return self.db.Get(json.dumps((self.LBRYFILE_STATUS, stream_hash)))
|
||||
except KeyError:
|
||||
return ManagedLBRYFileDownloader.STATUS_STOPPED
|
||||
|
||||
def _delete_lbry_file_status(self, stream_hash):
|
||||
self.db.Delete(json.dumps((self.LBRYFILE_STATUS, stream_hash)), sync=True)
|
||||
d = self.sql_db.runQuery("select status from lbry_file_options where stream_hash = ?",
|
||||
(stream_hash,))
|
||||
d.addCallback(lambda r: r[0][0] if len(r) else ManagedLBRYFileDownloader.STATUS_STOPPED)
|
||||
return d
|
|
@ -1,11 +1,13 @@
|
|||
import time
|
||||
import logging
|
||||
import leveldb
|
||||
import json
|
||||
from twisted.enterprise import adbapi
|
||||
import os
|
||||
import sqlite3
|
||||
from twisted.internet import threads, defer
|
||||
from twisted.python.failure import Failure
|
||||
from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier
|
||||
from lbrynet.core.Error import DuplicateStreamHashError
|
||||
from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHashError
|
||||
from lbrynet.core.sqlite_helpers import rerun_if_locked
|
||||
|
||||
|
||||
class DBLiveStreamMetadataManager(DHTHashSupplier):
|
||||
|
@ -14,26 +16,22 @@ class DBLiveStreamMetadataManager(DHTHashSupplier):
|
|||
def __init__(self, db_dir, hash_announcer):
|
||||
DHTHashSupplier.__init__(self, hash_announcer)
|
||||
self.db_dir = db_dir
|
||||
self.stream_info_db = None
|
||||
self.stream_blob_db = None
|
||||
self.stream_desc_db = None
|
||||
self.db_conn = None
|
||||
|
||||
def setup(self):
|
||||
return threads.deferToThread(self._open_db)
|
||||
return self._open_db()
|
||||
|
||||
def stop(self):
|
||||
self.stream_info_db = None
|
||||
self.stream_blob_db = None
|
||||
self.stream_desc_db = None
|
||||
self.db_conn = None
|
||||
return defer.succeed(True)
|
||||
|
||||
def get_all_streams(self):
|
||||
return threads.deferToThread(self._get_all_streams)
|
||||
return self._get_all_streams()
|
||||
|
||||
def save_stream(self, stream_hash, pub_key, file_name, key, blobs):
|
||||
next_announce_time = time.time() + self.hash_reannounce_time
|
||||
d = threads.deferToThread(self._store_stream, stream_hash, pub_key, file_name, key,
|
||||
next_announce_time=next_announce_time)
|
||||
d = self._store_stream(stream_hash, pub_key, file_name, key,
|
||||
next_announce_time=next_announce_time)
|
||||
|
||||
def save_blobs():
|
||||
return self.add_blobs_to_stream(stream_hash, blobs)
|
||||
|
@ -48,169 +46,229 @@ class DBLiveStreamMetadataManager(DHTHashSupplier):
|
|||
return d
|
||||
|
||||
def get_stream_info(self, stream_hash):
|
||||
return threads.deferToThread(self._get_stream_info, stream_hash)
|
||||
return self._get_stream_info(stream_hash)
|
||||
|
||||
def check_if_stream_exists(self, stream_hash):
|
||||
return threads.deferToThread(self._check_if_stream_exists, stream_hash)
|
||||
return self._check_if_stream_exists(stream_hash)
|
||||
|
||||
def delete_stream(self, stream_hash):
|
||||
return threads.deferToThread(self._delete_stream, stream_hash)
|
||||
return self._delete_stream(stream_hash)
|
||||
|
||||
def add_blobs_to_stream(self, stream_hash, blobs):
|
||||
|
||||
def add_blobs():
|
||||
self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True)
|
||||
|
||||
return threads.deferToThread(add_blobs)
|
||||
return self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True)
|
||||
|
||||
def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False):
|
||||
logging.info("Getting blobs for a stream. Count is %s", str(count))
|
||||
|
||||
def get_positions_of_start_and_end():
|
||||
if start_blob is not None:
|
||||
start_num = self._get_blob_num_by_hash(stream_hash, start_blob)
|
||||
d1 = self._get_blob_num_by_hash(stream_hash, start_blob)
|
||||
else:
|
||||
start_num = None
|
||||
d1 = defer.succeed(None)
|
||||
if end_blob is not None:
|
||||
end_num = self._get_blob_num_by_hash(stream_hash, end_blob)
|
||||
d2 = self._get_blob_num_by_hash(stream_hash, end_blob)
|
||||
else:
|
||||
d2 = defer.succeed(None)
|
||||
|
||||
dl = defer.DeferredList([d1, d2])
|
||||
|
||||
def get_positions(results):
|
||||
start_num = None
|
||||
end_num = None
|
||||
return start_num, end_num
|
||||
if results[0][0] is True:
|
||||
start_num = results[0][1]
|
||||
if results[1][0] is True:
|
||||
end_num = results[1][1]
|
||||
return start_num, end_num
|
||||
|
||||
dl.addCallback(get_positions)
|
||||
return dl
|
||||
|
||||
def get_blob_infos(nums):
|
||||
start_num, end_num = nums
|
||||
return threads.deferToThread(self._get_further_blob_infos, stream_hash, start_num, end_num,
|
||||
count, reverse)
|
||||
return self._get_further_blob_infos(stream_hash, start_num, end_num,
|
||||
count, reverse)
|
||||
|
||||
d = threads.deferToThread(get_positions_of_start_and_end)
|
||||
d = get_positions_of_start_and_end()
|
||||
d.addCallback(get_blob_infos)
|
||||
return d
|
||||
|
||||
def get_stream_of_blob(self, blob_hash):
|
||||
return threads.deferToThread(self._get_stream_of_blobhash, blob_hash)
|
||||
return self._get_stream_of_blobhash(blob_hash)
|
||||
|
||||
def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
|
||||
return threads.deferToThread(self._save_sd_blob_hash_to_stream, stream_hash, sd_blob_hash)
|
||||
return self._save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash)
|
||||
|
||||
def get_sd_blob_hashes_for_stream(self, stream_hash):
|
||||
return threads.deferToThread(self._get_sd_blob_hashes_for_stream, stream_hash)
|
||||
return self._get_sd_blob_hashes_for_stream(stream_hash)
|
||||
|
||||
def hashes_to_announce(self):
|
||||
next_announce_time = time.time() + self.hash_reannounce_time
|
||||
return threads.deferToThread(self._get_streams_to_announce, next_announce_time)
|
||||
return self._get_streams_to_announce(next_announce_time)
|
||||
|
||||
######### database calls #########
|
||||
|
||||
def _open_db(self):
|
||||
self.stream_info_db = leveldb.LevelDB(os.path.join(self.db_dir, "stream_info.db"))
|
||||
self.stream_blob_db = leveldb.LevelDB(os.path.join(self.db_dir, "stream_blob.db"))
|
||||
self.stream_desc_db = leveldb.LevelDB(os.path.join(self.db_dir, "stream_desc.db"))
|
||||
# check_same_thread=False is solely to quiet a spurious error that appears to be due
|
||||
# to a bug in twisted, where the connection is closed by a different thread than the
|
||||
# one that opened it. The individual connections in the pool are not used in multiple
|
||||
# threads.
|
||||
self.db_conn = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "live_stream.db"),
|
||||
check_same_thread=False)
|
||||
|
||||
def create_tables(transaction):
|
||||
transaction.execute("create table if not exists live_streams (" +
|
||||
" stream_hash text primary key, " +
|
||||
" public_key text, " +
|
||||
" key text, " +
|
||||
" stream_name text, " +
|
||||
" next_announce_time real" +
|
||||
")")
|
||||
transaction.execute("create table if not exists live_stream_blobs (" +
|
||||
" blob_hash text, " +
|
||||
" stream_hash text, " +
|
||||
" position integer, " +
|
||||
" revision integer, " +
|
||||
" iv text, " +
|
||||
" length integer, " +
|
||||
" signature text, " +
|
||||
" foreign key(stream_hash) references live_streams(stream_hash)" +
|
||||
")")
|
||||
transaction.execute("create table if not exists live_stream_descriptors (" +
|
||||
" sd_blob_hash TEXT PRIMARY KEY, " +
|
||||
" stream_hash TEXT, " +
|
||||
" foreign key(stream_hash) references live_streams(stream_hash)" +
|
||||
")")
|
||||
|
||||
return self.db_conn.runInteraction(create_tables)
|
||||
|
||||
@rerun_if_locked
|
||||
def _delete_stream(self, stream_hash):
|
||||
desc_batch = leveldb.WriteBatch()
|
||||
for sd_blob_hash, s_h in self.stream_desc_db.RangeIter():
|
||||
if stream_hash == s_h:
|
||||
desc_batch.Delete(sd_blob_hash)
|
||||
self.stream_desc_db.Write(desc_batch, sync=True)
|
||||
|
||||
blob_batch = leveldb.WriteBatch()
|
||||
for blob_hash_stream_hash, blob_info in self.stream_blob_db.RangeIter():
|
||||
b_h, s_h = json.loads(blob_hash_stream_hash)
|
||||
if stream_hash == s_h:
|
||||
blob_batch.Delete(blob_hash_stream_hash)
|
||||
self.stream_blob_db.Write(blob_batch, sync=True)
|
||||
d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,))
|
||||
d.addCallback(lambda result: result[0][0] if len(result) else Failure(NoSuchStreamHashError(stream_hash)))
|
||||
|
||||
stream_batch = leveldb.WriteBatch()
|
||||
for s_h, stream_info in self.stream_info_db.RangeIter():
|
||||
if stream_hash == s_h:
|
||||
stream_batch.Delete(s_h)
|
||||
self.stream_info_db.Write(stream_batch, sync=True)
|
||||
def do_delete(transaction, s_h):
|
||||
transaction.execute("delete from live_streams where stream_hash = ?", (s_h,))
|
||||
transaction.execute("delete from live_stream_blobs where stream_hash = ?", (s_h,))
|
||||
transaction.execute("delete from live_stream_descriptors where stream_hash = ?", (s_h,))
|
||||
|
||||
d.addCallback(lambda s_h: self.db_conn.runInteraction(do_delete, s_h))
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _store_stream(self, stream_hash, public_key, name, key, next_announce_time=None):
|
||||
try:
|
||||
self.stream_info_db.Get(stream_hash)
|
||||
raise DuplicateStreamHashError("Stream hash %s already exists" % stream_hash)
|
||||
except KeyError:
|
||||
pass
|
||||
self.stream_info_db.Put(stream_hash, json.dumps((public_key, key, name, next_announce_time)), sync=True)
|
||||
d = self.db_conn.runQuery("insert into live_streams values (?, ?, ?, ?, ?)",
|
||||
(stream_hash, public_key, key, name, next_announce_time))
|
||||
|
||||
def check_duplicate(err):
|
||||
if err.check(sqlite3.IntegrityError):
|
||||
raise DuplicateStreamHashError(stream_hash)
|
||||
return err
|
||||
|
||||
d.addErrback(check_duplicate)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_all_streams(self):
|
||||
return [stream_hash for stream_hash, stream_info in self.stream_info_db.RangeIter()]
|
||||
d = self.db_conn.runQuery("select stream_hash from live_streams")
|
||||
d.addCallback(lambda results: [r[0] for r in results])
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_stream_info(self, stream_hash):
|
||||
return json.loads(self.stream_info_db.Get(stream_hash))[:3]
|
||||
d = self.db_conn.runQuery("select public_key, key, stream_name from live_streams where stream_hash = ?",
|
||||
(stream_hash,))
|
||||
d.addCallback(lambda result: result[0] if len(result) else Failure(NoSuchStreamHashError(stream_hash)))
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _check_if_stream_exists(self, stream_hash):
|
||||
try:
|
||||
self.stream_info_db.Get(stream_hash)
|
||||
return True
|
||||
except KeyError:
|
||||
return False
|
||||
d = self.db_conn.runQuery("select stream_hash from live_streams where stream_hash = ?", (stream_hash,))
|
||||
d.addCallback(lambda r: True if len(r) else False)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_streams_to_announce(self, next_announce_time):
|
||||
# TODO: See if the following would be better for handling announce times:
|
||||
# TODO: Have a separate db for them, and read the whole thing into memory
|
||||
# TODO: on startup, and then write changes to db when they happen
|
||||
stream_hashes = []
|
||||
batch = leveldb.WriteBatch()
|
||||
current_time = time.time()
|
||||
for stream_hash, stream_info in self.stream_info_db.RangeIter():
|
||||
public_key, key, name, announce_time = json.loads(stream_info)
|
||||
if announce_time < current_time:
|
||||
batch.Put(stream_hash, json.dumps((public_key, key, name, next_announce_time)))
|
||||
stream_hashes.append(stream_hash)
|
||||
self.stream_info_db.Write(batch, sync=True)
|
||||
return stream_hashes
|
||||
|
||||
def get_and_update(transaction):
|
||||
timestamp = time.time()
|
||||
r = transaction.execute("select stream_hash from live_streams where" +
|
||||
" (next_announce_time is null or next_announce_time < ?) " +
|
||||
" and stream_hash is not null", (timestamp, ))
|
||||
s_hs = [s_h for s_h, in r.fetchall()]
|
||||
transaction.execute("update live_streams set next_announce_time = ? where " +
|
||||
" (next_announce_time is null or next_announce_time < ?)",
|
||||
(next_announce_time, timestamp))
|
||||
return s_hs
|
||||
|
||||
return self.db_conn.runInteraction(get_and_update)
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_blob_num_by_hash(self, stream_hash, blob_hash):
|
||||
blob_hash_stream_hash = json.dumps((blob_hash, stream_hash))
|
||||
return json.loads(self.stream_blob_db.Get(blob_hash_stream_hash))[0]
|
||||
d = self.db_conn.runQuery("select position from live_stream_blobs where stream_hash = ? and blob_hash = ?",
|
||||
(stream_hash, blob_hash))
|
||||
d.addCallback(lambda r: r[0][0] if len(r) else None)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False):
|
||||
blob_infos = []
|
||||
for blob_hash_stream_hash, blob_info in self.stream_blob_db.RangeIter():
|
||||
b_h, s_h = json.loads(blob_hash_stream_hash)
|
||||
if stream_hash == s_h:
|
||||
position, revision, iv, length, signature = json.loads(blob_info)
|
||||
if (start_num is None) or (position > start_num):
|
||||
if (end_num is None) or (position < end_num):
|
||||
blob_infos.append((b_h, position, revision, iv, length, signature))
|
||||
blob_infos.sort(key=lambda i: i[1], reverse=reverse)
|
||||
params = []
|
||||
q_string = "select * from ("
|
||||
q_string += " select blob_hash, position, revision, iv, length, signature from live_stream_blobs "
|
||||
q_string += " where stream_hash = ? "
|
||||
params.append(stream_hash)
|
||||
if start_num is not None:
|
||||
q_string += " and position > ? "
|
||||
params.append(start_num)
|
||||
if end_num is not None:
|
||||
q_string += " and position < ? "
|
||||
params.append(end_num)
|
||||
q_string += " order by position "
|
||||
if reverse is True:
|
||||
q_string += " DESC "
|
||||
if count is not None:
|
||||
blob_infos = blob_infos[:count]
|
||||
return blob_infos
|
||||
q_string += " limit ? "
|
||||
params.append(count)
|
||||
q_string += ") order by position"
|
||||
# Order by position is done twice so that it always returns them from lowest position to
|
||||
# greatest, but the limit by clause can select the 'count' greatest or 'count' least
|
||||
return self.db_conn.runQuery(q_string, tuple(params))
|
||||
|
||||
@rerun_if_locked
|
||||
def _add_blobs_to_stream(self, stream_hash, blob_infos, ignore_duplicate_error=False):
|
||||
batch = leveldb.WriteBatch()
|
||||
for blob_info in blob_infos:
|
||||
blob_hash_stream_hash = json.dumps((blob_info.blob_hash, stream_hash))
|
||||
try:
|
||||
self.stream_blob_db.Get(blob_hash_stream_hash)
|
||||
if ignore_duplicate_error is False:
|
||||
raise KeyError() # TODO: change this to DuplicateStreamBlobError?
|
||||
continue
|
||||
except KeyError:
|
||||
pass
|
||||
batch.Put(blob_hash_stream_hash,
|
||||
json.dumps((blob_info.blob_num,
|
||||
blob_info.revision,
|
||||
blob_info.iv,
|
||||
blob_info.length,
|
||||
blob_info.signature)))
|
||||
self.stream_blob_db.Write(batch, sync=True)
|
||||
|
||||
def add_blobs(transaction):
|
||||
for blob_info in blob_infos:
|
||||
try:
|
||||
transaction.execute("insert into live_stream_blobs values (?, ?, ?, ?, ?, ?, ?)",
|
||||
(blob_info.blob_hash, stream_hash, blob_info.blob_num,
|
||||
blob_info.revision, blob_info.iv, blob_info.length,
|
||||
blob_info.signature))
|
||||
except sqlite3.IntegrityError:
|
||||
if ignore_duplicate_error is False:
|
||||
raise
|
||||
|
||||
return self.db_conn.runInteraction(add_blobs)
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_stream_of_blobhash(self, blob_hash):
|
||||
for blob_hash_stream_hash, blob_info in self.stream_blob_db.RangeIter():
|
||||
b_h, s_h = json.loads(blob_hash_stream_hash)
|
||||
if blob_hash == b_h:
|
||||
return s_h
|
||||
return None
|
||||
d = self.db_conn.runQuery("select stream_hash from live_stream_blobs where blob_hash = ?",
|
||||
(blob_hash,))
|
||||
d.addCallback(lambda r: r[0][0] if len(r) else None)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
def _save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
|
||||
self.stream_desc_db.Put(sd_blob_hash, stream_hash)
|
||||
return self.db_conn.runQuery("insert into live_stream_descriptors values (?, ?)",
|
||||
(sd_blob_hash, stream_hash))
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_sd_blob_hashes_for_stream(self, stream_hash):
|
||||
return [sd_blob_hash for sd_blob_hash, s_h in self.stream_desc_db.RangeIter() if stream_hash == s_h]
|
||||
d = self.db_conn.runQuery("select sd_blob_hash from live_stream_descriptors where stream_hash = ?",
|
||||
(stream_hash,))
|
||||
d.addCallback(lambda results: [r[0] for r in results])
|
||||
return d
|
||||
|
||||
|
||||
class TempLiveStreamMetadataManager(DHTHashSupplier):
|
||||
|
|
|
@ -59,6 +59,8 @@ class LBRYConsole():
|
|||
self.lbry_file_metadata_manager = None
|
||||
self.lbry_file_manager = None
|
||||
self.conf_dir = conf_dir
|
||||
self.created_db_dir = False
|
||||
self.current_db_revision = 1
|
||||
self.data_dir = data_dir
|
||||
self.plugin_manager = PluginManager()
|
||||
self.plugin_manager.setPluginPlaces([
|
||||
|
@ -72,10 +74,13 @@ class LBRYConsole():
|
|||
self.blob_request_payment_rate_manager = None
|
||||
self.lbryid = None
|
||||
self.sd_identifier = StreamDescriptorIdentifier()
|
||||
self.plugin_objects = []
|
||||
self.db_migration_revisions = None
|
||||
|
||||
def start(self):
|
||||
"""Initialize the session and restore everything to its saved state"""
|
||||
d = threads.deferToThread(self._create_directory)
|
||||
d.addCallback(lambda _: self._check_db_migration())
|
||||
d.addCallback(lambda _: self._get_settings())
|
||||
d.addCallback(lambda _: self._get_session())
|
||||
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier))
|
||||
|
@ -95,7 +100,10 @@ class LBRYConsole():
|
|||
|
||||
def shut_down(self):
|
||||
"""Stop the session, all currently running streams, and stop the server"""
|
||||
d = self.session.shut_down()
|
||||
if self.session is not None:
|
||||
d = self.session.shut_down()
|
||||
else:
|
||||
d = defer.succeed(True)
|
||||
d.addCallback(lambda _: self._shut_down())
|
||||
return d
|
||||
|
||||
|
@ -121,11 +129,38 @@ class LBRYConsole():
|
|||
def _create_directory(self):
|
||||
if not os.path.exists(self.conf_dir):
|
||||
os.makedirs(self.conf_dir)
|
||||
db_revision = open(os.path.join(self.conf_dir, "db_revision"), mode='w')
|
||||
db_revision.write(str(self.current_db_revision))
|
||||
db_revision.close()
|
||||
logging.debug("Created the configuration directory: %s", str(self.conf_dir))
|
||||
if not os.path.exists(self.data_dir):
|
||||
os.makedirs(self.data_dir)
|
||||
logging.debug("Created the data directory: %s", str(self.data_dir))
|
||||
|
||||
def _check_db_migration(self):
|
||||
old_revision = 0
|
||||
db_revision_file = os.path.join(self.conf_dir, "db_revision")
|
||||
if os.path.exists(db_revision_file):
|
||||
old_revision = int(open(db_revision_file).read().strip())
|
||||
if old_revision < self.current_db_revision:
|
||||
from lbrynet.db_migrator import dbmigrator
|
||||
print "Upgrading your databases..."
|
||||
d = threads.deferToThread(dbmigrator.migrate_db, self.conf_dir, old_revision, self.current_db_revision)
|
||||
|
||||
def print_success(old_dirs):
|
||||
success_string = "Finished upgrading the databases. It is now safe to delete the"
|
||||
success_string += " following directories, if you feel like it. It won't make any"
|
||||
success_string += " difference.\nAnyway here they are: "
|
||||
for i, old_dir in enumerate(old_dirs):
|
||||
success_string += old_dir
|
||||
if i + 1 < len(old_dir):
|
||||
success_string += ", "
|
||||
print success_string
|
||||
|
||||
d.addCallback(print_success)
|
||||
return d
|
||||
return defer.succeed(True)
|
||||
|
||||
def _get_settings(self):
|
||||
d = self.settings.start()
|
||||
d.addCallback(lambda _: self.settings.get_lbryid())
|
||||
|
@ -312,12 +347,19 @@ class LBRYConsole():
|
|||
def setup_plugins():
|
||||
ds = []
|
||||
for plugin in self.plugin_manager.getAllPlugins():
|
||||
self.plugin_objects.append(plugin.plugin_object)
|
||||
ds.append(plugin.plugin_object.setup(self))
|
||||
return defer.DeferredList(ds)
|
||||
|
||||
d.addCallback(lambda _: setup_plugins())
|
||||
return d
|
||||
|
||||
def _stop_plugins(self):
|
||||
ds = []
|
||||
for plugin_object in self.plugin_objects:
|
||||
ds.append(defer.maybeDeferred(plugin_object.stop))
|
||||
return defer.DeferredList(ds)
|
||||
|
||||
def _setup_server(self):
|
||||
|
||||
def restore_running_status(running):
|
||||
|
@ -359,6 +401,7 @@ class LBRYConsole():
|
|||
d.addCallback(lambda _: self.lbry_file_manager.stop())
|
||||
ds.append(d)
|
||||
ds.append(self.stop_server())
|
||||
ds.append(self._stop_plugins())
|
||||
dl = defer.DeferredList(ds)
|
||||
return dl
|
||||
|
||||
|
|
|
@ -7,4 +7,7 @@ class LBRYPlugin(IPlugin):
|
|||
IPlugin.__init__(self)
|
||||
|
||||
def setup(self, lbry_console):
|
||||
raise NotImplementedError
|
||||
|
||||
def stop(self):
|
||||
raise NotImplementedError
|
|
@ -1,6 +1,6 @@
|
|||
import binascii
|
||||
import json
|
||||
import leveldb
|
||||
import unqlite
|
||||
import logging
|
||||
import os
|
||||
from twisted.internet import threads, defer
|
||||
|
@ -12,7 +12,7 @@ class LBRYSettings(object):
|
|||
self.db = None
|
||||
|
||||
def start(self):
|
||||
return threads.deferToThread(self._open_db)
|
||||
return self._open_db()
|
||||
|
||||
def stop(self):
|
||||
self.db = None
|
||||
|
@ -20,21 +20,22 @@ class LBRYSettings(object):
|
|||
|
||||
def _open_db(self):
|
||||
logging.debug("Opening %s as the settings database", str(os.path.join(self.db_dir, "settings.db")))
|
||||
self.db = leveldb.LevelDB(os.path.join(self.db_dir, "settings.db"))
|
||||
self.db = unqlite.UnQLite(os.path.join(self.db_dir, "settings.db"))
|
||||
return defer.succeed(True)
|
||||
|
||||
def save_lbryid(self, lbryid):
|
||||
|
||||
def save_lbryid():
|
||||
self.db.Put("lbryid", binascii.hexlify(lbryid), sync=True)
|
||||
self.db['lbryid'] = binascii.hexlify(lbryid)
|
||||
|
||||
return threads.deferToThread(save_lbryid)
|
||||
|
||||
def get_lbryid(self):
|
||||
|
||||
def get_lbryid():
|
||||
try:
|
||||
return binascii.unhexlify(self.db.Get("lbryid"))
|
||||
except KeyError:
|
||||
if 'lbryid' in self.db:
|
||||
return binascii.unhexlify(self.db['lbryid'])
|
||||
else:
|
||||
return None
|
||||
|
||||
return threads.deferToThread(get_lbryid)
|
||||
|
@ -42,9 +43,9 @@ class LBRYSettings(object):
|
|||
def get_server_running_status(self):
|
||||
|
||||
def get_status():
|
||||
try:
|
||||
return json.loads(self.db.Get("server_running"))
|
||||
except KeyError:
|
||||
if 'server_running' in self.db:
|
||||
return json.loads(self.db['server_running'])
|
||||
else:
|
||||
return True
|
||||
|
||||
return threads.deferToThread(get_status)
|
||||
|
@ -52,7 +53,7 @@ class LBRYSettings(object):
|
|||
def save_server_running_status(self, running):
|
||||
|
||||
def save_status():
|
||||
self.db.Put("server_running", json.dumps(running), sync=True)
|
||||
self.db['server_running'] = json.dumps(running)
|
||||
|
||||
return threads.deferToThread(save_status)
|
||||
|
||||
|
@ -77,9 +78,9 @@ class LBRYSettings(object):
|
|||
def _get_payment_rate(self, rate_type):
|
||||
|
||||
def get_rate():
|
||||
try:
|
||||
return json.loads(self.db.Get(rate_type))
|
||||
except KeyError:
|
||||
if rate_type in self.db:
|
||||
return json.loads(self.db['rate_type'])
|
||||
else:
|
||||
return None
|
||||
|
||||
return threads.deferToThread(get_rate)
|
||||
|
@ -88,18 +89,18 @@ class LBRYSettings(object):
|
|||
|
||||
def save_rate():
|
||||
if rate is not None:
|
||||
self.db.Put(rate_type, json.dumps(rate), sync=True)
|
||||
else:
|
||||
self.db.Delete(rate_type, sync=True)
|
||||
self.db[rate_type] = json.dumps(rate)
|
||||
elif rate_type in self.db:
|
||||
del self.db[rate_type]
|
||||
|
||||
return threads.deferToThread(save_rate)
|
||||
|
||||
def get_query_handler_status(self, query_identifier):
|
||||
|
||||
def get_status():
|
||||
try:
|
||||
return json.loads(self.db.Get(json.dumps(('q_h', query_identifier))))
|
||||
except KeyError:
|
||||
if json.dumps(('q_h', query_identifier)) in self.db:
|
||||
return json.loads(self.db[(json.dumps(('q_h', query_identifier)))])
|
||||
else:
|
||||
return True
|
||||
|
||||
return threads.deferToThread(get_status)
|
||||
|
@ -112,5 +113,5 @@ class LBRYSettings(object):
|
|||
|
||||
def _set_query_handler_status(self, query_identifier, status):
|
||||
def set_status():
|
||||
self.db.Put(json.dumps(('q_h', query_identifier)), json.dumps(status), sync=True)
|
||||
self.db[json.dumps(('q_h', query_identifier))] = json.dumps(status)
|
||||
return threads.deferToThread(set_status)
|
|
@ -1,25 +1,43 @@
|
|||
from twisted.internet import threads, defer
|
||||
from twisted.internet import defer
|
||||
from ValuableBlobInfo import ValuableBlobInfo
|
||||
from db_keys import BLOB_INFO_TYPE
|
||||
import json
|
||||
import leveldb
|
||||
import os
|
||||
import sqlite3
|
||||
from twisted.enterprise import adbapi
|
||||
from lbrynet.core.sqlite_helpers import rerun_if_locked
|
||||
|
||||
|
||||
class BlindInfoManager(object):
|
||||
|
||||
def __init__(self, db, peer_manager):
|
||||
self.db = db
|
||||
def __init__(self, db_dir, peer_manager):
|
||||
self.db_dir = db_dir
|
||||
self.db_conn = None
|
||||
self.peer_manager = peer_manager
|
||||
|
||||
def setup(self):
|
||||
return defer.succeed(True)
|
||||
# check_same_thread=False is solely to quiet a spurious error that appears to be due
|
||||
# to a bug in twisted, where the connection is closed by a different thread than the
|
||||
# one that opened it. The individual connections in the pool are not used in multiple
|
||||
# threads.
|
||||
self.db_conn = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blind_info.db"),
|
||||
check_same_thread=False)
|
||||
|
||||
def set_up_table(transaction):
|
||||
transaction.execute("create table if not exists valuable_blobs (" +
|
||||
" blob_hash text primary key, " +
|
||||
" blob_length integer, " +
|
||||
" reference text, " +
|
||||
" peer_host text, " +
|
||||
" peer_port integer, " +
|
||||
" peer_score text" +
|
||||
")")
|
||||
return self.db_conn.runInteraction(set_up_table)
|
||||
|
||||
def stop(self):
|
||||
self.db = None
|
||||
return defer.succeed(True)
|
||||
|
||||
def get_all_blob_infos(self):
|
||||
d = threads.deferToThread(self._get_all_blob_infos)
|
||||
d = self._get_all_blob_infos()
|
||||
|
||||
def make_blob_infos(blob_data):
|
||||
blob_infos = []
|
||||
|
@ -42,21 +60,19 @@ class BlindInfoManager(object):
|
|||
peer_port = blob_info.peer.port
|
||||
peer_score = blob_info.peer_score
|
||||
blobs.append((blob_hash, length, reference, peer_host, peer_port, peer_score))
|
||||
return threads.deferToThread(self._save_blob_infos, blobs)
|
||||
return self._save_blob_infos(blobs)
|
||||
|
||||
@rerun_if_locked
|
||||
def _get_all_blob_infos(self):
|
||||
blob_infos = []
|
||||
for key, blob_info in self.db.RangeIter():
|
||||
key_type, blob_hash = json.loads(key)
|
||||
if key_type == BLOB_INFO_TYPE:
|
||||
blob_infos.append([blob_hash] + json.loads(blob_info))
|
||||
return blob_infos
|
||||
return self.db_conn.runQuery("select * from valuable_blobs")
|
||||
|
||||
@rerun_if_locked
|
||||
def _save_blob_infos(self, blobs):
|
||||
batch = leveldb.WriteBatch()
|
||||
for blob in blobs:
|
||||
try:
|
||||
self.db.Get(json.dumps((BLOB_INFO_TYPE, blob[0])))
|
||||
except KeyError:
|
||||
batch.Put(json.dumps((BLOB_INFO_TYPE, blob[0])), json.dumps(blob[1:]))
|
||||
self.db.Write(batch, sync=True)
|
||||
def save_infos(transaction):
|
||||
for blob in blobs:
|
||||
try:
|
||||
transaction.execute("insert into valuable_blobs values (?, ?, ?, ?, ?, ?)",
|
||||
blob)
|
||||
except sqlite3.IntegrityError:
|
||||
pass
|
||||
return self.db_conn.runInteraction(save_infos)
|
|
@ -1,67 +1,79 @@
|
|||
from db_keys import SETTING_TYPE, PEER_TYPE
|
||||
from twisted.internet import threads
|
||||
from twisted.internet import threads, defer
|
||||
import json
|
||||
|
||||
import unqlite
|
||||
import os
|
||||
from twisted.enterprise import adbapi
|
||||
from lbrynet.core.sqlite_helpers import rerun_if_locked
|
||||
|
||||
class BlindRepeaterSettings(object):
|
||||
|
||||
def __init__(self, db):
|
||||
self.db = db
|
||||
def __init__(self, db_dir):
|
||||
self.db_dir = db_dir
|
||||
self.unq_db = None
|
||||
self.sql_db = None
|
||||
|
||||
def setup(self):
|
||||
self.unq_db = unqlite.UnQLite(os.path.join(self.db_dir, "blind_settings.db"))
|
||||
# check_same_thread=False is solely to quiet a spurious error that appears to be due
|
||||
# to a bug in twisted, where the connection is closed by a different thread than the
|
||||
# one that opened it. The individual connections in the pool are not used in multiple
|
||||
# threads.
|
||||
self.sql_db = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blind_peers.db"),
|
||||
check_same_thread=False)
|
||||
|
||||
return self.sql_db.runQuery("create table if not exists approved_peers (" +
|
||||
" ip_address text, " +
|
||||
" port integer" +
|
||||
")")
|
||||
|
||||
def stop(self):
|
||||
self.unq_db = None
|
||||
self.sql_db = None
|
||||
return defer.succeed(True)
|
||||
|
||||
def save_repeater_status(self, running):
|
||||
def save_status():
|
||||
self.db.Put(json.dumps((SETTING_TYPE, "running")), json.dumps(running), sync=True)
|
||||
self.unq_db["running"] = json.dumps(running)
|
||||
|
||||
return threads.deferToThread(save_status)
|
||||
|
||||
def get_repeater_saved_status(self):
|
||||
def get_status():
|
||||
try:
|
||||
return json.loads(self.db.Get(json.dumps((SETTING_TYPE, "running"))))
|
||||
except KeyError:
|
||||
if "running" in self.unq_db:
|
||||
return json.loads(self.unq_db['running'])
|
||||
else:
|
||||
return False
|
||||
|
||||
return threads.deferToThread(get_status)
|
||||
|
||||
def save_max_space(self, max_space):
|
||||
def save_space():
|
||||
self.db.Put(json.dumps((SETTING_TYPE, "max_space")), str(max_space), sync=True)
|
||||
self.unq_db['max_space'] = json.dumps(max_space)
|
||||
|
||||
return threads.deferToThread(save_space)
|
||||
|
||||
def get_saved_max_space(self):
|
||||
def get_space():
|
||||
try:
|
||||
return int(self.db.Get(json.dumps((SETTING_TYPE, "max_space"))))
|
||||
except KeyError:
|
||||
if 'max_space' in self.unq_db:
|
||||
return json.loads(self.unq_db['max_space'])
|
||||
else:
|
||||
return 0
|
||||
|
||||
return threads.deferToThread(get_space)
|
||||
|
||||
@rerun_if_locked
|
||||
def save_approved_peer(self, host, port):
|
||||
def add_peer():
|
||||
peer_string = json.dumps((PEER_TYPE, (host, port)))
|
||||
self.db.Put(peer_string, "", sync=True)
|
||||
|
||||
return threads.deferToThread(add_peer)
|
||||
return self.sql_db.runQuery("insert into approved_peers values (?, ?)",
|
||||
(host, port))
|
||||
|
||||
@rerun_if_locked
|
||||
def remove_approved_peer(self, host, port):
|
||||
def remove_peer():
|
||||
peer_string = json.dumps((PEER_TYPE, (host, port)))
|
||||
self.db.Delete(peer_string, sync=True)
|
||||
|
||||
return threads.deferToThread(remove_peer)
|
||||
return self.sql_db.runQuery("delete from approved_peers where ip_address = ? and port = ?",
|
||||
(host, port))
|
||||
|
||||
@rerun_if_locked
|
||||
def get_approved_peers(self):
|
||||
def get_peers():
|
||||
peers = []
|
||||
for k, v in self.db.RangeIter():
|
||||
key_type, peer_info = json.loads(k)
|
||||
if key_type == PEER_TYPE:
|
||||
peers.append(peer_info)
|
||||
return peers
|
||||
|
||||
return threads.deferToThread(get_peers)
|
||||
return self.sql_db.runQuery("select * from approved_peers")
|
||||
|
||||
def get_data_payment_rate(self):
|
||||
return threads.deferToThread(self._get_rate, "data_payment_rate")
|
||||
|
@ -82,13 +94,13 @@ class BlindRepeaterSettings(object):
|
|||
return threads.deferToThread(self._save_rate, "valuable_hash_rate", rate)
|
||||
|
||||
def _get_rate(self, rate_type):
|
||||
try:
|
||||
return json.loads(self.db.Get(json.dumps((SETTING_TYPE, rate_type))))
|
||||
except KeyError:
|
||||
if rate_type in self.unq_db:
|
||||
return json.loads(self.unq_db[rate_type])
|
||||
else:
|
||||
return None
|
||||
|
||||
def _save_rate(self, rate_type, rate):
|
||||
if rate is not None:
|
||||
self.db.Put(json.dumps((SETTING_TYPE, rate_type)), json.dumps(rate), sync=True)
|
||||
else:
|
||||
self.db.Delete(json.dumps((SETTING_TYPE, rate_type)), sync=True)
|
||||
self.unq_db[rate_type] = json.dumps(rate)
|
||||
elif rate_type in self.unq_db:
|
||||
del self.unq_db[rate_type]
|
|
@ -1,7 +1,5 @@
|
|||
import leveldb
|
||||
import os
|
||||
from lbrynet.lbrynet_console import LBRYPlugin
|
||||
from twisted.internet import defer, threads
|
||||
from twisted.internet import defer
|
||||
from lbrynet.conf import MIN_VALUABLE_BLOB_HASH_PAYMENT_RATE, MIN_VALUABLE_BLOB_INFO_PAYMENT_RATE
|
||||
from BlindRepeater import BlindRepeater
|
||||
from BlindInfoManager import BlindInfoManager
|
||||
|
@ -26,14 +24,12 @@ class BlindRepeaterPlugin(LBRYPlugin.LBRYPlugin):
|
|||
self.control_handlers = None
|
||||
self.payment_rate_manager = None
|
||||
self.settings = None
|
||||
self.db = None
|
||||
|
||||
def setup(self, lbry_console):
|
||||
lbry_session = lbry_console.session
|
||||
d = threads.deferToThread(self._setup_db, lbry_session.db_dir)
|
||||
d.addCallback(lambda _: self._setup_settings())
|
||||
d = self._setup_settings(lbry_session.db_dir)
|
||||
d.addCallback(lambda _: self._get_payment_rate_manager(lbry_session.base_payment_rate_manager))
|
||||
d.addCallback(lambda _: self._setup_blind_info_manager(lbry_session.peer_manager))
|
||||
d.addCallback(lambda _: self._setup_blind_info_manager(lbry_session.peer_manager, lbry_session.db_dir))
|
||||
d.addCallback(lambda _: self._setup_blind_repeater(lbry_session))
|
||||
d.addCallback(lambda _: self._setup_valuable_blob_query_handler(lbry_session))
|
||||
d.addCallback(lambda _: self._create_control_handlers(lbry_session))
|
||||
|
@ -41,11 +37,12 @@ class BlindRepeaterPlugin(LBRYPlugin.LBRYPlugin):
|
|||
d.addCallback(lambda _: self._add_to_lbry_console(lbry_console))
|
||||
return d
|
||||
|
||||
def _setup_db(self, db_dir):
|
||||
self.db = leveldb.LevelDB(os.path.join(db_dir, "valuable_blobs.db"))
|
||||
def stop(self):
|
||||
return self.settings.stop()
|
||||
|
||||
def _setup_settings(self):
|
||||
self.settings = BlindRepeaterSettings(self.db)
|
||||
def _setup_settings(self, db_dir):
|
||||
self.settings = BlindRepeaterSettings(db_dir)
|
||||
return self.settings.setup()
|
||||
|
||||
def _get_payment_rate_manager(self, default_payment_rate_manager):
|
||||
d1 = self.settings.get_data_payment_rate()
|
||||
|
@ -67,8 +64,8 @@ class BlindRepeaterPlugin(LBRYPlugin.LBRYPlugin):
|
|||
dl.addCallback(get_payment_rate_manager)
|
||||
return dl
|
||||
|
||||
def _setup_blind_info_manager(self, peer_manager):
|
||||
self.blind_info_manager = BlindInfoManager(self.db, peer_manager)
|
||||
def _setup_blind_info_manager(self, peer_manager, db_dir):
|
||||
self.blind_info_manager = BlindInfoManager(db_dir, peer_manager)
|
||||
return self.blind_info_manager.setup()
|
||||
|
||||
def _setup_valuable_blob_query_handler(self, lbry_session):
|
||||
|
|
|
@ -1,3 +0,0 @@
|
|||
BLOB_INFO_TYPE = 'b'
|
||||
SETTING_TYPE = 's'
|
||||
PEER_TYPE = 'p'
|
|
@ -34,6 +34,7 @@ class LBRYDownloader(object):
|
|||
self.dht_node_port = 4444
|
||||
self.run_server = True
|
||||
self.first_run = False
|
||||
self.current_db_revision = 1
|
||||
if os.name == "nt":
|
||||
from lbrynet.winhelpers.knownpaths import get_path, FOLDERID, UserHandle
|
||||
self.download_directory = get_path(FOLDERID.Downloads, UserHandle.current)
|
||||
|
@ -54,6 +55,7 @@ class LBRYDownloader(object):
|
|||
def start(self):
|
||||
d = self._load_configuration_file()
|
||||
d.addCallback(lambda _: threads.deferToThread(self._create_directory))
|
||||
d.addCallback(lambda _: self._check_db_migration())
|
||||
d.addCallback(lambda _: self._get_session())
|
||||
d.addCallback(lambda _: self._setup_stream_info_manager())
|
||||
d.addCallback(lambda _: self._setup_stream_identifier())
|
||||
|
@ -72,6 +74,37 @@ class LBRYDownloader(object):
|
|||
def get_new_address(self):
|
||||
return self.session.wallet.get_new_address()
|
||||
|
||||
def _check_db_migration(self):
|
||||
old_revision = 0
|
||||
db_revision_file = os.path.join(self.conf_dir, "db_revision")
|
||||
if os.path.exists(db_revision_file):
|
||||
old_revision = int(open(db_revision_file).read().strip())
|
||||
if old_revision < self.current_db_revision:
|
||||
if os.name == "nt":
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
def run_migrator():
|
||||
migrator_exe = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])),
|
||||
"dmigrator", "migrator.exe")
|
||||
print "trying to find the migrator at", migrator_exe
|
||||
si = subprocess.STARTUPINFO
|
||||
si.dwFlags = subprocess.STARTF_USESHOWWINDOW
|
||||
si.wShowWindow = subprocess.SW_HIDE
|
||||
print "trying to run the migrator"
|
||||
migrator_proc = subprocess.Popen([migrator_exe, self.conf_dir, old_revision,
|
||||
self.current_db_revision], startupinfo=si)
|
||||
print "started the migrator"
|
||||
migrator_proc.wait()
|
||||
print "migrator has returned"
|
||||
|
||||
return threads.deferToThread(run_migrator)
|
||||
else:
|
||||
from lbrynet.db_migrator import dbmigrator
|
||||
return threads.deferToThread(dbmigrator.migrate_db, self.conf_dir, old_revision,
|
||||
self.current_db_revision)
|
||||
return defer.succeed(True)
|
||||
|
||||
def _load_configuration_file(self):
|
||||
|
||||
def get_configuration():
|
||||
|
@ -194,6 +227,9 @@ class LBRYDownloader(object):
|
|||
def _create_directory(self):
|
||||
if not os.path.exists(self.conf_dir):
|
||||
os.makedirs(self.conf_dir)
|
||||
db_revision = open(os.path.join(self.conf_dir, "db_revision"), mode='w')
|
||||
db_revision.write(str(self.current_db_revision))
|
||||
db_revision.close()
|
||||
logging.debug("Created the configuration directory: %s", str(self.conf_dir))
|
||||
if not os.path.exists(self.data_dir):
|
||||
os.makedirs(self.data_dir)
|
||||
|
|
2
setup.py
2
setup.py
|
@ -8,7 +8,7 @@ from setuptools import setup, find_packages
|
|||
setup(name='lbrynet',
|
||||
version='0.0.4',
|
||||
packages=find_packages(),
|
||||
install_requires=['pycrypto', 'twisted', 'miniupnpc', 'yapsy', 'seccure', 'python-bitcoinrpc', 'leveldb', 'txJSON-RPC', 'requests'],
|
||||
install_requires=['pycrypto', 'twisted', 'miniupnpc', 'yapsy', 'seccure', 'python-bitcoinrpc', 'txJSON-RPC', 'requests', 'unqlite', 'leveldb'],
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'lbrynet-console = lbrynet.lbrynet_console.LBRYConsole:launch_lbry_console',
|
||||
|
|
Loading…
Add table
Reference in a new issue