move store_stream and store_file to standalone functions

This commit is contained in:
Jack Robison 2019-02-14 18:17:50 -05:00
parent f1b60e3ef2
commit a228d20137
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -154,6 +154,35 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
return files return files
def store_stream(transaction: sqlite3.Connection, sd_blob: 'BlobFile', descriptor: 'StreamDescriptor'):
# add the head blob and set it to be announced
transaction.execute(
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?, ?)",
(
sd_blob.blob_hash, sd_blob.length, 0, 1, "pending", 0, 0,
descriptor.blobs[0].blob_hash, descriptor.blobs[0].length, 0, 1, "pending", 0, 0
)
)
# add the rest of the blobs with announcement off
if len(descriptor.blobs) > 2:
transaction.executemany(
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)",
[(blob.blob_hash, blob.length, 0, 0, "pending", 0, 0)
for blob in descriptor.blobs[1:-1]]
)
# associate the blobs to the stream
transaction.execute("insert or ignore into stream values (?, ?, ?, ?, ?)",
(descriptor.stream_hash, sd_blob.blob_hash, descriptor.key,
binascii.hexlify(descriptor.stream_name.encode()).decode(),
binascii.hexlify(descriptor.suggested_file_name.encode()).decode()))
# add the stream
transaction.executemany(
"insert or ignore into stream_blob values (?, ?, ?, ?)",
[(descriptor.stream_hash, blob.blob_hash, blob.blob_num, blob.iv)
for blob in descriptor.blobs]
)
def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor'): def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor'):
blob_hashes = [(blob.blob_hash, ) for blob in descriptor.blobs[:-1]] blob_hashes = [(blob.blob_hash, ) for blob in descriptor.blobs[:-1]]
blob_hashes.append((descriptor.sd_hash, )) blob_hashes.append((descriptor.sd_hash, ))
@ -164,6 +193,15 @@ def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor
transaction.executemany("delete from blob where blob_hash=?", blob_hashes) transaction.executemany("delete from blob where blob_hash=?", blob_hashes)
def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: str, download_directory: str,
data_payment_rate: float, status: str):
transaction.execute(
"insert or replace into file values (?, ?, ?, ?, ?)",
(stream_hash, binascii.hexlify(file_name.encode()).decode(),
binascii.hexlify(download_directory.encode()).decode(), data_payment_rate, status)
)
class SQLiteStorage(SQLiteMixin): class SQLiteStorage(SQLiteMixin):
CREATE_TABLES_QUERY = """ CREATE_TABLES_QUERY = """
pragma foreign_keys=on; pragma foreign_keys=on;
@ -391,35 +429,7 @@ class SQLiteStorage(SQLiteMixin):
return streams is not None return streams is not None
def store_stream(self, sd_blob: 'BlobFile', descriptor: 'StreamDescriptor'): def store_stream(self, sd_blob: 'BlobFile', descriptor: 'StreamDescriptor'):
def _store_stream(transaction: sqlite3.Connection): return self.db.run(store_stream, sd_blob, descriptor)
# add the head blob and set it to be announced
transaction.execute(
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?, ?)",
(
sd_blob.blob_hash, sd_blob.length, 0, 1, "pending", 0, 0,
descriptor.blobs[0].blob_hash, descriptor.blobs[0].length, 0, 1, "pending", 0, 0
)
)
# add the rest of the blobs with announcement off
if len(descriptor.blobs) > 2:
transaction.executemany(
"insert or ignore into blob values (?, ?, ?, ?, ?, ?, ?)",
[(blob.blob_hash, blob.length, 0, 0, "pending", 0, 0)
for blob in descriptor.blobs[1:-1]]
)
# associate the blobs to the stream
transaction.execute("insert or ignore into stream values (?, ?, ?, ?, ?)",
(descriptor.stream_hash, sd_blob.blob_hash, descriptor.key,
binascii.hexlify(descriptor.stream_name.encode()).decode(),
binascii.hexlify(descriptor.suggested_file_name.encode()).decode()))
# add the stream
transaction.executemany(
"insert or ignore into stream_blob values (?, ?, ?, ?)",
[(descriptor.stream_hash, blob.blob_hash, blob.blob_num, blob.iv)
for blob in descriptor.blobs]
)
return self.db.run(_store_stream)
def get_blobs_for_stream(self, stream_hash, only_completed=False) -> typing.Awaitable[typing.List[BlobInfo]]: def get_blobs_for_stream(self, stream_hash, only_completed=False) -> typing.Awaitable[typing.List[BlobInfo]]:
def _get_blobs_for_stream(transaction): def _get_blobs_for_stream(transaction):
@ -475,17 +485,13 @@ class SQLiteStorage(SQLiteMixin):
def save_published_file(self, stream_hash: str, file_name: str, download_directory: str, data_payment_rate: float, def save_published_file(self, stream_hash: str, file_name: str, download_directory: str, data_payment_rate: float,
status="finished"): status="finished"):
return self.db.execute( return self.db.run(store_file, stream_hash, file_name, download_directory, data_payment_rate, status)
"insert into file values (?, ?, ?, ?, ?)",
(stream_hash, binascii.hexlify(file_name.encode()).decode(),
binascii.hexlify(download_directory.encode()).decode(), data_payment_rate, status)
)
def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]: def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]:
return self.db.run(get_all_lbry_files) return self.db.run(get_all_lbry_files)
def change_file_status(self, stream_hash: str, new_status: str): def change_file_status(self, stream_hash: str, new_status: str):
log.info("update file status %s -> %s", stream_hash, new_status) log.debug("update file status %s -> %s", stream_hash, new_status)
return self.db.execute("update file set status=? where stream_hash=?", (new_status, stream_hash)) return self.db.execute("update file set status=? where stream_hash=?", (new_status, stream_hash))
def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: str, file_name: str): def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: str, file_name: str):