Merge #13517: qa: Remove need to handle the network thread in tests

fa87da2f17 qa: Avoid start/stop of the network thread mid-test (MarcoFalke)

Pull request description:

  This simplifies test writing by removing the need to handle the network thread in tests. E.g. start thread, join thread, restart thread mid-test, adding p2p connections at the "right" time, ...

Tree-SHA512: 533642f12fef5496f1933855edcdab1a7ed901d088d34911749cd0f9e044c8a6cb1f89985ac3a7f41a512943663e4e270a61978f6f072143ae050cd102d4eab8
This commit is contained in:
Wladimir J. van der Laan 2018-06-29 18:04:25 +02:00
commit a6ed99a1e6
No known key found for this signature in database
GPG key ID: 1E4AED62986CD25D
26 changed files with 95 additions and 220 deletions

View file

@ -76,7 +76,7 @@ over the network (`CBlock`, `CTransaction`, etc, along with the network-level
wrappers for them, `msg_block`, `msg_tx`, etc).
- P2P tests have two threads. One thread handles all network communication
with the bitcoind(s) being tested (using python's asyncore package); the other
with the bitcoind(s) being tested in a callback-based event loop; the other
implements the test logic.
- `P2PConnection` is the class used to connect to a bitcoind. `P2PInterface`
@ -84,10 +84,6 @@ contains the higher level logic for processing P2P payloads and connecting to
the Bitcoin Core node application logic. For custom behaviour, subclass the
P2PInterface object and override the callback methods.
- Call `network_thread_start()` after all `P2PInterface` objects are created to
start the networking thread. (Continue with the test logic in your existing
thread.)
- Can be used to write tests where specific P2P protocol behavior is tested.
Examples tests are `p2p_unrequested_blocks.py`, `p2p_compactblocks.py`.

View file

@ -21,8 +21,6 @@ from test_framework.mininode import (
mininode_lock,
msg_block,
msg_getdata,
network_thread_join,
network_thread_start,
)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
@ -135,9 +133,6 @@ class ExampleTest(BitcoinTestFramework):
# Create P2P connections to two of the nodes
self.nodes[0].add_p2p_connection(BaseNode())
# Start up network handling in another thread. This needs to be called
# after the P2P connections have been created.
network_thread_start()
# wait_for_verack ensures that the P2P connection is fully up.
self.nodes[0].p2p.wait_for_verack()
@ -189,14 +184,9 @@ class ExampleTest(BitcoinTestFramework):
connect_nodes(self.nodes[1], 2)
self.log.info("Add P2P connection to node2")
# We can't add additional P2P connections once the network thread has started. Disconnect the connection
# to node0, wait for the network thread to terminate, then connect to node2. This is specific to
# the current implementation of the network thread and may be improved in future.
self.nodes[0].disconnect_p2ps()
network_thread_join()
self.nodes[2].add_p2p_connection(BaseNode())
network_thread_start()
self.nodes[2].p2p.wait_for_verack()
self.log.info("Wait for node2 reach current tip. Test that it has propagated all the blocks to us")

View file

@ -33,16 +33,16 @@ import time
from test_framework.blocktools import (create_block, create_coinbase)
from test_framework.key import CECKey
from test_framework.mininode import (CBlockHeader,
COutPoint,
CTransaction,
CTxIn,
CTxOut,
network_thread_join,
network_thread_start,
P2PInterface,
msg_block,
msg_headers)
from test_framework.messages import (
CBlockHeader,
COutPoint,
CTransaction,
CTxIn,
CTxOut,
msg_block,
msg_headers
)
from test_framework.mininode import P2PInterface
from test_framework.script import (CScript, OP_TRUE)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal
@ -98,8 +98,6 @@ class AssumeValidTest(BitcoinTestFramework):
# Connect to node0
p2p0 = self.nodes[0].add_p2p_connection(BaseNode())
network_thread_start()
self.nodes[0].p2p.wait_for_verack()
# Build the blockchain
@ -160,9 +158,7 @@ class AssumeValidTest(BitcoinTestFramework):
self.block_time += 1
height += 1
# We're adding new connections so terminate the network thread
self.nodes[0].disconnect_p2ps()
network_thread_join()
# Start node1 and node2 with assumevalid so they accept a block with a bad signature.
self.start_node(1, extra_args=["-assumevalid=" + hex(block102.sha256)])
@ -172,8 +168,6 @@ class AssumeValidTest(BitcoinTestFramework):
p2p1 = self.nodes[1].add_p2p_connection(BaseNode())
p2p2 = self.nodes[2].add_p2p_connection(BaseNode())
network_thread_start()
p2p0.wait_for_verack()
p2p1.wait_for_verack()
p2p2.wait_for_verack()

View file

@ -20,7 +20,7 @@ from test_framework.messages import (
uint256_from_compact,
uint256_from_str,
)
from test_framework.mininode import P2PDataStore, network_thread_start, network_thread_join
from test_framework.mininode import P2PDataStore
from test_framework.script import (
CScript,
MAX_SCRIPT_ELEMENT_SIZE,
@ -1299,7 +1299,6 @@ class FullBlockTest(BitcoinTestFramework):
Helper to connect and wait for version handshake."""
self.nodes[0].add_p2p_connection(P2PDataStore())
network_thread_start()
# We need to wait for the initial getheaders from the peer before we
# start populating our blockstore. If we don't, then we may run ahead
# to the next subtest before we receive the getheaders. We'd then send
@ -1314,7 +1313,6 @@ class FullBlockTest(BitcoinTestFramework):
The node gets disconnected several times in this test. This helper
method reconnects the p2p and restarts the network thread."""
self.nodes[0].disconnect_p2ps()
network_thread_join()
self.bootstrap_p2p()
def sync_blocks(self, blocks, success=True, reject_code=None, reject_reason=None, request_block=True, reconnect=False, timeout=60):

View file

@ -67,10 +67,6 @@ class BIP65Test(BitcoinTestFramework):
def run_test(self):
self.nodes[0].add_p2p_connection(P2PInterface())
network_thread_start()
# wait_for_verack ensures that the P2P connection is fully up.
self.nodes[0].p2p.wait_for_verack()
self.log.info("Mining %d blocks", CLTV_HEIGHT - 2)

View file

@ -49,7 +49,7 @@ import time
from test_framework.blocktools import create_coinbase, create_block
from test_framework.messages import ToHex, CTransaction
from test_framework.mininode import network_thread_start, P2PDataStore
from test_framework.mininode import P2PDataStore
from test_framework.script import (
CScript,
OP_CHECKSEQUENCEVERIFY,
@ -183,7 +183,6 @@ class BIP68_112_113Test(BitcoinTestFramework):
def run_test(self):
self.nodes[0].add_p2p_connection(P2PDataStore())
network_thread_start()
self.nodes[0].p2p.wait_for_verack()
self.log.info("Generate blocks in the past for coinbase outputs.")

View file

@ -56,8 +56,6 @@ class BIP66Test(BitcoinTestFramework):
def run_test(self):
self.nodes[0].add_p2p_connection(P2PInterface())
network_thread_start()
# wait_for_verack ensures that the P2P connection is fully up.
self.nodes[0].p2p.wait_for_verack()

View file

@ -57,7 +57,6 @@ class MaxUploadTest(BitcoinTestFramework):
for _ in range(3):
p2p_conns.append(self.nodes[0].add_p2p_connection(TestP2PConn()))
network_thread_start()
for p2pc in p2p_conns:
p2pc.wait_for_verack()
@ -148,8 +147,6 @@ class MaxUploadTest(BitcoinTestFramework):
# Reconnect to self.nodes[0]
self.nodes[0].add_p2p_connection(TestP2PConn())
network_thread_start()
self.nodes[0].p2p.wait_for_verack()
#retrieve 20 blocks which should be enough to break the 1MB limit

View file

@ -15,7 +15,7 @@ Generate 427 more blocks.
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import *
from test_framework.mininode import CTransaction, network_thread_start
from test_framework.messages import CTransaction
from test_framework.blocktools import create_coinbase, create_block, add_witness_commitment
from test_framework.script import CScript
from io import BytesIO
@ -50,7 +50,6 @@ class NULLDUMMYTest(BitcoinTestFramework):
self.wit_address = self.nodes[0].addwitnessaddress(self.address)
self.wit_ms_address = self.nodes[0].addmultisigaddress(1, [self.address], '', 'p2sh-segwit')['address']
network_thread_start()
self.coinbase_blocks = self.nodes[0].generate(2) # Block 2
coinbase_txid = []
for i in self.coinbase_blocks:

View file

@ -12,7 +12,7 @@ import re
from test_framework.blocktools import create_block, create_coinbase
from test_framework.messages import msg_block
from test_framework.mininode import P2PInterface, network_thread_start, mininode_lock
from test_framework.mininode import P2PInterface, mininode_lock
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import wait_until
@ -65,7 +65,6 @@ class VersionBitsWarningTest(BitcoinTestFramework):
# Handy alias
node = self.nodes[0]
node.add_p2p_connection(P2PInterface())
network_thread_start()
node.p2p.wait_for_verack()
# Mine one period worth of blocks

View file

@ -788,13 +788,11 @@ class CompactBlocksTest(BitcoinTestFramework):
assert_equal(int(node.getbestblockhash(), 16), block.sha256)
def run_test(self):
# Setup the p2p connections and start up the network thread.
# Setup the p2p connections
self.test_node = self.nodes[0].add_p2p_connection(TestP2PConn())
self.segwit_node = self.nodes[1].add_p2p_connection(TestP2PConn(), services=NODE_NETWORK|NODE_WITNESS)
self.old_node = self.nodes[1].add_p2p_connection(TestP2PConn(), services=NODE_NETWORK)
network_thread_start()
self.test_node.wait_for_verack()
# We will need UTXOs to construct transactions in later tests.

View file

@ -47,9 +47,8 @@ class FeeFilterTest(BitcoinTestFramework):
node1.generate(1)
sync_blocks(self.nodes)
# Setup the p2p connections and start up the network thread.
# Setup the p2p connections
self.nodes[0].add_p2p_connection(TestP2PConn())
network_thread_start()
self.nodes[0].p2p.wait_for_verack()
# Test that invs are received for all txs at feerate of 20 sat/byte

View file

@ -18,7 +18,6 @@ from test_framework.mininode import (
msg_block,
msg_getdata,
msg_getheaders,
network_thread_start,
wait_until,
)
from test_framework.test_framework import BitcoinTestFramework
@ -76,8 +75,6 @@ class P2PFingerprintTest(BitcoinTestFramework):
# last month but that have over a month's worth of work are also withheld.
def run_test(self):
node0 = self.nodes[0].add_p2p_connection(P2PInterface())
network_thread_start()
node0.wait_for_verack()
# Set node time to 60 days ago

View file

@ -14,7 +14,7 @@ import copy
from test_framework.blocktools import create_block, create_coinbase, create_transaction
from test_framework.messages import COIN
from test_framework.mininode import network_thread_start, P2PDataStore
from test_framework.mininode import P2PDataStore
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal
@ -28,8 +28,6 @@ class InvalidBlockRequestTest(BitcoinTestFramework):
# Add p2p connection to node0
node = self.nodes[0] # convenience reference to the node
node.add_p2p_connection(P2PDataStore())
network_thread_start()
node.p2p.wait_for_verack()
best_block = node.getblock(node.getbestblockhash())

View file

@ -13,7 +13,7 @@ from test_framework.messages import (
CTxIn,
CTxOut,
)
from test_framework.mininode import network_thread_start, P2PDataStore, network_thread_join
from test_framework.mininode import P2PDataStore
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
@ -32,7 +32,6 @@ class InvalidTxRequestTest(BitcoinTestFramework):
Helper to connect and wait for version handshake."""
for _ in range(num_connections):
self.nodes[0].add_p2p_connection(P2PDataStore())
network_thread_start()
self.nodes[0].p2p.wait_for_verack()
def reconnect_p2p(self, **kwargs):
@ -41,7 +40,6 @@ class InvalidTxRequestTest(BitcoinTestFramework):
The node gets disconnected several times in this test. This helper
method reconnects the p2p and restarts the network thread."""
self.nodes[0].disconnect_p2ps()
network_thread_join()
self.bootstrap_p2p(**kwargs)
def run_test(self):

View file

@ -103,8 +103,6 @@ class P2PLeakTest(BitcoinTestFramework):
unsupported_service_bit5_node = self.nodes[0].add_p2p_connection(CLazyNode(), services=NODE_NETWORK|NODE_UNSUPPORTED_SERVICE_BIT_5)
unsupported_service_bit7_node = self.nodes[0].add_p2p_connection(CLazyNode(), services=NODE_NETWORK|NODE_UNSUPPORTED_SERVICE_BIT_7)
network_thread_start()
wait_until(lambda: no_version_bannode.ever_connected, timeout=10, lock=mininode_lock)
wait_until(lambda: no_version_idlenode.ever_connected, timeout=10, lock=mininode_lock)
wait_until(lambda: no_verack_idlenode.version_received, timeout=10, lock=mininode_lock)
@ -126,9 +124,8 @@ class P2PLeakTest(BitcoinTestFramework):
self.nodes[0].disconnect_p2ps()
# Wait until all connections are closed and the network thread has terminated
# Wait until all connections are closed
wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 0)
network_thread_join()
# Make sure no unexpected messages came in
assert(no_version_bannode.unexpected_msg == False)
@ -143,11 +140,9 @@ class P2PLeakTest(BitcoinTestFramework):
allowed_service_bit5_node = self.nodes[0].add_p2p_connection(P2PInterface(), services=NODE_NETWORK|NODE_UNSUPPORTED_SERVICE_BIT_5)
allowed_service_bit7_node = self.nodes[0].add_p2p_connection(P2PInterface(), services=NODE_NETWORK|NODE_UNSUPPORTED_SERVICE_BIT_7)
# Network thread stopped when all previous P2PInterfaces disconnected. Restart it
network_thread_start()
wait_until(lambda: allowed_service_bit5_node.message_count["verack"], lock=mininode_lock)
wait_until(lambda: allowed_service_bit7_node.message_count["verack"], lock=mininode_lock)
if __name__ == '__main__':
P2PLeakTest().main()

View file

@ -21,7 +21,6 @@ class P2PMempoolTests(BitcoinTestFramework):
def run_test(self):
# Add a p2p connection
self.nodes[0].add_p2p_connection(P2PInterface())
network_thread_start()
self.nodes[0].p2p.wait_for_verack()
#request mempool

View file

@ -9,7 +9,7 @@ and that it responds to getdata requests for blocks correctly:
- send a block within 288 + 2 of the tip
- disconnect peers who request blocks older than that."""
from test_framework.messages import CInv, msg_getdata, msg_verack
from test_framework.mininode import NODE_BLOOM, NODE_NETWORK_LIMITED, NODE_WITNESS, P2PInterface, wait_until, mininode_lock, network_thread_start, network_thread_join
from test_framework.mininode import NODE_BLOOM, NODE_NETWORK_LIMITED, NODE_WITNESS, P2PInterface, wait_until, mininode_lock
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, disconnect_nodes, connect_nodes_bi, sync_blocks
@ -48,7 +48,6 @@ class NodeNetworkLimitedTest(BitcoinTestFramework):
def run_test(self):
node = self.nodes[0].add_p2p_connection(P2PIgnoreInv())
network_thread_start()
node.wait_for_verack()
expected_services = NODE_BLOOM | NODE_WITNESS | NODE_NETWORK_LIMITED
@ -74,9 +73,7 @@ class NodeNetworkLimitedTest(BitcoinTestFramework):
self.log.info("Check local address relay, do a fresh connection.")
self.nodes[0].disconnect_p2ps()
network_thread_join()
node1 = self.nodes[0].add_p2p_connection(P2PIgnoreInv())
network_thread_start()
node1.wait_for_verack()
node1.send_message(msg_verack())

View file

@ -1964,9 +1964,8 @@ class SegWitTest(BitcoinTestFramework):
self.utxo.pop(0)
def run_test(self):
# Setup the p2p connections and start up the network thread.
# Setup the p2p connections
# self.test_node sets NODE_WITNESS|NODE_NETWORK
self.test_node = self.nodes[0].add_p2p_connection(TestP2PConn(), services=NODE_NETWORK|NODE_WITNESS)
# self.old_node sets only NODE_NETWORK
@ -1974,8 +1973,6 @@ class SegWitTest(BitcoinTestFramework):
# self.std_node is for testing node1 (fRequireStandard=true)
self.std_node = self.nodes[1].add_p2p_connection(TestP2PConn(), services=NODE_NETWORK|NODE_WITNESS)
network_thread_start()
# Keep a place to store utxo's that can be used in later tests
self.utxo = []

View file

@ -90,7 +90,6 @@ from test_framework.mininode import (
CBlockHeader,
CInv,
NODE_WITNESS,
network_thread_start,
P2PInterface,
mininode_lock,
msg_block,
@ -238,15 +237,11 @@ class SendHeadersTest(BitcoinTestFramework):
return [int(x, 16) for x in all_hashes]
def run_test(self):
# Setup the p2p connections and start up the network thread.
# Setup the p2p connections
inv_node = self.nodes[0].add_p2p_connection(BaseNode())
# Make sure NODE_NETWORK is not set for test_node, so no block download
# will occur outside of direct fetching
test_node = self.nodes[0].add_p2p_connection(BaseNode(), services=NODE_WITNESS)
network_thread_start()
# Test logic begins here
inv_node.wait_for_verack()
test_node.wait_for_verack()

View file

@ -38,13 +38,11 @@ class TimeoutsTest(BitcoinTestFramework):
self.num_nodes = 1
def run_test(self):
# Setup the p2p connections and start up the network thread.
# Setup the p2p connections
no_verack_node = self.nodes[0].add_p2p_connection(TestP2PConn())
no_version_node = self.nodes[0].add_p2p_connection(TestP2PConn(), send_version=False)
no_send_node = self.nodes[0].add_p2p_connection(TestP2PConn(), send_version=False)
network_thread_start()
sleep(1)
assert no_verack_node.is_connected

View file

@ -73,15 +73,11 @@ class AcceptBlockTest(BitcoinTestFramework):
self.setup_nodes()
def run_test(self):
# Setup the p2p connections and start up the network thread.
# Setup the p2p connections
# test_node connects to node0 (not whitelisted)
test_node = self.nodes[0].add_p2p_connection(P2PInterface())
# min_work_node connects to node1 (whitelisted)
min_work_node = self.nodes[1].add_p2p_connection(P2PInterface())
network_thread_start()
# Test logic begins here
test_node.wait_for_verack()
min_work_node.wait_for_verack()
@ -204,10 +200,8 @@ class AcceptBlockTest(BitcoinTestFramework):
self.nodes[0].disconnect_p2ps()
self.nodes[1].disconnect_p2ps()
network_thread_join()
test_node = self.nodes[0].add_p2p_connection(P2PInterface())
network_thread_start()
test_node.wait_for_verack()
test_node.send_message(msg_block(block_h1f))
@ -293,8 +287,6 @@ class AcceptBlockTest(BitcoinTestFramework):
self.nodes[0].disconnect_p2ps()
test_node = self.nodes[0].add_p2p_connection(P2PInterface())
network_thread_start()
test_node.wait_for_verack()
# We should have failed reorg and switched back to 290 (but have block 291)

View file

@ -41,7 +41,6 @@ from test_framework.messages import (
)
from test_framework.mininode import (
P2PInterface,
network_thread_start,
)
@ -262,7 +261,6 @@ class BlockchainTest(BitcoinTestFramework):
# Start a P2P connection since we'll need to create some blocks.
node.add_p2p_connection(P2PInterface())
network_thread_start()
node.p2p.wait_for_verack()
current_height = node.getblock(node.getbestblockhash())['height']

View file

@ -13,11 +13,10 @@ P2PConnection: A low-level connection object to a node's P2P interface
P2PInterface: A high-level interface object for communicating to a node over P2P
P2PDataStore: A p2p interface class that keeps a store of transactions and blocks
and can respond correctly to getdata and getheaders messages"""
import asyncore
import asyncio
from collections import defaultdict
from io import BytesIO
import logging
import socket
import struct
import sys
import threading
@ -57,7 +56,8 @@ MAGIC_BYTES = {
"regtest": b"\xfa\xbf\xb5\xda", # regtest
}
class P2PConnection(asyncore.dispatcher):
class P2PConnection(asyncio.Protocol):
"""A low-level connection object to a node's P2P interface.
This class is responsible for:
@ -71,68 +71,59 @@ class P2PConnection(asyncore.dispatcher):
sub-classed and the on_message() callback overridden."""
def __init__(self):
# All P2PConnections must be created before starting the NetworkThread.
# assert that the network thread is not running.
assert not network_thread_running()
super().__init__(map=mininode_socket_map)
self._conn_open = False
# The underlying transport of the connection.
# Should only call methods on this from the NetworkThread, c.f. call_soon_threadsafe
self._transport = None
@property
def is_connected(self):
return self._conn_open
return self._transport is not None
def peer_connect(self, dstaddr, dstport, net="regtest"):
assert not self.is_connected
self.dstaddr = dstaddr
self.dstport = dstport
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.sendbuf = b""
# The initial message to send after the connection was made:
self.on_connection_send_msg = None
self.recvbuf = b""
self._asyncore_pre_connection = True
self.network = net
self.disconnect = False
logger.debug('Connecting to Bitcoin Node: %s:%d' % (self.dstaddr, self.dstport))
try:
self.connect((dstaddr, dstport))
except:
self.handle_close()
loop = NetworkThread.network_event_loop
conn_gen_unsafe = loop.create_connection(lambda: self, host=self.dstaddr, port=self.dstport)
conn_gen = lambda: loop.call_soon_threadsafe(loop.create_task, conn_gen_unsafe)
return conn_gen
def peer_disconnect(self):
# Connection could have already been closed by other end.
if self.is_connected:
self.disconnect = True # Signal asyncore to disconnect
NetworkThread.network_event_loop.call_soon_threadsafe(lambda: self._transport and self._transport.abort())
# Connection and disconnection methods
def handle_connect(self):
"""asyncore callback when a connection is opened."""
if not self.is_connected:
logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport))
self._conn_open = True
self._asyncore_pre_connection = False
self.on_open()
def connection_made(self, transport):
"""asyncio callback when a connection is opened."""
assert not self._transport
logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport))
self._transport = transport
if self.on_connection_send_msg:
self.send_message(self.on_connection_send_msg)
self.on_connection_send_msg = None # Never used again
self.on_open()
def handle_close(self):
"""asyncore callback when a connection is closed."""
logger.debug("Closing connection to: %s:%d" % (self.dstaddr, self.dstport))
self._conn_open = False
def connection_lost(self, exc):
"""asyncio callback when a connection is closed."""
if exc:
logger.warning("Connection lost to {}:{} due to {}".format(self.dstaddr, self.dstport, exc))
else:
logger.debug("Closed connection to: %s:%d" % (self.dstaddr, self.dstport))
self._transport = None
self.recvbuf = b""
self.sendbuf = b""
try:
self.close()
except:
pass
self.on_close()
# Socket read methods
def handle_read(self):
"""asyncore callback when data is read from the socket."""
t = self.recv(8192)
def data_received(self, t):
"""asyncio callback when data is read from the socket."""
if len(t) > 0:
self.recvbuf += t
self._on_data()
@ -179,30 +170,6 @@ class P2PConnection(asyncore.dispatcher):
# Socket write methods
def writable(self):
"""asyncore method to determine whether the handle_write() callback should be called on the next loop."""
with mininode_lock:
length = len(self.sendbuf)
return length > 0 or self._asyncore_pre_connection
def handle_write(self):
"""asyncore callback when data should be written to the socket."""
with mininode_lock:
# asyncore does not expose socket connection, only the first read/write
# event, thus we must check connection manually here to know when we
# actually connect
if self._asyncore_pre_connection:
self.handle_connect()
if not self.writable():
return
try:
sent = self.send(self.sendbuf)
except:
self.handle_close()
return
self.sendbuf = self.sendbuf[sent:]
def send_message(self, message):
"""Send a P2P message over the socket.
@ -212,15 +179,7 @@ class P2PConnection(asyncore.dispatcher):
raise IOError('Not connected')
self._log_message("send", message)
tmsg = self._build_message(message)
with mininode_lock:
if len(self.sendbuf) == 0:
try:
sent = self.send(tmsg)
self.sendbuf = tmsg[sent:]
except BlockingIOError:
self.sendbuf = tmsg
else:
self.sendbuf += tmsg
NetworkThread.network_event_loop.call_soon_threadsafe(lambda: self._transport and self._transport.write(tmsg))
# Class utility methods
@ -274,7 +233,7 @@ class P2PInterface(P2PConnection):
self.nServices = 0
def peer_connect(self, *args, services=NODE_NETWORK|NODE_WITNESS, send_version=True, **kwargs):
super().peer_connect(*args, **kwargs)
create_conn = super().peer_connect(*args, **kwargs)
if send_version:
# Send a version msg
@ -284,7 +243,9 @@ class P2PInterface(P2PConnection):
vt.addrTo.port = self.dstport
vt.addrFrom.ip = "0.0.0.0"
vt.addrFrom.port = 0
self.sendbuf = self._build_message(vt) # Will be sent right after handle_connect
self.on_connection_send_msg = vt # Will be sent soon after connection_made
return create_conn
# Message receiving methods
@ -408,56 +369,35 @@ class P2PInterface(P2PConnection):
self.ping_counter += 1
# Keep our own socket map for asyncore, so that we can track disconnects
# ourselves (to work around an issue with closing an asyncore socket when
# using select)
mininode_socket_map = dict()
# One lock for synchronizing all data access between the networking thread (see
# One lock for synchronizing all data access between the network event loop (see
# NetworkThread below) and the thread running the test logic. For simplicity,
# P2PConnection acquires this lock whenever delivering a message to a P2PInterface,
# and whenever adding anything to the send buffer (in send_message()). This
# lock should be acquired in the thread running the test logic to synchronize
# P2PConnection acquires this lock whenever delivering a message to a P2PInterface.
# This lock should be acquired in the thread running the test logic to synchronize
# access to any data shared with the P2PInterface or P2PConnection.
mininode_lock = threading.RLock()
class NetworkThread(threading.Thread):
network_event_loop = None
def __init__(self):
super().__init__(name="NetworkThread")
# There is only one event loop and no more than one thread must be created
assert not self.network_event_loop
NetworkThread.network_event_loop = asyncio.new_event_loop()
def run(self):
while mininode_socket_map:
# We check for whether to disconnect outside of the asyncore
# loop to work around the behavior of asyncore when using
# select
disconnected = []
for fd, obj in mininode_socket_map.items():
if obj.disconnect:
disconnected.append(obj)
[obj.handle_close() for obj in disconnected]
asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1)
logger.debug("Network thread closing")
"""Start the network thread."""
self.network_event_loop.run_forever()
def network_thread_start():
"""Start the network thread."""
# Only one network thread may run at a time
assert not network_thread_running()
def close(self, timeout=10):
"""Close the connections and network event loop."""
self.network_event_loop.call_soon_threadsafe(self.network_event_loop.stop)
wait_until(lambda: not self.network_event_loop.is_running(), timeout=timeout)
self.network_event_loop.close()
self.join(timeout)
NetworkThread().start()
def network_thread_running():
"""Return whether the network thread is running."""
return any([thread.name == "NetworkThread" for thread in threading.enumerate()])
def network_thread_join(timeout=10):
"""Wait timeout seconds for the network thread to terminate.
Throw if the network thread doesn't terminate in timeout seconds."""
network_threads = [thread for thread in threading.enumerate() if thread.name == "NetworkThread"]
assert len(network_threads) <= 1
for thread in network_threads:
thread.join(timeout)
assert not thread.is_alive()
class P2PDataStore(P2PInterface):
"""A P2P data store class.

View file

@ -18,6 +18,7 @@ import time
from .authproxy import JSONRPCException
from . import coverage
from .test_node import TestNode
from .mininode import NetworkThread
from .util import (
MAX_NODES,
PortSeed,
@ -83,6 +84,7 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
"""Sets test framework defaults. Do not override this method. Instead, override the set_test_params() method"""
self.setup_clean_chain = False
self.nodes = []
self.network_thread = None
self.mocktime = 0
self.supports_cli = False
self.bind_to_localhost_only = True
@ -144,6 +146,10 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
self.options.tmpdir = tempfile.mkdtemp(prefix="test")
self._start_logging()
self.log.debug('Setting up network thread')
self.network_thread = NetworkThread()
self.network_thread.start()
success = TestStatus.FAILED
try:
@ -171,6 +177,8 @@ class BitcoinTestFramework(metaclass=BitcoinTestMetaClass):
print("Testcase failed. Attaching python debugger. Enter ? for help")
pdb.set_trace()
self.log.debug('Closing down network thread')
self.network_thread.close()
if not self.options.noshutdown:
self.log.info("Stopping nodes")
if self.nodes:

View file

@ -289,7 +289,7 @@ class TestNode():
if 'dstaddr' not in kwargs:
kwargs['dstaddr'] = '127.0.0.1'
p2p_conn.peer_connect(*args, **kwargs)
p2p_conn.peer_connect(*args, **kwargs)()
self.p2ps.append(p2p_conn)
return p2p_conn
@ -343,10 +343,10 @@ class TestNodeCLI():
def batch(self, requests):
results = []
for request in requests:
try:
results.append(dict(result=request()))
except JSONRPCException as e:
results.append(dict(error=e))
try:
results.append(dict(result=request()))
except JSONRPCException as e:
results.append(dict(error=e))
return results
def send_cli(self, command=None, *args, **kwargs):