add test case for restart, fix torrent file update
This commit is contained in:
parent
dd103d0f95
commit
7746ded9b6
5 changed files with 26 additions and 5 deletions
|
@ -5,6 +5,7 @@ import typing
|
||||||
import asyncio
|
import asyncio
|
||||||
import binascii
|
import binascii
|
||||||
import time
|
import time
|
||||||
|
from operator import itemgetter
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from lbry.wallet import SQLiteMixin
|
from lbry.wallet import SQLiteMixin
|
||||||
from lbry.conf import Config
|
from lbry.conf import Config
|
||||||
|
@ -635,6 +636,15 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]:
|
def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]:
|
||||||
return self.db.run(get_all_lbry_files)
|
return self.db.run(get_all_lbry_files)
|
||||||
|
|
||||||
|
async def get_all_torrent_files(self) -> typing.List[typing.Dict]:
|
||||||
|
def _get_all_torrent_files(transaction):
|
||||||
|
cursor = transaction.execute("select * from file join torrent on file.bt_infohash=torrent.bt_infohash")
|
||||||
|
return [
|
||||||
|
{field: value for field, value in zip(list(map(itemgetter(0), cursor.description)), row)}
|
||||||
|
for row in cursor.fetchall()
|
||||||
|
]
|
||||||
|
return await self.db.run(_get_all_torrent_files)
|
||||||
|
|
||||||
def change_file_status(self, stream_hash: str, new_status: str):
|
def change_file_status(self, stream_hash: str, new_status: str):
|
||||||
log.debug("update file status %s -> %s", stream_hash, new_status)
|
log.debug("update file status %s -> %s", stream_hash, new_status)
|
||||||
return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash))
|
return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash))
|
||||||
|
@ -907,7 +917,7 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
|
|
||||||
async def get_content_claim_for_torrent(self, bt_infohash):
|
async def get_content_claim_for_torrent(self, bt_infohash):
|
||||||
claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash])
|
claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash])
|
||||||
return claims[bt_infohash].as_dict() if claims else None
|
return claims[bt_infohash] if claims else None
|
||||||
|
|
||||||
# # # # # # # # # reflector functions # # # # # # # # #
|
# # # # # # # # # reflector functions # # # # # # # # #
|
||||||
|
|
||||||
|
|
|
@ -139,7 +139,7 @@ class FileManager:
|
||||||
existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name
|
existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name
|
||||||
)
|
)
|
||||||
claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier)
|
claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier)
|
||||||
existing[0].set_claim(claim_info, claim)
|
existing[0].set_claim(claim_info.as_dict() if claim_info else None, claim)
|
||||||
else:
|
else:
|
||||||
await self.storage.save_content_claim(
|
await self.storage.save_content_claim(
|
||||||
existing[0].stream_hash, outpoint
|
existing[0].stream_hash, outpoint
|
||||||
|
@ -242,7 +242,7 @@ class FileManager:
|
||||||
stream.identifier, outpoint, stream.torrent_length, stream.torrent_name
|
stream.identifier, outpoint, stream.torrent_length, stream.torrent_name
|
||||||
)
|
)
|
||||||
claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
|
claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
|
||||||
stream.set_claim(claim_info, claim)
|
stream.set_claim(claim_info.as_dict() if claim_info else None, claim)
|
||||||
if save_file:
|
if save_file:
|
||||||
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download))
|
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download))
|
||||||
return stream
|
return stream
|
||||||
|
|
|
@ -84,6 +84,7 @@ class SourceManager:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
|
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
|
||||||
|
await self.storage.delete_torrent(source.identifier)
|
||||||
self.remove(source)
|
self.remove(source)
|
||||||
if delete_file and source.output_file_exists:
|
if delete_file and source.output_file_exists:
|
||||||
os.remove(source.full_path)
|
os.remove(source.full_path)
|
||||||
|
|
|
@ -161,7 +161,7 @@ class TorrentManager(SourceManager):
|
||||||
async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str],
|
async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str],
|
||||||
download_directory: Optional[str], status: str,
|
download_directory: Optional[str], status: str,
|
||||||
claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
|
claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
|
||||||
added_on: Optional[int]):
|
added_on: Optional[int], **kwargs):
|
||||||
stream = TorrentSource(
|
stream = TorrentSource(
|
||||||
self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name,
|
self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name,
|
||||||
download_directory=download_directory, status=status, claim=claim, rowid=rowid,
|
download_directory=download_directory, status=status, claim=claim, rowid=rowid,
|
||||||
|
@ -171,7 +171,9 @@ class TorrentManager(SourceManager):
|
||||||
self.add(stream)
|
self.add(stream)
|
||||||
|
|
||||||
async def initialize_from_database(self):
|
async def initialize_from_database(self):
|
||||||
pass
|
for file in await self.storage.get_all_torrent_files():
|
||||||
|
claim = await self.storage.get_content_claim_for_torrent(file['bt_infohash'])
|
||||||
|
await self._load_stream(None, claim=claim, **file)
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
await super().start()
|
await super().start()
|
||||||
|
|
|
@ -66,6 +66,14 @@ class FileCommands(CommandTestCase):
|
||||||
# claim now points to another torrent, update to it
|
# claim now points to another torrent, update to it
|
||||||
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
|
self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent')))
|
||||||
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih)
|
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih)
|
||||||
|
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
||||||
|
|
||||||
|
# restart and verify that only one updated stream was recovered
|
||||||
|
self.daemon.file_manager.stop()
|
||||||
|
await self.daemon.file_manager.start()
|
||||||
|
self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih)
|
||||||
|
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
||||||
|
|
||||||
self.assertIn(new_btih, self.client_session._handles)
|
self.assertIn(new_btih, self.client_session._handles)
|
||||||
self.assertNotIn(btih, self.client_session._handles)
|
self.assertNotIn(btih, self.client_session._handles)
|
||||||
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
|
||||||
|
|
Loading…
Add table
Reference in a new issue