lbry-sdk/lbrynet/core/utils.py

165 lines
4.6 KiB
Python
Raw Normal View History

import base64
import datetime
import logging
2015-08-20 17:27:15 +02:00
import random
import socket
2016-12-30 19:35:17 +01:00
import string
import json
2015-08-20 17:27:15 +02:00
2016-11-19 23:58:40 +01:00
import pkg_resources
from lbryschema.claim import ClaimDict
from lbrynet.core.cryptoutils import get_lbry_hash_obj
2016-09-30 06:06:07 +02:00
# digest_size is in bytes, and blob hashes are hex encoded
blobhash_length = get_lbry_hash_obj().digest_size * 2
2015-08-20 17:27:15 +02:00
log = logging.getLogger(__name__)
2016-10-05 21:16:20 +02:00
# defining these time functions here allows for easier overriding in testing
2016-09-30 06:06:07 +02:00
def now():
return datetime.datetime.now()
2016-10-05 21:16:20 +02:00
2016-09-30 06:06:07 +02:00
def utcnow():
return datetime.datetime.utcnow()
2016-10-05 21:16:20 +02:00
2016-09-30 06:06:07 +02:00
def isonow():
"""Return utc now in isoformat with timezone"""
return utcnow().isoformat() + 'Z'
2016-09-30 06:06:07 +02:00
def today():
return datetime.datetime.today()
2017-01-02 20:52:24 +01:00
def timedelta(**kwargs):
return datetime.timedelta(**kwargs)
def datetime_obj(*args, **kwargs):
return datetime.datetime(*args, **kwargs)
def call_later(delay, func, *args, **kwargs):
2017-04-10 16:51:49 +02:00
# Import here to ensure that it gets called after installing a reactor
# see: http://twistedmatrix.com/documents/current/core/howto/choosing-reactor.html
from twisted.internet import reactor
return reactor.callLater(delay, func, *args, **kwargs)
def safe_start_looping_call(looping_call, interval_sec):
if not looping_call.running:
looping_call.start(interval_sec)
def safe_stop_looping_call(looping_call):
if looping_call.running:
looping_call.stop()
2015-08-20 17:27:15 +02:00
def generate_id(num=None):
h = get_lbry_hash_obj()
if num is not None:
h.update(str(num))
else:
h.update(str(random.getrandbits(512)))
return h.digest()
2016-10-05 21:16:20 +02:00
def is_valid_hashcharacter(char):
return char in "0123456789abcdef"
2015-08-20 17:27:15 +02:00
def is_valid_blobhash(blobhash):
2016-10-19 00:25:16 +02:00
"""Checks whether the blobhash is the correct length and contains only
valid characters (0-9, a-f)
2015-08-20 17:27:15 +02:00
@param blobhash: string, the blobhash to check
2016-10-19 00:25:16 +02:00
@return: True/False
2015-08-20 17:27:15 +02:00
"""
2016-10-19 00:25:16 +02:00
return len(blobhash) == blobhash_length and all(is_valid_hashcharacter(l) for l in blobhash)
def version_is_greater_than(a, b):
"""Returns True if version a is more recent than version b"""
2016-11-19 23:58:40 +01:00
return pkg_resources.parse_version(a) > pkg_resources.parse_version(b)
def deobfuscate(obfustacated):
return base64.b64decode(obfustacated.decode('rot13'))
def obfuscate(plain):
return base64.b64encode(plain).encode('rot13')
2017-10-31 17:18:47 +01:00
def check_connection(server="lbry.io", port=80, timeout=2):
"""Attempts to open a socket to server:port and returns True if successful."""
log.debug('Checking connection to %s:%s', server, port)
try:
2017-10-31 17:18:47 +01:00
server = socket.gethostbyname(server)
socket.create_connection((server, port), timeout)
2016-11-11 17:10:00 +01:00
log.debug('Connection successful')
return True
except (socket.gaierror, socket.herror) as ex:
2017-10-31 17:18:55 +01:00
log.warning("Failed to connect to %s:%s. Unable to resolve domain. Trying to bypass DNS",
server, port)
try:
server = "8.8.8.8"
port = 53
socket.create_connection((server, port), timeout)
log.debug('Connection successful')
return True
except Exception as ex:
2017-10-31 17:18:55 +01:00
log.error("Failed to connect to %s:%s. Maybe the internet connection is not working",
server, port)
return False
except Exception as ex:
2017-10-31 17:18:55 +01:00
log.error("Failed to connect to %s:%s. Maybe the internet connection is not working",
server, port)
return False
2016-12-30 19:35:17 +01:00
def random_string(length=10, chars=string.ascii_lowercase):
return ''.join([random.choice(chars) for _ in range(length)])
def short_hash(hash_str):
return hash_str[:6]
2017-03-09 16:39:17 +01:00
def get_sd_hash(stream_info):
if not stream_info:
return None
if isinstance(stream_info, ClaimDict):
return stream_info.source_hash
path = ['claim', 'value', 'stream', 'source', 'source']
result = safe_dict_descend(stream_info, *path)
if not result:
log.warn("Unable to get sd_hash via path %s" % path)
return result
def json_dumps_pretty(obj, **kwargs):
return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs)
def safe_dict_descend(source, *path):
"""
For when you want to do something like
stream_info['claim']['value']['stream']['source']['source']"
but don't trust that every last one of those keys exists
"""
cur_source = source
for path_entry in path:
try:
if path_entry not in cur_source:
return None
except TypeError:
# This happens if we try to keep going along a path that isn't
# a dictionary (see e.g. test_safe_dict_descend_typeerror)
return None
cur_source = cur_source[path_entry]
return cur_source