asyncifying minimal amount of the old blob/p2p code

This commit is contained in:
Lex Berezhny 2018-12-15 15:29:25 -05:00
parent 248baf58b4
commit fb4e94b04a
7 changed files with 60 additions and 69 deletions

View file

@ -9,6 +9,7 @@ from binascii import hexlify
from twisted.internet import defer from twisted.internet import defer
from twisted.protocols.basic import FileSender from twisted.protocols.basic import FileSender
from lbrynet.extras.compat import f2d
from lbrynet.p2p.StreamDescriptor import BlobStreamDescriptorWriter, EncryptedFileStreamType from lbrynet.p2p.StreamDescriptor import BlobStreamDescriptorWriter, EncryptedFileStreamType
from lbrynet.p2p.StreamDescriptor import format_sd_info, get_stream_hash, validate_descriptor from lbrynet.p2p.StreamDescriptor import format_sd_info, get_stream_hash, validate_descriptor
from lbrynet.blob.CryptStreamCreator import CryptStreamCreator from lbrynet.blob.CryptStreamCreator import CryptStreamCreator
@ -119,13 +120,13 @@ def create_lbry_file(blob_manager, storage, payment_rate_manager, lbry_file_mana
sd_hash = yield descriptor_writer.create_descriptor(sd_info) sd_hash = yield descriptor_writer.create_descriptor(sd_info)
log.debug("saving the stream") log.debug("saving the stream")
yield storage.store_stream( yield f2d(storage.store_stream(
sd_info['stream_hash'], sd_hash, sd_info['stream_name'], sd_info['key'], sd_info['stream_hash'], sd_hash, sd_info['stream_name'], sd_info['key'],
sd_info['suggested_file_name'], sd_info['blobs'] sd_info['suggested_file_name'], sd_info['blobs']
) ))
log.debug("adding to the file manager") log.debug("adding to the file manager")
lbry_file = yield lbry_file_manager.add_published_file( lbry_file = yield f2d(lbry_file_manager.add_published_file(
sd_info['stream_hash'], sd_hash, hexlify(file_directory.encode()), payment_rate_manager, sd_info['stream_hash'], sd_hash, hexlify(file_directory.encode()), payment_rate_manager,
payment_rate_manager.min_blob_data_payment_rate payment_rate_manager.min_blob_data_payment_rate
) ))
defer.returnValue(lbry_file) defer.returnValue(lbry_file)

View file

@ -6,6 +6,7 @@ from binascii import hexlify, unhexlify
from twisted.internet import defer from twisted.internet import defer
from lbrynet import conf from lbrynet import conf
from lbrynet.extras.compat import f2d
from lbrynet.p2p.client.StreamProgressManager import FullStreamProgressManager from lbrynet.p2p.client.StreamProgressManager import FullStreamProgressManager
from lbrynet.p2p.HTTPBlobDownloader import HTTPBlobDownloader from lbrynet.p2p.HTTPBlobDownloader import HTTPBlobDownloader
from lbrynet.utils import short_hash from lbrynet.utils import short_hash
@ -70,13 +71,11 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
self.channel_name = claim_info['channel_name'] self.channel_name = claim_info['channel_name']
self.metadata = claim_info['value']['stream']['metadata'] self.metadata = claim_info['value']['stream']['metadata']
@defer.inlineCallbacks async def get_claim_info(self, include_supports=True):
def get_claim_info(self, include_supports=True): claim_info = await self.storage.get_content_claim(self.stream_hash, include_supports)
claim_info = yield self.storage.get_content_claim(self.stream_hash, include_supports)
if claim_info: if claim_info:
self.set_claim_info(claim_info) self.set_claim_info(claim_info)
return claim_info
defer.returnValue(claim_info)
@property @property
def saving_status(self): def saving_status(self):
@ -180,10 +179,10 @@ class ManagedEncryptedFileDownloaderFactory:
if file_name: if file_name:
file_name = hexlify(file_name.encode()) file_name = hexlify(file_name.encode())
hex_download_directory = hexlify(download_directory.encode()) hex_download_directory = hexlify(download_directory.encode())
lbry_file = yield self.lbry_file_manager.add_downloaded_file( lbry_file = yield f2d(self.lbry_file_manager.add_downloaded_file(
stream_hash, metadata.source_blob_hash, hex_download_directory, payment_rate_manager, stream_hash, metadata.source_blob_hash, hex_download_directory, payment_rate_manager,
data_rate, file_name=file_name, download_mirrors=download_mirrors data_rate, file_name=file_name, download_mirrors=download_mirrors
) ))
defer.returnValue(lbry_file) defer.returnValue(lbry_file)
@staticmethod @staticmethod

View file

@ -8,6 +8,7 @@ from binascii import hexlify, unhexlify
from twisted.internet import defer, task, reactor from twisted.internet import defer, task, reactor
from twisted.python.failure import Failure from twisted.python.failure import Failure
from lbrynet import conf from lbrynet import conf
from lbrynet.extras.compat import f2d
from lbrynet.extras.reflector.reupload import reflect_file from lbrynet.extras.reflector.reupload import reflect_file
from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
@ -41,11 +42,9 @@ class EncryptedFileManager:
self.lbry_files = [] self.lbry_files = []
self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files) self.lbry_file_reflector = task.LoopingCall(self.reflect_lbry_files)
@defer.inlineCallbacks
def setup(self): def setup(self):
yield self._add_to_sd_identifier() self._add_to_sd_identifier()
yield self._start_lbry_files() return self._start_lbry_files()
log.info("Started file manager")
def get_lbry_file_status(self, lbry_file): def get_lbry_file_status(self, lbry_file):
return self.storage.get_lbry_file_status(lbry_file.rowid) return self.storage.get_lbry_file_status(lbry_file.rowid)
@ -115,10 +114,9 @@ class EncryptedFileManager:
except Exception: except Exception:
log.warning("Failed to start %i", file_info.get('rowid')) log.warning("Failed to start %i", file_info.get('rowid'))
@defer.inlineCallbacks async def _start_lbry_files(self):
def _start_lbry_files(self): files = await self.storage.get_all_lbry_files()
files = yield self.storage.get_all_lbry_files() claim_infos = await self.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files])
claim_infos = yield self.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files])
prm = self.payment_rate_manager prm = self.payment_rate_manager
log.info("Starting %i files", len(files)) log.info("Starting %i files", len(files))
@ -151,14 +149,13 @@ class EncryptedFileManager:
log.info("Stopping %i lbry files", len(self.lbry_files)) log.info("Stopping %i lbry files", len(self.lbry_files))
yield defer.DeferredList([self._stop_lbry_file(lbry_file) for lbry_file in list(self.lbry_files)]) yield defer.DeferredList([self._stop_lbry_file(lbry_file) for lbry_file in list(self.lbry_files)])
@defer.inlineCallbacks async def add_published_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager, blob_data_rate):
def add_published_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager, blob_data_rate):
status = ManagedEncryptedFileDownloader.STATUS_FINISHED status = ManagedEncryptedFileDownloader.STATUS_FINISHED
stream_metadata = yield get_sd_info(self.storage, stream_hash, include_blobs=False) stream_metadata = await get_sd_info(self.storage, stream_hash, include_blobs=False)
key = stream_metadata['key'] key = stream_metadata['key']
stream_name = stream_metadata['stream_name'] stream_name = stream_metadata['stream_name']
file_name = stream_metadata['suggested_file_name'] file_name = stream_metadata['suggested_file_name']
rowid = yield self.storage.save_published_file( rowid = await self.storage.save_published_file(
stream_hash, file_name, download_directory, blob_data_rate, status stream_hash, file_name, download_directory, blob_data_rate, status
) )
lbry_file = self._get_lbry_file( lbry_file = self._get_lbry_file(
@ -166,37 +163,36 @@ class EncryptedFileManager:
stream_metadata['suggested_file_name'], download_mirrors=None stream_metadata['suggested_file_name'], download_mirrors=None
) )
lbry_file.restore(status) lbry_file.restore(status)
yield lbry_file.get_claim_info() await lbry_file.get_claim_info()
self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info
self.lbry_files.append(lbry_file) self.lbry_files.append(lbry_file)
defer.returnValue(lbry_file) return lbry_file
@defer.inlineCallbacks async def add_downloaded_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager=None,
def add_downloaded_file(self, stream_hash, sd_hash, download_directory, payment_rate_manager=None,
blob_data_rate=None, status=None, file_name=None, download_mirrors=None): blob_data_rate=None, status=None, file_name=None, download_mirrors=None):
status = status or ManagedEncryptedFileDownloader.STATUS_STOPPED status = status or ManagedEncryptedFileDownloader.STATUS_STOPPED
payment_rate_manager = payment_rate_manager or self.payment_rate_manager payment_rate_manager = payment_rate_manager or self.payment_rate_manager
blob_data_rate = blob_data_rate or payment_rate_manager.min_blob_data_payment_rate blob_data_rate = blob_data_rate or payment_rate_manager.min_blob_data_payment_rate
stream_metadata = yield get_sd_info(self.storage, stream_hash, include_blobs=False) stream_metadata = await get_sd_info(self.storage, stream_hash, include_blobs=False)
key = stream_metadata['key'] key = stream_metadata['key']
stream_name = stream_metadata['stream_name'] stream_name = stream_metadata['stream_name']
file_name = file_name or stream_metadata['suggested_file_name'] file_name = file_name or stream_metadata['suggested_file_name']
# when we save the file we'll atomic touch the nearest file to the suggested file name # when we save the file we'll atomic touch the nearest file to the suggested file name
# that doesn't yet exist in the download directory # that doesn't yet exist in the download directory
rowid = yield self.storage.save_downloaded_file( rowid = await self.storage.save_downloaded_file(
stream_hash, hexlify(os.path.basename(unhexlify(file_name))), download_directory, blob_data_rate stream_hash, hexlify(os.path.basename(unhexlify(file_name))), download_directory, blob_data_rate
) )
file_name = (yield self.storage.get_filename_for_rowid(rowid)).decode() file_name = (await self.storage.get_filename_for_rowid(rowid)).decode()
lbry_file = self._get_lbry_file( lbry_file = self._get_lbry_file(
rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory, rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory,
stream_metadata['suggested_file_name'], download_mirrors stream_metadata['suggested_file_name'], download_mirrors
) )
lbry_file.restore(status) lbry_file.restore(status)
yield lbry_file.get_claim_info(include_supports=False) await lbry_file.get_claim_info(include_supports=False)
self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info
self.lbry_files.append(lbry_file) self.lbry_files.append(lbry_file)
defer.returnValue(lbry_file) return lbry_file
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_lbry_file(self, lbry_file, delete_file=False): def delete_lbry_file(self, lbry_file, delete_file=False):
@ -222,7 +218,7 @@ class EncryptedFileManager:
del self.storage.content_claim_callbacks[lbry_file.stream_hash] del self.storage.content_claim_callbacks[lbry_file.stream_hash]
yield lbry_file.delete_data() yield lbry_file.delete_data()
yield self.storage.delete_stream(lbry_file.stream_hash) yield f2d(self.storage.delete_stream(lbry_file.stream_hash))
if delete_file and os.path.isfile(full_path): if delete_file and os.path.isfile(full_path):
os.remove(full_path) os.remove(full_path)

View file

@ -1,6 +1,6 @@
import os import os
import logging import logging
from twisted.internet import defer, threads from twisted.internet import defer
from twisted.web.client import FileBodyProducer from twisted.web.client import FileBodyProducer
from twisted.python.failure import Failure from twisted.python.failure import Failure
from lbrynet.cryptoutils import get_lbry_hash_obj from lbrynet.cryptoutils import get_lbry_hash_obj
@ -112,19 +112,19 @@ class BlobFile:
self._verified = False self._verified = False
self.saved_verified_blob = False self.saved_verified_blob = False
def delete_from_file_system(): #def delete_from_file_system():
if os.path.isfile(self.file_path): if os.path.isfile(self.file_path):
os.remove(self.file_path) os.remove(self.file_path)
d = threads.deferToThread(delete_from_file_system) #d = threads.deferToThread(delete_from_file_system)
def log_error(err): def log_error(err):
log.warning("An error occurred deleting %s: %s", log.warning("An error occurred deleting %s: %s",
str(self.file_path), err.getErrorMessage()) str(self.file_path), err.getErrorMessage())
return err return err
d.addErrback(log_error) #d.addErrback(log_error)
return d return #d
else: else:
return defer.fail(Failure( return defer.fail(Failure(
ValueError("File is currently being read or written and cannot be deleted"))) ValueError("File is currently being read or written and cannot be deleted")))

View file

@ -4,6 +4,7 @@ import traceback
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from twisted.internet import defer, threads from twisted.internet import defer, threads
from lbrynet.extras.compat import f2d
from lbrynet.p2p.StreamDescriptor import save_sd_info from lbrynet.p2p.StreamDescriptor import save_sd_info
from lbrynet.blob.client.CryptStreamDownloader import CryptStreamDownloader from lbrynet.blob.client.CryptStreamDownloader import CryptStreamDownloader
from lbrynet.p2p.client.StreamProgressManager import FullStreamProgressManager from lbrynet.p2p.client.StreamProgressManager import FullStreamProgressManager
@ -28,16 +29,15 @@ class EncryptedFileDownloader(CryptStreamDownloader):
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_data(self): def delete_data(self):
crypt_infos = yield self.storage.get_blobs_for_stream(self.stream_hash) crypt_infos = yield f2d(self.storage.get_blobs_for_stream(self.stream_hash))
blob_hashes = [b.blob_hash for b in crypt_infos if b.blob_hash] blob_hashes = [b.blob_hash for b in crypt_infos if b.blob_hash]
sd_hash = yield self.storage.get_sd_blob_hash_for_stream(self.stream_hash) sd_hash = yield f2d(self.storage.get_sd_blob_hash_for_stream(self.stream_hash))
blob_hashes.append(sd_hash) blob_hashes.append(sd_hash)
yield self.blob_manager.delete_blobs(blob_hashes) yield self.blob_manager.delete_blobs(blob_hashes)
def stop(self, err=None): def stop(self, err=None):
d = self._close_output() self._close_output()
d.addCallback(lambda _: CryptStreamDownloader.stop(self, err=err)) return CryptStreamDownloader.stop(self, err=err)
return d
def _get_progress_manager(self, download_manager): def _get_progress_manager(self, download_manager):
return FullStreamProgressManager(self._finished_downloading, return FullStreamProgressManager(self._finished_downloading,
@ -174,14 +174,14 @@ class EncryptedFileSaver(EncryptedFileDownloader):
def _close_output(self): def _close_output(self):
self.file_handle, file_handle = None, self.file_handle self.file_handle, file_handle = None, self.file_handle
def close_file(): #def close_file():
if file_handle is not None: if file_handle is not None:
name = file_handle.name name = file_handle.name
file_handle.close() file_handle.close()
if self.completed is False: if self.completed is False:
os.remove(name) os.remove(name)
return threads.deferToThread(close_file) #return threads.deferToThread(close_file)
def _get_write_func(self): def _get_write_func(self):
def write_func(data): def write_func(data):

View file

@ -3,6 +3,7 @@ import os
from binascii import unhexlify from binascii import unhexlify
from sqlite3 import IntegrityError from sqlite3 import IntegrityError
from twisted.internet import threads, defer from twisted.internet import threads, defer
from lbrynet.extras.compat import f2d
from lbrynet.blob.blob_file import BlobFile from lbrynet.blob.blob_file import BlobFile
from lbrynet.blob.creator import BlobFileCreator from lbrynet.blob.creator import BlobFileCreator
@ -26,15 +27,13 @@ class DiskBlobManager:
self.blobs = {} self.blobs = {}
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)} self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
@defer.inlineCallbacks async def setup(self):
def setup(self):
if self._node_datastore is not None: if self._node_datastore is not None:
raw_blob_hashes = yield self.storage.get_all_finished_blobs() raw_blob_hashes = await self.storage.get_all_finished_blobs()
self._node_datastore.completed_blobs.update(raw_blob_hashes) self._node_datastore.completed_blobs.update(raw_blob_hashes)
defer.returnValue(True)
def stop(self): async def stop(self):
return defer.succeed(True) pass
def get_blob(self, blob_hash, length=None): def get_blob(self, blob_hash, length=None):
"""Return a blob identified by blob_hash, which may be a new blob or a """Return a blob identified by blob_hash, which may be a new blob or a
@ -112,7 +111,7 @@ class DiskBlobManager:
except Exception as e: except Exception as e:
log.warning("Failed to delete blob file. Reason: %s", e) log.warning("Failed to delete blob file. Reason: %s", e)
try: try:
yield self.storage.delete_blobs_from_db(bh_to_delete_from_db) yield f2d(self.storage.delete_blobs_from_db(bh_to_delete_from_db))
except IntegrityError as err: except IntegrityError as err:
if str(err) != "FOREIGN KEY constraint failed": if str(err) != "FOREIGN KEY constraint failed":
raise err raise err

View file

@ -299,8 +299,7 @@ def format_sd_info(stream_type, stream_name, key, suggested_file_name, stream_ha
} }
@defer.inlineCallbacks async def get_sd_info(storage, stream_hash, include_blobs):
def get_sd_info(storage, stream_hash, include_blobs):
""" """
Get an sd info dictionary from storage Get an sd info dictionary from storage
@ -329,16 +328,13 @@ def get_sd_info(storage, stream_hash, include_blobs):
] ]
} }
""" """
stream_info = await storage.get_stream_info(stream_hash)
stream_info = yield storage.get_stream_info(stream_hash)
blobs = [] blobs = []
if include_blobs: if include_blobs:
blobs = yield storage.get_blobs_for_stream(stream_hash) blobs = await storage.get_blobs_for_stream(stream_hash)
defer.returnValue( return format_sd_info(
format_sd_info( EncryptedFileStreamType, stream_info[0], stream_info[1],
EncryptedFileStreamType, stream_info[0], stream_info[1], stream_info[2], stream_hash, format_blobs(blobs)
stream_info[2], stream_hash, format_blobs(blobs)
)
) )