forked from LBRYCommunity/lbry-sdk
Compare commits
4 commits
master
...
spv-server
Author | SHA1 | Date | |
---|---|---|---|
|
9376e092c4 | ||
|
3c4574f11f | ||
|
9b3f7e133b | ||
|
9b0a99b161 |
9 changed files with 32 additions and 32 deletions
|
@ -12,11 +12,13 @@ import aiohttp
|
||||||
from aiohttp.web import GracefulExit
|
from aiohttp.web import GracefulExit
|
||||||
from docopt import docopt
|
from docopt import docopt
|
||||||
|
|
||||||
|
from torba.loggly_handler import get_loggly_handler
|
||||||
from lbry import __version__ as lbrynet_version
|
from lbry import __version__ as lbrynet_version
|
||||||
from lbry.extras.daemon.loggly_handler import get_loggly_handler
|
|
||||||
from lbry.conf import Config, CLIConfig
|
from lbry.conf import Config, CLIConfig
|
||||||
from lbry.extras.daemon.Daemon import Daemon
|
from lbry.extras.daemon.Daemon import Daemon
|
||||||
|
|
||||||
|
LOGGLY_TOKEN = 'BQEzZmMzLJHgAGxkBF00LGD0YGuyATVgAmqxAQEuAQZ2BQH4'
|
||||||
|
|
||||||
log = logging.getLogger('lbry')
|
log = logging.getLogger('lbry')
|
||||||
|
|
||||||
|
|
||||||
|
@ -248,9 +250,7 @@ def setup_logging(logger: logging.Logger, args: argparse.Namespace, conf: Config
|
||||||
logger.getChild('lbry').setLevel(logging.DEBUG)
|
logger.getChild('lbry').setLevel(logging.DEBUG)
|
||||||
|
|
||||||
if conf.share_usage_data:
|
if conf.share_usage_data:
|
||||||
loggly_handler = get_loggly_handler()
|
logger.getChild('lbry').addHandler(get_loggly_handler(f'lbrynet-{lbrynet_version}', LOGGLY_TOKEN))
|
||||||
loggly_handler.setLevel(logging.ERROR)
|
|
||||||
logger.getChild('lbry').addHandler(loggly_handler)
|
|
||||||
|
|
||||||
|
|
||||||
def run_daemon(args: argparse.Namespace, conf: Config):
|
def run_daemon(args: argparse.Namespace, conf: Config):
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
import time
|
import time
|
||||||
import struct
|
import struct
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import logging
|
|
||||||
from operator import itemgetter
|
from operator import itemgetter
|
||||||
from typing import Tuple, List, Dict, Union, Type, Optional
|
from typing import Tuple, List, Dict, Union, Type, Optional
|
||||||
from binascii import unhexlify
|
from binascii import unhexlify
|
||||||
|
@ -74,7 +73,6 @@ class ReaderState:
|
||||||
is_tracking_metrics: bool
|
is_tracking_metrics: bool
|
||||||
ledger: Type[BaseLedger]
|
ledger: Type[BaseLedger]
|
||||||
query_timeout: float
|
query_timeout: float
|
||||||
log: logging.Logger
|
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.db.close()
|
self.db.close()
|
||||||
|
@ -97,14 +95,14 @@ class ReaderState:
|
||||||
ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx')
|
ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx')
|
||||||
|
|
||||||
|
|
||||||
def initializer(log, _path, _ledger_name, query_timeout, _measure=False):
|
def initializer(_path, _ledger_name, query_timeout, _measure=False):
|
||||||
db = sqlite3.connect(_path, isolation_level=None, uri=True)
|
db = sqlite3.connect(_path, isolation_level=None, uri=True)
|
||||||
db.row_factory = sqlite3.Row
|
db.row_factory = sqlite3.Row
|
||||||
ctx.set(
|
ctx.set(
|
||||||
ReaderState(
|
ReaderState(
|
||||||
db=db, stack=[], metrics={}, is_tracking_metrics=_measure,
|
db=db, stack=[], metrics={}, is_tracking_metrics=_measure,
|
||||||
ledger=MainNetLedger if _ledger_name == 'mainnet' else RegTestLedger,
|
ledger=MainNetLedger if _ledger_name == 'mainnet' else RegTestLedger,
|
||||||
query_timeout=query_timeout, log=log
|
query_timeout=query_timeout
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -172,9 +170,7 @@ def execute_query(sql, values) -> List:
|
||||||
if context.is_tracking_metrics:
|
if context.is_tracking_metrics:
|
||||||
context.metrics['execute_query'][-1]['sql'] = plain_sql
|
context.metrics['execute_query'][-1]['sql'] = plain_sql
|
||||||
if str(err) == "interrupted":
|
if str(err) == "interrupted":
|
||||||
context.log.warning("interrupted slow sqlite query:\n%s", plain_sql)
|
|
||||||
raise SQLiteInterruptedError(context.metrics)
|
raise SQLiteInterruptedError(context.metrics)
|
||||||
context.log.exception('failed running query', exc_info=err)
|
|
||||||
raise SQLiteOperationalError(context.metrics)
|
raise SQLiteOperationalError(context.metrics)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -67,7 +67,7 @@ class LBRYSessionManager(SessionManager):
|
||||||
path = os.path.join(self.env.db_dir, 'claims.db')
|
path = os.path.join(self.env.db_dir, 'claims.db')
|
||||||
args = dict(
|
args = dict(
|
||||||
initializer=reader.initializer,
|
initializer=reader.initializer,
|
||||||
initargs=(self.logger, path, self.env.coin.NET, self.env.database_query_timeout,
|
initargs=(path, self.env.coin.NET, self.env.database_query_timeout,
|
||||||
self.env.track_metrics)
|
self.env.track_metrics)
|
||||||
)
|
)
|
||||||
if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
|
if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
|
||||||
|
|
|
@ -112,7 +112,7 @@ async def search(executor, kwargs):
|
||||||
|
|
||||||
|
|
||||||
async def main(db_path, max_query_time):
|
async def main(db_path, max_query_time):
|
||||||
args = dict(initializer=initializer, initargs=(log, db_path, MainNetLedger, 0.25))
|
args = dict(initializer=initializer, initargs=(db_path, MainNetLedger, 0.25))
|
||||||
workers = max(os.cpu_count(), 4)
|
workers = max(os.cpu_count(), 4)
|
||||||
log.info(f"using {workers} reader processes")
|
log.info(f"using {workers} reader processes")
|
||||||
query_executor = ProcessPoolExecutor(workers, **args)
|
query_executor = ProcessPoolExecutor(workers, **args)
|
||||||
|
|
|
@ -45,7 +45,7 @@ async def run_times(executor, iterations, show=True):
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
executor = ProcessPoolExecutor(
|
executor = ProcessPoolExecutor(
|
||||||
4, initializer=reader.initializer, initargs=(log, db_path, 'mainnet', 1.0, True)
|
4, initializer=reader.initializer, initargs=(db_path, 'mainnet', 1.0, True)
|
||||||
)
|
)
|
||||||
#await run_times(executor, 4, show=False)
|
#await run_times(executor, 4, show=False)
|
||||||
#await run_times(executor, 1)
|
#await run_times(executor, 1)
|
||||||
|
|
|
@ -46,7 +46,7 @@ class TestSQLDB(unittest.TestCase):
|
||||||
db_url = 'file:test_sqldb?mode=memory&cache=shared'
|
db_url = 'file:test_sqldb?mode=memory&cache=shared'
|
||||||
self.sql = writer.SQLDB(self, db_url)
|
self.sql = writer.SQLDB(self, db_url)
|
||||||
self.addCleanup(self.sql.close)
|
self.addCleanup(self.sql.close)
|
||||||
reader.initializer(logging.getLogger(__name__), db_url, 'regtest', self.query_timeout)
|
reader.initializer(db_url, 'regtest', self.query_timeout)
|
||||||
self.addCleanup(reader.cleanup)
|
self.addCleanup(reader.cleanup)
|
||||||
self.timer = Timer('BlockProcessor')
|
self.timer = Timer('BlockProcessor')
|
||||||
self.sql.open()
|
self.sql.open()
|
||||||
|
|
|
@ -1,22 +1,24 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
from aiohttp.client_exceptions import ClientError
|
|
||||||
import json
|
import json
|
||||||
|
import codecs
|
||||||
|
import base64
|
||||||
import logging.handlers
|
import logging.handlers
|
||||||
import traceback
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
from lbry import utils, __version__
|
from aiohttp.client_exceptions import ClientError
|
||||||
|
|
||||||
|
|
||||||
LOGGLY_TOKEN = 'BQEzZmMzLJHgAGxkBF00LGD0YGuyATVgAmqxAQEuAQZ2BQH4'
|
def deobfuscate(obfustacated):
|
||||||
|
return base64.b64decode(codecs.encode(obfustacated, 'rot_13')(obfustacated)).decode()
|
||||||
|
|
||||||
|
|
||||||
class JsonFormatter(logging.Formatter):
|
class JsonFormatter(logging.Formatter):
|
||||||
"""Format log records using json serialization"""
|
"""Format log records using json serialization"""
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
|
super().__init__()
|
||||||
self.attributes = kwargs
|
self.attributes = kwargs
|
||||||
|
|
||||||
def format(self, record):
|
def format(self, record: logging.LogRecord):
|
||||||
data = {
|
data = {
|
||||||
'loggerName': record.name,
|
'loggerName': record.name,
|
||||||
'asciTime': self.formatTime(record),
|
'asciTime': self.formatTime(record),
|
||||||
|
@ -34,23 +36,15 @@ class JsonFormatter(logging.Formatter):
|
||||||
|
|
||||||
|
|
||||||
class HTTPSLogglyHandler(logging.Handler):
|
class HTTPSLogglyHandler(logging.Handler):
|
||||||
def __init__(self, loggly_token: str, fqdn=False, localname=None, facility=None, cookies=None):
|
def __init__(self, loggly_token: str, tag: str, fqdn=False, localname=None, facility=None, cookies=None):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.fqdn = fqdn
|
self.fqdn = fqdn
|
||||||
self.localname = localname
|
self.localname = localname
|
||||||
self.facility = facility
|
self.facility = facility
|
||||||
self.cookies = cookies or {}
|
self.cookies = cookies or {}
|
||||||
self.url = "https://logs-01.loggly.com/inputs/{token}/tag/{tag}".format(
|
self.url = f"https://logs-01.loggly.com/inputs/{deobfuscate(loggly_token)}/tag/{tag}"
|
||||||
token=utils.deobfuscate(loggly_token), tag='lbrynet-' + __version__
|
|
||||||
)
|
|
||||||
self._loop = asyncio.get_event_loop()
|
self._loop = asyncio.get_event_loop()
|
||||||
self._session = aiohttp.ClientSession()
|
self._session = aiohttp.ClientSession(loop=self._loop)
|
||||||
|
|
||||||
def get_full_message(self, record):
|
|
||||||
if record.exc_info:
|
|
||||||
return '\n'.join(traceback.format_exception(*record.exc_info))
|
|
||||||
else:
|
|
||||||
return record.getMessage()
|
|
||||||
|
|
||||||
async def _emit(self, record, retry=True):
|
async def _emit(self, record, retry=True):
|
||||||
data = self.format(record).encode()
|
data = self.format(record).encode()
|
||||||
|
@ -76,7 +70,8 @@ class HTTPSLogglyHandler(logging.Handler):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def get_loggly_handler():
|
def get_loggly_handler(tag: str, token: str, level=logging.ERROR) -> HTTPSLogglyHandler:
|
||||||
handler = HTTPSLogglyHandler(LOGGLY_TOKEN)
|
handler = HTTPSLogglyHandler(token, tag)
|
||||||
handler.setFormatter(JsonFormatter())
|
handler.setFormatter(JsonFormatter())
|
||||||
|
handler.setLevel(level)
|
||||||
return handler
|
return handler
|
|
@ -29,6 +29,7 @@ class Env:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def __init__(self, coin=None):
|
def __init__(self, coin=None):
|
||||||
|
self.loggly_token = self.default('LOGGLY_TOKEN', '')
|
||||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||||
self.allow_root = self.boolean('ALLOW_ROOT', False)
|
self.allow_root = self.boolean('ALLOW_ROOT', False)
|
||||||
self.host = self.default('HOST', 'localhost')
|
self.host = self.default('HOST', 'localhost')
|
||||||
|
|
|
@ -4,6 +4,7 @@ import asyncio
|
||||||
from concurrent.futures.thread import ThreadPoolExecutor
|
from concurrent.futures.thread import ThreadPoolExecutor
|
||||||
|
|
||||||
import torba
|
import torba
|
||||||
|
from torba.loggly_handler import get_loggly_handler
|
||||||
from torba.server.mempool import MemPool, MemPoolAPI
|
from torba.server.mempool import MemPool, MemPoolAPI
|
||||||
|
|
||||||
|
|
||||||
|
@ -67,6 +68,13 @@ class Server:
|
||||||
def __init__(self, env):
|
def __init__(self, env):
|
||||||
self.env = env
|
self.env = env
|
||||||
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
|
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
|
||||||
|
console_handler = logging.StreamHandler()
|
||||||
|
console_handler.setFormatter(
|
||||||
|
logging.Formatter("%(asctime)s %(levelname)-8s %(name)s:%(lineno)d: %(message)s")
|
||||||
|
)
|
||||||
|
self.log.addHandler(console_handler)
|
||||||
|
if self.env.loggly_token:
|
||||||
|
self.log.addHandler(get_loggly_handler('wallet-server', self.env.loggly_token))
|
||||||
self.shutdown_event = asyncio.Event()
|
self.shutdown_event = asyncio.Event()
|
||||||
self.cancellable_tasks = []
|
self.cancellable_tasks = []
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue