diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index 6770faca4..9e272a7d5 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -3,14 +3,12 @@ from collections import defaultdict, deque import datetime import logging from decimal import Decimal + +import treq from zope.interface import implements from twisted.internet import threads, reactor, defer, task from twisted.python.failure import Failure -from twisted.python.threadpool import ThreadPool -from twisted._threads._ithreads import AlreadyQuit from twisted.internet.error import ConnectionAborted -from txrequests import Session as _TxRequestsSession -from requests import Session as requestsSession from lbryum import wallet as lbryum_wallet from lbryum.network import Network @@ -36,29 +34,6 @@ from lbrynet.core.Error import DownloadCanceledError, RequestCanceledError log = logging.getLogger(__name__) -class TxRequestsSession(_TxRequestsSession): - # Session from txrequests would throw AlreadyQuit errors, this catches them - def __init__(self, pool=None, minthreads=1, maxthreads=4, **kwargs): - requestsSession.__init__(self, **kwargs) # pylint: disable=non-parent-init-called - self.ownPool = False - if pool is None: - self.ownPool = True - pool = ThreadPool(minthreads=minthreads, maxthreads=maxthreads) - # unclosed ThreadPool leads to reactor hangs at shutdown - # this is a problem in many situation, so better enforce pool stop here - - def stop_pool(): - try: - pool.stop() - except AlreadyQuit: - pass - - reactor.addSystemEventTrigger("after", "shutdown", stop_pool) - self.pool = pool - if self.ownPool: - pool.start() - - class ReservedPoints(object): def __init__(self, identifier, amount): self.identifier = identifier @@ -118,20 +93,18 @@ class Wallet(object): @defer.inlineCallbacks def fetch_headers_from_s3(self): - with TxRequestsSession() as s: - r = yield s.get(HEADERS_URL) - raw_headers = r.content - if not len(raw_headers) % HEADER_SIZE: # should be divisible by the header size - s3_height = (len(raw_headers) / HEADER_SIZE) - 1 - local_height = self.local_header_file_height() - if s3_height > local_height: - with open(os.path.join(self.config.path, "blockchain_headers"), "wb") as headers_file: - headers_file.write(raw_headers) - log.info("fetched headers from s3 (s3 height: %i)", s3_height) - else: - log.warning("s3 is more out of date than we are") + response = yield treq.get(HEADERS_URL) + if not response.length % HEADER_SIZE: # should be divisible by the header size + s3_height = (response.length / HEADER_SIZE) - 1 + local_height = self.local_header_file_height() + if s3_height > local_height: + with open(os.path.join(self.config.path, "blockchain_headers"), "wb") as headers_file: + yield treq.collect(response, headers_file.write) + log.info("fetched headers from s3 (s3 height: %i)", s3_height) else: - log.error("invalid size for headers from s3") + log.warning("s3 is more out of date than we are") + else: + log.error("invalid size for headers from s3") def local_header_file_height(self): headers_path = os.path.join(self.config.path, "blockchain_headers")