p2p的网络发现协议使用了Kademlia protocol 来处理网络的节点发现。节点查找和节点更新。Kademlia protocol使用了UDP协议来进行网络通信。
阅读这部分的代码建议先看看references里面的Kademlia协议简介来看看什么是Kademlia协议。
首先看看数据结构。 网络传输了4种数据包(UDP协议是基于报文的协议。传输的是一个一个数据包),分别是ping,pong,findnode和neighbors。 下面分别定义了4种报文的格式。
// RPC packet types
const (
pingPacket = iota + 1 // zero is 'reserved'
pongPacket
findnodePacket
neighborsPacket
)
// RPC request structures
type (
ping struct {
Version uint //协议版本
From, To rpcEndpoint //源IP地址 目的IP地址
Expiration uint64 //超时时间
// Ignore additional fields (for forward compatibility).
//可以忽略的字段。 为了向前兼容
Rest []rlp.RawValue `rlp:"tail"`
}
// pong is the reply to ping.
// ping包的回应
pong struct {
// This field should mirror the UDP envelope address
// of the ping packet, which provides a way to discover the
// the external address (after NAT).
// 目的IP地址
To rpcEndpoint
// 说明这个pong包是回应那个ping包的。 包含了ping包的hash值
ReplyTok []byte // This contains the hash of the ping packet.
//包超时的绝对时间。 如果收到包的时候超过了这个时间,那么包被认为是超时的。
Expiration uint64 // Absolute timestamp at which the packet becomes invalid.
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
}
// findnode 是用来查询距离target比较近的节点
// findnode is a query for nodes close to the given target.
findnode struct {
// 目的节点
Target NodeID // doesn't need to be an actual public key
Expiration uint64
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
}
// reply to findnode
// findnode的回应
neighbors struct {
//距离target比较近的节点值。
Nodes []rpcNode
Expiration uint64
// Ignore additional fields (for forward compatibility).
Rest []rlp.RawValue `rlp:"tail"`
}
rpcNode struct {
IP net.IP // len 4 for IPv4 or 16 for IPv6
UDP uint16 // for discovery protocol
TCP uint16 // for RLPx protocol
ID NodeID
}
rpcEndpoint struct {
IP net.IP // len 4 for IPv4 or 16 for IPv6
UDP uint16 // for discovery protocol
TCP uint16 // for RLPx protocol
}
)
定义了两个接口类型,packet接口类型应该是给4种不同类型的包分派不同的handle方法。 conn接口定义了一个udp的连接的功能。
type packet interface {
handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error
name() string
}
type conn interface {
ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)
Close() error
LocalAddr() net.Addr
}
udp的结构, 需要注意的是最后一个字段*Table是go里面的匿名字段。 也就是说udp可以直接调用匿名字段Table的方法。
// udp implements the RPC protocol.
type udp struct {
conn conn //网络连接
netrestrict *netutil.Netlist
priv *ecdsa.PrivateKey //私钥,自己的ID是通过这个来生成的。
ourEndpoint rpcEndpoint
addpending chan *pending //用来申请一个pending
gotreply chan reply //用来获取回应的队列
closing chan struct{} //用来关闭的队列
nat nat.Interface
*Table
}
pending 和reply 结构。 这两个结构用户内部的go routine之间进行通信的结构体。
// pending represents a pending reply.
// some implementations of the protocol wish to send more than one
// reply packet to findnode. in general, any neighbors packet cannot
// be matched up with a specific findnode packet.
// our implementation handles this by storing a callback function for
// each pending reply. incoming packets from a node are dispatched
// to all the callback functions for that node.
// pending结构 代表正在等待一个reply
// 我们通过为每一个pending reply 存储一个callback来实现这个功能。从一个节点来的所有数据包都会分配到这个节点对应的callback上面。
type pending struct {
// these fields must match in the reply.
from NodeID
ptype byte
// time when the request must complete
deadline time.Time
// callback is called when a matching reply arrives. if it returns
// true, the callback is removed from the pending reply queue.
// if it returns false, the reply is considered incomplete and
// the callback will be invoked again for the next matching reply.
//如果返回值是true。那么callback会从队列里面移除。 如果返回false,那么认为reply还没有完成,会继续等待下一次reply.
callback func(resp interface{}) (done bool)
// errc receives nil when the callback indicates completion or an
// error if no further reply is received within the timeout.
errc chan<- error
}
type reply struct {
from NodeID
ptype byte
data interface{}
// loop indicates whether there was
// a matching request by sending on this channel.
//通过往这个channel上面发送消息来表示匹配到一个请求。
matched chan<- bool
}
UDP的创建
// ListenUDP returns a new table that listens for UDP packets on laddr.
func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBPath string, netrestrict *netutil.Netlist) (*Table, error) {
addr, err := net.ResolveUDPAddr("udp", laddr)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
tab, _, err := newUDP(priv, conn, natm, nodeDBPath, netrestrict)
if err != nil {
return nil, err
}
log.Info("UDP listener up", "self", tab.self)
return tab, nil
}
func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, nodeDBPath string, netrestrict *netutil.Netlist) (*Table, *udp, error) {
udp := &udp{
conn: c,
priv: priv,
netrestrict: netrestrict,
closing: make(chan struct{}),
gotreply: make(chan reply),
addpending: make(chan *pending),
}
realaddr := c.LocalAddr().(*net.UDPAddr)
if natm != nil { //natm nat mapping 用来获取外网地址
if !realaddr.IP.IsLoopback() { //如果地址是本地环回地址
go nat.Map(natm, udp.closing, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
}
// TODO: react to external IP changes over time.
if ext, err := natm.ExternalIP(); err == nil {
realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
}
}
// TODO: separate TCP port
udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port))
//创建一个table 后续会介绍。 Kademlia的主要逻辑在这个类里面实现。
tab, err := newTable(udp, PubkeyID(&priv.PublicKey), realaddr, nodeDBPath)
if err != nil {
return nil, nil, err
}
udp.Table = tab //匿名字段的赋值
go udp.loop() //go routine
go udp.readLoop() //用来网络数据读取。
return udp.Table, udp, nil
}
有疑问加站长微信联系(非本文作者)