updated Headers component to use lbrynet.wallet

This commit is contained in:
Lex Berezhny 2018-08-04 20:20:37 -04:00 committed by Jack Robison
parent a7ef8889dd
commit 310fe4a42c
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 47 additions and 42 deletions

View file

@ -1,13 +1,12 @@
import os import os
import logging import logging
from hashlib import sha256
import treq import treq
import math import math
import binascii import binascii
from hashlib import sha256
from types import SimpleNamespace
from twisted.internet import defer, threads, reactor, error from twisted.internet import defer, threads, reactor, error
from txupnp.upnp import UPnP from txupnp.upnp import UPnP
from lbryum.simple_config import SimpleConfig
from lbryum.constants import HEADERS_URL, HEADER_SIZE
from lbrynet import conf from lbrynet import conf
from lbrynet.core.utils import DeferredDict from lbrynet.core.utils import DeferredDict
from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager
@ -15,6 +14,7 @@ from lbrynet.core.RateLimiter import RateLimiter
from lbrynet.core.BlobManager import DiskBlobManager from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, EncryptedFileStreamType from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, EncryptedFileStreamType
from lbrynet.wallet.manager import LbryWalletManager from lbrynet.wallet.manager import LbryWalletManager
from lbrynet.wallet.network import Network
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.daemon.Component import Component from lbrynet.daemon.Component import Component
@ -25,7 +25,7 @@ from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory
from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.reflector import ServerFactory as reflector_server_factory from lbrynet.reflector import ServerFactory as reflector_server_factory
from lbrynet.txlbryum.factory import StratumClient
from lbrynet.core.utils import generate_id from lbrynet.core.utils import generate_id
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -169,12 +169,18 @@ class DatabaseComponent(Component):
self.storage = None self.storage = None
HEADERS_URL = "https://headers.lbry.io/blockchain_headers_latest"
HEADER_SIZE = 112
class HeadersComponent(Component): class HeadersComponent(Component):
component_name = HEADERS_COMPONENT component_name = HEADERS_COMPONENT
def __init__(self, component_manager): def __init__(self, component_manager):
super().__init__(component_manager) super().__init__(component_manager)
self.config = SimpleConfig(get_wallet_config()) self.headers_dir = os.path.join(conf.settings['lbryum_wallet_dir'], 'lbc_mainnet')
self.headers_file = os.path.join(self.headers_dir, 'headers')
self.old_file = os.path.join(conf.settings['lbryum_wallet_dir'], 'blockchain_headers')
self._downloading_headers = None self._downloading_headers = None
self._headers_progress_percent = None self._headers_progress_percent = None
@ -190,19 +196,18 @@ class HeadersComponent(Component):
@defer.inlineCallbacks @defer.inlineCallbacks
def fetch_headers_from_s3(self): def fetch_headers_from_s3(self):
local_header_size = self.local_header_file_size() def collector(data, h_file):
self._headers_progress_percent = 0.0
resume_header = {"Range": "bytes={}-".format(local_header_size)}
response = yield treq.get(HEADERS_URL, headers=resume_header)
final_size_after_download = response.length + local_header_size
def collector(data, h_file, start_size):
h_file.write(data) h_file.write(data)
local_size = float(h_file.tell()) local_size = float(h_file.tell())
final_size = float(final_size_after_download) final_size = float(final_size_after_download)
self._headers_progress_percent = math.ceil((local_size - start_size) / (final_size - start_size) * 100) self._headers_progress_percent = math.ceil(local_size / final_size * 100)
if response.code == 406: # our file is bigger local_header_size = self.local_header_file_size()
resume_header = {"Range": "bytes={}-".format(local_header_size)}
response = yield treq.get(HEADERS_URL, headers=resume_header)
got_406 = response.code == 406 # our file is bigger
final_size_after_download = response.length + local_header_size
if got_406:
log.warning("s3 is more out of date than we are") log.warning("s3 is more out of date than we are")
# should have something to download and a final length divisible by the header size # should have something to download and a final length divisible by the header size
elif final_size_after_download and not final_size_after_download % HEADER_SIZE: elif final_size_after_download and not final_size_after_download % HEADER_SIZE:
@ -211,11 +216,11 @@ class HeadersComponent(Component):
if s3_height > local_height: if s3_height > local_height:
if local_header_size: if local_header_size:
log.info("Resuming download of %i bytes from s3", response.length) log.info("Resuming download of %i bytes from s3", response.length)
with open(os.path.join(self.config.path, "blockchain_headers"), "a+b") as headers_file: with open(self.headers_file, "a+b") as headers_file:
yield treq.collect(response, lambda d: collector(d, headers_file, local_header_size)) yield treq.collect(response, lambda d: collector(d, headers_file))
else: else:
with open(os.path.join(self.config.path, "blockchain_headers"), "wb") as headers_file: with open(self.headers_file, "wb") as headers_file:
yield treq.collect(response, lambda d: collector(d, headers_file, 0)) yield treq.collect(response, lambda d: collector(d, headers_file))
log.info("fetched headers from s3 (s3 height: %i), now verifying integrity after download.", s3_height) log.info("fetched headers from s3 (s3 height: %i), now verifying integrity after download.", s3_height)
self._check_header_file_integrity() self._check_header_file_integrity()
else: else:
@ -227,20 +232,19 @@ class HeadersComponent(Component):
return max((self.local_header_file_size() / HEADER_SIZE) - 1, 0) return max((self.local_header_file_size() / HEADER_SIZE) - 1, 0)
def local_header_file_size(self): def local_header_file_size(self):
headers_path = os.path.join(self.config.path, "blockchain_headers") if os.path.isfile(self.headers_file):
if os.path.isfile(headers_path): return os.stat(self.headers_file).st_size
return os.stat(headers_path).st_size
return 0 return 0
@defer.inlineCallbacks @defer.inlineCallbacks
def get_remote_height(self, server, port): def get_remote_height(self):
connected = defer.Deferred() ledger = SimpleNamespace()
connected.addTimeout(3, reactor, lambda *_: None) ledger.config = conf
client = StratumClient(connected) net = Network(ledger)
reactor.connectTCP(server, port, client) net.start()
yield connected yield net.on_connected.first
remote_height = yield client.blockchain_block_get_server_height() remote_height = yield net.get_server_height()
client.client.transport.loseConnection() yield net.stop()
defer.returnValue(remote_height) defer.returnValue(remote_height)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -252,15 +256,10 @@ class HeadersComponent(Component):
if not s3_headers_depth: if not s3_headers_depth:
defer.returnValue(False) defer.returnValue(False)
local_height = self.local_header_file_height() local_height = self.local_header_file_height()
for server_url in self.config.get('default_servers'): remote_height = yield self.get_remote_height()
port = int(self.config.get('default_servers')[server_url]['t']) log.info("remote height: %i, local height: %s", remote_height, local_height)
try: if remote_height > (local_height + s3_headers_depth):
remote_height = yield self.get_remote_height(server_url, port) defer.returnValue(True)
log.info("%s:%i height: %i, local height: %s", server_url, port, remote_height, local_height)
if remote_height > (local_height + s3_headers_depth):
defer.returnValue(True)
except Exception as err:
log.warning("error requesting remote height from %s:%i - %s", server_url, port, err)
defer.returnValue(False) defer.returnValue(False)
def _check_header_file_integrity(self): def _check_header_file_integrity(self):
@ -272,22 +271,25 @@ class HeadersComponent(Component):
checksum_length_in_bytes = checksum_height * HEADER_SIZE checksum_length_in_bytes = checksum_height * HEADER_SIZE
if self.local_header_file_size() < checksum_length_in_bytes: if self.local_header_file_size() < checksum_length_in_bytes:
return return
headers_path = os.path.join(self.config.path, "blockchain_headers") with open(self.headers_file, "rb") as headers_file:
with open(headers_path, "rb") as headers_file:
hashsum.update(headers_file.read(checksum_length_in_bytes)) hashsum.update(headers_file.read(checksum_length_in_bytes))
current_checksum = hashsum.hexdigest() current_checksum = hashsum.hexdigest()
if current_checksum != checksum: if current_checksum != checksum:
msg = "Expected checksum {}, got {}".format(checksum, current_checksum) msg = "Expected checksum {}, got {}".format(checksum, current_checksum)
log.warning("Wallet file corrupted, checksum mismatch. " + msg) log.warning("Wallet file corrupted, checksum mismatch. " + msg)
log.warning("Deleting header file so it can be downloaded again.") log.warning("Deleting header file so it can be downloaded again.")
os.unlink(headers_path) os.unlink(self.headers_file)
elif (self.local_header_file_size() % HEADER_SIZE) != 0: elif (self.local_header_file_size() % HEADER_SIZE) != 0:
log.warning("Header file is good up to checkpoint height, but incomplete. Truncating to checkpoint.") log.warning("Header file is good up to checkpoint height, but incomplete. Truncating to checkpoint.")
with open(headers_path, "rb+") as headers_file: with open(self.headers_file, "rb+") as headers_file:
headers_file.truncate(checksum_length_in_bytes) headers_file.truncate(checksum_length_in_bytes)
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self): def start(self):
if not os.path.exists(self.headers_dir):
os.mkdir(self.headers_dir)
if os.path.exists(self.old_file):
os.rename(self.old_file, self.headers_file)
self._downloading_headers = yield self.should_download_headers_from_s3() self._downloading_headers = yield self.should_download_headers_from_s3()
if self._downloading_headers: if self._downloading_headers:
try: try:

View file

@ -3,6 +3,9 @@ from torba.basenetwork import BaseNetwork
class Network(BaseNetwork): class Network(BaseNetwork):
def get_server_height(self):
return self.rpc('blockchain.block.get_server_height')
def get_values_for_uris(self, block_hash, *uris): def get_values_for_uris(self, block_hash, *uris):
return self.rpc('blockchain.claimtrie.getvaluesforuris', block_hash, *uris) return self.rpc('blockchain.claimtrie.getvaluesforuris', block_hash, *uris)