forked from LBRYCommunity/lbry-sdk
remove headers component
This commit is contained in:
parent
c30db15efa
commit
d8fed79810
5 changed files with 5 additions and 160 deletions
|
@ -1,8 +1,6 @@
|
|||
import hashlib
|
||||
import os
|
||||
import asyncio
|
||||
import logging
|
||||
import math
|
||||
import binascii
|
||||
import typing
|
||||
import base58
|
||||
|
@ -21,7 +19,6 @@ from lbry.extras.daemon.Component import Component
|
|||
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
||||
from lbry.extras.daemon.storage import SQLiteStorage
|
||||
from lbry.wallet import LbryWalletManager
|
||||
from lbry.wallet.header import Headers
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -29,7 +26,6 @@ log = logging.getLogger(__name__)
|
|||
|
||||
DATABASE_COMPONENT = "database"
|
||||
BLOB_COMPONENT = "blob_manager"
|
||||
HEADERS_COMPONENT = "blockchain_headers"
|
||||
WALLET_COMPONENT = "wallet"
|
||||
DHT_COMPONENT = "dht"
|
||||
HASH_ANNOUNCER_COMPONENT = "hash_announcer"
|
||||
|
@ -97,135 +93,9 @@ class DatabaseComponent(Component):
|
|||
self.storage = None
|
||||
|
||||
|
||||
class HeadersComponent(Component):
|
||||
component_name = HEADERS_COMPONENT
|
||||
HEADERS_URL = "https://headers.lbry.io/blockchain_headers_latest"
|
||||
CHECKPOINT = ('100b33ca3d0b86a48f0d6d6f30458a130ecb89d5affefe4afccb134d5a40f4c2', 600_000)
|
||||
|
||||
def __init__(self, component_manager):
|
||||
super().__init__(component_manager)
|
||||
self.headers_dir = os.path.join(self.conf.wallet_dir, 'lbc_mainnet')
|
||||
self.headers_file = os.path.join(self.headers_dir, 'headers')
|
||||
self.old_file = os.path.join(self.conf.wallet_dir, 'blockchain_headers')
|
||||
self.headers = Headers(self.headers_file)
|
||||
self.is_downloading_headers = False
|
||||
self._headers_progress_percent = 0
|
||||
|
||||
@property
|
||||
def component(self):
|
||||
return self
|
||||
|
||||
def _round_progress(self, local_height, remote_height):
|
||||
return min(max(math.ceil(float(local_height) / float(remote_height) * 100), 0), 100)
|
||||
|
||||
async def get_status(self) -> dict:
|
||||
progress = None
|
||||
if self.is_downloading_headers:
|
||||
progress = self._headers_progress_percent
|
||||
elif self.component_manager.has_component(WALLET_COMPONENT):
|
||||
wallet_manager = self.component_manager.get_component(WALLET_COMPONENT)
|
||||
if wallet_manager and wallet_manager.ledger.network.remote_height > 0:
|
||||
local_height = wallet_manager.ledger.headers.height
|
||||
remote_height = wallet_manager.ledger.network.remote_height
|
||||
progress = self._round_progress(local_height, remote_height)
|
||||
return {
|
||||
'downloading_headers': True,
|
||||
'download_progress': progress
|
||||
} if progress is not None and progress < 100 else {}
|
||||
|
||||
async def fetch_headers_from_s3(self):
|
||||
local_header_size = self.local_header_file_size()
|
||||
resume_header = {"Range": f"bytes={local_header_size}-"}
|
||||
async with utils.aiohttp_request('get', self.HEADERS_URL, headers=resume_header) as response:
|
||||
if response.status == 406 or response.content_length < self.headers.header_size: # our file is bigger
|
||||
log.warning("s3 is more out of date than we are")
|
||||
return
|
||||
final_size_after_download = response.content_length + local_header_size
|
||||
if final_size_after_download % self.headers.header_size != 0:
|
||||
log.warning("s3 appears to have corrupted header")
|
||||
return
|
||||
write_mode = "wb"
|
||||
if local_header_size > 0:
|
||||
log.info("Resuming download of %i bytes from s3", response.content_length)
|
||||
write_mode = "a+b"
|
||||
with open(self.headers_file, write_mode) as fd:
|
||||
while not response.content.at_eof():
|
||||
local_header_size += fd.write(await response.content.readany())
|
||||
self._headers_progress_percent = self._round_progress(
|
||||
local_header_size, final_size_after_download
|
||||
)
|
||||
|
||||
def local_header_file_size(self) -> int:
|
||||
if os.path.isfile(self.headers_file):
|
||||
return os.stat(self.headers_file).st_size
|
||||
return 0
|
||||
|
||||
async def get_downloadable_header_height(self) -> typing.Optional[int]:
|
||||
async with utils.aiohttp_request('HEAD', self.HEADERS_URL) as response:
|
||||
if response.status != 200:
|
||||
log.warning("Header download error, unexpected response code: %s", response.status)
|
||||
return -1
|
||||
return response.content_length // self.headers.header_size
|
||||
|
||||
async def should_download_headers_from_s3(self) -> bool:
|
||||
if self.conf.blockchain_name != "lbrycrd_main":
|
||||
return False
|
||||
s3_headers_depth = self.conf.s3_headers_depth
|
||||
if not s3_headers_depth:
|
||||
return False
|
||||
|
||||
local_height = self.local_header_file_size() // self.headers.header_size
|
||||
remote_height = await self.get_downloadable_header_height()
|
||||
if remote_height is not None:
|
||||
log.info("remote height: %i, local height: %i", remote_height, local_height)
|
||||
if remote_height > (local_height + s3_headers_depth):
|
||||
return True
|
||||
return False
|
||||
|
||||
def verify_checkpoint(self):
|
||||
expected_hash, at_height = self.CHECKPOINT
|
||||
if self.local_header_file_size() // self.headers.header_size < at_height:
|
||||
return False
|
||||
hash = hashlib.sha256()
|
||||
chunk_size = self.headers.header_size * 1000
|
||||
with open(self.headers_file, 'rb') as header_file:
|
||||
data = header_file.read(chunk_size)
|
||||
while data and header_file.tell() <= at_height * self.headers.header_size:
|
||||
hash.update(data)
|
||||
data = header_file.read(chunk_size)
|
||||
return hash.hexdigest() == expected_hash
|
||||
|
||||
async def start(self):
|
||||
if not os.path.exists(self.headers_dir):
|
||||
os.mkdir(self.headers_dir)
|
||||
if os.path.exists(self.old_file):
|
||||
log.warning("Moving old headers from %s to %s.", self.old_file, self.headers_file)
|
||||
os.rename(self.old_file, self.headers_file)
|
||||
|
||||
try:
|
||||
if await self.should_download_headers_from_s3():
|
||||
self.is_downloading_headers = True
|
||||
await self.fetch_headers_from_s3()
|
||||
except Exception as err:
|
||||
log.error("failed to fetch headers from s3: %s", err)
|
||||
finally:
|
||||
self.is_downloading_headers = False
|
||||
# fixme: workaround, this should happen before download but happens after because headers.connect fail
|
||||
if not self.verify_checkpoint():
|
||||
log.info("Checkpoint failed, verifying headers using slower method.")
|
||||
await self.headers.open()
|
||||
await self.headers.repair()
|
||||
await self.headers.close()
|
||||
else:
|
||||
log.info("Header checkpoint verified.")
|
||||
|
||||
async def stop(self):
|
||||
pass
|
||||
|
||||
|
||||
class WalletComponent(Component):
|
||||
component_name = WALLET_COMPONENT
|
||||
depends_on = [DATABASE_COMPONENT, HEADERS_COMPONENT]
|
||||
depends_on = [DATABASE_COMPONENT]
|
||||
|
||||
def __init__(self, component_manager):
|
||||
super().__init__(component_manager)
|
||||
|
|
|
@ -5,7 +5,7 @@ from torba.testcase import AsyncioTestCase
|
|||
from lbry.conf import Config
|
||||
from lbry.extras import cli
|
||||
from lbry.extras.daemon.Components import (
|
||||
DATABASE_COMPONENT, BLOB_COMPONENT, HEADERS_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT,
|
||||
DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT,
|
||||
HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
|
||||
UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT
|
||||
)
|
||||
|
@ -20,7 +20,7 @@ class CLIIntegrationTest(AsyncioTestCase):
|
|||
conf.share_usage_data = False
|
||||
conf.api = 'localhost:5299'
|
||||
conf.components_to_skip = (
|
||||
DATABASE_COMPONENT, BLOB_COMPONENT, HEADERS_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT,
|
||||
DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT,
|
||||
HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
|
||||
UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT
|
||||
)
|
||||
|
|
|
@ -1,10 +1,7 @@
|
|||
import asyncio
|
||||
import os
|
||||
|
||||
import lbry.wallet
|
||||
from lbry import __version__ as sdk_version
|
||||
from lbry.testcase import CommandTestCase
|
||||
from lbry.extras.daemon.Components import HeadersComponent
|
||||
from torba.client.basenetwork import ClientSession
|
||||
from torba.testcase import IntegrationTestCase
|
||||
|
||||
|
@ -45,22 +42,3 @@ class TestSegwitServer(IntegrationTestCase):
|
|||
|
||||
async def test_at_least_it_starts(self):
|
||||
await asyncio.wait_for(self.ledger.network.get_headers(0, 1), 1.0)
|
||||
|
||||
|
||||
class TestHeadersComponent(CommandTestCase):
|
||||
|
||||
LEDGER = lbry.wallet
|
||||
|
||||
async def asyncSetUp(self):
|
||||
await super().asyncSetUp()
|
||||
self.component_manager = self.daemon.component_manager
|
||||
self.component_manager.conf.blockchain_name = 'lbrycrd_main'
|
||||
self.headers_component = HeadersComponent(self.component_manager)
|
||||
|
||||
async def test_cant_reach_host(self):
|
||||
HeadersComponent.HEADERS_URL = 'notthere/'
|
||||
os.unlink(self.headers_component.headers.path)
|
||||
# test is that this doesn't raise
|
||||
await self.headers_component.start()
|
||||
self.assertTrue(self.component_manager.get_components_status()['blockchain_headers'])
|
||||
self.assertEqual(await self.headers_component.get_status(), {})
|
|
@ -6,7 +6,6 @@ from lbry.extras.daemon.ComponentManager import ComponentManager
|
|||
from lbry.extras.daemon.Components import DATABASE_COMPONENT, DHT_COMPONENT
|
||||
from lbry.extras.daemon.Components import HASH_ANNOUNCER_COMPONENT, UPNP_COMPONENT
|
||||
from lbry.extras.daemon.Components import PEER_PROTOCOL_SERVER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT
|
||||
from lbry.extras.daemon.Components import HEADERS_COMPONENT
|
||||
from lbry.extras.daemon import Components
|
||||
|
||||
|
||||
|
@ -15,7 +14,6 @@ class TestComponentManager(AsyncioTestCase):
|
|||
def setUp(self):
|
||||
self.default_components_sort = [
|
||||
[
|
||||
Components.HeadersComponent,
|
||||
Components.DatabaseComponent,
|
||||
Components.ExchangeRateManagerComponent,
|
||||
Components.UPnPComponent
|
||||
|
@ -152,7 +150,7 @@ class TestComponentManagerProperStart(AdvanceTimeTestCase):
|
|||
skip_components=[
|
||||
DATABASE_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT,
|
||||
PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT,
|
||||
HEADERS_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT],
|
||||
EXCHANGE_RATE_MANAGER_COMPONENT],
|
||||
wallet=FakeDelayedWallet,
|
||||
stream_manager=FakeDelayedStreamManager,
|
||||
blob_manager=FakeDelayedBlobManager
|
||||
|
|
|
@ -9,7 +9,6 @@ from lbry.extras.daemon.Components import DATABASE_COMPONENT, DHT_COMPONENT, WAL
|
|||
from lbry.extras.daemon.Components import HASH_ANNOUNCER_COMPONENT
|
||||
from lbry.extras.daemon.Components import UPNP_COMPONENT, BLOB_COMPONENT
|
||||
from lbry.extras.daemon.Components import PEER_PROTOCOL_SERVER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT
|
||||
from lbry.extras.daemon.Components import HEADERS_COMPONENT
|
||||
from lbry.extras.daemon.Daemon import Daemon as LBRYDaemon
|
||||
from lbry.wallet import LbryWalletManager
|
||||
from torba.client.wallet import Wallet
|
||||
|
@ -31,7 +30,7 @@ def get_test_daemon(conf: Config, with_fee=False):
|
|||
conf, skip_components=[
|
||||
DATABASE_COMPONENT, DHT_COMPONENT, WALLET_COMPONENT, UPNP_COMPONENT,
|
||||
PEER_PROTOCOL_SERVER_COMPONENT, HASH_ANNOUNCER_COMPONENT,
|
||||
EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, HEADERS_COMPONENT,
|
||||
EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT,
|
||||
RATE_LIMITER_COMPONENT],
|
||||
file_manager=FakeFileManager
|
||||
)
|
||||
|
|
Loading…
Add table
Reference in a new issue