区块链以消息为中心,通过发布/订阅服务,所有节点之间的数据同步成为可能。
**注意publish执行后,消息将发给全网,包括本地节点,所以本地节点在接收到自己发的消息后,需要特别做好识别(自己发布的消息自己不应该处理)。**
一般情况下,startNode启动一个节点时候,miner, fullNode应该是为true,miner可以为true可以为false。一般作为挖矿节点,不执行send交易命令,只有全节点才执行send交易命令,但这只是分工,事实上它也可以发起交易,分工的好处是,它可以专注于挖矿,将更多区块放一起集中挖矿。
FullNodesChanne 通道支持处理的消息支持类型:gettxfrompool,tx;
MiningChannel 通道支持处理的消息类型:inv(type为tx),tx;
GeneralChannel 通道支持处理的消息类型:block,getdata,inv(type为block),getblocks,version;
## 1、开始
每一个网络节点,可能是挖矿节点,也可能是全节点,或者既是挖矿节点,也是全节点。
```
// Network 节点的数据结构
type Network struct {
Host host.Host//主机
GeneralChannel *Channel//通用节点
MiningChannel *Channel//挖矿节点
FullNodesChannel *Channel//全节点
Blockchain *blockchain.Blockchain
//Blocks和Transactions消息队列,用于存储新产生的Block或Transaction
//一般而言,我们在主程序执行send交易过程中,根据send参数mineNow决定是否立即挖矿,mineNow为true则存储Block消息队列(立即挖矿)
//mineNow为false则存储Transaction消息队列(不立即挖矿)。两个消息由节点在startNode后启动消息处理协程进行处理
//一般情况下,为提高挖矿效率,我们会汇聚几个交易在一起,然后挖矿一次
Blocks chan *blockchain.Block//Block类型的通道(带缓冲的通道:在StartNode函数中构建Network实例,缓冲数量200)
Transactions chan *blockchain.Transaction//Transaction类型的通道(带缓冲的通道:在StartNode函数中构建Network实例,缓冲数量200)
//是否是挖矿节点
Miner bool
}
```
通过startNode函数,实例化Network时候,将根据参数,对GeneralChannel、MiningChannel、FullNodesChannel全部进行实例化。
```
//GeneralChannel 通道订阅消息
generalChannel, _ := JoinChannel(ctx, pubsub, host.ID(), GeneralChannel, true)
//如果是挖矿节点, miningChannel 订阅消息,否则 miningChannel 不订阅消息
subscribe := false
if miner {
subscribe = true
}
miningChannel, _ := JoinChannel(ctx, pubsub, host.ID(), MiningChannel, subscribe)
//如果是全节点, fullNodesChannel 订阅消息,否则 fullNodesChannel 不订阅消息
subscribe = false
if fullNode {
subscribe = true
}
fullNodesChannel, _ := JoinChannel(ctx, pubsub, host.ID(), FullNodesChannel, subscribe)
```
主程序为有界面命令控制程序,三个实例将作为参数传递给cliui:
```
// 各通信通道建立命令行界面对象,监控来自三个通道实例的消息
ui := NewCLIUI(generalChannel, miningChannel, fullNodesChannel)
```
## 2、所有对network的操作命令,将会通过generalChannel实例(其它实例可能没有实例化),publish到全网
对network发送的消息指令类型有:
1. block
2. inv
3. getblocks
4. getdata
5. tx
6. gettxfrompool
7. version
如:发送inv命令:
```
func (net *Network) SendInv(peerId string, _type string, items [][]byte) {
inventory := Inv{net.Host.ID().Pretty(), _type, items}
payload := GobEncode(inventory)
request := append(CmdToBytes("inv"), payload...)
net.GeneralChannel.Publish("接收到 inventory 命令", request, peerId)
}
```
## 3、这些事件消息,将在cliui中开协程进行处理:
```
// HandleStream 处理来自全网发来的且可以处理的消息内容
func (ui *CLIUI) HandleStream(net *Network, content *ChannelContent) {
// ui.displayContent(content)
if content.Payload != nil {
command := BytesToCmd(content.Payload\[:commandLength\])
log.Infof("Received %s command \\n", command)
switch command {
case "block":
net.HandleBlocks(content)
case "inv":
net.HandleInv(content)
case "getblocks":
net.HandleGetBlocks(content)
case "getdata":
net.HandleGetData(content)
case "tx":
net.HandleTx(content)
case "gettxfrompool":
net.HandleGetTxFromPool(content)
case "version":
net.HandleVersion(content)
default:
log.Warn("Unknown Command")
}
}
}
```
cliui的RUN函数,需要处理来自network的各种事件消息:
```
// Run 开启协程处理各种事件消息
func (ui *CLIUI) Run(net *Network) error {
go ui.handleEvents(net)
defer ui.end()
return ui.app.Run()
}
```
handleEvents:
```
func (ui *CLIUI) handleEvents(net *Network) {
peerRefreshTicker := time.NewTicker(time.Second)
defer peerRefreshTicker.Stop()
go ui.readFromLogs(net.Blockchain.InstanceId)
log.Info("HOST ADDR: ", net.Host.Addrs())
for {
select {
case input := <-ui.inputCh:
err := ui.GeneralChannel.Publish(input, nil, "")//未指定消息接收者,意味着所有节点(peer)均会收到
if err != nil {
log.Errorf("Publish error: %s", err)
}
ui.displaySelfMessage(input)
case <-peerRefreshTicker.C:
// 定期刷新peers list
ui.refreshPeers()
case m := <-ui.GeneralChannel.Content://如果 GeneralChannel 收到消息
ui.HandleStream(net, m)
case m := <-ui.MiningChannel.Content://如果 MiningChannel 收到消息
ui.HandleStream(net, m)
case m := <-ui.FullNodesChannel.Content://如果 FullNodesChannel 收到消息
ui.HandleStream(net, m)
case <-ui.GeneralChannel.ctx.Done():
return
case <-ui.doneCh:
return
}
}
}
```
## 4、一切从Inv(block)消息开始
每次挖出新区块,节点向全网广播inv(资产,在区块链里面inv包括block和交易池中的tx两类,但狭义上说,只有block)消息:
```
func (net *Network) MineTx(memopoolTxs map[string]blockchain.Transaction) {
var txs []\*blockchain.Transaction
log.Infof("挖矿的交易数: %d", len(memopoolTxs))
chain := net.Blockchain.ContinueBlockchain()
for id := range memopoolTxs {
log.Infof("tx: %s \\n", memopoolTxs[id].ID)
tx := memopoolTxs[id\]
log.Info("tx校验: ", chain.VerifyTransaction(&tx))
if chain.VerifyTransaction(&tx) {
txs = append(txs, &tx)
}
}
if len(txs) == 0 {
log.Info("无合法的交易")
}
cbTx := blockchain.MinerTx(MinerAddress, "")
txs = append(txs, cbTx)
newBlock := chain.MineBlock(txs)
UTXOs := blockchain.UTXOSet{Blockchain:chain}
UTXOs.Compute()
log.Info("挖出新的区块")
** //peerId为空,SendInv发布给全网**
net.SendInv("", "block", [][]byte{newBlock.Hash})
memoryPool.ClearAll()//清除内存池中的全部交易
memoryPool.Wg.Done()
}
```
当然,收到请求消息getblocks后,节点应向请求方发一个block类型的inv:
```
func (net *Network) HandleGetBlocks(content *ChannelContent) {
var buff bytes.Buffer
var payload GetBlocks
buff.Write(content.Payload[commandLength:])
dec := gob.NewDecoder(&buff)
err := dec.Decode(&payload)
if err != nil {
log.Panic(err)
}
chain := net.Blockchain.ContinueBlockchain()
blockHashes := chain.GetBlockHashes(payload.Height)
log.Info("LENGTH:", len(blockHashes))
net.SendInv(payload.SendFrom, "block", blockHashes)
}
```
### 4、每个network节点启动后做什么
```
1、向全网请求区块信息,以补全本地区块链
// 每一个节点均有区块链的一个完整副本
err = RequestBlocks(network)//全网发布version命令,接收到该命令的节点,要么将本地version回给它(接收者的区块链height更大),要么向它发送getblocks命令请求自己没有的区块(接收者的区块链height更小)
2、启用协程,处理来自自身网络节点事件**(只有启用了rpc才会有相关事件发生)**
//本地的Blocks有了新的block加入队列,或者本地的Transactions有了新的tx加入队列,则将新的block或tx发布给请求者或全网(根据请求者是否为空)
go HandleEvents(network)
3、如果是矿工节点,启用协程,不断发送ping命令给全节点
if miner {
// 矿工事件循环,以不断地发送一个 ping 给全节点,目的是得到新的交易,为新交易挖矿,并添加到区块链
//矿工节点将定时向全网发送gettxfrompool指令,请求每个节点交易内存池(pending)的待挖矿交易
go network.MinersEventLoop()
}
if err != nil {
panic(err)
}
4、运行UI界面,将在Run函数体中启动协程,通过三个通道,循环接收并处理全网节点publish的消息
//(包括generalChannel, miningChannel, fullNodesChannel节点)
if err = ui.Run(network); err != nil {
log.Error("运行文字UI发生错误: %s", err)
}
```
- 重要更新说明
- linechain发布
- linechain新版设计
- 引言一
- 引言二
- 引言三
- vs-code设置及开发环境设置
- BoltDB数据库应用
- 关于Go语言、VS-code的一些Tips
- 区块链的架构
- 网络通信与区块链
- 单元测试
- 比特币脚本语言
- 关于区块链的一些概念
- 区块链组件
- 区块链第一版:基本原型
- 区块链第二版:增加工作量证明
- 区块链第三版:持久化
- 区块链第四版:交易
- 区块链第五版:实现钱包
- 区块链第六版:实现UTXO集
- 区块链第七版:网络
- 阶段小结
- 区块链第八版:P2P
- P2P网络架构
- 区块链网络层
- P2P区块链最简体验
- libp2p建立P2P网络的关键概念
- 区块链结构层设计与实现
- 用户交互层设计与实现
- 网络层设计与实现
- 建立节点发现机制
- 向区块链网络请求区块信息
- 向区块链网络发布消息
- 运行区块链
- LineChain
- 系统运行流程
- Multihash
- 区块链网络的节点发现机制深入探讨
- DHT
- Bootstrap
- 连接到所有引导节点
- Advertise
- 搜索其它peers
- 连接到搜到的其它peers
- 区块链网络的消息订发布-订阅机制深入探讨
- LineChain:适用于智能合约编程的脚本语言支持
- LineChain:解决分叉问题
- LineChain:多重签名
- libp2p升级到v0.22版本
- 以太坊基础
- 重温以太坊的树结构
- 世界状态树
- (智能合约)账户存储树
- 交易树
- 交易收据树
- 小结
- 以太坊的存储结构
- 以太坊状态数据库
- MPT
- 以太坊POW共识算法
- 智能合约存储
- Polygon Edge
- block结构
- transaction数据结构
- 数据结构小结
- 关于本区块链的一些说明
- UML工具-PlantUML
- libp2p介绍
- JSON-RPC
- docker制作:启动多个应用系统
- Dockerfile
- docker-entrypoint.sh
- supervisord.conf
- docker run
- nginx.conf
- docker基础操作整理
- jupyter计算交互环境
- git技巧一
- git技巧二
- 使用github项目的最佳实践
- windows下package管理工具