企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
区块链以消息为中心,通过发布/订阅服务,所有节点之间的数据同步成为可能。 **注意publish执行后,消息将发给全网,包括本地节点,所以本地节点在接收到自己发的消息后,需要特别做好识别(自己发布的消息自己不应该处理)。** 一般情况下,startNode启动一个节点时候,miner, fullNode应该是为true,miner可以为true可以为false。一般作为挖矿节点,不执行send交易命令,只有全节点才执行send交易命令,但这只是分工,事实上它也可以发起交易,分工的好处是,它可以专注于挖矿,将更多区块放一起集中挖矿。 FullNodesChanne 通道支持处理的消息支持类型:gettxfrompool,tx; MiningChannel 通道支持处理的消息类型:inv(type为tx),tx; GeneralChannel 通道支持处理的消息类型:block,getdata,inv(type为block),getblocks,version; ## 1、开始 每一个网络节点,可能是挖矿节点,也可能是全节点,或者既是挖矿节点,也是全节点。 ``` // Network 节点的数据结构 type Network struct {     Host             host.Host//主机 GeneralChannel *Channel//通用节点     MiningChannel    *Channel//挖矿节点 FullNodesChannel *Channel//全节点 Blockchain *blockchain.Blockchain //Blocks和Transactions消息队列,用于存储新产生的Block或Transaction //一般而言,我们在主程序执行send交易过程中,根据send参数mineNow决定是否立即挖矿,mineNow为true则存储Block消息队列(立即挖矿) //mineNow为false则存储Transaction消息队列(不立即挖矿)。两个消息由节点在startNode后启动消息处理协程进行处理 //一般情况下,为提高挖矿效率,我们会汇聚几个交易在一起,然后挖矿一次 Blocks chan *blockchain.Block//Block类型的通道(带缓冲的通道:在StartNode函数中构建Network实例,缓冲数量200) Transactions chan *blockchain.Transaction//Transaction类型的通道(带缓冲的通道:在StartNode函数中构建Network实例,缓冲数量200) //是否是挖矿节点   Miner            bool } ``` 通过startNode函数,实例化Network时候,将根据参数,对GeneralChannel、MiningChannel、FullNodesChannel全部进行实例化。 ``` //GeneralChannel 通道订阅消息 generalChannel, _ := JoinChannel(ctx, pubsub, host.ID(), GeneralChannel, true) //如果是挖矿节点, miningChannel 订阅消息,否则 miningChannel 不订阅消息 subscribe := false if miner { subscribe = true } miningChannel, _ := JoinChannel(ctx, pubsub, host.ID(), MiningChannel, subscribe) //如果是全节点, fullNodesChannel 订阅消息,否则 fullNodesChannel 不订阅消息 subscribe = false if fullNode { subscribe = true }     fullNodesChannel, _ := JoinChannel(ctx, pubsub, host.ID(), FullNodesChannel, subscribe) ``` 主程序为有界面命令控制程序,三个实例将作为参数传递给cliui: ``` // 各通信通道建立命令行界面对象,监控来自三个通道实例的消息 ui := NewCLIUI(generalChannel, miningChannel, fullNodesChannel) ``` ## 2、所有对network的操作命令,将会通过generalChannel实例(其它实例可能没有实例化),publish到全网 对network发送的消息指令类型有: 1. block 2. inv 3. getblocks 4. getdata 5. tx 6. gettxfrompool 7. version 如:发送inv命令: ``` func (net *Network) SendInv(peerId string, _type string, items [][]byte) { inventory := Inv{net.Host.ID().Pretty(), _type, items} payload := GobEncode(inventory) request := append(CmdToBytes("inv"), payload...)     net.GeneralChannel.Publish("接收到 inventory 命令", request, peerId) } ``` ## 3、这些事件消息,将在cliui中开协程进行处理: ``` // HandleStream 处理来自全网发来的且可以处理的消息内容 func (ui *CLIUI) HandleStream(net *Network, content *ChannelContent) { // ui.displayContent(content) if content.Payload != nil { command := BytesToCmd(content.Payload\[:commandLength\])         log.Infof("Received  %s command \\n", command) switch command { case "block":             net.HandleBlocks(content) case "inv":             net.HandleInv(content) case "getblocks":             net.HandleGetBlocks(content) case "getdata":             net.HandleGetData(content) case "tx":             net.HandleTx(content) case "gettxfrompool":             net.HandleGetTxFromPool(content) case "version":             net.HandleVersion(content) default:             log.Warn("Unknown Command") } } } ``` cliui的RUN函数,需要处理来自network的各种事件消息: ``` // Run 开启协程处理各种事件消息 func (ui *CLIUI) Run(net *Network) error { go ui.handleEvents(net) defer ui.end() return ui.app.Run() } ``` handleEvents: ``` func (ui *CLIUI) handleEvents(net *Network) { peerRefreshTicker := time.NewTicker(time.Second) defer peerRefreshTicker.Stop() go ui.readFromLogs(net.Blockchain.InstanceId)  log.Info("HOST ADDR: ", net.Host.Addrs()) for { select { case input := <-ui.inputCh: err := ui.GeneralChannel.Publish(input, nil, "")//未指定消息接收者,意味着所有节点(peer)均会收到 if err != nil {                 log.Errorf("Publish error: %s", err) } ui.displaySelfMessage(input) case <-peerRefreshTicker.C: // 定期刷新peers list   ui.refreshPeers() case m := <-ui.GeneralChannel.Content://如果 GeneralChannel 收到消息             ui.HandleStream(net, m) case m := <-ui.MiningChannel.Content://如果 MiningChannel 收到消息             ui.HandleStream(net, m) case m := <-ui.FullNodesChannel.Content://如果 FullNodesChannel 收到消息             ui.HandleStream(net, m) case <-ui.GeneralChannel.ctx.Done(): return case <-ui.doneCh: return } } } ``` ## 4、一切从Inv(block)消息开始 每次挖出新区块,节点向全网广播inv(资产,在区块链里面inv包括block和交易池中的tx两类,但狭义上说,只有block)消息: ``` func (net *Network) MineTx(memopoolTxs map[string]blockchain.Transaction) { var txs []\*blockchain.Transaction     log.Infof("挖矿的交易数: %d", len(memopoolTxs)) chain := net.Blockchain.ContinueBlockchain() for id := range memopoolTxs {         log.Infof("tx: %s \\n", memopoolTxs[id].ID) tx := memopoolTxs[id\]         log.Info("tx校验: ", chain.VerifyTransaction(&tx)) if chain.VerifyTransaction(&tx) { txs = append(txs, &tx) } } if len(txs) == 0 {         log.Info("无合法的交易") } cbTx := blockchain.MinerTx(MinerAddress, "") txs = append(txs, cbTx) newBlock := chain.MineBlock(txs) UTXOs := blockchain.UTXOSet{Blockchain:chain}     UTXOs.Compute()     log.Info("挖出新的区块") ** //peerId为空,SendInv发布给全网**     net.SendInv("", "block", [][]byte{newBlock.Hash})     memoryPool.ClearAll()//清除内存池中的全部交易     memoryPool.Wg.Done() } ``` 当然,收到请求消息getblocks后,节点应向请求方发一个block类型的inv: ``` func (net *Network) HandleGetBlocks(content *ChannelContent) { var buff bytes.Buffer var payload GetBlocks  buff.Write(content.Payload[commandLength:]) dec := gob.NewDecoder(&buff) err := dec.Decode(&payload) if err != nil {         log.Panic(err) } chain := net.Blockchain.ContinueBlockchain() blockHashes := chain.GetBlockHashes(payload.Height) log.Info("LENGTH:", len(blockHashes)) net.SendInv(payload.SendFrom, "block", blockHashes) } ``` ### 4、每个network节点启动后做什么 ``` 1、向全网请求区块信息,以补全本地区块链 // 每一个节点均有区块链的一个完整副本 err = RequestBlocks(network)//全网发布version命令,接收到该命令的节点,要么将本地version回给它(接收者的区块链height更大),要么向它发送getblocks命令请求自己没有的区块(接收者的区块链height更小) 2、启用协程,处理来自自身网络节点事件**(只有启用了rpc才会有相关事件发生)** //本地的Blocks有了新的block加入队列,或者本地的Transactions有了新的tx加入队列,则将新的block或tx发布给请求者或全网(根据请求者是否为空) go HandleEvents(network) 3、如果是矿工节点,启用协程,不断发送ping命令给全节点 if miner { // 矿工事件循环,以不断地发送一个 ping 给全节点,目的是得到新的交易,为新交易挖矿,并添加到区块链 //矿工节点将定时向全网发送gettxfrompool指令,请求每个节点交易内存池(pending)的待挖矿交易 go network.MinersEventLoop() } if err != nil { panic(err) } 4、运行UI界面,将在Run函数体中启动协程,通过三个通道,循环接收并处理全网节点publish的消息 //(包括generalChannel, miningChannel, fullNodesChannel节点) if err = ui.Run(network); err != nil {         log.Error("运行文字UI发生错误: %s", err) } ```