lbry-sdk/lbrynet/core/utils.py

129 lines
3.3 KiB
Python
Raw Normal View History

import base64
import datetime
import distutils.version
import logging
import json
2015-08-20 17:27:15 +02:00
import random
import os
import socket
import sys
import yaml
2015-08-20 17:27:15 +02:00
2016-10-27 21:18:25 +02:00
from lbrynet.conf import settings
from lbrynet.conf import AdjustableSettings
from lbrynet.core.cryptoutils import get_lbry_hash_obj
2016-09-30 06:06:07 +02:00
# digest_size is in bytes, and blob hashes are hex encoded
blobhash_length = get_lbry_hash_obj().digest_size * 2
2015-08-20 17:27:15 +02:00
log = logging.getLogger(__name__)
2016-10-05 21:16:20 +02:00
# defining these time functions here allows for easier overriding in testing
2016-09-30 06:06:07 +02:00
def now():
return datetime.datetime.now()
2016-10-05 21:16:20 +02:00
2016-09-30 06:06:07 +02:00
def utcnow():
return datetime.datetime.utcnow()
2016-10-05 21:16:20 +02:00
2016-09-30 06:06:07 +02:00
def isonow():
"""Return utc now in isoformat with timezone"""
return utcnow().isoformat() + 'Z'
def today():
return datetime.datetime.today()
2015-08-20 17:27:15 +02:00
def generate_id(num=None):
h = get_lbry_hash_obj()
if num is not None:
h.update(str(num))
else:
h.update(str(random.getrandbits(512)))
return h.digest()
2016-10-05 21:16:20 +02:00
def is_valid_hashcharacter(char):
return char in "0123456789abcdef"
2015-08-20 17:27:15 +02:00
def is_valid_blobhash(blobhash):
2016-10-19 00:25:16 +02:00
"""Checks whether the blobhash is the correct length and contains only
valid characters (0-9, a-f)
2015-08-20 17:27:15 +02:00
@param blobhash: string, the blobhash to check
2016-10-19 00:25:16 +02:00
@return: True/False
2015-08-20 17:27:15 +02:00
"""
2016-10-19 00:25:16 +02:00
return len(blobhash) == blobhash_length and all(is_valid_hashcharacter(l) for l in blobhash)
def version_is_greater_than(a, b):
"""Returns True if version a is more recent than version b"""
try:
return distutils.version.StrictVersion(a) > distutils.version.StrictVersion(b)
except ValueError:
return distutils.version.LooseVersion(a) > distutils.version.LooseVersion(b)
def deobfuscate(obfustacated):
return base64.b64decode(obfustacated.decode('rot13'))
def obfuscate(plain):
return base64.b64encode(plain).encode('rot13')
settings_decoders = {
'.json': json.loads,
'.yml': yaml.load
}
settings_encoders = {
'.json': json.dumps,
'.yml': yaml.safe_dump
}
ADJUSTABLE_SETTINGS = AdjustableSettings().get_dict()
def load_settings(path):
ext = os.path.splitext(path)[1]
with open(path, 'r') as settings_file:
data = settings_file.read()
decoder = settings_decoders.get(ext, False)
assert decoder is not False, "Unknown settings format .%s" % ext
return decoder(data)
def save_settings(path):
to_save = {k: v for k, v in settings.__dict__.iteritems() if k in ADJUSTABLE_SETTINGS}
ext = os.path.splitext(path)[1]
encoder = settings_encoders.get(ext, False)
assert encoder is not False, "Unknown settings format .%s" % ext
with open(path, 'w') as settings_file:
settings_file.write(encoder(to_save))
2016-10-13 19:35:55 +02:00
def check_connection(server="www.lbry.io", port=80):
"""Attempts to open a socket to server:port and returns True if successful."""
try:
host = socket.gethostbyname(server)
s = socket.create_connection((host, port), 2)
return True
except Exception as ex:
log.info(
"Failed to connect to %s:%s. Maybe the internet connection is not working",
server, port, exc_info=True)
return False
def setup_certs_for_windows():
if getattr(sys, 'frozen', False) and os.name == "nt":
cert_path = os.path.join(os.path.dirname(sys.executable), "cacert.pem")
os.environ["REQUESTS_CA_BUNDLE"] = cert_path