import base64 import codecs import datetime import random import socket import string import json import traceback import functools import logging import pkg_resources from twisted.python.failure import Failure from twisted.internet import defer from lbrynet.schema.claim import ClaimDict from lbrynet.cryptoutils import get_lbry_hash_obj log = logging.getLogger(__name__) # defining these time functions here allows for easier overriding in testing def now(): return datetime.datetime.now() def utcnow(): return datetime.datetime.utcnow() def isonow(): """Return utc now in isoformat with timezone""" return utcnow().isoformat() + 'Z' def today(): return datetime.datetime.today() def timedelta(**kwargs): return datetime.timedelta(**kwargs) def datetime_obj(*args, **kwargs): return datetime.datetime(*args, **kwargs) def call_later(delay, func, *args, **kwargs): # Import here to ensure that it gets called after installing a reactor # see: http://twistedmatrix.com/documents/current/core/howto/choosing-reactor.html from twisted.internet import reactor return reactor.callLater(delay, func, *args, **kwargs) def safe_start_looping_call(looping_call, interval_sec): if not looping_call.running: looping_call.start(interval_sec) def safe_stop_looping_call(looping_call): if looping_call.running: looping_call.stop() def generate_id(num=None): h = get_lbry_hash_obj() if num is not None: h.update(str(num).encode()) else: h.update(str(random.getrandbits(512)).encode()) return h.digest() def version_is_greater_than(a, b): """Returns True if version a is more recent than version b""" return pkg_resources.parse_version(a) > pkg_resources.parse_version(b) def rot13(some_str): return codecs.encode(some_str, 'rot_13') def deobfuscate(obfustacated): return base64.b64decode(rot13(obfustacated)).decode() def obfuscate(plain): return rot13(base64.b64encode(plain).decode()) def check_connection(server="lbry.io", port=80, timeout=5): """Attempts to open a socket to server:port and returns True if successful.""" log.debug('Checking connection to %s:%s', server, port) try: server = socket.gethostbyname(server) conn = socket.create_connection((server, port), timeout) conn.close() log.debug('Connection successful') return True except (socket.gaierror, socket.herror) as ex: log.warning("Failed to connect to %s:%s. Unable to resolve domain. Trying to bypass DNS", server, port) try: server = "8.8.8.8" port = 53 socket.create_connection((server, port), timeout) log.debug('Connection successful') return True except Exception as ex: log.error("Failed to connect to %s:%s. Maybe the internet connection is not working", server, port) return False except Exception as ex: log.error("Failed to connect to %s:%s. Maybe the internet connection is not working", server, port) return False def random_string(length=10, chars=string.ascii_lowercase): return ''.join([random.choice(chars) for _ in range(length)]) def short_hash(hash_str): return hash_str[:6] def get_sd_hash(stream_info): if not stream_info: return None if isinstance(stream_info, ClaimDict): return stream_info.source_hash result = stream_info.get('claim', {}).\ get('value', {}).\ get('stream', {}).\ get('source', {}).\ get('source') if not result: log.warning("Unable to get sd_hash") return result def json_dumps_pretty(obj, **kwargs): return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs) class DeferredLockContextManager: def __init__(self, lock): self._lock = lock def __enter__(self): yield self._lock.acquire() def __exit__(self, exc_type, exc_val, exc_tb): yield self._lock.release() @defer.inlineCallbacks def DeferredDict(d, consumeErrors=False): keys = [] dl = [] response = {} for k, v in d.items(): keys.append(k) dl.append(v) results = yield defer.DeferredList(dl, consumeErrors=consumeErrors) for k, (success, result) in zip(keys, results): if success: response[k] = result defer.returnValue(response) class DeferredProfiler: def __init__(self): self.profile_results = {} def add_result(self, fn, start_time, finished_time, stack, success): self.profile_results[fn].append((start_time, finished_time, stack, success)) def show_profile_results(self, fn): profile_results = list(self.profile_results[fn]) call_counts = { caller: [(start, finished, finished - start, success) for (start, finished, _caller, success) in profile_results if _caller == caller] for caller in {result[2] for result in profile_results} } log.info("called %s %i times from %i sources\n", fn.__name__, len(profile_results), len(call_counts)) for caller in sorted(list(call_counts.keys()), key=lambda c: len(call_counts[c]), reverse=True): call_info = call_counts[caller] times = [r[2] for r in call_info] own_time = sum(times) times.sort() longest = 0 if not times else times[-1] shortest = 0 if not times else times[0] log.info( "%i successes and %i failures\nlongest %f, shortest %f, avg %f\ncaller:\n%s", len([r for r in call_info if r[3]]), len([r for r in call_info if not r[3]]), longest, shortest, own_time / float(len(call_info)), caller ) def profiled_deferred(self, reactor=None): if not reactor: from twisted.internet import reactor def _cb(result, fn, start, caller_info): if isinstance(result, (Failure, Exception)): error = result result = None else: error = None self.add_result(fn, start, reactor.seconds(), caller_info, error is None) if error is None: return result raise error def _profiled_deferred(fn): reactor.addSystemEventTrigger("after", "shutdown", self.show_profile_results, fn) self.profile_results[fn] = [] @functools.wraps(fn) def _wrapper(*args, **kwargs): caller_info = "".join(traceback.format_list(traceback.extract_stack()[-3:-1])) start = reactor.seconds() d = defer.maybeDeferred(fn, *args, **kwargs) d.addBoth(_cb, fn, start, caller_info) return d return _wrapper return _profiled_deferred _profiler = DeferredProfiler() profile_deferred = _profiler.profiled_deferred