split headers download into own component

-add component statuses
This commit is contained in:
Jack Robison 2018-07-25 15:32:01 -04:00
parent 5b3103e41b
commit e3c3fafa1e
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 181 additions and 152 deletions

View file

@ -4,27 +4,23 @@ import datetime
import logging import logging
from decimal import Decimal from decimal import Decimal
import treq
from zope.interface import implements from zope.interface import implements
from twisted.internet import threads, reactor, defer, task from twisted.internet import threads, reactor, defer, task
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.internet.error import ConnectionAborted from twisted.internet.error import ConnectionAborted
from hashlib import sha256
from lbryum import wallet as lbryum_wallet from lbryum import wallet as lbryum_wallet
from lbryum.network import Network from lbryum.network import Network
from lbryum.simple_config import SimpleConfig from lbryum.simple_config import SimpleConfig
from lbryum.constants import COIN from lbryum.constants import COIN
from lbryum.commands import Commands from lbryum.commands import Commands
from lbryum.errors import InvalidPassword from lbryum.errors import InvalidPassword
from lbryum.constants import HEADERS_URL, HEADER_SIZE
from lbryschema.uri import parse_lbry_uri from lbryschema.uri import parse_lbry_uri
from lbryschema.claim import ClaimDict from lbryschema.claim import ClaimDict
from lbryschema.error import DecodeError from lbryschema.error import DecodeError
from lbryschema.decode import smart_decode from lbryschema.decode import smart_decode
from lbrynet.txlbryum.factory import StratumClient
from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet
from lbrynet.core.utils import DeferredDict from lbrynet.core.utils import DeferredDict
from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.client.ClientRequest import ClientRequest
@ -92,107 +88,8 @@ class Wallet(object):
self._batch_count = 20 self._batch_count = 20
self._pending_claim_checker = task.LoopingCall(self.fetch_and_save_heights_for_pending_claims) self._pending_claim_checker = task.LoopingCall(self.fetch_and_save_heights_for_pending_claims)
@defer.inlineCallbacks
def fetch_headers_from_s3(self):
local_header_size = self.local_header_file_size()
resume_header = {"Range": "bytes={}-".format(local_header_size)}
response = yield treq.get(HEADERS_URL, headers=resume_header)
got_406 = response.code == 406 # our file is bigger
final_size_after_download = response.length + local_header_size
if got_406:
log.warning("s3 is more out of date than we are")
# should have something to download and a final length divisible by the header size
elif final_size_after_download and not final_size_after_download % HEADER_SIZE:
s3_height = (final_size_after_download / HEADER_SIZE) - 1
local_height = self.local_header_file_height()
if s3_height > local_height:
if local_header_size:
log.info("Resuming download of %i bytes from s3", response.length)
with open(os.path.join(self.config.path, "blockchain_headers"), "a+b") as headers_file:
yield treq.collect(response, headers_file.write)
else:
with open(os.path.join(self.config.path, "blockchain_headers"), "wb") as headers_file:
yield treq.collect(response, headers_file.write)
log.info("fetched headers from s3 (s3 height: %i), now verifying integrity after download.", s3_height)
self._check_header_file_integrity()
else:
log.warning("s3 is more out of date than we are")
else:
log.error("invalid size for headers from s3")
def local_header_file_height(self):
return max((self.local_header_file_size() / HEADER_SIZE) - 1, 0)
def local_header_file_size(self):
headers_path = os.path.join(self.config.path, "blockchain_headers")
if os.path.isfile(headers_path):
return os.stat(headers_path).st_size
return 0
@defer.inlineCallbacks
def get_remote_height(self, server, port):
connected = defer.Deferred()
connected.addTimeout(3, reactor, lambda *_: None)
client = StratumClient(connected)
reactor.connectTCP(server, port, client)
yield connected
remote_height = yield client.blockchain_block_get_server_height()
client.client.transport.loseConnection()
defer.returnValue(remote_height)
@defer.inlineCallbacks
def should_download_headers_from_s3(self):
from lbrynet import conf
if conf.settings['blockchain_name'] != "lbrycrd_main":
defer.returnValue(False)
self._check_header_file_integrity()
s3_headers_depth = conf.settings['s3_headers_depth']
if not s3_headers_depth:
defer.returnValue(False)
local_height = self.local_header_file_height()
for server_url in self.config.get('default_servers'):
port = int(self.config.get('default_servers')[server_url]['t'])
try:
remote_height = yield self.get_remote_height(server_url, port)
log.info("%s:%i height: %i, local height: %s", server_url, port, remote_height, local_height)
if remote_height > (local_height + s3_headers_depth):
defer.returnValue(True)
except Exception as err:
log.warning("error requesting remote height from %s:%i - %s", server_url, port, err)
defer.returnValue(False)
def _check_header_file_integrity(self):
# TODO: temporary workaround for usability. move to txlbryum and check headers instead of file integrity
from lbrynet import conf
if conf.settings['blockchain_name'] != "lbrycrd_main":
return
hashsum = sha256()
checksum_height, checksum = conf.settings['HEADERS_FILE_SHA256_CHECKSUM']
checksum_length_in_bytes = checksum_height * HEADER_SIZE
if self.local_header_file_size() < checksum_length_in_bytes:
return
headers_path = os.path.join(self.config.path, "blockchain_headers")
with open(headers_path, "rb") as headers_file:
hashsum.update(headers_file.read(checksum_length_in_bytes))
current_checksum = hashsum.hexdigest()
if current_checksum != checksum:
msg = "Expected checksum {}, got {}".format(checksum, current_checksum)
log.warning("Wallet file corrupted, checksum mismatch. " + msg)
log.warning("Deleting header file so it can be downloaded again.")
os.unlink(headers_path)
elif (self.local_header_file_size() % HEADER_SIZE) != 0:
log.warning("Header file is good up to checkpoint height, but incomplete. Truncating to checkpoint.")
with open(headers_path, "rb+") as headers_file:
headers_file.truncate(checksum_length_in_bytes)
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self): def start(self):
should_download_headers = yield self.should_download_headers_from_s3()
if should_download_headers:
try:
yield self.fetch_headers_from_s3()
except Exception as err:
log.error("failed to fetch headers from s3: %s", err)
log.info("Starting wallet.") log.info("Starting wallet.")
yield self._start() yield self._start()
self.stopped = False self.stopped = False

View file

@ -1,8 +1,12 @@
import os import os
import logging import logging
from hashlib import sha256
import miniupnpc import miniupnpc
import treq
import math
from twisted.internet import defer, threads, reactor, error from twisted.internet import defer, threads, reactor, error
from lbryum.simple_config import SimpleConfig
from lbryum.constants import HEADERS_URL, HEADER_SIZE
from lbrynet import conf from lbrynet import conf
from lbrynet.core.Session import Session from lbrynet.core.Session import Session
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, EncryptedFileStreamType from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, EncryptedFileStreamType
@ -17,6 +21,7 @@ from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory
from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.reflector import ServerFactory as reflector_server_factory from lbrynet.reflector import ServerFactory as reflector_server_factory
from lbrynet.txlbryum.factory import StratumClient
from lbrynet.core.utils import generate_id from lbrynet.core.utils import generate_id
@ -25,6 +30,7 @@ log = logging.getLogger(__name__)
# settings must be initialized before this file is imported # settings must be initialized before this file is imported
DATABASE_COMPONENT = "database" DATABASE_COMPONENT = "database"
HEADERS_COMPONENT = "blockchain_headers"
WALLET_COMPONENT = "wallet" WALLET_COMPONENT = "wallet"
SESSION_COMPONENT = "session" SESSION_COMPONENT = "session"
DHT_COMPONENT = "dht" DHT_COMPONENT = "dht"
@ -35,6 +41,24 @@ PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
REFLECTOR_COMPONENT = "reflector" REFLECTOR_COMPONENT = "reflector"
UPNP_COMPONENT = "upnp" UPNP_COMPONENT = "upnp"
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager" EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
def get_wallet_config():
wallet_type = GCS('wallet')
if wallet_type == conf.LBRYCRD_WALLET:
raise ValueError('LBRYcrd Wallet is no longer supported')
elif wallet_type != conf.LBRYUM_WALLET:
raise ValueError('Wallet Type {} is not valid'.format(wallet_type))
lbryum_servers = {address: {'t': str(port)}
for address, port in GCS('lbryum_servers')}
config = {
'auto_connect': True,
'chain': GCS('blockchain_name'),
'default_servers': lbryum_servers
}
if 'use_keyring' in conf.settings:
config['use_keyring'] = GCS('use_keyring')
if conf.settings['lbryum_wallet_dir']:
config['lbryum_path'] = GCS('lbryum_wallet_dir')
return config
class ConfigSettings(object): class ConfigSettings(object):
@ -138,9 +162,142 @@ class DatabaseComponent(Component):
self.storage = None self.storage = None
class HeadersComponent(Component):
component_name = HEADERS_COMPONENT
def __init__(self, component_manager):
Component.__init__(self, component_manager)
self.config = SimpleConfig(get_wallet_config())
self._downloading_headers = None
self._headers_progress_percent = None
@property
def component(self):
return self
def get_status(self):
if self._downloading_headers is None:
return {}
return {
'downloading_headers': self._downloading_headers,
'download_progress': self._headers_progress_percent
}
@defer.inlineCallbacks
def fetch_headers_from_s3(self):
def collector(data, h_file):
h_file.write(data)
local_size = float(h_file.tell())
final_size = float(final_size_after_download)
self._headers_progress_percent = math.ceil(local_size / final_size * 100)
local_header_size = self.local_header_file_size()
resume_header = {"Range": "bytes={}-".format(local_header_size)}
response = yield treq.get(HEADERS_URL, headers=resume_header)
got_406 = response.code == 406 # our file is bigger
final_size_after_download = response.length + local_header_size
if got_406:
log.warning("s3 is more out of date than we are")
# should have something to download and a final length divisible by the header size
elif final_size_after_download and not final_size_after_download % HEADER_SIZE:
s3_height = (final_size_after_download / HEADER_SIZE) - 1
local_height = self.local_header_file_height()
if s3_height > local_height:
if local_header_size:
log.info("Resuming download of %i bytes from s3", response.length)
with open(os.path.join(self.config.path, "blockchain_headers"), "a+b") as headers_file:
yield treq.collect(response, lambda d: collector(d, headers_file))
else:
with open(os.path.join(self.config.path, "blockchain_headers"), "wb") as headers_file:
yield treq.collect(response, lambda d: collector(d, headers_file))
log.info("fetched headers from s3 (s3 height: %i), now verifying integrity after download.", s3_height)
self._check_header_file_integrity()
else:
log.warning("s3 is more out of date than we are")
else:
log.error("invalid size for headers from s3")
def local_header_file_height(self):
return max((self.local_header_file_size() / HEADER_SIZE) - 1, 0)
def local_header_file_size(self):
headers_path = os.path.join(self.config.path, "blockchain_headers")
if os.path.isfile(headers_path):
return os.stat(headers_path).st_size
return 0
@defer.inlineCallbacks
def get_remote_height(self, server, port):
connected = defer.Deferred()
connected.addTimeout(3, reactor, lambda *_: None)
client = StratumClient(connected)
reactor.connectTCP(server, port, client)
yield connected
remote_height = yield client.blockchain_block_get_server_height()
client.client.transport.loseConnection()
defer.returnValue(remote_height)
@defer.inlineCallbacks
def should_download_headers_from_s3(self):
from lbrynet import conf
if conf.settings['blockchain_name'] != "lbrycrd_main":
defer.returnValue(False)
self._check_header_file_integrity()
s3_headers_depth = conf.settings['s3_headers_depth']
if not s3_headers_depth:
defer.returnValue(False)
local_height = self.local_header_file_height()
for server_url in self.config.get('default_servers'):
port = int(self.config.get('default_servers')[server_url]['t'])
try:
remote_height = yield self.get_remote_height(server_url, port)
log.info("%s:%i height: %i, local height: %s", server_url, port, remote_height, local_height)
if remote_height > (local_height + s3_headers_depth):
defer.returnValue(True)
except Exception as err:
log.warning("error requesting remote height from %s:%i - %s", server_url, port, err)
defer.returnValue(False)
def _check_header_file_integrity(self):
# TODO: temporary workaround for usability. move to txlbryum and check headers instead of file integrity
from lbrynet import conf
if conf.settings['blockchain_name'] != "lbrycrd_main":
return
hashsum = sha256()
checksum_height, checksum = conf.settings['HEADERS_FILE_SHA256_CHECKSUM']
checksum_length_in_bytes = checksum_height * HEADER_SIZE
if self.local_header_file_size() < checksum_length_in_bytes:
return
headers_path = os.path.join(self.config.path, "blockchain_headers")
with open(headers_path, "rb") as headers_file:
hashsum.update(headers_file.read(checksum_length_in_bytes))
current_checksum = hashsum.hexdigest()
if current_checksum != checksum:
msg = "Expected checksum {}, got {}".format(checksum, current_checksum)
log.warning("Wallet file corrupted, checksum mismatch. " + msg)
log.warning("Deleting header file so it can be downloaded again.")
os.unlink(headers_path)
elif (self.local_header_file_size() % HEADER_SIZE) != 0:
log.warning("Header file is good up to checkpoint height, but incomplete. Truncating to checkpoint.")
with open(headers_path, "rb+") as headers_file:
headers_file.truncate(checksum_length_in_bytes)
@defer.inlineCallbacks
def start(self):
self._downloading_headers = yield self.should_download_headers_from_s3()
if self._downloading_headers:
try:
yield self.fetch_headers_from_s3()
except Exception as err:
log.error("failed to fetch headers from s3: %s", err)
def stop(self):
return defer.succeed(None)
class WalletComponent(Component): class WalletComponent(Component):
component_name = WALLET_COMPONENT component_name = WALLET_COMPONENT
depends_on = [DATABASE_COMPONENT] depends_on = [DATABASE_COMPONENT, HEADERS_COMPONENT]
def __init__(self, component_manager): def __init__(self, component_manager):
Component.__init__(self, component_manager) Component.__init__(self, component_manager)
@ -150,34 +307,26 @@ class WalletComponent(Component):
def component(self): def component(self):
return self.wallet return self.wallet
@defer.inlineCallbacks
def get_status(self):
if not self.wallet:
return
local_height = self.wallet.network.get_local_height()
remote_height = self.wallet.network.get_server_height()
best_hash = yield self.wallet.get_best_blockhash()
defer.returnValue({
'blocks': local_height,
'blocks_behind': remote_height - local_height,
'best_blockhash': best_hash,
'is_encrypted': self.wallet.wallet.use_encryption
})
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self): def start(self):
storage = self.component_manager.get_component(DATABASE_COMPONENT) storage = self.component_manager.get_component(DATABASE_COMPONENT)
wallet_type = GCS('wallet') config = get_wallet_config()
self.wallet = LBRYumWallet(storage, config)
if wallet_type == conf.LBRYCRD_WALLET: yield self.wallet.start()
raise ValueError('LBRYcrd Wallet is no longer supported')
elif wallet_type == conf.LBRYUM_WALLET:
log.info("Using lbryum wallet")
lbryum_servers = {address: {'t': str(port)}
for address, port in GCS('lbryum_servers')}
config = {
'auto_connect': True,
'chain': GCS('blockchain_name'),
'default_servers': lbryum_servers
}
if 'use_keyring' in conf.settings:
config['use_keyring'] = GCS('use_keyring')
if conf.settings['lbryum_wallet_dir']:
config['lbryum_path'] = GCS('lbryum_wallet_dir')
self.wallet = LBRYumWallet(storage, config)
yield self.wallet.start()
else:
raise ValueError('Wallet Type {} is not valid'.format(wallet_type))
@defer.inlineCallbacks @defer.inlineCallbacks
def stop(self): def stop(self):

View file

@ -727,29 +727,12 @@ class Daemon(AuthJSONRPCServer):
'code': connection_code, 'code': connection_code,
'message': CONNECTION_MESSAGES[connection_code], 'message': CONNECTION_MESSAGES[connection_code],
}, },
'wallet_is_encrypted': wallet_is_encrypted,
'blocks_behind': remote_height - local_height, # deprecated. remove from UI, then here
'blockchain_status': {
'blocks': local_height,
'blocks_behind': remote_height - local_height,
'best_blockhash': best_hash,
},
'dht_node_status': {
'node_id': conf.settings.node_id.encode('hex'),
'peers_in_routing_table': 0 if not self.component_manager.all_components_running(DHT_COMPONENT) else
len(self.dht_node.contacts)
}
} }
if session_status: for component in self.component_manager.components:
blobs = yield self.session.blob_manager.get_all_verified_blobs() status = yield defer.maybeDeferred(component.get_status)
announce_queue_size = self.session.hash_announcer.hash_queue_size() if status:
should_announce_blobs = yield self.session.blob_manager.count_should_announce_blobs() response[component.component_name] = status
response['session_status'] = {
'managed_blobs': len(blobs),
'managed_streams': len(self.file_manager.lbry_files),
'announce_queue_size': announce_queue_size,
'should_announce_blobs': should_announce_blobs,
}
defer.returnValue(response) defer.returnValue(response)
def jsonrpc_version(self): def jsonrpc_version(self):