diff --git a/lbry/console.py b/lbry/console.py index aff478eb8..a74776e03 100644 --- a/lbry/console.py +++ b/lbry/console.py @@ -1,5 +1,8 @@ import os +import sys +import time from typing import Dict, Any +from tempfile import TemporaryFile import tqdm @@ -9,6 +12,53 @@ from lbry.service.full_node import FullNode from lbry.service.light_client import LightClient +class RedirectOutput: + + silence_lines = [ + b'libprotobuf ERROR google/protobuf/wire_format_lite.cc:626', + ] + + def __init__(self, stream_type: str): + assert stream_type in ('stderr', 'stdout') + self.stream_type = stream_type + self.stream_no = getattr(sys, stream_type).fileno() + self.last_flush = time.time() + self.last_read = 0 + + def __enter__(self): + self.backup = os.dup(self.stream_no) + setattr(sys, self.stream_type, os.fdopen(self.backup, 'w')) + self.file = TemporaryFile() + self.backup = os.dup2(self.file.fileno(), self.stream_no) + + def __exit__(self, exc_type, exc_val, exc_tb): + self.file.close() + os.dup2(self.backup, self.stream_no) + os.close(self.backup) + setattr(sys, self.stream_type, os.fdopen(self.stream_no, 'w')) + + def capture(self): + self.__enter__() + + def release(self): + self.__exit__(None, None, None) + + def flush(self, writer, last=False): + if not last and (time.time() - self.last_flush) < 5: + return + self.file.seek(self.last_read) + for line in self.file.readlines(): + silence = False + for bad_line in self.silence_lines: + if bad_line in line: + silence = True + break + if not silence: + writer(line.decode().rstrip()) + self.last_read = self.file.tell() + self.last_flush = time.time() + + class Console: def __init__(self, service: Service): @@ -65,6 +115,7 @@ class Advanced(Basic): self.sync_steps = [] self.block_savers = 0 self.block_readers = 0 + self.stderr = RedirectOutput('stderr') def get_or_create_bar(self, name, desc, unit, total, leave=False, bar_format=None, postfix=None, position=None): bar = self.bars.get(name) @@ -168,8 +219,13 @@ class Advanced(Basic): def on_sync_progress(self, event): e, d = event['event'], event.get('data', {}) if e.endswith("sync.start"): + self.stderr.capture() self.sync_start(d) + self.stderr.flush(self.bars['sync'].write) elif e.endswith("sync.complete"): + self.stderr.flush(self.bars['sync'].write, True) self.sync_complete() + self.stderr.release() else: + self.stderr.flush(self.bars['sync'].write) self.update_progress(e, d) diff --git a/scripts/simulate_sync_console.py b/scripts/simulate_sync_console.py index ee88edf39..75ba49fd1 100644 --- a/scripts/simulate_sync_console.py +++ b/scripts/simulate_sync_console.py @@ -1,6 +1,10 @@ import asyncio from random import randrange from typing import List +from binascii import unhexlify +from google.protobuf.message import DecodeError + +from lbry.schema.claim import Claim from lbry.blockchain import Ledger from lbry.service import FullNode from lbry.console import Advanced, Basic @@ -8,6 +12,16 @@ from lbry.conf import Config from lbry.db.utils import chunk +def cause_protobuf_stderr(): + try: + Claim.from_bytes(unhexlify( + '005a3c63779597cba4c0e6ee45c3074fc389bd564ccc5d4a90eb4baacb0b028f2f4930' + '0db003d6a27f0cac8be8b45fdda597303208b81845534e4543494c07123e0a420a' + )) + except DecodeError: + pass + + class Simulator: def __init__(self, console): @@ -54,7 +68,9 @@ class Simulator: tasks = [] for file in file_group: if file == files[-1]: + cause_protobuf_stderr() tasks.append(self.sync_block_file(file, self.blocks-blocks_synced, self.txs-txs_synced)) + cause_protobuf_stderr() else: blocks = int(self.blocks / len(files)) blocks_synced += blocks