From 42373312225606ce4b4d320de5a2c32e9caed8dd Mon Sep 17 00:00:00 2001 From: Oleg Silkin Date: Thu, 10 Oct 2019 19:46:00 -0400 Subject: [PATCH] adds `added_at` field to files --- lbry/lbry/extras/daemon/Components.py | 2 +- lbry/lbry/extras/daemon/Daemon.py | 4 +- .../extras/daemon/json_response_encoder.py | 1 + .../lbry/extras/daemon/migrator/dbmigrator.py | 2 + .../extras/daemon/migrator/migrate11to12.py | 69 +++++++++++++++++++ lbry/lbry/extras/daemon/storage.py | 27 +++++--- lbry/lbry/stream/managed_stream.py | 13 +++- lbry/lbry/stream/stream_manager.py | 9 ++- 8 files changed, 110 insertions(+), 17 deletions(-) create mode 100644 lbry/lbry/extras/daemon/migrator/migrate11to12.py diff --git a/lbry/lbry/extras/daemon/Components.py b/lbry/lbry/extras/daemon/Components.py index a440c29af..9ead6bc68 100644 --- a/lbry/lbry/extras/daemon/Components.py +++ b/lbry/lbry/extras/daemon/Components.py @@ -52,7 +52,7 @@ class DatabaseComponent(Component): @staticmethod def get_current_db_revision(): - return 11 + return 12 @property def revision_filename(self): diff --git a/lbry/lbry/extras/daemon/Daemon.py b/lbry/lbry/extras/daemon/Daemon.py index eca6a2b03..701828a81 100644 --- a/lbry/lbry/extras/daemon/Daemon.py +++ b/lbry/lbry/extras/daemon/Daemon.py @@ -1769,7 +1769,8 @@ class Daemon(metaclass=JSONRPCServerType): Usage: file_list [--sd_hash=] [--file_name=] [--stream_hash=] - [--rowid=] [--claim_id=] [--outpoint=] [--txid=] [--nout=] + [--rowid=] [--added_at=] [--claim_id=] + [--outpoint=] [--txid=] [--nout=] [--channel_claim_id=] [--channel_name=] [--claim_name=] [--blobs_in_stream=] [--blobs_remaining=] [--sort=] @@ -1782,6 +1783,7 @@ class Daemon(metaclass=JSONRPCServerType): downloads folder --stream_hash= : (str) get file with matching stream hash --rowid= : (int) get file with matching row id + --added_at= : (int) get file with matching time of insertion --claim_id= : (str) get file with matching claim id --outpoint= : (str) get file with matching claim outpoint --txid= : (str) get file with matching claim txid diff --git a/lbry/lbry/extras/daemon/json_response_encoder.py b/lbry/lbry/extras/daemon/json_response_encoder.py index 36117beae..318fe8084 100644 --- a/lbry/lbry/extras/daemon/json_response_encoder.py +++ b/lbry/lbry/extras/daemon/json_response_encoder.py @@ -273,6 +273,7 @@ class JSONResponseEncoder(JSONEncoder): 'channel_name': managed_stream.channel_name, 'claim_name': managed_stream.claim_name, 'content_fee': managed_stream.content_fee, + 'added_at': managed_stream.added_at, 'height': tx_height, 'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height, 'timestamp': self.ledger.headers[tx_height]['timestamp'] if 0 < tx_height <= best_height else None diff --git a/lbry/lbry/extras/daemon/migrator/dbmigrator.py b/lbry/lbry/extras/daemon/migrator/dbmigrator.py index 2959c1415..807cb1907 100644 --- a/lbry/lbry/extras/daemon/migrator/dbmigrator.py +++ b/lbry/lbry/extras/daemon/migrator/dbmigrator.py @@ -29,6 +29,8 @@ def migrate_db(conf, start, end): from .migrate9to10 import do_migration elif current == 10: from .migrate10to11 import do_migration + elif current == 11: + from .migrate11to12 import do_migration else: raise Exception(f"DB migration of version {current} to {current+1} is not available") try: diff --git a/lbry/lbry/extras/daemon/migrator/migrate11to12.py b/lbry/lbry/extras/daemon/migrator/migrate11to12.py new file mode 100644 index 000000000..3d2bda5d2 --- /dev/null +++ b/lbry/lbry/extras/daemon/migrator/migrate11to12.py @@ -0,0 +1,69 @@ +import sqlite3 +import os +import time + + +def do_migration(conf): + db_path = os.path.join(conf.data_dir, 'lbrynet.sqlite') + connection = sqlite3.connect(db_path) + connection.row_factory = sqlite3.Row + cursor = connection.cursor() + + current_columns = [] + for col_info in cursor.execute("pragma table_info('file');").fetchall(): + current_columns.append(col_info[1]) + + if 'added_at' in current_columns: + connection.close() + print('already migrated') + return + + # follow 12 step schema change procedure + cursor.execute("pragma foreign_keys=off") + + # we don't have any indexes, views or triggers, so step 3 is skipped. + cursor.execute("drop table if exists new_file") + cursor.execute(""" + create table if not exists new_file ( + stream_hash text not null primary key references stream, + file_name text, + download_directory text, + blob_data_rate text not null, + status text not null, + saved_file integer not null, + content_fee text, + added_at integer not null + ); + + + """) + + # step 5: transfer content from old to new + select = "select * from file" + for (stream_hash, file_name, download_dir, data_rate, blob_rate, status, saved_file, fee) \ + in cursor.execute(select).fetchall(): + added_at = int(time.time()) + cursor.execute( + "insert into new_file values (?, ?, ?, ?, ?, ?, ?, ?)", + (stream_hash, file_name, download_dir, data_rate, blob_rate, status, saved_file, fee, added_at) + ) + + # step 6: drop old table + cursor.execute("drop table file") + + # step 7: rename new table to old table + cursor.execute("alter table new_file rename to file") + + # step 8: we aren't using indexes, views or triggers so skip + # step 9: no views so skip + # step 10: foreign key check + cursor.execute("pragma foreign_key_check;") + + # step 11: commit transaction + connection.commit() + + # step 12: re-enable foreign keys + connection.execute("pragma foreign_keys=on;") + + # done :) + connection.close() diff --git a/lbry/lbry/extras/daemon/storage.py b/lbry/lbry/extras/daemon/storage.py index ff5202e7e..b580a3502 100644 --- a/lbry/lbry/extras/daemon/storage.py +++ b/lbry/lbry/extras/daemon/storage.py @@ -103,8 +103,8 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di stream_hashes = tuple( stream_hash for (stream_hash,) in transaction.execute("select stream_hash from file").fetchall() ) - for (rowid, stream_hash, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, _, - sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select( + for (rowid, stream_hash, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, added_at, + _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select( transaction, "select file.rowid, file.*, stream.*, c.* " "from file inner join stream on file.stream_hash=stream.stream_hash " "inner join content_claim cc on file.stream_hash=cc.stream_hash " @@ -119,6 +119,7 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di files.append( { "rowid": rowid, + "added_at": added_at, "stream_hash": stream_hash, "file_name": file_name, # hex "download_directory": download_dir, # hex @@ -180,17 +181,20 @@ def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str], download_directory: typing.Optional[str], data_payment_rate: float, status: str, - content_fee: typing.Optional[Transaction]) -> int: + content_fee: typing.Optional[Transaction], added_at: typing.Optional[int] = None) -> int: if not file_name and not download_directory: encoded_file_name, encoded_download_dir = None, None else: encoded_file_name = binascii.hexlify(file_name.encode()).decode() encoded_download_dir = binascii.hexlify(download_directory.encode()).decode() + time_added = added_at or int(time.time()) transaction.execute( - "insert or replace into file values (?, ?, ?, ?, ?, ?, ?)", + "insert or replace into file values (?, ?, ?, ?, ?, ?, ?, ?)", (stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status, 1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0, - None if not content_fee else binascii.hexlify(content_fee.raw).decode()) + None if not content_fee else binascii.hexlify(content_fee.raw).decode(), + time_added + ) ).fetchall() return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0] @@ -246,7 +250,8 @@ class SQLiteStorage(SQLiteMixin): blob_data_rate real not null, status text not null, saved_file integer not null, - content_fee text + content_fee text, + added_at integer not null ); create table if not exists content_claim ( @@ -448,18 +453,20 @@ class SQLiteStorage(SQLiteMixin): def save_downloaded_file(self, stream_hash: str, file_name: typing.Optional[str], download_directory: typing.Optional[str], data_payment_rate: float, - content_fee: typing.Optional[Transaction] = None) -> typing.Awaitable[int]: + content_fee: typing.Optional[Transaction] = None, + added_at: typing.Optional[int] = None) -> typing.Awaitable[int]: return self.save_published_file( stream_hash, file_name, download_directory, data_payment_rate, status="running", - content_fee=content_fee + content_fee=content_fee, added_at=added_at ) def save_published_file(self, stream_hash: str, file_name: typing.Optional[str], download_directory: typing.Optional[str], data_payment_rate: float, status: str = "finished", - content_fee: typing.Optional[Transaction] = None) -> typing.Awaitable[int]: + content_fee: typing.Optional[Transaction] = None, + added_at: typing.Optional[int] = None) -> typing.Awaitable[int]: return self.db.run(store_file, stream_hash, file_name, download_directory, data_payment_rate, status, - content_fee) + content_fee, added_at) async def update_manually_removed_files_since_last_run(self): """ diff --git a/lbry/lbry/stream/managed_stream.py b/lbry/lbry/stream/managed_stream.py index 3b2da43e1..4488f5296 100644 --- a/lbry/lbry/stream/managed_stream.py +++ b/lbry/lbry/stream/managed_stream.py @@ -1,5 +1,6 @@ import os import asyncio +import time import typing import logging import binascii @@ -54,6 +55,7 @@ class ManagedStream: 'sd_hash', 'download_directory', '_file_name', + '_added_at', '_status', 'stream_claim_info', 'download_id', @@ -79,7 +81,8 @@ class ManagedStream: download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None, descriptor: typing.Optional[StreamDescriptor] = None, content_fee: typing.Optional['Transaction'] = None, - analytics_manager: typing.Optional['AnalyticsManager'] = None): + analytics_manager: typing.Optional['AnalyticsManager'] = None, + added_at: typing.Optional[int] = None): self.loop = loop self.config = config self.blob_manager = blob_manager @@ -91,6 +94,7 @@ class ManagedStream: self.download_id = download_id or binascii.hexlify(generate_id()).decode() self.rowid = rowid self.content_fee = content_fee + self._added_at = added_at self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor) self.analytics_manager = analytics_manager @@ -117,6 +121,10 @@ class ManagedStream: def file_name(self) -> typing.Optional[str]: return self._file_name or (self.descriptor.suggested_file_name if self.descriptor else None) + @property + def added_at(self) -> typing.Optional[int]: + return self._added_at + @property def status(self) -> str: return self._status @@ -253,8 +261,9 @@ class ManagedStream: file_name, download_dir = self._file_name, self.download_directory else: file_name, download_dir = None, None + self._added_at = int(time.time()) self.rowid = await self.blob_manager.storage.save_downloaded_file( - self.stream_hash, file_name, download_dir, 0.0 + self.stream_hash, file_name, download_dir, 0.0, added_at=self._added_at ) if self.status != self.STATUS_RUNNING: await self.update_status(self.STATUS_RUNNING) diff --git a/lbry/lbry/stream/stream_manager.py b/lbry/lbry/stream/stream_manager.py index 3a45d01b9..442a9e7c8 100644 --- a/lbry/lbry/stream/stream_manager.py +++ b/lbry/lbry/stream/stream_manager.py @@ -31,6 +31,7 @@ filter_fields = [ 'rowid', 'status', 'file_name', + 'added_at', 'sd_hash', 'stream_hash', 'claim_name', @@ -115,7 +116,8 @@ class StreamManager: async def add_stream(self, rowid: int, sd_hash: str, file_name: typing.Optional[str], download_directory: typing.Optional[str], status: str, - claim: typing.Optional['StoredStreamClaim'], content_fee: typing.Optional['Transaction']): + claim: typing.Optional['StoredStreamClaim'], content_fee: typing.Optional['Transaction'], + added_at: typing.Optional[int]): try: descriptor = await self.blob_manager.get_stream_descriptor(sd_hash) except InvalidStreamDescriptorError as err: @@ -124,7 +126,7 @@ class StreamManager: stream = ManagedStream( self.loop, self.config, self.blob_manager, descriptor.sd_hash, download_directory, file_name, status, claim, content_fee=content_fee, rowid=rowid, descriptor=descriptor, - analytics_manager=self.analytics_manager + analytics_manager=self.analytics_manager, added_at=added_at ) self.streams[sd_hash] = stream self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) @@ -156,7 +158,8 @@ class StreamManager: add_stream_tasks.append(self.loop.create_task(self.add_stream( file_info['rowid'], file_info['sd_hash'], file_name, download_directory, file_info['status'], - file_info['claim'], file_info['content_fee'] + file_info['claim'], file_info['content_fee'], + file_info['added_at'] ))) if add_stream_tasks: await asyncio.gather(*add_stream_tasks, loop=self.loop)