[tests] Add P2PDataStore class
P2PDataStore subclasses NodeConnCB. This class keeps a store of txs and blocks and responds to getdata requests from the node-under-test.
This commit is contained in:
parent
cc046f66a7
commit
c32cf9f622
2 changed files with 155 additions and 3 deletions
|
@ -44,6 +44,11 @@ NODE_UNSUPPORTED_SERVICE_BIT_5 = (1 << 5)
|
|||
NODE_UNSUPPORTED_SERVICE_BIT_7 = (1 << 7)
|
||||
NODE_NETWORK_LIMITED = (1 << 10)
|
||||
|
||||
MSG_TX = 1
|
||||
MSG_BLOCK = 2
|
||||
MSG_WITNESS_FLAG = 1 << 30
|
||||
MSG_TYPE_MASK = 0xffffffff >> 2
|
||||
|
||||
# Serialization/deserialization tools
|
||||
def sha256(s):
|
||||
return hashlib.new('sha256', s).digest()
|
||||
|
@ -204,8 +209,6 @@ class CAddress():
|
|||
return "CAddress(nServices=%i ip=%s port=%i)" % (self.nServices,
|
||||
self.ip, self.port)
|
||||
|
||||
MSG_WITNESS_FLAG = 1<<30
|
||||
|
||||
class CInv():
|
||||
typemap = {
|
||||
0: "Error",
|
||||
|
|
|
@ -10,7 +10,9 @@ This python code was modified from ArtForz' public domain half-a-node, as
|
|||
found in the mini-node branch of http://github.com/jgarzik/pynode.
|
||||
|
||||
P2PConnection: A low-level connection object to a node's P2P interface
|
||||
P2PInterface: A high-level interface object for communicating to a node over P2P"""
|
||||
P2PInterface: A high-level interface object for communicating to a node over P2P
|
||||
P2PDataStore: A p2p interface class that keeps a store of transactions and blocks
|
||||
and can respond correctly to getdata and getheaders messages"""
|
||||
import asyncore
|
||||
from collections import defaultdict
|
||||
from io import BytesIO
|
||||
|
@ -356,10 +358,22 @@ class P2PInterface(P2PConnection):
|
|||
wait_until(test_function, timeout=timeout, lock=mininode_lock)
|
||||
|
||||
def wait_for_getdata(self, timeout=60):
|
||||
"""Waits for a getdata message.
|
||||
|
||||
Receiving any getdata message will satisfy the predicate. the last_message["getdata"]
|
||||
value must be explicitly cleared before calling this method, or this will return
|
||||
immediately with success. TODO: change this method to take a hash value and only
|
||||
return true if the correct block/tx has been requested."""
|
||||
test_function = lambda: self.last_message.get("getdata")
|
||||
wait_until(test_function, timeout=timeout, lock=mininode_lock)
|
||||
|
||||
def wait_for_getheaders(self, timeout=60):
|
||||
"""Waits for a getheaders message.
|
||||
|
||||
Receiving any getheaders message will satisfy the predicate. the last_message["getheaders"]
|
||||
value must be explicitly cleared before calling this method, or this will return
|
||||
immediately with success. TODO: change this method to take a hash value and only
|
||||
return true if the correct block header has been requested."""
|
||||
test_function = lambda: self.last_message.get("getheaders")
|
||||
wait_until(test_function, timeout=timeout, lock=mininode_lock)
|
||||
|
||||
|
@ -440,3 +454,138 @@ def network_thread_join(timeout=10):
|
|||
for thread in network_threads:
|
||||
thread.join(timeout)
|
||||
assert not thread.is_alive()
|
||||
|
||||
class P2PDataStore(P2PInterface):
|
||||
"""A P2P data store class.
|
||||
|
||||
Keeps a block and transaction store and responds correctly to getdata and getheaders requests."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.reject_code_received = None
|
||||
self.reject_reason_received = None
|
||||
# store of blocks. key is block hash, value is a CBlock object
|
||||
self.block_store = {}
|
||||
self.last_block_hash = ''
|
||||
# store of txs. key is txid, value is a CTransaction object
|
||||
self.tx_store = {}
|
||||
self.getdata_requests = []
|
||||
|
||||
def on_getdata(self, message):
|
||||
"""Check for the tx/block in our stores and if found, reply with an inv message."""
|
||||
for inv in message.inv:
|
||||
self.getdata_requests.append(inv.hash)
|
||||
if (inv.type & MSG_TYPE_MASK) == MSG_TX and inv.hash in self.tx_store.keys():
|
||||
self.send_message(msg_tx(self.tx_store[inv.hash]))
|
||||
elif (inv.type & MSG_TYPE_MASK) == MSG_BLOCK and inv.hash in self.block_store.keys():
|
||||
self.send_message(msg_block(self.block_store[inv.hash]))
|
||||
else:
|
||||
logger.debug('getdata message type {} received.'.format(hex(inv.type)))
|
||||
|
||||
def on_getheaders(self, message):
|
||||
"""Search back through our block store for the locator, and reply with a headers message if found."""
|
||||
|
||||
locator, hash_stop = message.locator, message.hashstop
|
||||
|
||||
# Assume that the most recent block added is the tip
|
||||
if not self.block_store:
|
||||
return
|
||||
|
||||
headers_list = [self.block_store[self.last_block_hash]]
|
||||
maxheaders = 2000
|
||||
while headers_list[-1].sha256 not in locator.vHave:
|
||||
# Walk back through the block store, adding headers to headers_list
|
||||
# as we go.
|
||||
prev_block_hash = headers_list[-1].hashPrevBlock
|
||||
if prev_block_hash in self.block_store:
|
||||
prev_block_header = self.block_store[prev_block_hash]
|
||||
headers_list.append(prev_block_header)
|
||||
if prev_block_header.sha256 == hash_stop:
|
||||
# if this is the hashstop header, stop here
|
||||
break
|
||||
else:
|
||||
logger.debug('block hash {} not found in block store'.format(hex(prev_block_hash)))
|
||||
break
|
||||
|
||||
# Truncate the list if there are too many headers
|
||||
headers_list = headers_list[:-maxheaders - 1:-1]
|
||||
response = msg_headers(headers_list)
|
||||
|
||||
if response is not None:
|
||||
self.send_message(response)
|
||||
|
||||
def on_reject(self, message):
|
||||
"""Store reject reason and code for testing."""
|
||||
self.reject_code_received = message.code
|
||||
self.reject_reason_received = message.reason
|
||||
|
||||
def send_blocks_and_test(self, blocks, rpc, success=True, request_block=True, reject_code=None, reject_reason=None, timeout=60):
|
||||
"""Send blocks to test node and test whether the tip advances.
|
||||
|
||||
- add all blocks to our block_store
|
||||
- send a headers message for the final block
|
||||
- the on_getheaders handler will ensure that any getheaders are responded to
|
||||
- if request_block is True: wait for getdata for each of the blocks. The on_getdata handler will
|
||||
ensure that any getdata messages are responded to
|
||||
- if success is True: assert that the node's tip advances to the most recent block
|
||||
- if success is False: assert that the node's tip doesn't advance
|
||||
- if reject_code and reject_reason are set: assert that the correct reject message is received"""
|
||||
|
||||
with mininode_lock:
|
||||
self.reject_code_received = None
|
||||
self.reject_reason_received = None
|
||||
|
||||
for block in blocks:
|
||||
self.block_store[block.sha256] = block
|
||||
self.last_block_hash = block.sha256
|
||||
|
||||
self.send_message(msg_headers([blocks[-1]]))
|
||||
|
||||
if request_block:
|
||||
wait_until(lambda: blocks[-1].sha256 in self.getdata_requests, timeout=timeout, lock=mininode_lock)
|
||||
|
||||
if success:
|
||||
wait_until(lambda: rpc.getbestblockhash() == blocks[-1].hash, timeout=timeout)
|
||||
else:
|
||||
assert rpc.getbestblockhash() != blocks[-1].hash
|
||||
|
||||
if reject_code is not None:
|
||||
wait_until(lambda: self.reject_code_received == reject_code, lock=mininode_lock)
|
||||
if reject_reason is not None:
|
||||
wait_until(lambda: self.reject_reason_received == reject_reason, lock=mininode_lock)
|
||||
|
||||
def send_txs_and_test(self, txs, rpc, success=True, reject_code=None, reject_reason=None):
|
||||
"""Send txs to test node and test whether they're accepted to the mempool.
|
||||
|
||||
- add all txs to our tx_store
|
||||
- send tx messages for all txs
|
||||
- if success is True: assert that the tx is accepted to the mempool
|
||||
- if success is False: assert that the tx is not accepted to the mempool
|
||||
- if reject_code and reject_reason are set: assert that the correct reject message is received."""
|
||||
|
||||
with mininode_lock:
|
||||
self.reject_code_received = None
|
||||
self.reject_reason_received = None
|
||||
|
||||
for tx in txs:
|
||||
self.tx_store[tx.sha256] = tx
|
||||
|
||||
for tx in txs:
|
||||
self.send_message(msg_tx(tx))
|
||||
|
||||
self.sync_with_ping()
|
||||
|
||||
raw_mempool = rpc.getrawmempool()
|
||||
if success:
|
||||
# Check that all txs are now in the mempool
|
||||
for tx in txs:
|
||||
assert tx.hash in raw_mempool, "{} not found in mempool".format(tx.hash)
|
||||
else:
|
||||
# Check that none of the txs are now in the mempool
|
||||
for tx in txs:
|
||||
assert tx.hash not in raw_mempool, "{} tx found in mempool".format(tx.hash)
|
||||
|
||||
if reject_code is not None:
|
||||
wait_until(lambda: self.reject_code_received == reject_code, lock=mininode_lock)
|
||||
if reject_reason is not None:
|
||||
wait_until(lambda: self.reject_reason_received == reject_reason, lock=mininode_lock)
|
||||
|
|
Loading…
Reference in a new issue