BIP0144+peer: specify the wire encoding type when reading/writing messages

This commit modifies the base peer struct to ascertain when a peer is
able to understand the new witness encoding, and specify the peer’s
supported encoding explicitly before/after the version handshake.
This commit is contained in:
Olaoluwa Osuntokun 2016-10-18 16:45:18 -07:00 committed by Dave Collins
parent 192bfbf123
commit 7a1456aae5
2 changed files with 52 additions and 17 deletions

View file

@ -91,8 +91,12 @@ func invSummary(invList []*wire.InvVect) string {
switch iv.Type {
case wire.InvTypeError:
return fmt.Sprintf("error %s", iv.Hash)
case wire.InvTypeWitnessBlock:
return fmt.Sprintf("witness block %s", iv.Hash)
case wire.InvTypeBlock:
return fmt.Sprintf("block %s", iv.Hash)
case wire.InvTypeWitnessTx:
return fmt.Sprintf("witness tx %s", iv.Hash)
case wire.InvTypeTx:
return fmt.Sprintf("tx %s", iv.Hash)
}

View file

@ -297,6 +297,7 @@ func newNetAddress(addr net.Addr, services wire.ServiceFlag) (*wire.NetAddress,
type outMsg struct {
msg wire.Message
doneChan chan<- struct{}
encoding wire.MessageEncoding
}
// stallControlCmd represents the command of a stall control message.
@ -412,6 +413,8 @@ type Peer struct {
sendHeadersPreferred bool // peer sent a sendheaders message
verAckReceived bool
wireEncoding wire.MessageEncoding
knownInventory *mruInventoryMap
prevGetBlocksMtx sync.Mutex
prevGetBlocksBegin *chainhash.Hash
@ -1016,14 +1019,14 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
wire.MultipleAddressVersion)
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectObsolete,
reason)
return p.writeMessage(rejectMsg)
return p.writeMessage(rejectMsg, wire.LatestEncoding)
}
// Updating a bunch of stats.
// Updating a bunch of stats including block based stats, and the
// peer's time offset.
p.statsMtx.Lock()
p.lastBlock = msg.LastBlock
p.startingHeight = msg.LastBlock
// Set the peer's time offset.
p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
p.statsMtx.Unlock()
@ -1034,14 +1037,27 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
p.versionKnown = true
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)
// Set the peer's ID.
p.id = atomic.AddInt32(&nodeCount, 1)
// Set the supported services for the peer to what the remote peer
// advertised.
p.services = msg.Services
// Set the remote peer's user agent.
p.userAgent = msg.UserAgent
p.flagsMtx.Unlock()
// Once the version message has been exchanged, we're able to determine
// if this peer knows how to encode witness data over the wire
// protocol. If so, then we'll switch to a decoding mode which is
// prepared for the new transaction format introduced as part of
// BIP0144.
if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
p.wireEncoding = wire.WitnessEncoding
}
return nil
}
@ -1081,9 +1097,9 @@ func (p *Peer) handlePongMsg(msg *wire.MsgPong) {
}
// readMessage reads the next bitcoin message from the peer with logging.
func (p *Peer) readMessage() (wire.Message, []byte, error) {
n, msg, buf, err := wire.ReadMessageN(p.conn, p.ProtocolVersion(),
p.cfg.ChainParams.Net)
func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte, error) {
n, msg, buf, err := wire.ReadMessageWithEncodingN(p.conn,
p.ProtocolVersion(), p.cfg.ChainParams.Net, encoding)
atomic.AddUint64(&p.bytesReceived, uint64(n))
if p.cfg.Listeners.OnRead != nil {
p.cfg.Listeners.OnRead(p, n, msg, err)
@ -1114,7 +1130,7 @@ func (p *Peer) readMessage() (wire.Message, []byte, error) {
}
// writeMessage sends a bitcoin message to the peer with logging.
func (p *Peer) writeMessage(msg wire.Message) error {
func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error {
// Don't do anything if we're disconnecting.
if atomic.LoadInt32(&p.disconnect) != 0 {
return nil
@ -1136,8 +1152,8 @@ func (p *Peer) writeMessage(msg wire.Message) error {
}))
log.Tracef("%v", newLogClosure(func() string {
var buf bytes.Buffer
err := wire.WriteMessage(&buf, msg, p.ProtocolVersion(),
p.cfg.ChainParams.Net)
_, err := wire.WriteMessageWithEncodingN(&buf, msg, p.ProtocolVersion(),
p.cfg.ChainParams.Net, enc)
if err != nil {
return err.Error()
}
@ -1145,8 +1161,8 @@ func (p *Peer) writeMessage(msg wire.Message) error {
}))
// Write the message to the peer.
n, err := wire.WriteMessageN(p.conn, msg, p.ProtocolVersion(),
p.cfg.ChainParams.Net)
n, err := wire.WriteMessageWithEncodingN(p.conn, msg,
p.ProtocolVersion(), p.cfg.ChainParams.Net, enc)
atomic.AddUint64(&p.bytesSent, uint64(n))
if p.cfg.Listeners.OnWrite != nil {
p.cfg.Listeners.OnWrite(p, n, msg, err)
@ -1408,7 +1424,7 @@ out:
// Read a message and stop the idle timer as soon as the read
// is done. The timer is reset below for the next iteration if
// needed.
rmsg, buf, err := p.readMessage()
rmsg, buf, err := p.readMessage(p.wireEncoding)
idleTimer.Stop()
if err != nil {
// In order to allow regression tests with malformed messages, don't
@ -1768,7 +1784,9 @@ out:
}
p.stallControl <- stallControlMsg{sccSendMessage, msg.msg}
if err := p.writeMessage(msg.msg); err != nil {
err := p.writeMessage(msg.msg, msg.encoding)
if err != nil {
p.Disconnect()
if p.shouldLogWriteError(err) {
log.Errorf("Failed to send message to "+
@ -1844,6 +1862,18 @@ out:
//
// This function is safe for concurrent access.
func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) {
p.QueueMessageWithEncoding(msg, doneChan, wire.BaseEncoding)
}
// QueueMessageWithEncoding adds the passed bitcoin message to the peer send
// queue. This function is identical to QueueMessage, however it allows the
// caller to specify the wire encoding type that should be used when
// encoding/decoding blocks and transactions.
//
// This function is safe for concurrent access.
func (p *Peer) QueueMessageWithEncoding(msg wire.Message, doneChan chan<- struct{},
encoding wire.MessageEncoding) {
// Avoid risk of deadlock if goroutine already exited. The goroutine
// we will be sending to hangs around until it knows for a fact that
// it is marked as disconnected and *then* it drains the channels.
@ -1855,7 +1885,7 @@ func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) {
}
return
}
p.outputQueue <- outMsg{msg: msg, doneChan: doneChan}
p.outputQueue <- outMsg{msg: msg, encoding: encoding, doneChan: doneChan}
}
// QueueInventory adds the passed inventory to the inventory send queue which
@ -1987,7 +2017,7 @@ func (p *Peer) WaitForDisconnect() {
// acceptable then return an error.
func (p *Peer) readRemoteVersionMsg() error {
// Read their version message.
msg, _, err := p.readMessage()
msg, _, err := p.readMessage(wire.LatestEncoding)
if err != nil {
return err
}
@ -1999,7 +2029,7 @@ func (p *Peer) readRemoteVersionMsg() error {
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
errStr)
return p.writeMessage(rejectMsg)
return p.writeMessage(rejectMsg, wire.LatestEncoding)
}
if err := p.handleRemoteVersionMsg(remoteVerMsg); err != nil {
@ -2019,7 +2049,7 @@ func (p *Peer) writeLocalVersionMsg() error {
return err
}
return p.writeMessage(localVerMsg)
return p.writeMessage(localVerMsg, wire.LatestEncoding)
}
// negotiateInboundProtocol waits to receive a version message from the peer
@ -2062,6 +2092,7 @@ func newPeerBase(origCfg *Config, inbound bool) *Peer {
p := Peer{
inbound: inbound,
wireEncoding: wire.BaseEncoding,
knownInventory: newMruInventoryMap(maxKnownInventory),
stallControl: make(chan stallControlMsg, 1), // nonblocking sync
outputQueue: make(chan outMsg, outputBufferSize),