forked from LBRYCommunity/lbry-sdk
Merge remote-tracking branch 'origin/1193'
This commit is contained in:
commit
e3265714ee
8 changed files with 197 additions and 76 deletions
|
@ -30,6 +30,8 @@ at anytime.
|
|||
* changed txrequests for treq
|
||||
* changed cryptography version to 2.2.2
|
||||
* removed pycrypto dependency, replacing all calls to cryptography
|
||||
* full verification of streams only during migration instead of every startup
|
||||
* database batching functions for starting up the file manager
|
||||
* several internal dht functions to use inlineCallbacks
|
||||
* `DHTHashAnnouncer` and `Node` manage functions to use `LoopingCall`s instead of scheduling with `callLater`.
|
||||
* `store` kademlia rpc method to block on the call finishing and to return storing peer information
|
||||
|
|
|
@ -204,7 +204,7 @@ class Daemon(AuthJSONRPCServer):
|
|||
self.connected_to_internet = True
|
||||
self.connection_status_code = None
|
||||
self.platform = None
|
||||
self.current_db_revision = 8
|
||||
self.current_db_revision = 9
|
||||
self.db_revision_file = conf.settings.get_db_revision_filename()
|
||||
self.session = None
|
||||
self._session_id = conf.settings.get_session_id()
|
||||
|
|
|
@ -18,6 +18,8 @@ def migrate_db(db_dir, start, end):
|
|||
from lbrynet.database.migrator.migrate6to7 import do_migration
|
||||
elif current == 7:
|
||||
from lbrynet.database.migrator.migrate7to8 import do_migration
|
||||
elif current == 8:
|
||||
from lbrynet.database.migrator.migrate8to9 import do_migration
|
||||
else:
|
||||
raise Exception("DB migration of version {} to {} is not available".format(current,
|
||||
current+1))
|
||||
|
|
54
lbrynet/database/migrator/migrate8to9.py
Normal file
54
lbrynet/database/migrator/migrate8to9.py
Normal file
|
@ -0,0 +1,54 @@
|
|||
import sqlite3
|
||||
import logging
|
||||
import os
|
||||
|
||||
from lbrynet.core.Error import InvalidStreamDescriptorError
|
||||
from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, format_sd_info, format_blobs, validate_descriptor
|
||||
from lbrynet.cryptstream.CryptBlob import CryptBlobInfo
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def do_migration(db_dir):
|
||||
db_path = os.path.join(db_dir, "lbrynet.sqlite")
|
||||
blob_dir = os.path.join(db_dir, "blobfiles")
|
||||
connection = sqlite3.connect(db_path)
|
||||
cursor = connection.cursor()
|
||||
|
||||
query = "select stream_name, stream_key, suggested_filename, sd_hash, stream_hash from stream"
|
||||
streams = cursor.execute(query).fetchall()
|
||||
|
||||
blobs = cursor.execute("select s.stream_hash, s.position, s.iv, b.blob_hash, b.blob_length from stream_blob s "
|
||||
"left outer join blob b ON b.blob_hash=s.blob_hash order by s.position").fetchall()
|
||||
blobs_by_stream = {}
|
||||
for stream_hash, position, iv, blob_hash, blob_length in blobs:
|
||||
blobs_by_stream.setdefault(stream_hash, []).append(CryptBlobInfo(blob_hash, position, blob_length or 0, iv))
|
||||
|
||||
for stream_name, stream_key, suggested_filename, sd_hash, stream_hash in streams:
|
||||
sd_info = format_sd_info(
|
||||
EncryptedFileStreamType, stream_name, stream_key,
|
||||
suggested_filename, stream_hash, format_blobs(blobs_by_stream[stream_hash])
|
||||
)
|
||||
try:
|
||||
validate_descriptor(sd_info)
|
||||
except InvalidStreamDescriptorError as err:
|
||||
log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
|
||||
sd_hash, err.message)
|
||||
blob_hashes = [blob.blob_hash for blob in blobs_by_stream[stream_hash]]
|
||||
delete_stream(cursor, stream_hash, sd_hash, blob_hashes, blob_dir)
|
||||
|
||||
connection.commit()
|
||||
connection.close()
|
||||
|
||||
|
||||
def delete_stream(transaction, stream_hash, sd_hash, blob_hashes, blob_dir):
|
||||
transaction.execute("delete from content_claim where stream_hash=? ", (stream_hash,))
|
||||
transaction.execute("delete from file where stream_hash=? ", (stream_hash, ))
|
||||
transaction.execute("delete from stream_blob where stream_hash=?", (stream_hash, ))
|
||||
transaction.execute("delete from stream where stream_hash=? ", (stream_hash, ))
|
||||
transaction.execute("delete from blob where blob_hash=?", (sd_hash, ))
|
||||
for blob_hash in blob_hashes:
|
||||
transaction.execute("delete from blob where blob_hash=?", (blob_hash, ))
|
||||
file_path = os.path.join(blob_dir, blob_hash)
|
||||
if os.path.isfile(file_path):
|
||||
os.unlink(file_path)
|
|
@ -552,7 +552,7 @@ class SQLiteStorage(object):
|
|||
)
|
||||
return self.db.runInteraction(_save_support)
|
||||
|
||||
def get_supports(self, claim_id):
|
||||
def get_supports(self, *claim_ids):
|
||||
def _format_support(outpoint, supported_id, amount, address):
|
||||
return {
|
||||
"txid": outpoint.split(":")[0],
|
||||
|
@ -563,10 +563,15 @@ class SQLiteStorage(object):
|
|||
}
|
||||
|
||||
def _get_supports(transaction):
|
||||
if len(claim_ids) == 1:
|
||||
bind = "=?"
|
||||
else:
|
||||
bind = "in ({})".format(','.join('?' for _ in range(len(claim_ids))))
|
||||
return [
|
||||
_format_support(*support_info)
|
||||
for support_info in transaction.execute(
|
||||
"select * from support where claim_id=?", (claim_id, )
|
||||
"select * from support where claim_id {}".format(bind),
|
||||
tuple(claim_ids)
|
||||
).fetchall()
|
||||
]
|
||||
|
||||
|
@ -683,51 +688,82 @@ class SQLiteStorage(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def get_content_claim(self, stream_hash, include_supports=True):
|
||||
def _get_content_claim(transaction):
|
||||
claim_id = transaction.execute(
|
||||
"select claim.claim_outpoint from content_claim "
|
||||
"inner join claim on claim.claim_outpoint=content_claim.claim_outpoint and content_claim.stream_hash=? "
|
||||
"order by claim.rowid desc", (stream_hash, )
|
||||
def _get_claim_from_stream_hash(transaction):
|
||||
claim_info = transaction.execute(
|
||||
"select c.*, "
|
||||
"case when c.channel_claim_id is not null then "
|
||||
"(select claim_name from claim where claim_id==c.channel_claim_id) "
|
||||
"else null end as channel_name from content_claim "
|
||||
"inner join claim c on c.claim_outpoint=content_claim.claim_outpoint "
|
||||
"and content_claim.stream_hash=? order by c.rowid desc", (stream_hash,)
|
||||
).fetchone()
|
||||
if not claim_id:
|
||||
if not claim_info:
|
||||
return None
|
||||
return claim_id[0]
|
||||
channel_name = claim_info[-1]
|
||||
result = _format_claim_response(*claim_info[:-1])
|
||||
if channel_name:
|
||||
result['channel_name'] = channel_name
|
||||
return result
|
||||
|
||||
content_claim_outpoint = yield self.db.runInteraction(_get_content_claim)
|
||||
result = None
|
||||
if content_claim_outpoint:
|
||||
result = yield self.get_claim(content_claim_outpoint, include_supports)
|
||||
result = yield self.db.runInteraction(_get_claim_from_stream_hash)
|
||||
if result and include_supports:
|
||||
supports = yield self.get_supports(result['claim_id'])
|
||||
result['supports'] = supports
|
||||
result['effective_amount'] = float(
|
||||
sum([support['amount'] for support in supports]) + result['amount']
|
||||
)
|
||||
defer.returnValue(result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_claim(self, claim_outpoint, include_supports=True):
|
||||
def _claim_response(outpoint, claim_id, name, amount, height, serialized, channel_id, address, claim_sequence):
|
||||
r = {
|
||||
"name": name,
|
||||
"claim_id": claim_id,
|
||||
"address": address,
|
||||
"claim_sequence": claim_sequence,
|
||||
"value": ClaimDict.deserialize(serialized.decode('hex')).claim_dict,
|
||||
"height": height,
|
||||
"amount": float(Decimal(amount) / Decimal(COIN)),
|
||||
"nout": int(outpoint.split(":")[1]),
|
||||
"txid": outpoint.split(":")[0],
|
||||
"channel_claim_id": channel_id,
|
||||
"channel_name": None
|
||||
}
|
||||
return r
|
||||
def get_claims_from_stream_hashes(self, stream_hashes, include_supports=True):
|
||||
def _batch_get_claim(transaction):
|
||||
results = {}
|
||||
bind = "({})".format(','.join('?' for _ in range(len(stream_hashes))))
|
||||
claim_infos = transaction.execute(
|
||||
"select content_claim.stream_hash, c.*, "
|
||||
"case when c.channel_claim_id is not null then "
|
||||
"(select claim_name from claim where claim_id==c.channel_claim_id) "
|
||||
"else null end as channel_name from content_claim "
|
||||
"inner join claim c on c.claim_outpoint=content_claim.claim_outpoint "
|
||||
"and content_claim.stream_hash in {} order by c.rowid desc".format(bind),
|
||||
tuple(stream_hashes)
|
||||
).fetchall()
|
||||
for claim_info in claim_infos:
|
||||
channel_name = claim_info[-1]
|
||||
stream_hash = claim_info[0]
|
||||
result = _format_claim_response(*claim_info[1:-1])
|
||||
if channel_name:
|
||||
result['channel_name'] = channel_name
|
||||
results[stream_hash] = result
|
||||
return results
|
||||
|
||||
claims = yield self.db.runInteraction(_batch_get_claim)
|
||||
if include_supports:
|
||||
all_supports = {}
|
||||
for support in (yield self.get_supports(*[claim['claim_id'] for claim in claims.values()])):
|
||||
all_supports.setdefault(support['claim_id'], []).append(support)
|
||||
for stream_hash in claims.keys():
|
||||
claim = claims[stream_hash]
|
||||
supports = all_supports.get(claim['claim_id'], [])
|
||||
claim['supports'] = supports
|
||||
claim['effective_amount'] = float(
|
||||
sum([support['amount'] for support in supports]) + claim['amount']
|
||||
)
|
||||
claims[stream_hash] = claim
|
||||
defer.returnValue(claims)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_claim(self, claim_outpoint, include_supports=True):
|
||||
def _get_claim(transaction):
|
||||
claim_info = transaction.execute(
|
||||
"select * from claim where claim_outpoint=?", (claim_outpoint, )
|
||||
).fetchone()
|
||||
result = _claim_response(*claim_info)
|
||||
if result['channel_claim_id']:
|
||||
channel_name_result = transaction.execute(
|
||||
"select claim_name from claim where claim_id=?", (result['channel_claim_id'], )
|
||||
).fetchone()
|
||||
if channel_name_result:
|
||||
result['channel_name'] = channel_name_result[0]
|
||||
claim_info = transaction.execute("select c.*, "
|
||||
"case when c.channel_claim_id is not null then "
|
||||
"(select claim_name from claim where claim_id==c.channel_claim_id) "
|
||||
"else null end as channel_name from claim c where claim_outpoint = ?",
|
||||
(claim_outpoint,)).fetchone()
|
||||
channel_name = claim_info[-1]
|
||||
result = _format_claim_response(*claim_info[:-1])
|
||||
if channel_name:
|
||||
result['channel_name'] = channel_name
|
||||
return result
|
||||
|
||||
result = yield self.db.runInteraction(_get_claim)
|
||||
|
@ -793,3 +829,21 @@ class SQLiteStorage(object):
|
|||
"where r.timestamp is null or r.timestamp < ?",
|
||||
self.clock.seconds() - conf.settings['auto_re_reflect_interval']
|
||||
)
|
||||
|
||||
|
||||
# Helper functions
|
||||
def _format_claim_response(outpoint, claim_id, name, amount, height, serialized, channel_id, address, claim_sequence):
|
||||
r = {
|
||||
"name": name,
|
||||
"claim_id": claim_id,
|
||||
"address": address,
|
||||
"claim_sequence": claim_sequence,
|
||||
"value": ClaimDict.deserialize(serialized.decode('hex')).claim_dict,
|
||||
"height": height,
|
||||
"amount": float(Decimal(amount) / Decimal(COIN)),
|
||||
"nout": int(outpoint.split(":")[1]),
|
||||
"txid": outpoint.split(":")[0],
|
||||
"channel_claim_id": channel_id,
|
||||
"channel_name": None
|
||||
}
|
||||
return r
|
||||
|
|
|
@ -56,18 +56,21 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
|||
self.channel_name = None
|
||||
self.metadata = None
|
||||
|
||||
def set_claim_info(self, claim_info):
|
||||
self.claim_id = claim_info['claim_id']
|
||||
self.txid = claim_info['txid']
|
||||
self.nout = claim_info['nout']
|
||||
self.channel_claim_id = claim_info['channel_claim_id']
|
||||
self.outpoint = "%s:%i" % (self.txid, self.nout)
|
||||
self.claim_name = claim_info['name']
|
||||
self.channel_name = claim_info['channel_name']
|
||||
self.metadata = claim_info['value']['stream']['metadata']
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_claim_info(self, include_supports=True):
|
||||
claim_info = yield self.storage.get_content_claim(self.stream_hash, include_supports)
|
||||
if claim_info:
|
||||
self.claim_id = claim_info['claim_id']
|
||||
self.txid = claim_info['txid']
|
||||
self.nout = claim_info['nout']
|
||||
self.channel_claim_id = claim_info['channel_claim_id']
|
||||
self.outpoint = "%s:%i" % (self.txid, self.nout)
|
||||
self.claim_name = claim_info['name']
|
||||
self.channel_name = claim_info['channel_name']
|
||||
self.metadata = claim_info['value']['stream']['metadata']
|
||||
self.set_claim_info(claim_info)
|
||||
|
||||
defer.returnValue(claim_info)
|
||||
|
||||
|
|
|
@ -6,12 +6,11 @@ import logging
|
|||
|
||||
from twisted.internet import defer, task, reactor
|
||||
from twisted.python.failure import Failure
|
||||
from lbrynet.core.Error import InvalidStreamDescriptorError
|
||||
from lbrynet.reflector.reupload import reflect_file
|
||||
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
|
||||
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
|
||||
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
|
||||
from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info, validate_descriptor
|
||||
from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info
|
||||
from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError
|
||||
from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError
|
||||
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
|
||||
|
@ -96,47 +95,35 @@ class EncryptedFileManager(object):
|
|||
suggested_file_name=suggested_file_name
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _start_lbry_file(self, file_info, payment_rate_manager):
|
||||
def _start_lbry_file(self, file_info, payment_rate_manager, claim_info):
|
||||
lbry_file = self._get_lbry_file(
|
||||
file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
|
||||
file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
|
||||
file_info['suggested_file_name']
|
||||
)
|
||||
yield lbry_file.get_claim_info()
|
||||
if claim_info:
|
||||
lbry_file.set_claim_info(claim_info)
|
||||
try:
|
||||
# verify the stream is valid (we might have downloaded an invalid stream
|
||||
# in the past when the validation check didn't work)
|
||||
stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True)
|
||||
validate_descriptor(stream_info)
|
||||
except InvalidStreamDescriptorError as err:
|
||||
log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
|
||||
lbry_file.sd_hash, err.message)
|
||||
yield lbry_file.delete_data()
|
||||
yield self.session.storage.delete_stream(lbry_file.stream_hash)
|
||||
else:
|
||||
try:
|
||||
# restore will raise an Exception if status is unknown
|
||||
lbry_file.restore(file_info['status'])
|
||||
self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info
|
||||
self.lbry_files.append(lbry_file)
|
||||
if len(self.lbry_files) % 500 == 0:
|
||||
log.info("Started %i files", len(self.lbry_files))
|
||||
except Exception:
|
||||
log.warning("Failed to start %i", file_info.get('rowid'))
|
||||
# restore will raise an Exception if status is unknown
|
||||
lbry_file.restore(file_info['status'])
|
||||
self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info
|
||||
self.lbry_files.append(lbry_file)
|
||||
if len(self.lbry_files) % 500 == 0:
|
||||
log.info("Started %i files", len(self.lbry_files))
|
||||
except Exception:
|
||||
log.warning("Failed to start %i", file_info.get('rowid'))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _start_lbry_files(self):
|
||||
files = yield self.session.storage.get_all_lbry_files()
|
||||
claim_infos = yield self.session.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files])
|
||||
b_prm = self.session.base_payment_rate_manager
|
||||
payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
|
||||
|
||||
log.info("Starting %i files", len(files))
|
||||
dl = []
|
||||
for file_info in files:
|
||||
dl.append(self._start_lbry_file(file_info, payment_rate_manager))
|
||||
|
||||
yield defer.DeferredList(dl)
|
||||
claim_info = claim_infos.get(file_info['stream_hash'])
|
||||
self._start_lbry_file(file_info, payment_rate_manager, claim_info)
|
||||
|
||||
log.info("Started %i lbry files", len(self.lbry_files))
|
||||
if self.auto_re_reflect is True:
|
||||
|
|
|
@ -163,6 +163,25 @@ class BlobStorageTests(StorageTest):
|
|||
self.assertEqual(blob_hashes, [])
|
||||
|
||||
|
||||
class SupportsStorageTests(StorageTest):
|
||||
@defer.inlineCallbacks
|
||||
def test_supports_storage(self):
|
||||
claim_ids = [random_lbry_hash() for _ in range(10)]
|
||||
random_supports = [{"txid": random_lbry_hash(), "nout":i, "address": "addr{}".format(i), "amount": i}
|
||||
for i in range(20)]
|
||||
expected_supports = {}
|
||||
for idx, claim_id in enumerate(claim_ids):
|
||||
yield self.storage.save_supports(claim_id, random_supports[idx*2:idx*2+2])
|
||||
for random_support in random_supports[idx*2:idx*2+2]:
|
||||
random_support['claim_id'] = claim_id
|
||||
expected_supports.setdefault(claim_id, []).append(random_support)
|
||||
supports = yield self.storage.get_supports(claim_ids[0])
|
||||
self.assertEqual(supports, expected_supports[claim_ids[0]])
|
||||
all_supports = yield self.storage.get_supports(*claim_ids)
|
||||
for support in all_supports:
|
||||
self.assertIn(support, expected_supports[support['claim_id']])
|
||||
|
||||
|
||||
class StreamStorageTests(StorageTest):
|
||||
@defer.inlineCallbacks
|
||||
def test_store_stream(self, stream_hash=None):
|
||||
|
|
Loading…
Reference in a new issue