use sd_hash instead of claim_id as a key for Daemon.streams dictionary

This commit is contained in:
Kay Kurokawa 2017-11-06 12:26:42 -05:00 committed by Jack Robison
parent 38834a083c
commit 50586fa39d
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF

View file

@ -391,7 +391,7 @@ class Daemon(AuthJSONRPCServer):
def _stop_streams(self): def _stop_streams(self):
"""stop pending GetStream downloads""" """stop pending GetStream downloads"""
for claim_id, stream in self.streams.iteritems(): for sd_hash, stream in self.streams.iteritems():
stream.cancel(reason="daemon shutdown") stream.cancel(reason="daemon shutdown")
def _shutdown(self): def _shutdown(self):
@ -639,7 +639,7 @@ class Daemon(AuthJSONRPCServer):
defer.returnValue(report) defer.returnValue(report)
@defer.inlineCallbacks @defer.inlineCallbacks
def _download_name(self, name, claim_dict, claim_id, timeout=None, file_name=None): def _download_name(self, name, claim_dict, sd_hash, timeout=None, file_name=None):
""" """
Add a lbry file to the file manager, start the download, and return the new lbry file. Add a lbry file to the file manager, start the download, and return the new lbry file.
If it already exists in the file manager, return the existing lbry file If it already exists in the file manager, return the existing lbry file
@ -655,21 +655,21 @@ class Daemon(AuthJSONRPCServer):
self.analytics_manager.send_download_errored(error, download_id, name, claim_dict, self.analytics_manager.send_download_errored(error, download_id, name, claim_dict,
report) report)
if claim_id in self.streams: if sd_hash in self.streams:
downloader = self.streams[claim_id] downloader = self.streams[sd_hash]
result = yield downloader.finished_deferred result = yield downloader.finished_deferred
defer.returnValue(result) defer.returnValue(result)
else: else:
download_id = utils.random_string() download_id = utils.random_string()
self.analytics_manager.send_download_started(download_id, name, claim_dict) self.analytics_manager.send_download_started(download_id, name, claim_dict)
self.streams[claim_id] = GetStream(self.sd_identifier, self.session, self.streams[sd_hash] = GetStream(self.sd_identifier, self.session,
self.exchange_rate_manager, self.max_key_fee, self.exchange_rate_manager, self.max_key_fee,
self.disable_max_key_fee, self.disable_max_key_fee,
conf.settings['data_rate'], timeout, conf.settings['data_rate'], timeout,
file_name) file_name)
try: try:
lbry_file, finished_deferred = yield self.streams[claim_id].start(claim_dict, name) lbry_file, finished_deferred = yield self.streams[sd_hash].start(claim_dict, name)
finished_deferred.addCallbacks(lambda _: _download_finished(download_id, name, finished_deferred.addCallbacks(lambda _: _download_finished(download_id, name,
claim_dict), claim_dict),
lambda e: _download_failed(e, download_id, name, lambda e: _download_failed(e, download_id, name,
@ -682,11 +682,11 @@ class Daemon(AuthJSONRPCServer):
log.warning('Failed to get %s (%s)', name, err) log.warning('Failed to get %s (%s)', name, err)
else: else:
log.error('Failed to get %s (%s)', name, err) log.error('Failed to get %s (%s)', name, err)
if self.streams[claim_id].downloader: if self.streams[sd_hash].downloader:
yield self.streams[claim_id].downloader.stop(err) yield self.streams[sd_hash].downloader.stop(err)
result = {'error': err.message} result = {'error': err.message}
finally: finally:
del self.streams[claim_id] del self.streams[sd_hash]
defer.returnValue(result) defer.returnValue(result)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -1506,13 +1506,12 @@ class Daemon(AuthJSONRPCServer):
resolved = resolved['claim'] resolved = resolved['claim']
name = resolved['name'] name = resolved['name']
claim_id = resolved['claim_id']
claim_dict = ClaimDict.load_dict(resolved['value']) claim_dict = ClaimDict.load_dict(resolved['value'])
sd_hash = claim_dict.source_hash sd_hash = claim_dict.source_hash
if claim_id in self.streams: if sd_hash in self.streams:
log.info("Already waiting on lbry://%s to start downloading", name) log.info("Already waiting on lbry://%s to start downloading", name)
yield self.streams[claim_id].data_downloading_deferred yield self.streams[sd_hash].data_downloading_deferred
lbry_file = yield self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False) lbry_file = yield self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False)
@ -1525,7 +1524,7 @@ class Daemon(AuthJSONRPCServer):
log.info('Already have a file for %s', name) log.info('Already have a file for %s', name)
result = yield self._get_lbry_file_dict(lbry_file, full_status=True) result = yield self._get_lbry_file_dict(lbry_file, full_status=True)
else: else:
result = yield self._download_name(name, claim_dict, claim_id, timeout=timeout, result = yield self._download_name(name, claim_dict, sd_hash, timeout=timeout,
file_name=file_name) file_name=file_name)
response = yield self._render_response(result) response = yield self._render_response(result)
defer.returnValue(response) defer.returnValue(response)
@ -1621,8 +1620,8 @@ class Daemon(AuthJSONRPCServer):
else: else:
for lbry_file in lbry_files: for lbry_file in lbry_files:
file_name, stream_hash = lbry_file.file_name, lbry_file.stream_hash file_name, stream_hash = lbry_file.file_name, lbry_file.stream_hash
if lbry_file.claim_id in self.streams: if lbry_file.sd_hash in self.streams:
del self.streams[lbry_file.claim_id] del self.streams[lbry_file.sd_hash]
yield self.lbry_file_manager.delete_lbry_file(lbry_file, yield self.lbry_file_manager.delete_lbry_file(lbry_file,
delete_file=delete_from_download_dir) delete_file=delete_from_download_dir)
log.info("Deleted file: %s", file_name) log.info("Deleted file: %s", file_name)