fix updating content claim for a file

-fix file name and download dir not being hex encoded during publish, add repair script
This commit is contained in:
Jack Robison 2019-01-31 12:32:52 -05:00
parent 3589cc9977
commit e60e5b3919
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 55 additions and 15 deletions

View file

@ -400,16 +400,15 @@ class SQLiteStorage(SQLiteMixin):
def save_downloaded_file(self, stream_hash, file_name, download_directory, data_payment_rate): def save_downloaded_file(self, stream_hash, file_name, download_directory, data_payment_rate):
return self.save_published_file( return self.save_published_file(
stream_hash, binascii.hexlify(file_name.encode()).decode(), stream_hash, file_name, download_directory, data_payment_rate, status="running"
binascii.hexlify(download_directory.encode()).decode(), data_payment_rate,
status="running"
) )
def save_published_file(self, stream_hash: str, file_name: str, download_directory: str, data_payment_rate: float, def save_published_file(self, stream_hash: str, file_name: str, download_directory: str, data_payment_rate: float,
status="finished"): status="finished"):
return self.db.execute( return self.db.execute(
"insert into file values (?, ?, ?, ?, ?)", "insert into file values (?, ?, ?, ?, ?)",
(stream_hash, file_name, download_directory, data_payment_rate, status) (stream_hash, binascii.hexlify(file_name.encode()).decode(),
binascii.hexlify(download_directory.encode()).decode(), data_payment_rate, status)
) )
async def get_all_lbry_files(self) -> typing.List[typing.Dict]: async def get_all_lbry_files(self) -> typing.List[typing.Dict]:

View file

@ -6,6 +6,7 @@ from lbrynet.extras.daemon.mime_types import guess_media_type
from lbrynet.stream.downloader import StreamDownloader from lbrynet.stream.downloader import StreamDownloader
from lbrynet.stream.descriptor import StreamDescriptor from lbrynet.stream.descriptor import StreamDescriptor
from lbrynet.stream.reflector.client import StreamReflectorClient from lbrynet.stream.reflector.client import StreamReflectorClient
from lbrynet.schema.claim import ClaimDict
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbrynet.extras.daemon.storage import StoredStreamClaim from lbrynet.extras.daemon.storage import StoredStreamClaim
from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.blob.blob_manager import BlobFileManager
@ -99,11 +100,11 @@ class ManagedStream:
def as_dict(self) -> typing.Dict: def as_dict(self) -> typing.Dict:
full_path = os.path.join(self.download_directory, self.file_name) full_path = os.path.join(self.download_directory, self.file_name)
if not os.path.exists(full_path): if not os.path.isfile(full_path):
full_path = None full_path = None
mime_type = guess_media_type(os.path.basename(self.file_name)) mime_type = guess_media_type(os.path.basename(self.file_name))
if self.downloader: if self.downloader and self.downloader.written_bytes:
written_bytes = self.downloader.written_bytes written_bytes = self.downloader.written_bytes
elif full_path: elif full_path:
written_bytes = os.stat(full_path).st_size written_bytes = os.stat(full_path).st_size
@ -199,3 +200,11 @@ class ManagedStream:
self.fully_reflected.set() self.fully_reflected.set()
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}") await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
return sent return sent
def set_claim(self, claim_info: typing.Dict, claim: ClaimDict):
self.stream_claim_info = StoredStreamClaim(
self.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
claim_info['name'], claim_info['amount'], claim_info['height'], claim_info['hex'],
claim.certificate_id, claim_info['address'], claim_info['claim_sequence'],
claim_info.get('channel_name')
)

View file

@ -59,6 +59,10 @@ class StreamManager:
self.resume_downloading_task: asyncio.Task = None self.resume_downloading_task: asyncio.Task = None
self.update_stream_finished_futs: typing.List[asyncio.Future] = [] self.update_stream_finished_futs: typing.List[asyncio.Future] = []
async def _update_content_claim(self, stream: ManagedStream):
claim_info = await self.storage.get_content_claim(stream.stream_hash)
stream.set_claim(claim_info, ClaimDict.load_dict(claim_info['value']))
async def load_streams_from_database(self): async def load_streams_from_database(self):
infos = await self.storage.get_all_lbry_files() infos = await self.storage.get_all_lbry_files()
for file_info in infos: for file_info in infos:
@ -77,6 +81,7 @@ class StreamManager:
downloader, file_info['status'], file_info['claim'] downloader, file_info['status'], file_info['claim']
) )
self.streams.add(stream) self.streams.add(stream)
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
async def resume(self): async def resume(self):
if not self.node: if not self.node:
@ -123,6 +128,7 @@ class StreamManager:
iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
stream = await ManagedStream.create(self.loop, self.blob_manager, file_path, key, iv_generator) stream = await ManagedStream.create(self.loop, self.blob_manager, file_path, key, iv_generator)
self.streams.add(stream) self.streams.add(stream)
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
if self.config.reflector_servers: if self.config.reflector_servers:
host, port = random.choice(self.config.reflector_servers) host, port = random.choice(self.config.reflector_servers)
self.loop.create_task(stream.upload_to_reflector(host, port)) self.loop.create_task(stream.upload_to_reflector(host, port))
@ -182,16 +188,9 @@ class StreamManager:
await self.blob_manager.storage.save_content_claim( await self.blob_manager.storage.save_content_claim(
downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}" downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}"
) )
stored_claim = StoredStreamClaim(
downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
claim_info['name'], claim_info['amount'], claim_info['height'], claim_info['hex'],
claim.certificate_id, claim_info['address'], claim_info['claim_sequence'],
claim_info.get('channel_name')
)
stream = ManagedStream(self.loop, self.blob_manager, downloader.descriptor, download_directory, stream = ManagedStream(self.loop, self.blob_manager, downloader.descriptor, download_directory,
os.path.basename(downloader.output_path), downloader, ManagedStream.STATUS_RUNNING, os.path.basename(downloader.output_path), downloader, ManagedStream.STATUS_RUNNING)
stored_claim) stream.set_claim(claim_info, claim)
self.streams.add(stream) self.streams.add(stream)
try: try:
await stream.downloader.wrote_bytes_event.wait() await stream.downloader.wrote_bytes_event.wait()

View file

@ -0,0 +1,33 @@
import os
import binascii
import sqlite3
from lbrynet.conf import Config
def main():
conf = Config()
db = sqlite3.connect(os.path.join(conf.data_dir, 'lbrynet.sqlite'))
cur = db.cursor()
files = cur.execute("select stream_hash, file_name, download_directory from file").fetchall()
update = {}
for stream_hash, file_name, download_directory in files:
try:
binascii.unhexlify(file_name)
except binascii.Error:
try:
binascii.unhexlify(download_directory)
except binascii.Error:
update[stream_hash] = (
binascii.hexlify(file_name.encode()).decode(), binascii.hexlify(download_directory.encode()).decode()
)
if update:
print(f"repair {len(update)} streams")
for stream_hash, (file_name, download_directory) in update.items():
cur.execute('update file set file_name=?, download_directory=? where stream_hash=?',
(file_name, download_directory, stream_hash))
db.commit()
db.close()
if __name__ == "__main__":
main()