add saved_file and content_fee columns to file table

-drop not null constraints for file_name and download_directory

-add migrator
This commit is contained in:
Jack Robison 2019-05-07 14:30:35 -04:00
parent 84381ff76c
commit d7032b12d7
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
7 changed files with 178 additions and 57 deletions

View file

@ -63,7 +63,7 @@ class DatabaseComponent(Component):
@staticmethod
def get_current_db_revision():
return 10
return 11
@property
def revision_filename(self):

View file

@ -24,6 +24,8 @@ def migrate_db(conf, start, end):
from .migrate8to9 import do_migration
elif current == 9:
from .migrate9to10 import do_migration
elif current == 10:
from .migrate10to11 import do_migration
else:
raise Exception("DB migration of version {} to {} is not available".format(current,
current+1))

View file

@ -0,0 +1,46 @@
import sqlite3
import os
import binascii
def do_migration(conf):
db_path = os.path.join(conf.data_dir, "lbrynet.sqlite")
connection = sqlite3.connect(db_path)
cursor = connection.cursor()
cursor.execute(
"pragma foreign_keys=off;"
)
cursor.execute("""
create table if not exists new_file (
stream_hash text primary key not null references stream,
file_name text,
download_directory text,
blob_data_rate real not null,
status text not null,
saved_file integer not null,
content_fee text
);
""")
for (stream_hash, file_name, download_dir, data_rate, status) in cursor.execute("select * from file").fetchall():
saved_file = 0
if download_dir != '{stream}' and file_name != '{stream}':
try:
if os.path.isfile(os.path.join(binascii.unhexlify(download_dir).decode(),
binascii.unhexlify(file_name).decode())):
saved_file = 1
else:
download_dir, file_name = None, None
except (OSError, ValueError):
download_dir, file_name = None, None
else:
download_dir, file_name = None, None
cursor.execute(
"insert into new_file values (?, ?, ?, ?, ?, ?, NULL)",
(stream_hash, file_name, download_dir, data_rate, status, saved_file)
)
cursor.execute("drop table file")
cursor.execute("alter table new_file rename to file")
connection.commit()
connection.close()

View file

@ -8,6 +8,7 @@ import time
from torba.client.basedatabase import SQLiteMixin
from lbrynet.conf import Config
from lbrynet.wallet.dewies import dewies_to_lbc, lbc_to_dewies
from lbrynet.wallet.transaction import Transaction
from lbrynet.schema.claim import Claim
from lbrynet.dht.constants import data_expiration
from lbrynet.blob.blob_info import BlobInfo
@ -114,8 +115,8 @@ def _batched_select(transaction, query, parameters, batch_size=900):
def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]:
files = []
signed_claims = {}
for (rowid, stream_hash, file_name, download_dir, data_rate, status, _, sd_hash, stream_key,
stream_name, suggested_file_name, *claim_args) in _batched_select(
for (rowid, stream_hash, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, _,
sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select(
transaction, "select file.rowid, file.*, stream.*, c.* "
"from file inner join stream on file.stream_hash=stream.stream_hash "
"inner join content_claim cc on file.stream_hash=cc.stream_hash "
@ -141,7 +142,11 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
"key": stream_key,
"stream_name": stream_name, # hex
"suggested_file_name": suggested_file_name, # hex
"claim": claim
"claim": claim,
"saved_file": bool(saved_file),
"content_fee": None if not raw_content_fee else Transaction(
binascii.unhexlify(raw_content_fee)
)
}
)
for claim_name, claim_id in _batched_select(
@ -188,16 +193,20 @@ def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor
def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], data_payment_rate: float, status: str) -> int:
download_directory: typing.Optional[str], data_payment_rate: float, status: str,
content_fee: typing.Optional[Transaction]) -> int:
if not file_name and not download_directory:
encoded_file_name, encoded_download_dir = "{stream}", "{stream}"
encoded_file_name, encoded_download_dir = None, None
else:
encoded_file_name = binascii.hexlify(file_name.encode()).decode()
encoded_download_dir = binascii.hexlify(download_directory.encode()).decode()
transaction.execute(
"insert or replace into file values (?, ?, ?, ?, ?)",
(stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status)
"insert or replace into file values (?, ?, ?, ?, ?, ?, ?)",
(stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status,
1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0,
None if not content_fee else content_fee.raw.decode())
)
return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0]
@ -246,10 +255,12 @@ class SQLiteStorage(SQLiteMixin):
create table if not exists file (
stream_hash text primary key not null references stream,
file_name text not null,
download_directory text not null,
file_name text,
download_directory text,
blob_data_rate real not null,
status text not null
status text not null,
saved_file integer not null,
content_fee text
);
create table if not exists content_claim (
@ -430,7 +441,7 @@ class SQLiteStorage(SQLiteMixin):
def set_files_as_streaming(self, stream_hashes: typing.List[str]):
def _set_streaming(transaction: sqlite3.Connection):
transaction.executemany(
"update file set file_name='{stream}', download_directory='{stream}' where stream_hash=?",
"update file set file_name=null, download_directory=null where stream_hash=?",
[(stream_hash, ) for stream_hash in stream_hashes]
)
@ -509,16 +520,42 @@ class SQLiteStorage(SQLiteMixin):
# # # # # # # # # file stuff # # # # # # # # #
def save_downloaded_file(self, stream_hash, file_name, download_directory,
data_payment_rate) -> typing.Awaitable[int]:
def save_downloaded_file(self, stream_hash: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], data_payment_rate: float,
content_fee: typing.Optional[Transaction] = None) -> typing.Awaitable[int]:
return self.save_published_file(
stream_hash, file_name, download_directory, data_payment_rate, status="running"
stream_hash, file_name, download_directory, data_payment_rate, status="running",
content_fee=content_fee
)
def save_published_file(self, stream_hash: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], data_payment_rate: float,
status="finished") -> typing.Awaitable[int]:
return self.db.run(store_file, stream_hash, file_name, download_directory, data_payment_rate, status)
status: str = "finished",
content_fee: typing.Optional[Transaction] = None) -> typing.Awaitable[int]:
return self.db.run(store_file, stream_hash, file_name, download_directory, data_payment_rate, status,
content_fee)
async def update_manually_removed_files_since_last_run(self):
"""
Update files that have been removed from the downloads directory since the last run
"""
def update_manually_removed_files(transaction: sqlite3.Connection):
removed = []
for (stream_hash, download_directory, file_name) in transaction.execute(
"select stream_hash, download_directory, file_name from file where saved_file=1"
).fetchall():
if download_directory and file_name and os.path.isfile(
os.path.join(binascii.unhexlify(download_directory.encode()).decode(),
binascii.unhexlify(file_name.encode()).decode())):
continue
else:
removed.append((stream_hash,))
if removed:
transaction.executemany(
"update file set file_name=null, download_directory=null, saved_file=0 where stream_hash=?",
removed
)
return await self.db.run(update_manually_removed_files)
def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]:
return self.db.run(get_all_lbry_files)
@ -530,7 +567,7 @@ class SQLiteStorage(SQLiteMixin):
async def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: typing.Optional[str],
file_name: typing.Optional[str]):
if not file_name or not download_dir:
encoded_file_name, encoded_download_dir = "{stream}", "{stream}"
encoded_file_name, encoded_download_dir = None, None
else:
encoded_file_name = binascii.hexlify(file_name.encode()).decode()
encoded_download_dir = binascii.hexlify(download_dir.encode()).decode()
@ -538,18 +575,34 @@ class SQLiteStorage(SQLiteMixin):
encoded_download_dir, encoded_file_name, stream_hash,
))
async def recover_streams(self, descriptors_and_sds: typing.List[typing.Tuple['StreamDescriptor', 'BlobFile']],
async def save_content_fee(self, stream_hash: str, content_fee: Transaction):
return await self.db.execute("update file set content_fee=? where stream_hash=?", (
binascii.hexlify(content_fee.raw), stream_hash,
))
async def set_saved_file(self, stream_hash: str):
return await self.db.execute("update file set saved_file=1 where stream_hash=?", (
stream_hash,
))
async def clear_saved_file(self, stream_hash: str):
return await self.db.execute("update file set saved_file=0 where stream_hash=?", (
stream_hash,
))
async def recover_streams(self, descriptors_and_sds: typing.List[typing.Tuple['StreamDescriptor', 'BlobFile',
typing.Optional[Transaction]]],
download_directory: str):
def _recover(transaction: sqlite3.Connection):
stream_hashes = [d.stream_hash for d, s in descriptors_and_sds]
for descriptor, sd_blob in descriptors_and_sds:
stream_hashes = [x[0].stream_hash for x in descriptors_and_sds]
for descriptor, sd_blob, content_fee in descriptors_and_sds:
content_claim = transaction.execute(
"select * from content_claim where stream_hash=?", (descriptor.stream_hash, )
).fetchone()
delete_stream(transaction, descriptor) # this will also delete the content claim
store_stream(transaction, sd_blob, descriptor)
store_file(transaction, descriptor.stream_hash, os.path.basename(descriptor.suggested_file_name),
download_directory, 0.0, 'stopped')
download_directory, 0.0, 'stopped', content_fee=content_fee)
if content_claim:
transaction.execute("insert or ignore into content_claim values (?, ?)", content_claim)
transaction.executemany(

View file

@ -351,6 +351,7 @@ class ManagedStream:
self.finished_writing.set()
log.info("finished saving file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id,
self.sd_hash[:6], self.full_path)
await self.blob_manager.storage.set_saved_file(self.stream_hash)
except Exception as err:
if os.path.isfile(output_path):
log.warning("removing incomplete download %s for %s", output_path, self.sd_hash)

View file

@ -21,6 +21,7 @@ if typing.TYPE_CHECKING:
from lbrynet.extras.daemon.analytics import AnalyticsManager
from lbrynet.extras.daemon.storage import SQLiteStorage, StoredStreamClaim
from lbrynet.wallet import LbryWalletManager
from lbrynet.wallet.transaction import Transaction
from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager
log = logging.getLogger(__name__)
@ -55,7 +56,9 @@ comparison_operators = {
def path_or_none(p) -> typing.Optional[str]:
return None if p == '{stream}' else binascii.unhexlify(p).decode()
if not p:
return
return binascii.unhexlify(p).decode()
class StreamManager:
@ -70,8 +73,8 @@ class StreamManager:
self.node = node
self.analytics_manager = analytics_manager
self.streams: typing.Dict[str, ManagedStream] = {}
self.resume_downloading_task: asyncio.Task = None
self.re_reflect_task: asyncio.Task = None
self.resume_saving_task: typing.Optional[asyncio.Task] = None
self.re_reflect_task: typing.Optional[asyncio.Task] = None
self.update_stream_finished_futs: typing.List[asyncio.Future] = []
self.running_reflector_uploads: typing.List[asyncio.Task] = []
self.started = asyncio.Event(loop=self.loop)
@ -84,7 +87,8 @@ class StreamManager:
to_restore = []
async def recover_stream(sd_hash: str, stream_hash: str, stream_name: str,
suggested_file_name: str, key: str) -> typing.Optional[StreamDescriptor]:
suggested_file_name: str, key: str,
content_fee: typing.Optional['Transaction']) -> typing.Optional[StreamDescriptor]:
sd_blob = self.blob_manager.get_blob(sd_hash)
blobs = await self.storage.get_blobs_for_stream(stream_hash)
descriptor = await StreamDescriptor.recover(
@ -92,12 +96,13 @@ class StreamManager:
)
if not descriptor:
return
to_restore.append((descriptor, sd_blob))
to_restore.append((descriptor, sd_blob, content_fee))
await asyncio.gather(*[
recover_stream(
file_info['sd_hash'], file_info['stream_hash'], binascii.unhexlify(file_info['stream_name']).decode(),
binascii.unhexlify(file_info['suggested_file_name']).decode(), file_info['key']
binascii.unhexlify(file_info['suggested_file_name']).decode(), file_info['key'],
file_info['content_fee']
) for file_info in file_infos
])
@ -109,7 +114,7 @@ class StreamManager:
async def add_stream(self, rowid: int, sd_hash: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], status: str,
claim: typing.Optional['StoredStreamClaim']):
claim: typing.Optional['StoredStreamClaim'], content_fee: typing.Optional['Transaction']):
try:
descriptor = await self.blob_manager.get_stream_descriptor(sd_hash)
except InvalidStreamDescriptorError as err:
@ -117,16 +122,18 @@ class StreamManager:
return
stream = ManagedStream(
self.loop, self.config, self.blob_manager, descriptor.sd_hash, download_directory, file_name, status,
claim, rowid=rowid, descriptor=descriptor, analytics_manager=self.analytics_manager
claim, content_fee=content_fee, rowid=rowid, descriptor=descriptor,
analytics_manager=self.analytics_manager
)
self.streams[sd_hash] = stream
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
async def load_streams_from_database(self):
async def load_and_resume_streams_from_database(self):
to_recover = []
to_start = []
# this will set streams marked as finished and are missing blobs as being stopped
# await self.storage.sync_files_to_blobs()
await self.storage.update_manually_removed_files_since_last_run()
for file_info in await self.storage.get_all_lbry_files():
# if the sd blob is not verified, try to reconstruct it from the database
# this could either be because the blob files were deleted manually or save_blobs was not true when
@ -138,29 +145,33 @@ class StreamManager:
await self.recover_streams(to_recover)
log.info("Initializing %i files", len(to_start))
if to_start:
await asyncio.gather(*[
self.add_stream(
file_info['rowid'], file_info['sd_hash'], path_or_none(file_info['file_name']),
path_or_none(file_info['download_directory']), file_info['status'],
file_info['claim']
) for file_info in to_start
])
to_resume_saving = []
add_stream_tasks = []
for file_info in to_start:
file_name = path_or_none(file_info['file_name'])
download_directory = path_or_none(file_info['download_directory'])
if file_name and download_directory and not file_info['saved_file'] and file_info['status'] == 'running':
to_resume_saving.append((file_name, download_directory, file_info['sd_hash']))
add_stream_tasks.append(self.loop.create_task(self.add_stream(
file_info['rowid'], file_info['sd_hash'], file_name,
download_directory, file_info['status'],
file_info['claim'], file_info['content_fee']
)))
if add_stream_tasks:
await asyncio.gather(*add_stream_tasks, loop=self.loop)
log.info("Started stream manager with %i files", len(self.streams))
async def resume(self):
if not self.node:
log.warning("no DHT node given, resuming downloads trusting that we can contact reflector")
t = [
self.loop.create_task(
stream.start(node=self.node, save_now=(stream.full_path is not None))
if not stream.full_path else
stream.save_file(node=self.node)
) for stream in self.streams.values() if stream.running
]
if t:
log.info("resuming %i downloads", len(t))
await asyncio.gather(*t, loop=self.loop)
if to_resume_saving:
self.resume_saving_task = self.loop.create_task(self.resume(to_resume_saving))
async def resume(self, to_resume_saving):
log.info("Resuming saving %i files", len(to_resume_saving))
await asyncio.gather(
*(self.streams[sd_hash].save_file(file_name, download_directory, node=self.node)
for (file_name, download_directory, sd_hash) in to_resume_saving),
loop=self.loop
)
async def reflect_streams(self):
while True:
@ -182,14 +193,13 @@ class StreamManager:
await asyncio.sleep(300, loop=self.loop)
async def start(self):
await self.load_streams_from_database()
self.resume_downloading_task = self.loop.create_task(self.resume())
await self.load_and_resume_streams_from_database()
self.re_reflect_task = self.loop.create_task(self.reflect_streams())
self.started.set()
def stop(self):
if self.resume_downloading_task and not self.resume_downloading_task.done():
self.resume_downloading_task.cancel()
if self.resume_saving_task and not self.resume_saving_task.done():
self.resume_saving_task.cancel()
if self.re_reflect_task and not self.re_reflect_task.done():
self.re_reflect_task.cancel()
while self.streams:
@ -387,6 +397,7 @@ class StreamManager:
lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1')
)
log.info("paid fee of %s for %s", fee_amount, uri)
await self.storage.save_content_fee(stream.stream_hash, stream.content_fee)
self.streams[stream.sd_hash] = stream
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)

View file

@ -239,6 +239,7 @@ class FileCommands(CommandTestCase):
await self.daemon.jsonrpc_file_delete(claim_name='icanpay')
await self.assertBalance(self.account, '9.925679')
response = await self.daemon.jsonrpc_get('lbry://icanpay')
raw_content_fee = response.content_fee.raw
await self.ledger.wait(response.content_fee)
await self.assertBalance(self.account, '8.925555')
self.assertEqual(len(self.daemon.jsonrpc_file_list()), 1)
@ -252,3 +253,10 @@ class FileCommands(CommandTestCase):
self.assertEqual(
await self.blockchain.get_balance(), starting_balance + block_reward_and_claim_fee
)
# restart the daemon and make sure the fee is still there
self.daemon.stream_manager.stop()
await self.daemon.stream_manager.start()
self.assertEqual(len(self.daemon.jsonrpc_file_list()), 1)
self.assertEqual(self.daemon.jsonrpc_file_list()[0].content_fee.raw, raw_content_fee)