ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
# 网络层设计与实现 ## Node 一个网络节点(Node)命名为Network。 Node用Network来定义和实现,特指P2P网络节点,更体现Node的本质。 ``` // Network 节点的数据结构 type Network struct {     Host             host.Host//主机     GeneralChannel   *Channel//通用节点     MiningChannel    *Channel//挖矿节点     FullNodesChannel *Channel//全节点     Blockchain       *blockchain.Blockchain     Blocks           chan *blockchain.Block//Block类型的通道     Transactions     chan *blockchain.Transaction//Transaction类型的通道     Miner            bool } ``` ## Channel Channel为通信通道,每个host有三个通信通道,但根据其节点的类别,一般一个节点只用到其中一个通信通道。 ``` // Channel 的数据结构 type Channel struct {     ctx   context.Context     pub   *pubsub.PubSub//发布者     topic *pubsub.Topic     sub   *pubsub.Subscription//订阅者     channelName string//构成Topic名称字符串的组成部分(TopicName="channel:" + channelName)     self        peer.ID     Content     chan *ChannelContent//ChannelContent类型的通道 } ``` GeneralChannel为通用节点,负责列举所有连接到主机(host)的所有peer,这也是所有连接到host的peer,处理除了tx之外的所有命令消息。 FullNodesChannel为全节点,处理与交易相关的tx及gettxfrompool命令,即将新交易放到内存池,以及每秒不断将交易从交易池中取出(这里我们每秒只取出一条交易,可以优化为每次取出多条交易)给挖矿节点进行挖矿。 MiningChannel为挖矿节点,处理来自交易池的inv命令及来自交易池的tx命令。 ## Host P2P的host package定义了Host这一interface。 ``` // 为本主机(host)创建一对新的 RSA 密钥 prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r) if err != nil { panic(err)     } transports := libp2p.ChainOptions(         libp2p.Transport(tcp.NewTCPTransport),//支持TCP传输协议         libp2p.Transport(ws.New),//支持websorcket传输协议     ) muxers := libp2p.ChainOptions(         libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),//支持"/yamux/1.0.0"流连接(基于可靠连接的多路I/O复用)         libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),//支持"/mplex/6.7.0"流连接(二进制流多路I/O复用),由LibP2P基于multiplex创建     ) if len(listenPort) == 0 { listenPort = "0"     } listenAddrs := libp2p.ListenAddrStrings(         fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", listenPort),//支持tcp传输         fmt.Sprintf("/ip4/0.0.0.0/tcp/%s/ws", listenPort),//支持websorket传输     ) // Host是参与p2p网络的对象,它实现协议或提供服务。 // 它像服务器一样处理请求,像客户端一样发出请求。 // 之所以称为 Host,是因为它既是 Server 又是 Client(而 Peer 可能会混淆)。 // 1、创建host // 重要:创建主机host //-如果没有提供transport和listen addresses,节点将监听在多地址(mutiaddresses): "/ip4/0.0.0.0/tcp/0" 和 "/ip6/::/tcp/0"; //-如果没有提供transport的选项,节点使用TCP和websorcket传输协议 //-如果multiplexer配置没有提供,节点缺省使用"yamux/1.0.0" 和 "mplux/6.7.0"流连接配置 //-如果没有提供security transport,主机使用go-libp2p的noise和/或tls加密的transport来加密所有的traffic(新版本libp2p已经不再支持security transport参数设置) //-如果没有提供peer的identity,它产生一个随机RSA 2048键值对,并由它导出一个新的identity //-如果没有提供peerstore,主机使用一个空的peerstore来进行初始化 host, err := libp2p.New(         ctx,         transports,         listenAddrs,         muxers,         libp2p.Identity(prvKey),     ) ``` 上述代码中第一步创建Host: ``` host, err := libp2p.New(...) ``` 我们追溯New函数,它来自于libp2p.go,最终调用的是: ``` func NewWithoutDefaults(ctx context.Context, opts ...Option) (host.Host, error) { varcfg Config if err := cfg.Apply(opts...); err != nil { returnnil, err     } return cfg.NewNode(ctx) } ``` 我们继续追溯cfg.NewNode(ctx),在P2Plib的config.go,关键代码如下: ``` func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) { swrm, err := cfg.makeSwarm(ctx) if err != nil { returnnil, err     } h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{         ConnManager:  cfg.ConnManager,         AddrsFactory: cfg.AddrsFactory,         NATManager:   cfg.NATManager,         EnablePing:   !cfg.DisablePing,         UserAgent:    cfg.UserAgent,     }) ... h.Start() if router != nil { return outed.Wrap(h, router), nil     } return h, nil } ``` security transport,默认的值为: ``` var DefaultSecurity = libp2p.ChainOptions( Security(noise.ID, noise.New), Security(tls.ID, tls.New), ) ``` 上述代码的第一个关键是: ``` swrm, err := cfg.makeSwarm(ctx) ``` 我们追溯进去,看看cfg.makeSwarm(ctx): ``` func (cfg *Config) makeSwarm(ctx context.Context) (*swarm.Swarm, error) { //从config保存的公钥得到pid pid, err := peer.IDFromPublicKey(cfg.PeerKey.GetPublic()) ... swrm := swarm.NewSwarm(ctx, pid, cfg.Peerstore, cfg.Reporter, cfg.ConnectionGater) return swrm, nil ``` 我们继续追溯swarm.NewSwarm(ctx, pid, cfg.Peerstore, cfg.Reporter, cfg.ConnectionGater): ``` func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc metrics.Reporter, extra ...interface{}) *Swarm { s := &Swarm{         local: local,         peers: peers,         bwc:   bwc,     } ... return s ``` 可见,peer.ID被赋值到Swarm对象的local变量。 我们回到函数: ``` func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) { swrm, err := cfg.makeSwarm(ctx) if err != nil { returnnil, err     } h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{         ConnManager:  cfg.ConnManager,         AddrsFactory: cfg.AddrsFactory,         NATManager:   cfg.NATManager,         EnablePing:   !cfg.DisablePing,         UserAgent:    cfg.UserAgent,     }) ... h.Start() if router != nil { return outed.Wrap(h, router), nil     } return h, nil } ``` 前面已经讨论完了swrm, err := cfg.makeSwarm(ctx),我们继续往下看,swrm成为创建Host的一个参数: ``` h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{         ConnManager:  cfg.ConnManager,         AddrsFactory: cfg.AddrsFactory,         NATManager:   cfg.NATManager,         EnablePing:   !cfg.DisablePing,         UserAgent:    cfg.UserAgent,     }) ``` bhost是一个package: ``` bhost "github.com/libp2p/go-libp2p/p2p/host/basic" ``` 我们查看上面的NewHost,进入到basichost package(basic_host.go): 定义了basichost: ``` type BasicHost struct  ``` 然后BasicHost实现了Host的所有接口方法,其中NewHost接口实现如下: ``` func NewHost(ctx context.Context, n network.Network, opts \*HostOpts) (\*BasicHost, error) { hostCtx, cancel := context.WithCancel(ctx) h := &BasicHost{         network:                 n,         mux:                     msmux.NewMultistreamMuxer(),         negtimeout:              DefaultNegotiationTimeout,         AddrsFactory:            DefaultAddrsFactory,         maResolver:              madns.DefaultResolver,         eventbus:                eventbus.NewBus(),         addrChangeChan:          make(chanstruct{}, 1),         ctx:                     hostCtx,         ctxCancel:               cancel,         disableSignedPeerRecord: opts.DisableSignedPeerRecord,     } ... return h, nil ``` swrm作为参数传给了n network.Network。而实现的接口: ``` func (h *BasicHost) ID() peer.ID { return h.Network().LocalPeer() } ``` h.Network()返回Swarm对象(Swarm是一个struct,实现了接口network.Network(Network是一个interface)) ``` func (h *BasicHost) Network() network.Network { return h.network } ``` 我们看看Swarm的函数LocalPeer(),正好返回的是local(即peer的ID): ``` func (s *Swarm) LocalPeer() peer.ID { return s.local } ``` ### 小结 1、主机实际上是BasicHost struct,它实现了Host interface,peer.ID在创建host时候已经在Host中得到了(host.ID()得到的即是peer.ID)。 2、同时Swarm struct实现了libp2p的network.Network interface。 3、BasicHost和Swarm均由p2plib提供。 ## Peer Peer为对等端,是host的第三方视觉的概念。 Peer以ID为唯一标识,peer.ID是通过哈希peer的公钥而派生,并编码其哈希输出为multihash的结果。 peer.ID是往后不同节点之间进行通信的重要参数,它代表一个Host,或者说,我们可以通过peer.ID获得一个具体的Host对象。如发送虚拟币: ``` func (net *Network) SendTx(peerId string, transaction *blockchain.Transaction) {  memoryPool.Add(*transaction) tnx := Tx{net.Host.ID().Pretty(), transaction.Serializer()} payload := GobEncode(tnx) request := append(CmdToBytes("tx"), payload...) // 给全节点(FullNodes)第通信通道发布此消息,全节点将进行处理 net.FullNodesChannel.Publish("接收到 Send transaction 命令", request, peerId) } ``` 如同Host一样,peer package也是在libp2p库中定义,所在的文件是peer.go,不同的是,在peer.go中并没有定义一个peer的struct,而是直接在peer package中定义ID: ``` type ID string ``` 但显然ID是一个mutihash的值,如需要对外呈现需要使用base58编码后得到人可以识别的字符串: ``` func (id ID) String() string { return id.Pretty() } ``` Pretty方法如下: ``` func (id ID) Pretty() string { returnIDB58Encode(id) } ``` ## 网络通信流程 一切从startNode开始。 main.go: ``` cli.StartNode(listenPort, minerAddress, miner, fullNode, func(net *p2p.Network) {//最后一个参数是回调函数,获得net实例 if rpc { cli.P2p = net//启动节点后设置cli的P2P实例,net为启动节点函数的回调函数参数被回调后返回的Network实例 go jsonrpc.StartServer(cli, rpc, rpcPort, rpcAddr)                 }  }) ``` 其中cli的结构: ``` type CommandLinestruct {     Blockchain    *blockchain.Blockchain     P2p           *p2p.Network     CloseDbAlways bool//每次命令执行完毕是否关闭数据库 } ``` 其中istenPort, minerAddress, miner, fullNode等参数的值来自于命令startnode执行时获得的命令行参数。 cli.StartNode实现: ``` // StartNode 启动节点,其中fn为回调函数,p2p.StartNode调用过程中调用fn,设置p2p.Network实例 func (cli *CommandLine) StartNode(listenPort, minerAddress string, miner, fullNode bool, fn func(*p2p.Network)) { if miner {         log.Infof("作为矿工正在启动节点: %s\\n", listenPort) iflen(minerAddress) > 0 { if wallet.ValidateAddress(minerAddress) {                 log.Info("正在挖矿,接收奖励的地址是:", minerAddress)             } else {                 log.Fatal("请提供一个合法的矿工地址")             }         }     } else {         log.Infof("在: %s\\n端口上启动节点", listenPort)     } chain := cli.Blockchain.ContinueBlockchain()     p2p.StartNode(chain, listenPort, minerAddress, miner, fullNode, fn) } ``` 在获得了blockchain实例后,调用p2p package的StartNode函数: ``` // StartNode 启动一个节点 func StartNode(chain *blockchain.Blockchain, listenPort, minerAddress string, miner, fullNode bool, callback func(*Network)) { var r io.Reader r = rand.Reader//没有指定seed,使用随机种子 MinerAddress = minerAddress ctx, cancel := context.WithCancel(context.Background()) defercancel() defer chain.Database.Close()//函数运行结束,关闭区块链数据库 go appUtils.CloseDB(chain)//启动协程,遇到程序强行终止信号时关闭数据库,退出程序 // 为本主机(host)创建一对新的 RSA 密钥 prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r) if err != nil { panic(err)     } transports := libp2p.ChainOptions(         libp2p.Transport(tcp.NewTCPTransport),//支持TCP传输协议         libp2p.Transport(ws.New),//支持websorcket传输协议     ) muxers := libp2p.ChainOptions(         libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),//支持"/yamux/1.0.0"流连接(基于可靠连接的多路I/O复用)         libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),//支持"/mplex/6.7.0"流连接(二进制流多路I/O复用),由LibP2P基于multiplex创建     ) if len(listenPort) == 0 { listenPort = "0"     } listenAddrs := libp2p.ListenAddrStrings(         fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", listenPort),//支持tcp传输         fmt.Sprintf("/ip4/0.0.0.0/tcp/%s/ws", listenPort),//支持websorket传输     ) // Host是参与p2p网络的对象,它实现协议或提供服务。 // 它像服务器一样处理请求,像客户端一样发出请求。 // 之所以称为 Host,是因为它既是 Server 又是 Client(而 Peer 可能会混淆)。 // 1、创建host // 重要:创建主机host //-如果没有提供transport和listen addresses,节点将监听在多地址(mutiaddresses): "/ip4/0.0.0.0/tcp/0" 和 "/ip6/::/tcp/0"; //-如果没有提供transport的选项,节点使用TCP和websorcket传输协议 //-如果multiplexer配置没有提供,节点缺省使用"yamux/1.0.0" 和 "mplux/6.7.0"流连接配置 //-如果没有提供security transport,主机使用go-libp2p的noise和/或tls加密的transport来加密所有的traffic(新版本libp2p已经不再支持security transport参数设置) //-如果没有提供peer的identity,它产生一个随机RSA 2048键值对,并由它导出一个新的identity //-如果没有提供peerstore,主机使用一个空的peerstore来进行初始化 host, err := libp2p.New(         ctx,         transports,         listenAddrs,         muxers,         libp2p.Identity(prvKey),     ) if err != nil { panic(err)     } for _, addr := range host.Addrs() {         fmt.Println("正在监听在", addr)     }     log.Info("主机已创建: ", host.ID()) // 2、使用GossipSub路由,创建一个新的基于Gossip 协议的 PubSub 服务系统 // 任何一个主机节点,都是一个订阅发布服务系统 // 这是整个区块链网络运行的关键所在 pub, err := pubsub.NewGossipSub(ctx, host) if err != nil { panic(err)     } // 3、构建三个通信通道,通信通道使用发布-订阅系统,在不同节点之间传递信息 // 之所以需要三个通道,是因为未来规划不同节点拥有不同的功能,不同功能的节点完成不同类型的任务。 // 三个通道的消息独立,只有订阅了该通道消息的节点,才能收到该通道的消息,然后进行处理,以完成相应的任务。 // 任何一个节点,均创建了三个通道实例,这意味着人一个节点都可以根据需要,选择任意一个通道发送消息 // 在订阅上,一个具体的节点, GeneralChannel 订阅将消息,如果是采矿节点(miner==true),miningChannel 会接收到消息, // 如果是全节点(fullNode==true),fullNodesChannel会接受到消息 //GeneralChannel 通道订阅消息 generalChannel, _ := JoinChannel(ctx, pub, host.ID(), GeneralChannel, true) subscribe := false if miner { subscribe = true     } //如果是挖矿节点, miningChannel 订阅消息,否则 miningChannel 不订阅消息 miningChannel, _ := JoinChannel(ctx, pub, host.ID(), MiningChannel, subscribe) subscribe = false if fullNode { subscribe = true     } //如果是全节点, fullNodesChannel 订阅消息,否则 fullNodesChannel 不订阅消息 fullNodesChannel, _ := JoinChannel(ctx, pub, host.ID(), FullNodesChannel, subscribe) // 3、为各通信通道建立命令行界面对象 ui := NewCLIUI(generalChannel, miningChannel, fullNodesChannel) // 4、建立对等端(peer)发现机制(discovery),使得本节点可以被网络上的其它节点发现 //同时将主机(host)连接到所有已经发现的对等端(peer) err = SetupDiscovery(ctx, host) if err != nil { panic(err)     } network := &Network{         Host:             host,         GeneralChannel:   generalChannel,         MiningChannel:    miningChannel,         FullNodesChannel: fullNodesChannel,         Blockchain:       chain,         Blocks:           make(chan *blockchain.Block, 200),         Transactions:     make(chan *blockchain.Transaction, 200),         Miner:            miner,     } // 5、回调,将节点(network)实例传回 callback(network) // 6、向全网请求区块信息,以补全本地区块链 // 每一个节点均有区块链的一个完整副本 err = RequestBlocks(network) // 7、启用协程,处理节点事件 goHandleEvents(network) // 8、如果是矿工节点,启用协程,不断发送ping命令给全节点 if miner { // 矿工事件循环,以不断地发送一个ping给全节点,目的是得到新的交易,为它挖矿,并添加到区块链 go network.MinersEventLoop()     } if err != nil { panic(err)     } // 9、运行UI界面,将在Run函数体中启动协程,循环接收并处理全网节点publish的消息 iferr = ui.Run(network); err != nil {         log.Error("运行文字UI发生错误: %s", err)     } } ```