From 45fb673e49d6ba6d2040d0ae9ba85b99d7d2d275 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor1984@riseup.net>
Date: Tue, 23 Jul 2019 13:45:10 -0300
Subject: [PATCH] dont fail when s3 is out of reach

---
 lbry/lbry/extras/daemon/Components.py         | 51 ++++++++-----------
 .../test_wallet_server_sessions.py            | 27 ++++++++--
 torba/torba/client/baseheader.py              |  2 +-
 3 files changed, 46 insertions(+), 34 deletions(-)

diff --git a/lbry/lbry/extras/daemon/Components.py b/lbry/lbry/extras/daemon/Components.py
index 232a66481..24caf5aa5 100644
--- a/lbry/lbry/extras/daemon/Components.py
+++ b/lbry/lbry/extras/daemon/Components.py
@@ -96,12 +96,9 @@ class DatabaseComponent(Component):
         self.storage = None
 
 
-HEADERS_URL = "https://headers.lbry.io/blockchain_headers_latest"
-HEADER_SIZE = 112
-
-
 class HeadersComponent(Component):
     component_name = HEADERS_COMPONENT
+    HEADERS_URL = "https://headers.lbry.io/blockchain_headers_latest"
 
     def __init__(self, component_manager):
         super().__init__(component_manager)
@@ -137,23 +134,19 @@ class HeadersComponent(Component):
     async def fetch_headers_from_s3(self):
         local_header_size = self.headers.bytes_size
         resume_header = {"Range": f"bytes={local_header_size}-"}
-        async with utils.aiohttp_request('get', HEADERS_URL, headers=resume_header) as response:
-            if response.status == 406 or response.content_length < HEADER_SIZE:  # our file is bigger
+        async with utils.aiohttp_request('get', self.HEADERS_URL, headers=resume_header) as response:
+            if response.status == 406 or response.content_length < self.headers.header_size:  # our file is bigger
                 log.warning("s3 is more out of date than we are")
                 return
-            if response.content_length % HEADER_SIZE != 0:
+            if response.content_length % self.headers.header_size != 0:
                 log.warning("s3 appears to have corrupted header")
                 return
             final_size_after_download = response.content_length + local_header_size
             if local_header_size > 0:
                 log.info("Resuming download of %i bytes from s3", response.content_length)
-            buffer, header_size = b'', self.headers.header_size
-            async for chunk in response.content.iter_any():
-                chunk = buffer + chunk
-                remaining = len(chunk) % header_size
-                chunk, buffer = chunk[:-remaining], bytes(chunk[-remaining:])
-                if not chunk:
-                    continue
+            while not response.content.at_eof():
+                max_read = min(self.headers.header_size * 10000, final_size_after_download - self.headers.bytes_size)
+                chunk = await response.content.readexactly(max_read)
                 if not await self.headers.connect(len(self.headers), chunk):
                     log.warning("Error connecting downloaded headers from at %s.", self.headers.height)
                     return
@@ -167,13 +160,11 @@ class HeadersComponent(Component):
         return 0
 
     async def get_downloadable_header_height(self) -> typing.Optional[int]:
-        try:
-            async with utils.aiohttp_request('HEAD', HEADERS_URL) as response:
-                if response.status != 200:
-                    log.warning("Header download error, unexpected response code: %s", response.status)
-                return response.content_length // HEADER_SIZE
-        except OSError:
-            log.exception("Failed to download headers using https.")
+        async with utils.aiohttp_request('HEAD', self.HEADERS_URL) as response:
+            if response.status != 200:
+                log.warning("Header download error, unexpected response code: %s", response.status)
+                return -1
+            return response.content_length // self.headers.header_size
 
     async def should_download_headers_from_s3(self) -> bool:
         if self.conf.blockchain_name != "lbrycrd_main":
@@ -196,17 +187,17 @@ class HeadersComponent(Component):
         if os.path.exists(self.old_file):
             log.warning("Moving old headers from %s to %s.", self.old_file, self.headers_file)
             os.rename(self.old_file, self.headers_file)
-        await self.headers.open()
-        await self.headers.repair()
-        if await self.should_download_headers_from_s3():
-            try:
+        try:
+            await self.headers.open()
+            await self.headers.repair()
+            if await self.should_download_headers_from_s3():
                 self.is_downloading_headers = True
                 await self.fetch_headers_from_s3()
-            except Exception as err:
-                log.error("failed to fetch headers from s3: %s", err)
-            finally:
-                self.is_downloading_headers = False
-        await self.headers.close()
+        except Exception as err:
+            log.error("failed to fetch headers from s3: %s", err)
+        finally:
+            self.is_downloading_headers = False
+            await self.headers.close()
 
     async def stop(self):
         pass
diff --git a/lbry/tests/integration/test_wallet_server_sessions.py b/lbry/tests/integration/test_wallet_server_sessions.py
index 03dd4e027..b2e9c779d 100644
--- a/lbry/tests/integration/test_wallet_server_sessions.py
+++ b/lbry/tests/integration/test_wallet_server_sessions.py
@@ -1,9 +1,11 @@
 import asyncio
+import os
 
-from torba.client.basenetwork import ClientSession
-from torba.orchstr8 import Conductor
-from torba.testcase import IntegrationTestCase
 import lbry.wallet
+from lbry.testcase import CommandTestCase
+from lbry.extras.daemon.Components import HeadersComponent
+from torba.client.basenetwork import ClientSession
+from torba.testcase import IntegrationTestCase
 
 
 class TestSessionBloat(IntegrationTestCase):
@@ -29,3 +31,22 @@ class TestSessionBloat(IntegrationTestCase):
             await session.send_request('server.banner', ())
         self.assertTrue(session.is_closing())
         self.assertEqual(len(self.conductor.spv_node.server.session_mgr.sessions), 0)
+
+
+class TestHeadersComponent(CommandTestCase):
+
+    LEDGER = lbry.wallet
+
+    async def asyncSetUp(self):
+        await super().asyncSetUp()
+        self.component_manager = self.daemon.component_manager
+        self.component_manager.conf.blockchain_name = 'lbrycrd_main'
+        self.headers_component = HeadersComponent(self.component_manager)
+
+    async def test_cant_reach_host(self):
+        HeadersComponent.HEADERS_URL = 'notthere/'
+        os.unlink(self.headers_component.headers.path)
+        # test is that this doesnt raise
+        await self.headers_component.start()
+        self.assertTrue(self.component_manager.get_components_status()['blockchain_headers'])
+        self.assertEqual(await self.headers_component.get_status(), {})
\ No newline at end of file
diff --git a/torba/torba/client/baseheader.py b/torba/torba/client/baseheader.py
index 339dd5e54..22cc7aef6 100644
--- a/torba/torba/client/baseheader.py
+++ b/torba/torba/client/baseheader.py
@@ -115,7 +115,7 @@ class BaseHeaders:
                 # .seek()/.write()/.truncate() might also .flush() when needed
                 # the goal here is mainly to ensure we're definitely flush()'ing
                 self.io.flush()
-                self._size = None
+                self._size = self.io.tell() // self.header_size
             added += written
             if bail:
                 break