adds added_at field to files

This commit is contained in:
Oleg Silkin 2019-10-10 19:46:00 -04:00 committed by Lex Berezhny
parent 223bcdc261
commit 4237331222
8 changed files with 110 additions and 17 deletions

View file

@ -52,7 +52,7 @@ class DatabaseComponent(Component):
@staticmethod @staticmethod
def get_current_db_revision(): def get_current_db_revision():
return 11 return 12
@property @property
def revision_filename(self): def revision_filename(self):

View file

@ -1769,7 +1769,8 @@ class Daemon(metaclass=JSONRPCServerType):
Usage: Usage:
file_list [--sd_hash=<sd_hash>] [--file_name=<file_name>] [--stream_hash=<stream_hash>] file_list [--sd_hash=<sd_hash>] [--file_name=<file_name>] [--stream_hash=<stream_hash>]
[--rowid=<rowid>] [--claim_id=<claim_id>] [--outpoint=<outpoint>] [--txid=<txid>] [--nout=<nout>] [--rowid=<rowid>] [--added_at=<added_at>] [--claim_id=<claim_id>]
[--outpoint=<outpoint>] [--txid=<txid>] [--nout=<nout>]
[--channel_claim_id=<channel_claim_id>] [--channel_name=<channel_name>] [--channel_claim_id=<channel_claim_id>] [--channel_name=<channel_name>]
[--claim_name=<claim_name>] [--blobs_in_stream=<blobs_in_stream>] [--claim_name=<claim_name>] [--blobs_in_stream=<blobs_in_stream>]
[--blobs_remaining=<blobs_remaining>] [--sort=<sort_by>] [--blobs_remaining=<blobs_remaining>] [--sort=<sort_by>]
@ -1782,6 +1783,7 @@ class Daemon(metaclass=JSONRPCServerType):
downloads folder downloads folder
--stream_hash=<stream_hash> : (str) get file with matching stream hash --stream_hash=<stream_hash> : (str) get file with matching stream hash
--rowid=<rowid> : (int) get file with matching row id --rowid=<rowid> : (int) get file with matching row id
--added_at=<added_at> : (int) get file with matching time of insertion
--claim_id=<claim_id> : (str) get file with matching claim id --claim_id=<claim_id> : (str) get file with matching claim id
--outpoint=<outpoint> : (str) get file with matching claim outpoint --outpoint=<outpoint> : (str) get file with matching claim outpoint
--txid=<txid> : (str) get file with matching claim txid --txid=<txid> : (str) get file with matching claim txid

View file

@ -273,6 +273,7 @@ class JSONResponseEncoder(JSONEncoder):
'channel_name': managed_stream.channel_name, 'channel_name': managed_stream.channel_name,
'claim_name': managed_stream.claim_name, 'claim_name': managed_stream.claim_name,
'content_fee': managed_stream.content_fee, 'content_fee': managed_stream.content_fee,
'added_at': managed_stream.added_at,
'height': tx_height, 'height': tx_height,
'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height, 'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height,
'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None 'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None

View file

@ -29,6 +29,8 @@ def migrate_db(conf, start, end):
from .migrate9to10 import do_migration from .migrate9to10 import do_migration
elif current == 10: elif current == 10:
from .migrate10to11 import do_migration from .migrate10to11 import do_migration
elif current == 11:
from .migrate11to12 import do_migration
else: else:
raise Exception(f"DB migration of version {current} to {current+1} is not available") raise Exception(f"DB migration of version {current} to {current+1} is not available")
try: try:

View file

@ -0,0 +1,69 @@
import sqlite3
import os
import time
def do_migration(conf):
db_path = os.path.join(conf.data_dir, 'lbrynet.sqlite')
connection = sqlite3.connect(db_path)
connection.row_factory = sqlite3.Row
cursor = connection.cursor()
current_columns = []
for col_info in cursor.execute("pragma table_info('file');").fetchall():
current_columns.append(col_info[1])
if 'added_at' in current_columns:
connection.close()
print('already migrated')
return
# follow 12 step schema change procedure
cursor.execute("pragma foreign_keys=off")
# we don't have any indexes, views or triggers, so step 3 is skipped.
cursor.execute("drop table if exists new_file")
cursor.execute("""
create table if not exists new_file (
stream_hash text not null primary key references stream,
file_name text,
download_directory text,
blob_data_rate text not null,
status text not null,
saved_file integer not null,
content_fee text,
added_at integer not null
);
""")
# step 5: transfer content from old to new
select = "select * from file"
for (stream_hash, file_name, download_dir, data_rate, blob_rate, status, saved_file, fee) \
in cursor.execute(select).fetchall():
added_at = int(time.time())
cursor.execute(
"insert into new_file values (?, ?, ?, ?, ?, ?, ?, ?)",
(stream_hash, file_name, download_dir, data_rate, blob_rate, status, saved_file, fee, added_at)
)
# step 6: drop old table
cursor.execute("drop table file")
# step 7: rename new table to old table
cursor.execute("alter table new_file rename to file")
# step 8: we aren't using indexes, views or triggers so skip
# step 9: no views so skip
# step 10: foreign key check
cursor.execute("pragma foreign_key_check;")
# step 11: commit transaction
connection.commit()
# step 12: re-enable foreign keys
connection.execute("pragma foreign_keys=on;")
# done :)
connection.close()

View file

@ -103,8 +103,8 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
stream_hashes = tuple( stream_hashes = tuple(
stream_hash for (stream_hash,) in transaction.execute("select stream_hash from file").fetchall() stream_hash for (stream_hash,) in transaction.execute("select stream_hash from file").fetchall()
) )
for (rowid, stream_hash, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, _, for (rowid, stream_hash, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, added_at,
sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select( _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select(
transaction, "select file.rowid, file.*, stream.*, c.* " transaction, "select file.rowid, file.*, stream.*, c.* "
"from file inner join stream on file.stream_hash=stream.stream_hash " "from file inner join stream on file.stream_hash=stream.stream_hash "
"inner join content_claim cc on file.stream_hash=cc.stream_hash " "inner join content_claim cc on file.stream_hash=cc.stream_hash "
@ -119,6 +119,7 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
files.append( files.append(
{ {
"rowid": rowid, "rowid": rowid,
"added_at": added_at,
"stream_hash": stream_hash, "stream_hash": stream_hash,
"file_name": file_name, # hex "file_name": file_name, # hex
"download_directory": download_dir, # hex "download_directory": download_dir, # hex
@ -180,17 +181,20 @@ def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor
def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str], def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], data_payment_rate: float, status: str, download_directory: typing.Optional[str], data_payment_rate: float, status: str,
content_fee: typing.Optional[Transaction]) -> int: content_fee: typing.Optional[Transaction], added_at: typing.Optional[int] = None) -> int:
if not file_name and not download_directory: if not file_name and not download_directory:
encoded_file_name, encoded_download_dir = None, None encoded_file_name, encoded_download_dir = None, None
else: else:
encoded_file_name = binascii.hexlify(file_name.encode()).decode() encoded_file_name = binascii.hexlify(file_name.encode()).decode()
encoded_download_dir = binascii.hexlify(download_directory.encode()).decode() encoded_download_dir = binascii.hexlify(download_directory.encode()).decode()
time_added = added_at or int(time.time())
transaction.execute( transaction.execute(
"insert or replace into file values (?, ?, ?, ?, ?, ?, ?)", "insert or replace into file values (?, ?, ?, ?, ?, ?, ?, ?)",
(stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status, (stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status,
1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0, 1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0,
None if not content_fee else binascii.hexlify(content_fee.raw).decode()) None if not content_fee else binascii.hexlify(content_fee.raw).decode(),
time_added
)
).fetchall() ).fetchall()
return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0] return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0]
@ -246,7 +250,8 @@ class SQLiteStorage(SQLiteMixin):
blob_data_rate real not null, blob_data_rate real not null,
status text not null, status text not null,
saved_file integer not null, saved_file integer not null,
content_fee text content_fee text,
added_at integer not null
); );
create table if not exists content_claim ( create table if not exists content_claim (
@ -448,18 +453,20 @@ class SQLiteStorage(SQLiteMixin):
def save_downloaded_file(self, stream_hash: str, file_name: typing.Optional[str], def save_downloaded_file(self, stream_hash: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], data_payment_rate: float, download_directory: typing.Optional[str], data_payment_rate: float,
content_fee: typing.Optional[Transaction] = None) -> typing.Awaitable[int]: content_fee: typing.Optional[Transaction] = None,
added_at: typing.Optional[int] = None) -> typing.Awaitable[int]:
return self.save_published_file( return self.save_published_file(
stream_hash, file_name, download_directory, data_payment_rate, status="running", stream_hash, file_name, download_directory, data_payment_rate, status="running",
content_fee=content_fee content_fee=content_fee, added_at=added_at
) )
def save_published_file(self, stream_hash: str, file_name: typing.Optional[str], def save_published_file(self, stream_hash: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], data_payment_rate: float, download_directory: typing.Optional[str], data_payment_rate: float,
status: str = "finished", status: str = "finished",
content_fee: typing.Optional[Transaction] = None) -> typing.Awaitable[int]: content_fee: typing.Optional[Transaction] = None,
added_at: typing.Optional[int] = None) -> typing.Awaitable[int]:
return self.db.run(store_file, stream_hash, file_name, download_directory, data_payment_rate, status, return self.db.run(store_file, stream_hash, file_name, download_directory, data_payment_rate, status,
content_fee) content_fee, added_at)
async def update_manually_removed_files_since_last_run(self): async def update_manually_removed_files_since_last_run(self):
""" """

View file

@ -1,5 +1,6 @@
import os import os
import asyncio import asyncio
import time
import typing import typing
import logging import logging
import binascii import binascii
@ -54,6 +55,7 @@ class ManagedStream:
'sd_hash', 'sd_hash',
'download_directory', 'download_directory',
'_file_name', '_file_name',
'_added_at',
'_status', '_status',
'stream_claim_info', 'stream_claim_info',
'download_id', 'download_id',
@ -79,7 +81,8 @@ class ManagedStream:
download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None, download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None,
descriptor: typing.Optional[StreamDescriptor] = None, descriptor: typing.Optional[StreamDescriptor] = None,
content_fee: typing.Optional['Transaction'] = None, content_fee: typing.Optional['Transaction'] = None,
analytics_manager: typing.Optional['AnalyticsManager'] = None): analytics_manager: typing.Optional['AnalyticsManager'] = None,
added_at: typing.Optional[int] = None):
self.loop = loop self.loop = loop
self.config = config self.config = config
self.blob_manager = blob_manager self.blob_manager = blob_manager
@ -91,6 +94,7 @@ class ManagedStream:
self.download_id = download_id or binascii.hexlify(generate_id()).decode() self.download_id = download_id or binascii.hexlify(generate_id()).decode()
self.rowid = rowid self.rowid = rowid
self.content_fee = content_fee self.content_fee = content_fee
self._added_at = added_at
self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor) self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor)
self.analytics_manager = analytics_manager self.analytics_manager = analytics_manager
@ -117,6 +121,10 @@ class ManagedStream:
def file_name(self) -> typing.Optional[str]: def file_name(self) -> typing.Optional[str]:
return self._file_name or (self.descriptor.suggested_file_name if self.descriptor else None) return self._file_name or (self.descriptor.suggested_file_name if self.descriptor else None)
@property
def added_at(self) -> typing.Optional[int]:
return self._added_at
@property @property
def status(self) -> str: def status(self) -> str:
return self._status return self._status
@ -253,8 +261,9 @@ class ManagedStream:
file_name, download_dir = self._file_name, self.download_directory file_name, download_dir = self._file_name, self.download_directory
else: else:
file_name, download_dir = None, None file_name, download_dir = None, None
self._added_at = int(time.time())
self.rowid = await self.blob_manager.storage.save_downloaded_file( self.rowid = await self.blob_manager.storage.save_downloaded_file(
self.stream_hash, file_name, download_dir, 0.0 self.stream_hash, file_name, download_dir, 0.0, added_at=self._added_at
) )
if self.status != self.STATUS_RUNNING: if self.status != self.STATUS_RUNNING:
await self.update_status(self.STATUS_RUNNING) await self.update_status(self.STATUS_RUNNING)

View file

@ -31,6 +31,7 @@ filter_fields = [
'rowid', 'rowid',
'status', 'status',
'file_name', 'file_name',
'added_at',
'sd_hash', 'sd_hash',
'stream_hash', 'stream_hash',
'claim_name', 'claim_name',
@ -115,7 +116,8 @@ class StreamManager:
async def add_stream(self, rowid: int, sd_hash: str, file_name: typing.Optional[str], async def add_stream(self, rowid: int, sd_hash: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], status: str, download_directory: typing.Optional[str], status: str,
claim: typing.Optional['StoredStreamClaim'], content_fee: typing.Optional['Transaction']): claim: typing.Optional['StoredStreamClaim'], content_fee: typing.Optional['Transaction'],
added_at: typing.Optional[int]):
try: try:
descriptor = await self.blob_manager.get_stream_descriptor(sd_hash) descriptor = await self.blob_manager.get_stream_descriptor(sd_hash)
except InvalidStreamDescriptorError as err: except InvalidStreamDescriptorError as err:
@ -124,7 +126,7 @@ class StreamManager:
stream = ManagedStream( stream = ManagedStream(
self.loop, self.config, self.blob_manager, descriptor.sd_hash, download_directory, file_name, status, self.loop, self.config, self.blob_manager, descriptor.sd_hash, download_directory, file_name, status,
claim, content_fee=content_fee, rowid=rowid, descriptor=descriptor, claim, content_fee=content_fee, rowid=rowid, descriptor=descriptor,
analytics_manager=self.analytics_manager analytics_manager=self.analytics_manager, added_at=added_at
) )
self.streams[sd_hash] = stream self.streams[sd_hash] = stream
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
@ -156,7 +158,8 @@ class StreamManager:
add_stream_tasks.append(self.loop.create_task(self.add_stream( add_stream_tasks.append(self.loop.create_task(self.add_stream(
file_info['rowid'], file_info['sd_hash'], file_name, file_info['rowid'], file_info['sd_hash'], file_name,
download_directory, file_info['status'], download_directory, file_info['status'],
file_info['claim'], file_info['content_fee'] file_info['claim'], file_info['content_fee'],
file_info['added_at']
))) )))
if add_stream_tasks: if add_stream_tasks:
await asyncio.gather(*add_stream_tasks, loop=self.loop) await asyncio.gather(*add_stream_tasks, loop=self.loop)