From 310fe4a42cbd9f7ee4c1fc07d52510634c1560fb Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Sat, 4 Aug 2018 20:20:37 -0400 Subject: [PATCH] updated Headers component to use lbrynet.wallet --- lbrynet/daemon/Components.py | 86 ++++++++++++++++++------------------ lbrynet/wallet/network.py | 3 ++ 2 files changed, 47 insertions(+), 42 deletions(-) diff --git a/lbrynet/daemon/Components.py b/lbrynet/daemon/Components.py index 10fa646cf..9b22e0c6f 100644 --- a/lbrynet/daemon/Components.py +++ b/lbrynet/daemon/Components.py @@ -1,13 +1,12 @@ import os import logging -from hashlib import sha256 import treq import math import binascii +from hashlib import sha256 +from types import SimpleNamespace from twisted.internet import defer, threads, reactor, error from txupnp.upnp import UPnP -from lbryum.simple_config import SimpleConfig -from lbryum.constants import HEADERS_URL, HEADER_SIZE from lbrynet import conf from lbrynet.core.utils import DeferredDict from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager @@ -15,6 +14,7 @@ from lbrynet.core.RateLimiter import RateLimiter from lbrynet.core.BlobManager import DiskBlobManager from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, EncryptedFileStreamType from lbrynet.wallet.manager import LbryWalletManager +from lbrynet.wallet.network import Network from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.daemon.Component import Component @@ -25,7 +25,7 @@ from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.reflector import ServerFactory as reflector_server_factory -from lbrynet.txlbryum.factory import StratumClient + from lbrynet.core.utils import generate_id log = logging.getLogger(__name__) @@ -169,12 +169,18 @@ class DatabaseComponent(Component): self.storage = None +HEADERS_URL = "https://headers.lbry.io/blockchain_headers_latest" +HEADER_SIZE = 112 + + class HeadersComponent(Component): component_name = HEADERS_COMPONENT def __init__(self, component_manager): super().__init__(component_manager) - self.config = SimpleConfig(get_wallet_config()) + self.headers_dir = os.path.join(conf.settings['lbryum_wallet_dir'], 'lbc_mainnet') + self.headers_file = os.path.join(self.headers_dir, 'headers') + self.old_file = os.path.join(conf.settings['lbryum_wallet_dir'], 'blockchain_headers') self._downloading_headers = None self._headers_progress_percent = None @@ -190,19 +196,18 @@ class HeadersComponent(Component): @defer.inlineCallbacks def fetch_headers_from_s3(self): - local_header_size = self.local_header_file_size() - self._headers_progress_percent = 0.0 - resume_header = {"Range": "bytes={}-".format(local_header_size)} - response = yield treq.get(HEADERS_URL, headers=resume_header) - final_size_after_download = response.length + local_header_size - - def collector(data, h_file, start_size): + def collector(data, h_file): h_file.write(data) local_size = float(h_file.tell()) final_size = float(final_size_after_download) - self._headers_progress_percent = math.ceil((local_size - start_size) / (final_size - start_size) * 100) + self._headers_progress_percent = math.ceil(local_size / final_size * 100) - if response.code == 406: # our file is bigger + local_header_size = self.local_header_file_size() + resume_header = {"Range": "bytes={}-".format(local_header_size)} + response = yield treq.get(HEADERS_URL, headers=resume_header) + got_406 = response.code == 406 # our file is bigger + final_size_after_download = response.length + local_header_size + if got_406: log.warning("s3 is more out of date than we are") # should have something to download and a final length divisible by the header size elif final_size_after_download and not final_size_after_download % HEADER_SIZE: @@ -211,11 +216,11 @@ class HeadersComponent(Component): if s3_height > local_height: if local_header_size: log.info("Resuming download of %i bytes from s3", response.length) - with open(os.path.join(self.config.path, "blockchain_headers"), "a+b") as headers_file: - yield treq.collect(response, lambda d: collector(d, headers_file, local_header_size)) + with open(self.headers_file, "a+b") as headers_file: + yield treq.collect(response, lambda d: collector(d, headers_file)) else: - with open(os.path.join(self.config.path, "blockchain_headers"), "wb") as headers_file: - yield treq.collect(response, lambda d: collector(d, headers_file, 0)) + with open(self.headers_file, "wb") as headers_file: + yield treq.collect(response, lambda d: collector(d, headers_file)) log.info("fetched headers from s3 (s3 height: %i), now verifying integrity after download.", s3_height) self._check_header_file_integrity() else: @@ -227,20 +232,19 @@ class HeadersComponent(Component): return max((self.local_header_file_size() / HEADER_SIZE) - 1, 0) def local_header_file_size(self): - headers_path = os.path.join(self.config.path, "blockchain_headers") - if os.path.isfile(headers_path): - return os.stat(headers_path).st_size + if os.path.isfile(self.headers_file): + return os.stat(self.headers_file).st_size return 0 @defer.inlineCallbacks - def get_remote_height(self, server, port): - connected = defer.Deferred() - connected.addTimeout(3, reactor, lambda *_: None) - client = StratumClient(connected) - reactor.connectTCP(server, port, client) - yield connected - remote_height = yield client.blockchain_block_get_server_height() - client.client.transport.loseConnection() + def get_remote_height(self): + ledger = SimpleNamespace() + ledger.config = conf + net = Network(ledger) + net.start() + yield net.on_connected.first + remote_height = yield net.get_server_height() + yield net.stop() defer.returnValue(remote_height) @defer.inlineCallbacks @@ -252,15 +256,10 @@ class HeadersComponent(Component): if not s3_headers_depth: defer.returnValue(False) local_height = self.local_header_file_height() - for server_url in self.config.get('default_servers'): - port = int(self.config.get('default_servers')[server_url]['t']) - try: - remote_height = yield self.get_remote_height(server_url, port) - log.info("%s:%i height: %i, local height: %s", server_url, port, remote_height, local_height) - if remote_height > (local_height + s3_headers_depth): - defer.returnValue(True) - except Exception as err: - log.warning("error requesting remote height from %s:%i - %s", server_url, port, err) + remote_height = yield self.get_remote_height() + log.info("remote height: %i, local height: %s", remote_height, local_height) + if remote_height > (local_height + s3_headers_depth): + defer.returnValue(True) defer.returnValue(False) def _check_header_file_integrity(self): @@ -272,22 +271,25 @@ class HeadersComponent(Component): checksum_length_in_bytes = checksum_height * HEADER_SIZE if self.local_header_file_size() < checksum_length_in_bytes: return - headers_path = os.path.join(self.config.path, "blockchain_headers") - with open(headers_path, "rb") as headers_file: + with open(self.headers_file, "rb") as headers_file: hashsum.update(headers_file.read(checksum_length_in_bytes)) current_checksum = hashsum.hexdigest() if current_checksum != checksum: msg = "Expected checksum {}, got {}".format(checksum, current_checksum) log.warning("Wallet file corrupted, checksum mismatch. " + msg) log.warning("Deleting header file so it can be downloaded again.") - os.unlink(headers_path) + os.unlink(self.headers_file) elif (self.local_header_file_size() % HEADER_SIZE) != 0: log.warning("Header file is good up to checkpoint height, but incomplete. Truncating to checkpoint.") - with open(headers_path, "rb+") as headers_file: + with open(self.headers_file, "rb+") as headers_file: headers_file.truncate(checksum_length_in_bytes) @defer.inlineCallbacks def start(self): + if not os.path.exists(self.headers_dir): + os.mkdir(self.headers_dir) + if os.path.exists(self.old_file): + os.rename(self.old_file, self.headers_file) self._downloading_headers = yield self.should_download_headers_from_s3() if self._downloading_headers: try: diff --git a/lbrynet/wallet/network.py b/lbrynet/wallet/network.py index 12f7d2bde..b6e54dcc0 100644 --- a/lbrynet/wallet/network.py +++ b/lbrynet/wallet/network.py @@ -3,6 +3,9 @@ from torba.basenetwork import BaseNetwork class Network(BaseNetwork): + def get_server_height(self): + return self.rpc('blockchain.block.get_server_height') + def get_values_for_uris(self, block_hash, *uris): return self.rpc('blockchain.claimtrie.getvaluesforuris', block_hash, *uris)