Fix potential race conditions in p2p testing framework
Previously, each NodeConnCB had its own lock to synchronize data structures used by the testing thread and the networking thread, and NodeConn provided a separate additional lock for synchronizing access to each send buffer. This commit replaces those locks with a single global lock (mininode_lock) that we use to synchronize access to all data structures shared by the two threads. Updates comptool and maxblocksinflight to use the new synchronization semantics, eliminating previous race conditions within comptool, and re-enables invalidblockrequest.py in travis.
This commit is contained in:
parent
5487975ca3
commit
574db4816f
4 changed files with 75 additions and 63 deletions
|
@ -31,7 +31,7 @@ testScripts=(
|
||||||
'merkle_blocks.py'
|
'merkle_blocks.py'
|
||||||
# 'forknotify.py'
|
# 'forknotify.py'
|
||||||
'maxblocksinflight.py'
|
'maxblocksinflight.py'
|
||||||
# 'invalidblockrequest.py'
|
'invalidblockrequest.py'
|
||||||
);
|
);
|
||||||
if [ "x${ENABLE_BITCOIND}${ENABLE_UTILS}${ENABLE_WALLET}" = "x111" ]; then
|
if [ "x${ENABLE_BITCOIND}${ENABLE_UTILS}${ENABLE_WALLET}" = "x111" ]; then
|
||||||
for (( i = 0; i < ${#testScripts[@]}; i++ ))
|
for (( i = 0; i < ${#testScripts[@]}; i++ ))
|
||||||
|
|
|
@ -25,6 +25,8 @@ generator that returns TestInstance objects. See below for definition.
|
||||||
# on_getheaders: provide headers via BlockStore
|
# on_getheaders: provide headers via BlockStore
|
||||||
# on_getdata: provide blocks via BlockStore
|
# on_getdata: provide blocks via BlockStore
|
||||||
|
|
||||||
|
global mininode_lock
|
||||||
|
|
||||||
class TestNode(NodeConnCB):
|
class TestNode(NodeConnCB):
|
||||||
|
|
||||||
def __init__(self, block_store, tx_store):
|
def __init__(self, block_store, tx_store):
|
||||||
|
@ -148,6 +150,7 @@ class TestManager(object):
|
||||||
max_tries = 10 / sleep_time # Wait at most 10 seconds
|
max_tries = 10 / sleep_time # Wait at most 10 seconds
|
||||||
while max_tries > 0:
|
while max_tries > 0:
|
||||||
done = True
|
done = True
|
||||||
|
with mininode_lock:
|
||||||
for c in self.connections:
|
for c in self.connections:
|
||||||
if c.cb.verack_received is False:
|
if c.cb.verack_received is False:
|
||||||
done = False
|
done = False
|
||||||
|
@ -161,6 +164,7 @@ class TestManager(object):
|
||||||
while received_pongs is not True:
|
while received_pongs is not True:
|
||||||
time.sleep(0.05)
|
time.sleep(0.05)
|
||||||
received_pongs = True
|
received_pongs = True
|
||||||
|
with mininode_lock:
|
||||||
for c in self.connections:
|
for c in self.connections:
|
||||||
if c.cb.received_ping_response(counter) is not True:
|
if c.cb.received_ping_response(counter) is not True:
|
||||||
received_pongs = False
|
received_pongs = False
|
||||||
|
@ -173,6 +177,7 @@ class TestManager(object):
|
||||||
# Wait for nodes to request block (50ms sleep * 20 tries * num_blocks)
|
# Wait for nodes to request block (50ms sleep * 20 tries * num_blocks)
|
||||||
max_tries = 20*num_blocks
|
max_tries = 20*num_blocks
|
||||||
while max_tries > 0:
|
while max_tries > 0:
|
||||||
|
with mininode_lock:
|
||||||
results = [ blockhash in c.cb.block_request_map and
|
results = [ blockhash in c.cb.block_request_map and
|
||||||
c.cb.block_request_map[blockhash] for c in self.connections ]
|
c.cb.block_request_map[blockhash] for c in self.connections ]
|
||||||
if False not in results:
|
if False not in results:
|
||||||
|
@ -199,6 +204,7 @@ class TestManager(object):
|
||||||
# Wait for nodes to request transaction (50ms sleep * 20 tries * num_events)
|
# Wait for nodes to request transaction (50ms sleep * 20 tries * num_events)
|
||||||
max_tries = 20*num_events
|
max_tries = 20*num_events
|
||||||
while max_tries > 0:
|
while max_tries > 0:
|
||||||
|
with mininode_lock:
|
||||||
results = [ txhash in c.cb.tx_request_map and
|
results = [ txhash in c.cb.tx_request_map and
|
||||||
c.cb.tx_request_map[txhash] for c in self.connections ]
|
c.cb.tx_request_map[txhash] for c in self.connections ]
|
||||||
if False not in results:
|
if False not in results:
|
||||||
|
@ -221,11 +227,13 @@ class TestManager(object):
|
||||||
self.ping_counter += 1
|
self.ping_counter += 1
|
||||||
|
|
||||||
# Sort inv responses from each node
|
# Sort inv responses from each node
|
||||||
|
with mininode_lock:
|
||||||
[ c.cb.lastInv.sort() for c in self.connections ]
|
[ c.cb.lastInv.sort() for c in self.connections ]
|
||||||
|
|
||||||
# Verify that the tip of each connection all agree with each other, and
|
# Verify that the tip of each connection all agree with each other, and
|
||||||
# with the expected outcome (if given)
|
# with the expected outcome (if given)
|
||||||
def check_results(self, blockhash, outcome):
|
def check_results(self, blockhash, outcome):
|
||||||
|
with mininode_lock:
|
||||||
for c in self.connections:
|
for c in self.connections:
|
||||||
if outcome is None:
|
if outcome is None:
|
||||||
if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
|
if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
|
||||||
|
@ -242,6 +250,7 @@ class TestManager(object):
|
||||||
# perhaps it would be useful to add the ability to check explicitly that
|
# perhaps it would be useful to add the ability to check explicitly that
|
||||||
# a particular tx's existence in the mempool is the same across all nodes.
|
# a particular tx's existence in the mempool is the same across all nodes.
|
||||||
def check_mempool(self, txhash, outcome):
|
def check_mempool(self, txhash, outcome):
|
||||||
|
with mininode_lock:
|
||||||
for c in self.connections:
|
for c in self.connections:
|
||||||
if outcome is None:
|
if outcome is None:
|
||||||
# Make sure the mempools agree with each other
|
# Make sure the mempools agree with each other
|
||||||
|
@ -272,6 +281,7 @@ class TestManager(object):
|
||||||
block = b_or_t
|
block = b_or_t
|
||||||
block_outcome = outcome
|
block_outcome = outcome
|
||||||
# Add to shared block_store, set as current block
|
# Add to shared block_store, set as current block
|
||||||
|
with mininode_lock:
|
||||||
self.block_store.add_block(block)
|
self.block_store.add_block(block)
|
||||||
for c in self.connections:
|
for c in self.connections:
|
||||||
c.cb.block_request_map[block.sha256] = False
|
c.cb.block_request_map[block.sha256] = False
|
||||||
|
@ -288,7 +298,8 @@ class TestManager(object):
|
||||||
assert(isinstance(b_or_t, CTransaction))
|
assert(isinstance(b_or_t, CTransaction))
|
||||||
tx = b_or_t
|
tx = b_or_t
|
||||||
tx_outcome = outcome
|
tx_outcome = outcome
|
||||||
# Add to shared tx store
|
# Add to shared tx store and clear map entry
|
||||||
|
with mininode_lock:
|
||||||
self.tx_store.add_transaction(tx)
|
self.tx_store.add_transaction(tx)
|
||||||
for c in self.connections:
|
for c in self.connections:
|
||||||
c.cb.tx_request_map[tx.sha256] = False
|
c.cb.tx_request_map[tx.sha256] = False
|
||||||
|
|
|
@ -61,6 +61,7 @@ class TestManager(NodeConnCB):
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
|
||||||
total_requests = 0
|
total_requests = 0
|
||||||
|
with mininode_lock:
|
||||||
for key in self.blockReqCounts:
|
for key in self.blockReqCounts:
|
||||||
total_requests += self.blockReqCounts[key]
|
total_requests += self.blockReqCounts[key]
|
||||||
if self.blockReqCounts[key] > 1:
|
if self.blockReqCounts[key] > 1:
|
||||||
|
|
|
@ -26,7 +26,7 @@ import sys
|
||||||
import random
|
import random
|
||||||
import cStringIO
|
import cStringIO
|
||||||
import hashlib
|
import hashlib
|
||||||
from threading import Lock
|
from threading import RLock
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
import logging
|
import logging
|
||||||
import copy
|
import copy
|
||||||
|
@ -42,6 +42,14 @@ MAX_INV_SZ = 50000
|
||||||
# using select)
|
# using select)
|
||||||
mininode_socket_map = dict()
|
mininode_socket_map = dict()
|
||||||
|
|
||||||
|
# One lock for synchronizing all data access between the networking thread (see
|
||||||
|
# NetworkThread below) and the thread running the test logic. For simplicity,
|
||||||
|
# NodeConn acquires this lock whenever delivering a message to to a NodeConnCB,
|
||||||
|
# and whenever adding anything to the send buffer (in send_message()). This
|
||||||
|
# lock should be acquired in the thread running the test logic to synchronize
|
||||||
|
# access to any data shared with the NodeConnCB or NodeConn.
|
||||||
|
mininode_lock = RLock()
|
||||||
|
|
||||||
# Serialization/deserialization tools
|
# Serialization/deserialization tools
|
||||||
def sha256(s):
|
def sha256(s):
|
||||||
return hashlib.new('sha256', s).digest()
|
return hashlib.new('sha256', s).digest()
|
||||||
|
@ -980,10 +988,6 @@ class msg_reject(object):
|
||||||
# Reimplement the on_* functions to provide handling for events
|
# Reimplement the on_* functions to provide handling for events
|
||||||
class NodeConnCB(object):
|
class NodeConnCB(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# Acquire on all callbacks -- overkill for now since asyncore is
|
|
||||||
# single-threaded, but may be useful for synchronizing access to
|
|
||||||
# member variables in derived classes.
|
|
||||||
self.cbLock = Lock()
|
|
||||||
self.verack_received = False
|
self.verack_received = False
|
||||||
|
|
||||||
# Derived classes should call this function once to set the message map
|
# Derived classes should call this function once to set the message map
|
||||||
|
@ -1009,7 +1013,7 @@ class NodeConnCB(object):
|
||||||
}
|
}
|
||||||
|
|
||||||
def deliver(self, conn, message):
|
def deliver(self, conn, message):
|
||||||
with self.cbLock:
|
with mininode_lock:
|
||||||
try:
|
try:
|
||||||
self.cbmap[message.command](conn, message)
|
self.cbmap[message.command](conn, message)
|
||||||
except:
|
except:
|
||||||
|
@ -1094,7 +1098,6 @@ class NodeConn(asyncore.dispatcher):
|
||||||
self.state = "connecting"
|
self.state = "connecting"
|
||||||
self.network = net
|
self.network = net
|
||||||
self.cb = callback
|
self.cb = callback
|
||||||
self.sendbufLock = Lock() # for protecting the sendbuffer
|
|
||||||
self.disconnect = False
|
self.disconnect = False
|
||||||
|
|
||||||
# stuff version msg into sendbuf
|
# stuff version msg into sendbuf
|
||||||
|
@ -1145,20 +1148,18 @@ class NodeConn(asyncore.dispatcher):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def writable(self):
|
def writable(self):
|
||||||
self.sendbufLock.acquire()
|
with mininode_lock:
|
||||||
length = len(self.sendbuf)
|
length = len(self.sendbuf)
|
||||||
self.sendbufLock.release()
|
|
||||||
return (length > 0)
|
return (length > 0)
|
||||||
|
|
||||||
def handle_write(self):
|
def handle_write(self):
|
||||||
self.sendbufLock.acquire()
|
with mininode_lock:
|
||||||
try:
|
try:
|
||||||
sent = self.send(self.sendbuf)
|
sent = self.send(self.sendbuf)
|
||||||
except:
|
except:
|
||||||
self.handle_close()
|
self.handle_close()
|
||||||
return
|
return
|
||||||
self.sendbuf = self.sendbuf[sent:]
|
self.sendbuf = self.sendbuf[sent:]
|
||||||
self.sendbufLock.release()
|
|
||||||
|
|
||||||
def got_data(self):
|
def got_data(self):
|
||||||
while True:
|
while True:
|
||||||
|
@ -1202,7 +1203,6 @@ class NodeConn(asyncore.dispatcher):
|
||||||
def send_message(self, message, pushbuf=False):
|
def send_message(self, message, pushbuf=False):
|
||||||
if self.state != "connected" and not pushbuf:
|
if self.state != "connected" and not pushbuf:
|
||||||
return
|
return
|
||||||
self.sendbufLock.acquire()
|
|
||||||
self.show_debug_msg("Send %s" % repr(message))
|
self.show_debug_msg("Send %s" % repr(message))
|
||||||
command = message.command
|
command = message.command
|
||||||
data = message.serialize()
|
data = message.serialize()
|
||||||
|
@ -1215,9 +1215,9 @@ class NodeConn(asyncore.dispatcher):
|
||||||
h = sha256(th)
|
h = sha256(th)
|
||||||
tmsg += h[:4]
|
tmsg += h[:4]
|
||||||
tmsg += data
|
tmsg += data
|
||||||
|
with mininode_lock:
|
||||||
self.sendbuf += tmsg
|
self.sendbuf += tmsg
|
||||||
self.last_sent = time.time()
|
self.last_sent = time.time()
|
||||||
self.sendbufLock.release()
|
|
||||||
|
|
||||||
def got_message(self, message):
|
def got_message(self, message):
|
||||||
if message.command == "version":
|
if message.command == "version":
|
||||||
|
|
Loading…
Reference in a new issue