asyncio.sleep() -> sleep() and asyncio.Event() to Event() in order to reduce overall diff after dropping curio.py

This commit is contained in:
Lex Berezhny 2018-12-06 20:19:16 -05:00
parent a6de3a9642
commit 78fd434686
5 changed files with 16 additions and 13 deletions

View file

@ -14,6 +14,7 @@ import array
import ast import ast
import os import os
import time import time
from asyncio import sleep
from bisect import bisect_right from bisect import bisect_right
from collections import namedtuple from collections import namedtuple
from glob import glob from glob import glob
@ -448,7 +449,7 @@ class DB:
return history return history
self.logger.warning(f'limited_history: tx hash ' self.logger.warning(f'limited_history: tx hash '
f'not found (reorg?), retrying...') f'not found (reorg?), retrying...')
await asyncio.sleep(0.25) await sleep(0.25)
# -- Undo information # -- Undo information
@ -617,7 +618,7 @@ class DB:
return utxos return utxos
self.logger.warning(f'all_utxos: tx hash not ' self.logger.warning(f'all_utxos: tx hash not '
f'found (reorg?), retrying...') f'found (reorg?), retrying...')
await asyncio.sleep(0.25) await sleep(0.25)
async def lookup_utxos(self, prevouts): async def lookup_utxos(self, prevouts):
'''For each prevout, lookup it up in the DB and return a (hashX, '''For each prevout, lookup it up in the DB and return a (hashX,

View file

@ -11,7 +11,7 @@ import asyncio
import itertools import itertools
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from asyncio import Lock from asyncio import Lock, sleep
from collections import defaultdict from collections import defaultdict
import attr import attr
@ -117,7 +117,7 @@ class MemPool:
while True: while True:
self.logger.info(f'{len(self.txs):,d} txs ' self.logger.info(f'{len(self.txs):,d} txs '
f'touching {len(self.hashXs):,d} addresses') f'touching {len(self.hashXs):,d} addresses')
await asyncio.sleep(self.log_status_secs) await sleep(self.log_status_secs)
await synchronized_event.wait() await synchronized_event.wait()
async def _refresh_histogram(self, synchronized_event): async def _refresh_histogram(self, synchronized_event):
@ -126,7 +126,7 @@ class MemPool:
async with self.lock: async with self.lock:
# Threaded as can be expensive # Threaded as can be expensive
await asyncio.get_event_loop().run_in_executor(None, self._update_histogram, 100_000) await asyncio.get_event_loop().run_in_executor(None, self._update_histogram, 100_000)
await asyncio.sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS) await sleep(self.coin.MEMPOOL_HISTOGRAM_REFRESH_SECS)
def _update_histogram(self, bin_size): def _update_histogram(self, bin_size):
# Build a histogram by fee rate # Build a histogram by fee rate
@ -212,7 +212,7 @@ class MemPool:
synchronized_event.set() synchronized_event.set()
synchronized_event.clear() synchronized_event.clear()
await self.api.on_mempool(touched, height) await self.api.on_mempool(touched, height)
await asyncio.sleep(self.refresh_secs) await sleep(self.refresh_secs)
async def _process_mempool(self, all_hashes): async def _process_mempool(self, all_hashes):
# Re-sync with the new set of hashes # Re-sync with the new set of hashes

View file

@ -26,7 +26,7 @@
'''Merkle trees, branches, proofs and roots.''' '''Merkle trees, branches, proofs and roots.'''
import asyncio from asyncio import Event
from math import ceil, log from math import ceil, log
from torba.server.hash import double_sha256 from torba.server.hash import double_sha256
@ -169,7 +169,7 @@ class MerkleCache:
self.source_func = source_func self.source_func = source_func
self.length = 0 self.length = 0
self.depth_higher = 0 self.depth_higher = 0
self.initialized = asyncio.Event() self.initialized = Event()
def _segment_length(self): def _segment_length(self):
return 1 << self.depth_higher return 1 << self.depth_higher

View file

@ -12,6 +12,7 @@ import random
import socket import socket
import ssl import ssl
import time import time
from asyncio import Event, sleep
from collections import defaultdict, Counter from collections import defaultdict, Counter
from torba.tasks import TaskGroup from torba.tasks import TaskGroup
@ -150,7 +151,7 @@ class PeerManager:
self.logger.info(f'detected {proxy}') self.logger.info(f'detected {proxy}')
return return
self.logger.info('no proxy detected, will try later') self.logger.info('no proxy detected, will try later')
await asyncio.sleep(900) await sleep(900)
async def _note_peers(self, peers, limit=2, check_ports=False, async def _note_peers(self, peers, limit=2, check_ports=False,
source=None): source=None):
@ -178,7 +179,7 @@ class PeerManager:
use_peers = new_peers use_peers = new_peers
for peer in use_peers: for peer in use_peers:
self.logger.info(f'accepted new peer {peer} from {source}') self.logger.info(f'accepted new peer {peer} from {source}')
peer.retry_event = asyncio.Event() peer.retry_event = Event()
self.peers.add(peer) self.peers.add(peer)
await self.group.add(self._monitor_peer(peer)) await self.group.add(self._monitor_peer(peer))

View file

@ -16,6 +16,7 @@ import os
import pylru import pylru
import ssl import ssl
import time import time
from asyncio import Event, sleep
from collections import defaultdict from collections import defaultdict
from functools import partial from functools import partial
@ -130,7 +131,7 @@ class SessionManager:
self.mn_cache_height = 0 self.mn_cache_height = 0
self.mn_cache = [] self.mn_cache = []
self.session_event = asyncio.Event() self.session_event = Event()
# Set up the RPC request handlers # Set up the RPC request handlers
cmds = ('add_peer daemon_url disconnect getinfo groups log peers ' cmds = ('add_peer daemon_url disconnect getinfo groups log peers '
@ -206,7 +207,7 @@ class SessionManager:
log_interval = self.env.log_sessions log_interval = self.env.log_sessions
if log_interval: if log_interval:
while True: while True:
await asyncio.sleep(log_interval) await sleep(log_interval)
data = self._session_data(for_log=True) data = self._session_data(for_log=True)
for line in text.sessions_lines(data): for line in text.sessions_lines(data):
self.logger.info(line) self.logger.info(line)
@ -248,7 +249,7 @@ class SessionManager:
async def _clear_stale_sessions(self): async def _clear_stale_sessions(self):
'''Cut off sessions that haven't done anything for 10 minutes.''' '''Cut off sessions that haven't done anything for 10 minutes.'''
while True: while True:
await asyncio.sleep(60) await sleep(60)
stale_cutoff = time.time() - self.env.session_timeout stale_cutoff = time.time() - self.env.session_timeout
stale_sessions = [session for session in self.sessions stale_sessions = [session for session in self.sessions
if session.last_recv < stale_cutoff] if session.last_recv < stale_cutoff]