Merge pull request #6094

2a22d4b Fix comptool send_message call when MAX_INV_SZ reached (Suhas Daftuar)
574db48 Fix potential race conditions in p2p testing framework (Suhas Daftuar)
5487975 Don't run invalidblockrequest.py in travis until race condition is fixed (Suhas Daftuar)
ef32817 Fix mininode disconnections to work with select (Suhas Daftuar)
This commit is contained in:
Wladimir J. van der Laan 2015-05-04 07:52:44 +02:00
commit 82d06e2338
No known key found for this signature in database
GPG key ID: 74810B012346C9A6
3 changed files with 91 additions and 70 deletions

View file

@ -25,6 +25,8 @@ generator that returns TestInstance objects. See below for definition.
# on_getheaders: provide headers via BlockStore # on_getheaders: provide headers via BlockStore
# on_getdata: provide blocks via BlockStore # on_getdata: provide blocks via BlockStore
global mininode_lock
class TestNode(NodeConnCB): class TestNode(NodeConnCB):
def __init__(self, block_store, tx_store): def __init__(self, block_store, tx_store):
@ -148,10 +150,11 @@ class TestManager(object):
max_tries = 10 / sleep_time # Wait at most 10 seconds max_tries = 10 / sleep_time # Wait at most 10 seconds
while max_tries > 0: while max_tries > 0:
done = True done = True
for c in self.connections: with mininode_lock:
if c.cb.verack_received is False: for c in self.connections:
done = False if c.cb.verack_received is False:
break done = False
break
if done: if done:
break break
time.sleep(sleep_time) time.sleep(sleep_time)
@ -161,10 +164,11 @@ class TestManager(object):
while received_pongs is not True: while received_pongs is not True:
time.sleep(0.05) time.sleep(0.05)
received_pongs = True received_pongs = True
for c in self.connections: with mininode_lock:
if c.cb.received_ping_response(counter) is not True: for c in self.connections:
received_pongs = False if c.cb.received_ping_response(counter) is not True:
break received_pongs = False
break
# sync_blocks: Wait for all connections to request the blockhash given # sync_blocks: Wait for all connections to request the blockhash given
# then send get_headers to find out the tip of each node, and synchronize # then send get_headers to find out the tip of each node, and synchronize
@ -173,8 +177,9 @@ class TestManager(object):
# Wait for nodes to request block (50ms sleep * 20 tries * num_blocks) # Wait for nodes to request block (50ms sleep * 20 tries * num_blocks)
max_tries = 20*num_blocks max_tries = 20*num_blocks
while max_tries > 0: while max_tries > 0:
results = [ blockhash in c.cb.block_request_map and with mininode_lock:
c.cb.block_request_map[blockhash] for c in self.connections ] results = [ blockhash in c.cb.block_request_map and
c.cb.block_request_map[blockhash] for c in self.connections ]
if False not in results: if False not in results:
break break
time.sleep(0.05) time.sleep(0.05)
@ -199,8 +204,9 @@ class TestManager(object):
# Wait for nodes to request transaction (50ms sleep * 20 tries * num_events) # Wait for nodes to request transaction (50ms sleep * 20 tries * num_events)
max_tries = 20*num_events max_tries = 20*num_events
while max_tries > 0: while max_tries > 0:
results = [ txhash in c.cb.tx_request_map and with mininode_lock:
c.cb.tx_request_map[txhash] for c in self.connections ] results = [ txhash in c.cb.tx_request_map and
c.cb.tx_request_map[txhash] for c in self.connections ]
if False not in results: if False not in results:
break break
time.sleep(0.05) time.sleep(0.05)
@ -221,19 +227,21 @@ class TestManager(object):
self.ping_counter += 1 self.ping_counter += 1
# Sort inv responses from each node # Sort inv responses from each node
[ c.cb.lastInv.sort() for c in self.connections ] with mininode_lock:
[ c.cb.lastInv.sort() for c in self.connections ]
# Verify that the tip of each connection all agree with each other, and # Verify that the tip of each connection all agree with each other, and
# with the expected outcome (if given) # with the expected outcome (if given)
def check_results(self, blockhash, outcome): def check_results(self, blockhash, outcome):
for c in self.connections: with mininode_lock:
if outcome is None: for c in self.connections:
if c.cb.bestblockhash != self.connections[0].cb.bestblockhash: if outcome is None:
if c.cb.bestblockhash != self.connections[0].cb.bestblockhash:
return False
elif ((c.cb.bestblockhash == blockhash) != outcome):
# print c.cb.bestblockhash, blockhash, outcome
return False return False
elif ((c.cb.bestblockhash == blockhash) != outcome): return True
# print c.cb.bestblockhash, blockhash, outcome
return False
return True
# Either check that the mempools all agree with each other, or that # Either check that the mempools all agree with each other, or that
# txhash's presence in the mempool matches the outcome specified. # txhash's presence in the mempool matches the outcome specified.
@ -242,16 +250,17 @@ class TestManager(object):
# perhaps it would be useful to add the ability to check explicitly that # perhaps it would be useful to add the ability to check explicitly that
# a particular tx's existence in the mempool is the same across all nodes. # a particular tx's existence in the mempool is the same across all nodes.
def check_mempool(self, txhash, outcome): def check_mempool(self, txhash, outcome):
for c in self.connections: with mininode_lock:
if outcome is None: for c in self.connections:
# Make sure the mempools agree with each other if outcome is None:
if c.cb.lastInv != self.connections[0].cb.lastInv: # Make sure the mempools agree with each other
# print c.rpc.getrawmempool() if c.cb.lastInv != self.connections[0].cb.lastInv:
# print c.rpc.getrawmempool()
return False
elif ((txhash in c.cb.lastInv) != outcome):
# print c.rpc.getrawmempool(), c.cb.lastInv
return False return False
elif ((txhash in c.cb.lastInv) != outcome): return True
# print c.rpc.getrawmempool(), c.cb.lastInv
return False
return True
def run(self): def run(self):
# Wait until verack is received # Wait until verack is received
@ -272,9 +281,10 @@ class TestManager(object):
block = b_or_t block = b_or_t
block_outcome = outcome block_outcome = outcome
# Add to shared block_store, set as current block # Add to shared block_store, set as current block
self.block_store.add_block(block) with mininode_lock:
for c in self.connections: self.block_store.add_block(block)
c.cb.block_request_map[block.sha256] = False for c in self.connections:
c.cb.block_request_map[block.sha256] = False
# Either send inv's to each node and sync, or add # Either send inv's to each node and sync, or add
# to invqueue for later inv'ing. # to invqueue for later inv'ing.
if (test_instance.sync_every_block): if (test_instance.sync_every_block):
@ -288,10 +298,11 @@ class TestManager(object):
assert(isinstance(b_or_t, CTransaction)) assert(isinstance(b_or_t, CTransaction))
tx = b_or_t tx = b_or_t
tx_outcome = outcome tx_outcome = outcome
# Add to shared tx store # Add to shared tx store and clear map entry
self.tx_store.add_transaction(tx) with mininode_lock:
for c in self.connections: self.tx_store.add_transaction(tx)
c.cb.tx_request_map[tx.sha256] = False for c in self.connections:
c.cb.tx_request_map[tx.sha256] = False
# Again, either inv to all nodes or save for later # Again, either inv to all nodes or save for later
if (test_instance.sync_every_tx): if (test_instance.sync_every_tx):
[ c.cb.send_inv(tx) for c in self.connections ] [ c.cb.send_inv(tx) for c in self.connections ]
@ -302,7 +313,7 @@ class TestManager(object):
invqueue.append(CInv(1, tx.sha256)) invqueue.append(CInv(1, tx.sha256))
# Ensure we're not overflowing the inv queue # Ensure we're not overflowing the inv queue
if len(invqueue) == MAX_INV_SZ: if len(invqueue) == MAX_INV_SZ:
[ c.sb.send_message(msg_inv(invqueue)) for c in self.connections ] [ c.send_message(msg_inv(invqueue)) for c in self.connections ]
invqueue = [] invqueue = []
# Do final sync if we weren't syncing on every block or every tx. # Do final sync if we weren't syncing on every block or every tx.

View file

@ -61,10 +61,11 @@ class TestManager(NodeConnCB):
time.sleep(2) time.sleep(2)
total_requests = 0 total_requests = 0
for key in self.blockReqCounts: with mininode_lock:
total_requests += self.blockReqCounts[key] for key in self.blockReqCounts:
if self.blockReqCounts[key] > 1: total_requests += self.blockReqCounts[key]
raise AssertionError("Error, test failed: block %064x requested more than once" % key) if self.blockReqCounts[key] > 1:
raise AssertionError("Error, test failed: block %064x requested more than once" % key)
if total_requests > MAX_REQUESTS: if total_requests > MAX_REQUESTS:
raise AssertionError("Error, too many blocks (%d) requested" % total_requests) raise AssertionError("Error, too many blocks (%d) requested" % total_requests)
print "Round %d: success (total requests: %d)" % (count, total_requests) print "Round %d: success (total requests: %d)" % (count, total_requests)

View file

@ -26,7 +26,7 @@ import sys
import random import random
import cStringIO import cStringIO
import hashlib import hashlib
from threading import Lock from threading import RLock
from threading import Thread from threading import Thread
import logging import logging
import copy import copy
@ -37,6 +37,19 @@ MY_SUBVERSION = "/python-mininode-tester:0.0.1/"
MAX_INV_SZ = 50000 MAX_INV_SZ = 50000
# Keep our own socket map for asyncore, so that we can track disconnects
# ourselves (to workaround an issue with closing an asyncore socket when
# using select)
mininode_socket_map = dict()
# One lock for synchronizing all data access between the networking thread (see
# NetworkThread below) and the thread running the test logic. For simplicity,
# NodeConn acquires this lock whenever delivering a message to to a NodeConnCB,
# and whenever adding anything to the send buffer (in send_message()). This
# lock should be acquired in the thread running the test logic to synchronize
# access to any data shared with the NodeConnCB or NodeConn.
mininode_lock = RLock()
# Serialization/deserialization tools # Serialization/deserialization tools
def sha256(s): def sha256(s):
return hashlib.new('sha256', s).digest() return hashlib.new('sha256', s).digest()
@ -975,10 +988,6 @@ class msg_reject(object):
# Reimplement the on_* functions to provide handling for events # Reimplement the on_* functions to provide handling for events
class NodeConnCB(object): class NodeConnCB(object):
def __init__(self): def __init__(self):
# Acquire on all callbacks -- overkill for now since asyncore is
# single-threaded, but may be useful for synchronizing access to
# member variables in derived classes.
self.cbLock = Lock()
self.verack_received = False self.verack_received = False
# Derived classes should call this function once to set the message map # Derived classes should call this function once to set the message map
@ -1004,7 +1013,7 @@ class NodeConnCB(object):
} }
def deliver(self, conn, message): def deliver(self, conn, message):
with self.cbLock: with mininode_lock:
try: try:
self.cbmap[message.command](conn, message) self.cbmap[message.command](conn, message)
except: except:
@ -1076,7 +1085,7 @@ class NodeConn(asyncore.dispatcher):
} }
def __init__(self, dstaddr, dstport, rpc, callback, net="regtest"): def __init__(self, dstaddr, dstport, rpc, callback, net="regtest"):
asyncore.dispatcher.__init__(self) asyncore.dispatcher.__init__(self, map=mininode_socket_map)
self.log = logging.getLogger("NodeConn(%s:%d)" % (dstaddr, dstport)) self.log = logging.getLogger("NodeConn(%s:%d)" % (dstaddr, dstport))
self.dstaddr = dstaddr self.dstaddr = dstaddr
self.dstport = dstport self.dstport = dstport
@ -1089,7 +1098,6 @@ class NodeConn(asyncore.dispatcher):
self.state = "connecting" self.state = "connecting"
self.network = net self.network = net
self.cb = callback self.cb = callback
self.sendbufLock = Lock() # for protecting the sendbuffer
self.disconnect = False self.disconnect = False
# stuff version msg into sendbuf # stuff version msg into sendbuf
@ -1140,24 +1148,18 @@ class NodeConn(asyncore.dispatcher):
return True return True
def writable(self): def writable(self):
if self.disconnect: with mininode_lock:
self.handle_close()
return False
else:
self.sendbufLock.acquire()
length = len(self.sendbuf) length = len(self.sendbuf)
self.sendbufLock.release() return (length > 0)
return (length > 0)
def handle_write(self): def handle_write(self):
self.sendbufLock.acquire() with mininode_lock:
try: try:
sent = self.send(self.sendbuf) sent = self.send(self.sendbuf)
except: except:
self.handle_close() self.handle_close()
return return
self.sendbuf = self.sendbuf[sent:] self.sendbuf = self.sendbuf[sent:]
self.sendbufLock.release()
def got_data(self): def got_data(self):
while True: while True:
@ -1201,7 +1203,6 @@ class NodeConn(asyncore.dispatcher):
def send_message(self, message, pushbuf=False): def send_message(self, message, pushbuf=False):
if self.state != "connected" and not pushbuf: if self.state != "connected" and not pushbuf:
return return
self.sendbufLock.acquire()
self.show_debug_msg("Send %s" % repr(message)) self.show_debug_msg("Send %s" % repr(message))
command = message.command command = message.command
data = message.serialize() data = message.serialize()
@ -1214,9 +1215,9 @@ class NodeConn(asyncore.dispatcher):
h = sha256(th) h = sha256(th)
tmsg += h[:4] tmsg += h[:4]
tmsg += data tmsg += data
self.sendbuf += tmsg with mininode_lock:
self.last_sent = time.time() self.sendbuf += tmsg
self.sendbufLock.release() self.last_sent = time.time()
def got_message(self, message): def got_message(self, message):
if message.command == "version": if message.command == "version":
@ -1229,12 +1230,20 @@ class NodeConn(asyncore.dispatcher):
def disconnect_node(self): def disconnect_node(self):
self.disconnect = True self.disconnect = True
self.send_message(self.messagemap['ping']())
class NetworkThread(Thread): class NetworkThread(Thread):
def run(self): def run(self):
asyncore.loop(0.1, True) while mininode_socket_map:
# We check for whether to disconnect outside of the asyncore
# loop to workaround the behavior of asyncore when using
# select
disconnected = []
for fd, obj in mininode_socket_map.items():
if obj.disconnect:
disconnected.append(obj)
[ obj.handle_close() for obj in disconnected ]
asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1)
# An exception we can raise if we detect a potential disconnect # An exception we can raise if we detect a potential disconnect