dont fail when s3 is out of reach

This commit is contained in:
Victor Shyba 2019-07-23 13:45:10 -03:00 committed by Lex Berezhny
parent 93b5f1809e
commit 45fb673e49
3 changed files with 46 additions and 34 deletions

View file

@ -96,12 +96,9 @@ class DatabaseComponent(Component):
self.storage = None self.storage = None
HEADERS_URL = "https://headers.lbry.io/blockchain_headers_latest"
HEADER_SIZE = 112
class HeadersComponent(Component): class HeadersComponent(Component):
component_name = HEADERS_COMPONENT component_name = HEADERS_COMPONENT
HEADERS_URL = "https://headers.lbry.io/blockchain_headers_latest"
def __init__(self, component_manager): def __init__(self, component_manager):
super().__init__(component_manager) super().__init__(component_manager)
@ -137,23 +134,19 @@ class HeadersComponent(Component):
async def fetch_headers_from_s3(self): async def fetch_headers_from_s3(self):
local_header_size = self.headers.bytes_size local_header_size = self.headers.bytes_size
resume_header = {"Range": f"bytes={local_header_size}-"} resume_header = {"Range": f"bytes={local_header_size}-"}
async with utils.aiohttp_request('get', HEADERS_URL, headers=resume_header) as response: async with utils.aiohttp_request('get', self.HEADERS_URL, headers=resume_header) as response:
if response.status == 406 or response.content_length < HEADER_SIZE: # our file is bigger if response.status == 406 or response.content_length < self.headers.header_size: # our file is bigger
log.warning("s3 is more out of date than we are") log.warning("s3 is more out of date than we are")
return return
if response.content_length % HEADER_SIZE != 0: if response.content_length % self.headers.header_size != 0:
log.warning("s3 appears to have corrupted header") log.warning("s3 appears to have corrupted header")
return return
final_size_after_download = response.content_length + local_header_size final_size_after_download = response.content_length + local_header_size
if local_header_size > 0: if local_header_size > 0:
log.info("Resuming download of %i bytes from s3", response.content_length) log.info("Resuming download of %i bytes from s3", response.content_length)
buffer, header_size = b'', self.headers.header_size while not response.content.at_eof():
async for chunk in response.content.iter_any(): max_read = min(self.headers.header_size * 10000, final_size_after_download - self.headers.bytes_size)
chunk = buffer + chunk chunk = await response.content.readexactly(max_read)
remaining = len(chunk) % header_size
chunk, buffer = chunk[:-remaining], bytes(chunk[-remaining:])
if not chunk:
continue
if not await self.headers.connect(len(self.headers), chunk): if not await self.headers.connect(len(self.headers), chunk):
log.warning("Error connecting downloaded headers from at %s.", self.headers.height) log.warning("Error connecting downloaded headers from at %s.", self.headers.height)
return return
@ -167,13 +160,11 @@ class HeadersComponent(Component):
return 0 return 0
async def get_downloadable_header_height(self) -> typing.Optional[int]: async def get_downloadable_header_height(self) -> typing.Optional[int]:
try: async with utils.aiohttp_request('HEAD', self.HEADERS_URL) as response:
async with utils.aiohttp_request('HEAD', HEADERS_URL) as response:
if response.status != 200: if response.status != 200:
log.warning("Header download error, unexpected response code: %s", response.status) log.warning("Header download error, unexpected response code: %s", response.status)
return response.content_length // HEADER_SIZE return -1
except OSError: return response.content_length // self.headers.header_size
log.exception("Failed to download headers using https.")
async def should_download_headers_from_s3(self) -> bool: async def should_download_headers_from_s3(self) -> bool:
if self.conf.blockchain_name != "lbrycrd_main": if self.conf.blockchain_name != "lbrycrd_main":
@ -196,10 +187,10 @@ class HeadersComponent(Component):
if os.path.exists(self.old_file): if os.path.exists(self.old_file):
log.warning("Moving old headers from %s to %s.", self.old_file, self.headers_file) log.warning("Moving old headers from %s to %s.", self.old_file, self.headers_file)
os.rename(self.old_file, self.headers_file) os.rename(self.old_file, self.headers_file)
try:
await self.headers.open() await self.headers.open()
await self.headers.repair() await self.headers.repair()
if await self.should_download_headers_from_s3(): if await self.should_download_headers_from_s3():
try:
self.is_downloading_headers = True self.is_downloading_headers = True
await self.fetch_headers_from_s3() await self.fetch_headers_from_s3()
except Exception as err: except Exception as err:

View file

@ -1,9 +1,11 @@
import asyncio import asyncio
import os
from torba.client.basenetwork import ClientSession
from torba.orchstr8 import Conductor
from torba.testcase import IntegrationTestCase
import lbry.wallet import lbry.wallet
from lbry.testcase import CommandTestCase
from lbry.extras.daemon.Components import HeadersComponent
from torba.client.basenetwork import ClientSession
from torba.testcase import IntegrationTestCase
class TestSessionBloat(IntegrationTestCase): class TestSessionBloat(IntegrationTestCase):
@ -29,3 +31,22 @@ class TestSessionBloat(IntegrationTestCase):
await session.send_request('server.banner', ()) await session.send_request('server.banner', ())
self.assertTrue(session.is_closing()) self.assertTrue(session.is_closing())
self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 0) self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 0)
class TestHeadersComponent(CommandTestCase):
LEDGER = lbry.wallet
async def asyncSetUp(self):
await super().asyncSetUp()
self.component_manager = self.daemon.component_manager
self.component_manager.conf.blockchain_name = 'lbrycrd_main'
self.headers_component = HeadersComponent(self.component_manager)
async def test_cant_reach_host(self):
HeadersComponent.HEADERS_URL = 'notthere/'
os.unlink(self.headers_component.headers.path)
# test is that this doesnt raise
await self.headers_component.start()
self.assertTrue(self.component_manager.get_components_status()['blockchain_headers'])
self.assertEqual(await self.headers_component.get_status(), {})

View file

@ -115,7 +115,7 @@ class BaseHeaders:
# .seek()/.write()/.truncate() might also .flush() when needed # .seek()/.write()/.truncate() might also .flush() when needed
# the goal here is mainly to ensure we're definitely flush()'ing # the goal here is mainly to ensure we're definitely flush()'ing
self.io.flush() self.io.flush()
self._size = None self._size = self.io.tell() // self.header_size
added += written added += written
if bail: if bail:
break break