Merge #16197: net: Use mockable time for tx download
fab3658356
[qa] Test that getdata requests work as expected (Suhas Daftuar)fa883ab35a
net: Use mockable time for tx download (MarcoFalke) Pull request description: Two commits: * First commit changes to mockable time for tx download (refactoring, should only have an effect on regtest) * Second commit adds a test that uses mocktime to test tx download ACKs for top commit: laanwj: code review ACK 16197/commits/fab365835639a3da03f8ad9a58a0db6c6c4c2314 jamesob: ACKfab3658356
Tree-SHA512: 3a64a3e283ec4bab1f6e506404b11f0a564a5b61d2a7508ae738a61f035e57220484c66e0ae47d847fe9f7e3ff5cc834909d7b34a9bbcea6abe01f8742806908
This commit is contained in:
commit
c77f7cdbd1
5 changed files with 218 additions and 32 deletions
|
@ -68,13 +68,13 @@ static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100;
|
||||||
/** Maximum number of announced transactions from a peer */
|
/** Maximum number of announced transactions from a peer */
|
||||||
static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
|
static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
|
||||||
/** How many microseconds to delay requesting transactions from inbound peers */
|
/** How many microseconds to delay requesting transactions from inbound peers */
|
||||||
static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; // 2 seconds
|
static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::seconds{2}};
|
||||||
/** How long to wait (in microseconds) before downloading a transaction from an additional peer */
|
/** How long to wait (in microseconds) before downloading a transaction from an additional peer */
|
||||||
static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; // 1 minute
|
static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{std::chrono::seconds{60}};
|
||||||
/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
|
/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
|
||||||
static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; // 2 seconds
|
static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{std::chrono::seconds{2}};
|
||||||
/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */
|
/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */
|
||||||
static constexpr int64_t TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL;
|
static constexpr std::chrono::microseconds TX_EXPIRY_INTERVAL{GETDATA_TX_INTERVAL * 10};
|
||||||
static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY,
|
static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY,
|
||||||
"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY");
|
"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY");
|
||||||
/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
|
/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
|
||||||
|
@ -340,16 +340,16 @@ struct CNodeState {
|
||||||
/* Track when to attempt download of announced transactions (process
|
/* Track when to attempt download of announced transactions (process
|
||||||
* time in micros -> txid)
|
* time in micros -> txid)
|
||||||
*/
|
*/
|
||||||
std::multimap<int64_t, uint256> m_tx_process_time;
|
std::multimap<std::chrono::microseconds, uint256> m_tx_process_time;
|
||||||
|
|
||||||
//! Store all the transactions a peer has recently announced
|
//! Store all the transactions a peer has recently announced
|
||||||
std::set<uint256> m_tx_announced;
|
std::set<uint256> m_tx_announced;
|
||||||
|
|
||||||
//! Store transactions which were requested by us, with timestamp
|
//! Store transactions which were requested by us, with timestamp
|
||||||
std::map<uint256, int64_t> m_tx_in_flight;
|
std::map<uint256, std::chrono::microseconds> m_tx_in_flight;
|
||||||
|
|
||||||
//! Periodically check for stuck getdata requests
|
//! Periodically check for stuck getdata requests
|
||||||
int64_t m_check_expiry_timer{0};
|
std::chrono::microseconds m_check_expiry_timer{0};
|
||||||
};
|
};
|
||||||
|
|
||||||
TxDownloadState m_tx_download;
|
TxDownloadState m_tx_download;
|
||||||
|
@ -391,7 +391,7 @@ struct CNodeState {
|
||||||
};
|
};
|
||||||
|
|
||||||
// Keeps track of the time (in microseconds) when transactions were requested last time
|
// Keeps track of the time (in microseconds) when transactions were requested last time
|
||||||
limitedmap<uint256, int64_t> g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ);
|
limitedmap<uint256, std::chrono::microseconds> g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ);
|
||||||
|
|
||||||
/** Map maintaining per-node state. */
|
/** Map maintaining per-node state. */
|
||||||
static std::map<NodeId, CNodeState> mapNodeState GUARDED_BY(cs_main);
|
static std::map<NodeId, CNodeState> mapNodeState GUARDED_BY(cs_main);
|
||||||
|
@ -688,16 +688,16 @@ void EraseTxRequest(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||||
g_already_asked_for.erase(txid);
|
g_already_asked_for.erase(txid);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t GetTxRequestTime(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
std::chrono::microseconds GetTxRequestTime(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||||
{
|
{
|
||||||
auto it = g_already_asked_for.find(txid);
|
auto it = g_already_asked_for.find(txid);
|
||||||
if (it != g_already_asked_for.end()) {
|
if (it != g_already_asked_for.end()) {
|
||||||
return it->second;
|
return it->second;
|
||||||
}
|
}
|
||||||
return 0;
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
void UpdateTxRequestTime(const uint256& txid, std::chrono::microseconds request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||||
{
|
{
|
||||||
auto it = g_already_asked_for.find(txid);
|
auto it = g_already_asked_for.find(txid);
|
||||||
if (it == g_already_asked_for.end()) {
|
if (it == g_already_asked_for.end()) {
|
||||||
|
@ -707,17 +707,17 @@ void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LO
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
std::chrono::microseconds CalculateTxGetDataTime(const uint256& txid, std::chrono::microseconds current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||||
{
|
{
|
||||||
int64_t process_time;
|
std::chrono::microseconds process_time;
|
||||||
int64_t last_request_time = GetTxRequestTime(txid);
|
const auto last_request_time = GetTxRequestTime(txid);
|
||||||
// First time requesting this tx
|
// First time requesting this tx
|
||||||
if (last_request_time == 0) {
|
if (last_request_time.count() == 0) {
|
||||||
process_time = current_time;
|
process_time = current_time;
|
||||||
} else {
|
} else {
|
||||||
// Randomize the delay to avoid biasing some peers over others (such as due to
|
// Randomize the delay to avoid biasing some peers over others (such as due to
|
||||||
// fixed ordering of peer processing in ThreadMessageHandler)
|
// fixed ordering of peer processing in ThreadMessageHandler)
|
||||||
process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY);
|
process_time = last_request_time + GETDATA_TX_INTERVAL + GetRandMicros(MAX_GETDATA_RANDOM_DELAY);
|
||||||
}
|
}
|
||||||
|
|
||||||
// We delay processing announcements from inbound peers
|
// We delay processing announcements from inbound peers
|
||||||
|
@ -726,7 +726,7 @@ int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool u
|
||||||
return process_time;
|
return process_time;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
void RequestTx(CNodeState* state, const uint256& txid, std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||||
{
|
{
|
||||||
CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
|
CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
|
||||||
if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
|
if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
|
||||||
|
@ -740,7 +740,7 @@ void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_L
|
||||||
|
|
||||||
// Calculate the time to try requesting this transaction. Use
|
// Calculate the time to try requesting this transaction. Use
|
||||||
// fPreferredDownload as a proxy for outbound peers.
|
// fPreferredDownload as a proxy for outbound peers.
|
||||||
int64_t process_time = CalculateTxGetDataTime(txid, nNow, !state->fPreferredDownload);
|
const auto process_time = CalculateTxGetDataTime(txid, current_time, !state->fPreferredDownload);
|
||||||
|
|
||||||
peer_download_state.m_tx_process_time.emplace(process_time, txid);
|
peer_download_state.m_tx_process_time.emplace(process_time, txid);
|
||||||
}
|
}
|
||||||
|
@ -2223,7 +2223,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
|
||||||
LOCK(cs_main);
|
LOCK(cs_main);
|
||||||
|
|
||||||
uint32_t nFetchFlags = GetFetchFlags(pfrom);
|
uint32_t nFetchFlags = GetFetchFlags(pfrom);
|
||||||
int64_t nNow = GetTimeMicros();
|
const auto current_time = GetTime<std::chrono::microseconds>();
|
||||||
|
|
||||||
for (CInv &inv : vInv)
|
for (CInv &inv : vInv)
|
||||||
{
|
{
|
||||||
|
@ -2255,7 +2255,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
|
||||||
if (fBlocksOnly) {
|
if (fBlocksOnly) {
|
||||||
LogPrint(BCLog::NET, "transaction (%s) inv sent in violation of protocol peer=%d\n", inv.hash.ToString(), pfrom->GetId());
|
LogPrint(BCLog::NET, "transaction (%s) inv sent in violation of protocol peer=%d\n", inv.hash.ToString(), pfrom->GetId());
|
||||||
} else if (!fAlreadyHave && !fImporting && !fReindex && !::ChainstateActive().IsInitialBlockDownload()) {
|
} else if (!fAlreadyHave && !fImporting && !fReindex && !::ChainstateActive().IsInitialBlockDownload()) {
|
||||||
RequestTx(State(pfrom->GetId()), inv.hash, nNow);
|
RequestTx(State(pfrom->GetId()), inv.hash, current_time);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2529,12 +2529,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
|
||||||
}
|
}
|
||||||
if (!fRejectedParents) {
|
if (!fRejectedParents) {
|
||||||
uint32_t nFetchFlags = GetFetchFlags(pfrom);
|
uint32_t nFetchFlags = GetFetchFlags(pfrom);
|
||||||
int64_t nNow = GetTimeMicros();
|
const auto current_time = GetTime<std::chrono::microseconds>();
|
||||||
|
|
||||||
for (const CTxIn& txin : tx.vin) {
|
for (const CTxIn& txin : tx.vin) {
|
||||||
CInv _inv(MSG_TX | nFetchFlags, txin.prevout.hash);
|
CInv _inv(MSG_TX | nFetchFlags, txin.prevout.hash);
|
||||||
pfrom->AddInventoryKnown(_inv);
|
pfrom->AddInventoryKnown(_inv);
|
||||||
if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, nNow);
|
if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, current_time);
|
||||||
}
|
}
|
||||||
AddOrphanTx(ptx, pfrom->GetId());
|
AddOrphanTx(ptx, pfrom->GetId());
|
||||||
|
|
||||||
|
@ -3906,6 +3906,9 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
|
||||||
connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
|
connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
|
||||||
|
|
||||||
// Detect whether we're stalling
|
// Detect whether we're stalling
|
||||||
|
const auto current_time = GetTime<std::chrono::microseconds>();
|
||||||
|
// nNow is the current system time (GetTimeMicros is not mockable) and
|
||||||
|
// should be replaced by the mockable current_time eventually
|
||||||
nNow = GetTimeMicros();
|
nNow = GetTimeMicros();
|
||||||
if (state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) {
|
if (state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) {
|
||||||
// Stalling only triggers when the block download window cannot move. During normal steady state,
|
// Stalling only triggers when the block download window cannot move. During normal steady state,
|
||||||
|
@ -3998,9 +4001,9 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
|
||||||
// were unresponsive in the past.
|
// were unresponsive in the past.
|
||||||
// Eventually we should consider disconnecting peers, but this is
|
// Eventually we should consider disconnecting peers, but this is
|
||||||
// conservative.
|
// conservative.
|
||||||
if (state.m_tx_download.m_check_expiry_timer <= nNow) {
|
if (state.m_tx_download.m_check_expiry_timer <= current_time) {
|
||||||
for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) {
|
for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) {
|
||||||
if (it->second <= nNow - TX_EXPIRY_INTERVAL) {
|
if (it->second <= current_time - TX_EXPIRY_INTERVAL) {
|
||||||
LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId());
|
LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId());
|
||||||
state.m_tx_download.m_tx_announced.erase(it->first);
|
state.m_tx_download.m_tx_announced.erase(it->first);
|
||||||
state.m_tx_download.m_tx_in_flight.erase(it++);
|
state.m_tx_download.m_tx_in_flight.erase(it++);
|
||||||
|
@ -4010,11 +4013,11 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
|
||||||
}
|
}
|
||||||
// On average, we do this check every TX_EXPIRY_INTERVAL. Randomize
|
// On average, we do this check every TX_EXPIRY_INTERVAL. Randomize
|
||||||
// so that we're not doing this for all peers at the same time.
|
// so that we're not doing this for all peers at the same time.
|
||||||
state.m_tx_download.m_check_expiry_timer = nNow + TX_EXPIRY_INTERVAL/2 + GetRand(TX_EXPIRY_INTERVAL);
|
state.m_tx_download.m_check_expiry_timer = current_time + TX_EXPIRY_INTERVAL / 2 + GetRandMicros(TX_EXPIRY_INTERVAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto& tx_process_time = state.m_tx_download.m_tx_process_time;
|
auto& tx_process_time = state.m_tx_download.m_tx_process_time;
|
||||||
while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
|
while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
|
||||||
const uint256 txid = tx_process_time.begin()->second;
|
const uint256 txid = tx_process_time.begin()->second;
|
||||||
// Erase this entry from tx_process_time (it may be added back for
|
// Erase this entry from tx_process_time (it may be added back for
|
||||||
// processing at a later time, see below)
|
// processing at a later time, see below)
|
||||||
|
@ -4023,22 +4026,22 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
|
||||||
if (!AlreadyHave(inv)) {
|
if (!AlreadyHave(inv)) {
|
||||||
// If this transaction was last requested more than 1 minute ago,
|
// If this transaction was last requested more than 1 minute ago,
|
||||||
// then request.
|
// then request.
|
||||||
int64_t last_request_time = GetTxRequestTime(inv.hash);
|
const auto last_request_time = GetTxRequestTime(inv.hash);
|
||||||
if (last_request_time <= nNow - GETDATA_TX_INTERVAL) {
|
if (last_request_time <= current_time - GETDATA_TX_INTERVAL) {
|
||||||
LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId());
|
LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId());
|
||||||
vGetData.push_back(inv);
|
vGetData.push_back(inv);
|
||||||
if (vGetData.size() >= MAX_GETDATA_SZ) {
|
if (vGetData.size() >= MAX_GETDATA_SZ) {
|
||||||
connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
|
connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
|
||||||
vGetData.clear();
|
vGetData.clear();
|
||||||
}
|
}
|
||||||
UpdateTxRequestTime(inv.hash, nNow);
|
UpdateTxRequestTime(inv.hash, current_time);
|
||||||
state.m_tx_download.m_tx_in_flight.emplace(inv.hash, nNow);
|
state.m_tx_download.m_tx_in_flight.emplace(inv.hash, current_time);
|
||||||
} else {
|
} else {
|
||||||
// This transaction is in flight from someone else; queue
|
// This transaction is in flight from someone else; queue
|
||||||
// up processing to happen after the download times out
|
// up processing to happen after the download times out
|
||||||
// (with a slight delay for inbound peers, to prefer
|
// (with a slight delay for inbound peers, to prefer
|
||||||
// requests to outbound peers).
|
// requests to outbound peers).
|
||||||
int64_t next_process_time = CalculateTxGetDataTime(txid, nNow, !state.fPreferredDownload);
|
const auto next_process_time = CalculateTxGetDataTime(txid, current_time, !state.fPreferredDownload);
|
||||||
tx_process_time.emplace(next_process_time, txid);
|
tx_process_time.emplace(next_process_time, txid);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -667,6 +667,11 @@ uint64_t GetRand(uint64_t nMax) noexcept
|
||||||
return FastRandomContext(g_mock_deterministic_tests).randrange(nMax);
|
return FastRandomContext(g_mock_deterministic_tests).randrange(nMax);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept
|
||||||
|
{
|
||||||
|
return std::chrono::microseconds{GetRand(duration_max.count())};
|
||||||
|
}
|
||||||
|
|
||||||
int GetRandInt(int nMax) noexcept
|
int GetRandInt(int nMax) noexcept
|
||||||
{
|
{
|
||||||
return GetRand(nMax);
|
return GetRand(nMax);
|
||||||
|
|
|
@ -10,7 +10,8 @@
|
||||||
#include <crypto/common.h>
|
#include <crypto/common.h>
|
||||||
#include <uint256.h>
|
#include <uint256.h>
|
||||||
|
|
||||||
#include <stdint.h>
|
#include <chrono> // For std::chrono::microseconds
|
||||||
|
#include <cstdint>
|
||||||
#include <limits>
|
#include <limits>
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -69,6 +70,7 @@
|
||||||
*/
|
*/
|
||||||
void GetRandBytes(unsigned char* buf, int num) noexcept;
|
void GetRandBytes(unsigned char* buf, int num) noexcept;
|
||||||
uint64_t GetRand(uint64_t nMax) noexcept;
|
uint64_t GetRand(uint64_t nMax) noexcept;
|
||||||
|
std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept;
|
||||||
int GetRandInt(int nMax) noexcept;
|
int GetRandInt(int nMax) noexcept;
|
||||||
uint256 GetRandHash() noexcept;
|
uint256 GetRandHash() noexcept;
|
||||||
|
|
||||||
|
|
175
test/functional/p2p_tx_download.py
Executable file
175
test/functional/p2p_tx_download.py
Executable file
|
@ -0,0 +1,175 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# Copyright (c) 2019 The Bitcoin Core developers
|
||||||
|
# Distributed under the MIT software license, see the accompanying
|
||||||
|
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||||
|
"""
|
||||||
|
Test transaction download behavior
|
||||||
|
"""
|
||||||
|
|
||||||
|
from test_framework.messages import (
|
||||||
|
CInv,
|
||||||
|
CTransaction,
|
||||||
|
FromHex,
|
||||||
|
MSG_TX,
|
||||||
|
MSG_TYPE_MASK,
|
||||||
|
msg_inv,
|
||||||
|
msg_notfound,
|
||||||
|
)
|
||||||
|
from test_framework.mininode import (
|
||||||
|
P2PInterface,
|
||||||
|
mininode_lock,
|
||||||
|
)
|
||||||
|
from test_framework.test_framework import BitcoinTestFramework
|
||||||
|
from test_framework.util import (
|
||||||
|
assert_equal,
|
||||||
|
wait_until,
|
||||||
|
)
|
||||||
|
from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
class TestP2PConn(P2PInterface):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
self.tx_getdata_count = 0
|
||||||
|
|
||||||
|
def on_getdata(self, message):
|
||||||
|
for i in message.inv:
|
||||||
|
if i.type & MSG_TYPE_MASK == MSG_TX:
|
||||||
|
self.tx_getdata_count += 1
|
||||||
|
|
||||||
|
|
||||||
|
# Constants from net_processing
|
||||||
|
GETDATA_TX_INTERVAL = 60 # seconds
|
||||||
|
MAX_GETDATA_RANDOM_DELAY = 2 # seconds
|
||||||
|
INBOUND_PEER_TX_DELAY = 2 # seconds
|
||||||
|
MAX_GETDATA_IN_FLIGHT = 100
|
||||||
|
TX_EXPIRY_INTERVAL = GETDATA_TX_INTERVAL * 10
|
||||||
|
|
||||||
|
# Python test constants
|
||||||
|
NUM_INBOUND = 10
|
||||||
|
MAX_GETDATA_INBOUND_WAIT = GETDATA_TX_INTERVAL + MAX_GETDATA_RANDOM_DELAY + INBOUND_PEER_TX_DELAY
|
||||||
|
|
||||||
|
|
||||||
|
class TxDownloadTest(BitcoinTestFramework):
|
||||||
|
def set_test_params(self):
|
||||||
|
self.setup_clean_chain = False
|
||||||
|
self.num_nodes = 2
|
||||||
|
|
||||||
|
def test_tx_requests(self):
|
||||||
|
self.log.info("Test that we request transactions from all our peers, eventually")
|
||||||
|
|
||||||
|
txid = 0xdeadbeef
|
||||||
|
|
||||||
|
self.log.info("Announce the txid from each incoming peer to node 0")
|
||||||
|
msg = msg_inv([CInv(t=1, h=txid)])
|
||||||
|
for p in self.nodes[0].p2ps:
|
||||||
|
p.send_message(msg)
|
||||||
|
p.sync_with_ping()
|
||||||
|
|
||||||
|
outstanding_peer_index = [i for i in range(len(self.nodes[0].p2ps))]
|
||||||
|
|
||||||
|
def getdata_found(peer_index):
|
||||||
|
p = self.nodes[0].p2ps[peer_index]
|
||||||
|
with mininode_lock:
|
||||||
|
return p.last_message.get("getdata") and p.last_message["getdata"].inv[-1].hash == txid
|
||||||
|
|
||||||
|
node_0_mocktime = int(time.time())
|
||||||
|
while outstanding_peer_index:
|
||||||
|
node_0_mocktime += MAX_GETDATA_INBOUND_WAIT
|
||||||
|
self.nodes[0].setmocktime(node_0_mocktime)
|
||||||
|
wait_until(lambda: any(getdata_found(i) for i in outstanding_peer_index))
|
||||||
|
for i in outstanding_peer_index:
|
||||||
|
if getdata_found(i):
|
||||||
|
outstanding_peer_index.remove(i)
|
||||||
|
|
||||||
|
self.nodes[0].setmocktime(0)
|
||||||
|
self.log.info("All outstanding peers received a getdata")
|
||||||
|
|
||||||
|
def test_inv_block(self):
|
||||||
|
self.log.info("Generate a transaction on node 0")
|
||||||
|
tx = self.nodes[0].createrawtransaction(
|
||||||
|
inputs=[{ # coinbase
|
||||||
|
"txid": self.nodes[0].getblock(self.nodes[0].getblockhash(1))['tx'][0],
|
||||||
|
"vout": 0
|
||||||
|
}],
|
||||||
|
outputs={ADDRESS_BCRT1_UNSPENDABLE: 50 - 0.00025},
|
||||||
|
)
|
||||||
|
tx = self.nodes[0].signrawtransactionwithkey(
|
||||||
|
hexstring=tx,
|
||||||
|
privkeys=[self.nodes[0].get_deterministic_priv_key().key],
|
||||||
|
)['hex']
|
||||||
|
ctx = FromHex(CTransaction(), tx)
|
||||||
|
txid = int(ctx.rehash(), 16)
|
||||||
|
|
||||||
|
self.log.info(
|
||||||
|
"Announce the transaction to all nodes from all {} incoming peers, but never send it".format(NUM_INBOUND))
|
||||||
|
msg = msg_inv([CInv(t=1, h=txid)])
|
||||||
|
for p in self.peers:
|
||||||
|
p.send_message(msg)
|
||||||
|
p.sync_with_ping()
|
||||||
|
|
||||||
|
self.log.info("Put the tx in node 0's mempool")
|
||||||
|
self.nodes[0].sendrawtransaction(tx)
|
||||||
|
|
||||||
|
# Since node 1 is connected outbound to an honest peer (node 0), it
|
||||||
|
# should get the tx within a timeout. (Assuming that node 0
|
||||||
|
# announced the tx within the timeout)
|
||||||
|
# The timeout is the sum of
|
||||||
|
# * the worst case until the tx is first requested from an inbound
|
||||||
|
# peer, plus
|
||||||
|
# * the first time it is re-requested from the outbound peer, plus
|
||||||
|
# * 2 seconds to avoid races
|
||||||
|
timeout = 2 + (MAX_GETDATA_RANDOM_DELAY + INBOUND_PEER_TX_DELAY) + (
|
||||||
|
GETDATA_TX_INTERVAL + MAX_GETDATA_RANDOM_DELAY)
|
||||||
|
self.log.info("Tx should be received at node 1 after {} seconds".format(timeout))
|
||||||
|
self.sync_mempools(timeout=timeout)
|
||||||
|
|
||||||
|
def test_in_flight_max(self):
|
||||||
|
self.log.info("Test that we don't request more than {} transactions from any peer, every {} minutes".format(
|
||||||
|
MAX_GETDATA_IN_FLIGHT, TX_EXPIRY_INTERVAL / 60))
|
||||||
|
txids = [i for i in range(MAX_GETDATA_IN_FLIGHT + 2)]
|
||||||
|
|
||||||
|
p = self.nodes[0].p2ps[0]
|
||||||
|
|
||||||
|
with mininode_lock:
|
||||||
|
p.tx_getdata_count = 0
|
||||||
|
|
||||||
|
p.send_message(msg_inv([CInv(t=1, h=i) for i in txids]))
|
||||||
|
wait_until(lambda: p.tx_getdata_count >= MAX_GETDATA_IN_FLIGHT, lock=mininode_lock)
|
||||||
|
with mininode_lock:
|
||||||
|
assert_equal(p.tx_getdata_count, MAX_GETDATA_IN_FLIGHT)
|
||||||
|
|
||||||
|
self.log.info("Now check that if we send a NOTFOUND for a transaction, we'll get one more request")
|
||||||
|
p.send_message(msg_notfound(vec=[CInv(t=1, h=txids[0])]))
|
||||||
|
wait_until(lambda: p.tx_getdata_count >= MAX_GETDATA_IN_FLIGHT + 1, timeout=10, lock=mininode_lock)
|
||||||
|
with mininode_lock:
|
||||||
|
assert_equal(p.tx_getdata_count, MAX_GETDATA_IN_FLIGHT + 1)
|
||||||
|
|
||||||
|
WAIT_TIME = TX_EXPIRY_INTERVAL // 2 + TX_EXPIRY_INTERVAL
|
||||||
|
self.log.info("if we wait about {} minutes, we should eventually get more requests".format(WAIT_TIME / 60))
|
||||||
|
self.nodes[0].setmocktime(int(time.time() + WAIT_TIME))
|
||||||
|
wait_until(lambda: p.tx_getdata_count == MAX_GETDATA_IN_FLIGHT + 2)
|
||||||
|
self.nodes[0].setmocktime(0)
|
||||||
|
|
||||||
|
def run_test(self):
|
||||||
|
# Setup the p2p connections
|
||||||
|
self.peers = []
|
||||||
|
for node in self.nodes:
|
||||||
|
for i in range(NUM_INBOUND):
|
||||||
|
self.peers.append(node.add_p2p_connection(TestP2PConn()))
|
||||||
|
|
||||||
|
self.log.info("Nodes are setup with {} incoming connections each".format(NUM_INBOUND))
|
||||||
|
|
||||||
|
# Test the in-flight max first, because we want no transactions in
|
||||||
|
# flight ahead of this test.
|
||||||
|
self.test_in_flight_max()
|
||||||
|
|
||||||
|
self.test_inv_block()
|
||||||
|
|
||||||
|
self.test_tx_requests()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
TxDownloadTest().main()
|
|
@ -91,6 +91,7 @@ BASE_SCRIPTS = [
|
||||||
'wallet_labels.py',
|
'wallet_labels.py',
|
||||||
'p2p_segwit.py',
|
'p2p_segwit.py',
|
||||||
'p2p_timeouts.py',
|
'p2p_timeouts.py',
|
||||||
|
'p2p_tx_download.py',
|
||||||
'wallet_dump.py',
|
'wallet_dump.py',
|
||||||
'wallet_listtransactions.py',
|
'wallet_listtransactions.py',
|
||||||
# vv Tests less than 60s vv
|
# vv Tests less than 60s vv
|
||||||
|
|
Loading…
Reference in a new issue