download headers from s3 using treq

This commit is contained in:
Victor Shyba 2018-05-05 01:16:26 -03:00
parent d03fc80eac
commit 3982e15091

View file

@ -3,14 +3,12 @@ from collections import defaultdict, deque
import datetime import datetime
import logging import logging
from decimal import Decimal from decimal import Decimal
import treq
from zope.interface import implements from zope.interface import implements
from twisted.internet import threads, reactor, defer, task from twisted.internet import threads, reactor, defer, task
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.python.threadpool import ThreadPool
from twisted._threads._ithreads import AlreadyQuit
from twisted.internet.error import ConnectionAborted from twisted.internet.error import ConnectionAborted
from txrequests import Session as _TxRequestsSession
from requests import Session as requestsSession
from lbryum import wallet as lbryum_wallet from lbryum import wallet as lbryum_wallet
from lbryum.network import Network from lbryum.network import Network
@ -36,29 +34,6 @@ from lbrynet.core.Error import DownloadCanceledError, RequestCanceledError
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class TxRequestsSession(_TxRequestsSession):
# Session from txrequests would throw AlreadyQuit errors, this catches them
def __init__(self, pool=None, minthreads=1, maxthreads=4, **kwargs):
requestsSession.__init__(self, **kwargs) # pylint: disable=non-parent-init-called
self.ownPool = False
if pool is None:
self.ownPool = True
pool = ThreadPool(minthreads=minthreads, maxthreads=maxthreads)
# unclosed ThreadPool leads to reactor hangs at shutdown
# this is a problem in many situation, so better enforce pool stop here
def stop_pool():
try:
pool.stop()
except AlreadyQuit:
pass
reactor.addSystemEventTrigger("after", "shutdown", stop_pool)
self.pool = pool
if self.ownPool:
pool.start()
class ReservedPoints(object): class ReservedPoints(object):
def __init__(self, identifier, amount): def __init__(self, identifier, amount):
self.identifier = identifier self.identifier = identifier
@ -118,20 +93,18 @@ class Wallet(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def fetch_headers_from_s3(self): def fetch_headers_from_s3(self):
with TxRequestsSession() as s: response = yield treq.get(HEADERS_URL)
r = yield s.get(HEADERS_URL) if not response.length % HEADER_SIZE: # should be divisible by the header size
raw_headers = r.content s3_height = (response.length / HEADER_SIZE) - 1
if not len(raw_headers) % HEADER_SIZE: # should be divisible by the header size local_height = self.local_header_file_height()
s3_height = (len(raw_headers) / HEADER_SIZE) - 1 if s3_height > local_height:
local_height = self.local_header_file_height() with open(os.path.join(self.config.path, "blockchain_headers"), "wb") as headers_file:
if s3_height > local_height: yield treq.collect(response, headers_file.write)
with open(os.path.join(self.config.path, "blockchain_headers"), "wb") as headers_file: log.info("fetched headers from s3 (s3 height: %i)", s3_height)
headers_file.write(raw_headers)
log.info("fetched headers from s3 (s3 height: %i)", s3_height)
else:
log.warning("s3 is more out of date than we are")
else: else:
log.error("invalid size for headers from s3") log.warning("s3 is more out of date than we are")
else:
log.error("invalid size for headers from s3")
def local_header_file_height(self): def local_header_file_height(self):
headers_path = os.path.join(self.config.path, "blockchain_headers") headers_path = os.path.join(self.config.path, "blockchain_headers")