update Downloader and Publisher

This commit is contained in:
Jack Robison 2018-02-12 14:13:30 -05:00
parent ce4b82d929
commit 0df383b9f2
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 56 additions and 52 deletions

View file

@ -676,12 +676,13 @@ class Daemon(AuthJSONRPCServer):
self.disable_max_key_fee, self.disable_max_key_fee,
conf.settings['data_rate'], timeout) conf.settings['data_rate'], timeout)
try: try:
lbry_file, finished_deferred = yield self.streams[sd_hash].start(claim_dict, name) lbry_file, finished_deferred = yield self.streams[sd_hash].start(
yield self.stream_info_manager.save_outpoint_to_file(lbry_file.rowid, txid, nout) claim_dict, name, txid, nout, file_name
finished_deferred.addCallbacks(lambda _: _download_finished(download_id, name, )
claim_dict), finished_deferred.addCallbacks(
lambda e: _download_failed(e, download_id, name, lambda _: _download_finished(download_id, name, claim_dict),
claim_dict)) lambda e: _download_failed(e, download_id, name, claim_dict)
)
result = yield self._get_lbry_file_dict(lbry_file, full_status=True) result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
except Exception as err: except Exception as err:
yield _download_failed(err, download_id, name, claim_dict) yield _download_failed(err, download_id, name, claim_dict)
@ -706,7 +707,8 @@ class Daemon(AuthJSONRPCServer):
if bid <= 0.0: if bid <= 0.0:
raise Exception("Invalid bid") raise Exception("Invalid bid")
if not file_path: if not file_path:
claim_out = yield publisher.publish_stream(name, bid, claim_dict, claim_address, stream_hash = yield self.storage.get_stream_hash_for_sd_hash(claim_dict['stream']['source']['source'])
claim_out = yield publisher.publish_stream(name, bid, claim_dict, stream_hash, claim_address,
change_address) change_address)
else: else:
claim_out = yield publisher.create_and_publish_stream(name, bid, claim_dict, file_path, claim_out = yield publisher.create_and_publish_stream(name, bid, claim_dict, file_path,
@ -715,9 +717,6 @@ class Daemon(AuthJSONRPCServer):
d = reupload.reflect_stream(publisher.lbry_file) d = reupload.reflect_stream(publisher.lbry_file)
d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name),
log.exception) log.exception)
yield self.stream_info_manager.save_outpoint_to_file(publisher.lbry_file.rowid,
claim_out['txid'],
int(claim_out['nout']))
self.analytics_manager.send_claim_action('publish') self.analytics_manager.send_claim_action('publish')
log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'], log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'],
claim_out['nout']) claim_out['nout'])
@ -1609,7 +1608,14 @@ class Daemon(AuthJSONRPCServer):
'blobs_completed': (int) num_completed, None if full_status is false, 'blobs_completed': (int) num_completed, None if full_status is false,
'blobs_in_stream': (int) None if full_status is false, 'blobs_in_stream': (int) None if full_status is false,
'status': (str) downloader status, None if full_status is false, 'status': (str) downloader status, None if full_status is false,
'outpoint': (str), None if full_status is false or if claim is not found 'claim_id': (str) claim id,
'outpoint': (str) claim outpoint string,
'txid': (str) claim txid,
'nout': (int) claim nout,
'metadata': (dict) claim metadata,
'channel_claim_id': (str) None if claim is not signed
'channel_name': (str) None if claim is not signed
'claim_name': (str) claim name
} }
""" """

View file

@ -116,14 +116,15 @@ class GetStream(object):
raise Exception('No suitable factory was found in {}'.format(factories)) raise Exception('No suitable factory was found in {}'.format(factories))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_downloader(self, factory, stream_metadata): def get_downloader(self, factory, stream_metadata, file_name=None):
# TODO: we should use stream_metadata.options.get_downloader_options # TODO: we should use stream_metadata.options.get_downloader_options
# instead of hard-coding the options to be [self.data_rate] # instead of hard-coding the options to be [self.data_rate]
downloader = yield factory.make_downloader( downloader = yield factory.make_downloader(
stream_metadata, stream_metadata,
[self.data_rate], self.data_rate,
self.payment_rate_manager, self.payment_rate_manager,
download_directory=self.download_directory, self.download_directory,
file_name=file_name
) )
defer.returnValue(downloader) defer.returnValue(downloader)
@ -165,10 +166,10 @@ class GetStream(object):
defer.returnValue(key_fee) defer.returnValue(key_fee)
@defer.inlineCallbacks @defer.inlineCallbacks
def _create_downloader(self, sd_blob): def _create_downloader(self, sd_blob, file_name=None):
stream_metadata = yield self.sd_identifier.get_metadata_for_sd_blob(sd_blob) stream_metadata = yield self.sd_identifier.get_metadata_for_sd_blob(sd_blob)
factory = self.get_downloader_factory(stream_metadata.factories) factory = self.get_downloader_factory(stream_metadata.factories)
downloader = yield self.get_downloader(factory, stream_metadata) downloader = yield self.get_downloader(factory, stream_metadata, file_name)
defer.returnValue(downloader) defer.returnValue(downloader)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -178,15 +179,17 @@ class GetStream(object):
defer.returnValue(sd_blob) defer.returnValue(sd_blob)
@defer.inlineCallbacks @defer.inlineCallbacks
def _download(self, sd_blob, name, key_fee): def _download(self, sd_blob, name, key_fee, txid, nout, file_name=None):
self.downloader = yield self._create_downloader(sd_blob) self.downloader = yield self._create_downloader(sd_blob, file_name=file_name)
yield self.pay_key_fee(key_fee, name) yield self.pay_key_fee(key_fee, name)
yield self.session.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout))
yield self.downloader.get_claim_info()
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.finished_deferred = self.downloader.start() self.finished_deferred = self.downloader.start()
self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail)
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self, stream_info, name): def start(self, stream_info, name, txid, nout, file_name=None):
""" """
Start download Start download
@ -203,7 +206,7 @@ class GetStream(object):
self.set_status(DOWNLOAD_METADATA_CODE, name) self.set_status(DOWNLOAD_METADATA_CODE, name)
sd_blob = yield self._download_sd_blob() sd_blob = yield self._download_sd_blob()
yield self._download(sd_blob, name, key_fee) yield self._download(sd_blob, name, key_fee, txid, nout, file_name)
self.set_status(DOWNLOAD_RUNNING_CODE, name) self.set_status(DOWNLOAD_RUNNING_CODE, name)
try: try:

View file

@ -6,9 +6,6 @@ from twisted.internet import defer
from lbrynet.core import file_utils from lbrynet.core import file_utils
from lbrynet.file_manager.EncryptedFileCreator import create_lbry_file from lbrynet.file_manager.EncryptedFileCreator import create_lbry_file
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.lbry_file.StreamDescriptor import publish_sd_blob
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -33,29 +30,27 @@ class Publisher(object):
file_name = os.path.basename(file_path) file_name = os.path.basename(file_path)
with file_utils.get_read_handle(file_path) as read_handle: with file_utils.get_read_handle(file_path) as read_handle:
stream_hash = yield create_lbry_file(self.session, self.lbry_file_manager, file_name, self.lbry_file = yield create_lbry_file(self.session, self.lbry_file_manager, file_name,
read_handle) read_handle)
sd_hash = yield publish_sd_blob(self.lbry_file_manager.stream_info_manager,
self.session.blob_manager, stream_hash)
status = ManagedEncryptedFileDownloader.STATUS_FINISHED
self.lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, sd_hash,
status=status)
if 'source' not in claim_dict['stream']: if 'source' not in claim_dict['stream']:
claim_dict['stream']['source'] = {} claim_dict['stream']['source'] = {}
claim_dict['stream']['source']['source'] = sd_hash claim_dict['stream']['source']['source'] = self.lbry_file.sd_hash
claim_dict['stream']['source']['sourceType'] = 'lbry_sd_hash' claim_dict['stream']['source']['sourceType'] = 'lbry_sd_hash'
claim_dict['stream']['source']['contentType'] = get_content_type(file_path) claim_dict['stream']['source']['contentType'] = get_content_type(file_path)
claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here
claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address)
self.lbry_file.completed = True yield self.session.storage.save_content_claim(
yield self.lbry_file.save_status() self.lbry_file.stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout'])
)
yield self.lbry_file.get_claim_info()
defer.returnValue(claim_out) defer.returnValue(claim_out)
@defer.inlineCallbacks @defer.inlineCallbacks
def publish_stream(self, name, bid, claim_dict, claim_address=None, change_address=None): def publish_stream(self, name, bid, claim_dict, stream_hash, claim_address=None, change_address=None):
"""Make a claim without creating a lbry file""" """Make a claim without creating a lbry file"""
claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address)
yield self.session.storage.save_content_claim(stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout']))
defer.returnValue(claim_out) defer.returnValue(claim_out)
@defer.inlineCallbacks @defer.inlineCallbacks