API call to blob_list with uri parameter now succeeds

This commit is contained in:
Roger Ostrander 2018-02-03 23:08:15 -05:00
parent 5cbfa273e5
commit a4343c3eb3
3 changed files with 72 additions and 2 deletions

View file

@ -134,8 +134,31 @@ def get_sd_hash(stream_info):
return None return None
if isinstance(stream_info, ClaimDict): if isinstance(stream_info, ClaimDict):
return stream_info.source_hash return stream_info.source_hash
return stream_info['stream']['source']['source'] path = ['claim', 'value', 'stream', 'source', 'source']
result = safe_dict_descend(stream_info, *path)
if not result:
log.warn("Unable to get sd_hash via path %s" % path)
return result
def json_dumps_pretty(obj, **kwargs): def json_dumps_pretty(obj, **kwargs):
return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs) return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs)
def safe_dict_descend(source, *path):
"""
For when you want to do something like
stream_info['claim']['value']['stream']['source']['source']"
but don't trust that every last one of those keys exists
"""
cur_source = source
for path_entry in path:
try:
if path_entry not in cur_source:
return None
except TypeError:
# This happens if we try to keep going along a path that isn't
# a dictionary (see e.g. test_safe_dict_descend_typeerror)
return None
cur_source = cur_source[path_entry]
return cur_source

View file

@ -2881,7 +2881,10 @@ class Daemon(AuthJSONRPCServer):
if uri: if uri:
metadata = yield self._resolve_name(uri) metadata = yield self._resolve_name(uri)
sd_hash = utils.get_sd_hash(metadata) sd_hash = utils.get_sd_hash(metadata)
blobs = yield self.get_blobs_for_sd_hash(sd_hash) try:
blobs = yield self.get_blobs_for_sd_hash(sd_hash)
except NoSuchSDHash:
blobs = []
elif stream_hash: elif stream_hash:
try: try:
blobs = yield self.get_blobs_for_stream_hash(stream_hash) blobs = yield self.get_blobs_for_stream_hash(stream_hash)

View file

@ -31,3 +31,47 @@ class ObfuscationTest(unittest.TestCase):
plain = '' plain = ''
obf = utils.obfuscate(plain) obf = utils.obfuscate(plain)
self.assertEqual(plain, utils.deobfuscate(obf)) self.assertEqual(plain, utils.deobfuscate(obf))
class SafeDictDescendTest(unittest.TestCase):
def test_safe_dict_descend_happy(self):
nested = {
'foo': {
'bar': {
'baz': 3
}
}
}
self.assertEqual(
utils.safe_dict_descend(nested, 'foo', 'bar', 'baz'),
3
)
def test_safe_dict_descend_typeerror(self):
nested = {
'foo': {
'bar': 7
}
}
self.assertIsNone(utils.safe_dict_descend(nested, 'foo', 'bar', 'baz'))
def test_safe_dict_descend_missing(self):
nested = {
'foo': {
'barn': 7
}
}
self.assertIsNone(utils.safe_dict_descend(nested, 'foo', 'bar', 'baz'))
def test_empty_dict_doesnt_explode(self):
nested = {}
self.assertIsNone(utils.safe_dict_descend(nested, 'foo', 'bar', 'baz'))
def test_identity(self):
nested = {
'foo': {
'bar': 7
}
}
self.assertIs(nested, utils.safe_dict_descend(nested))