# 网络层设计与实现
## Node
一个网络节点(Node)命名为Network。
Node用Network来定义和实现,特指P2P网络节点,更体现Node的本质。
```
// Network 节点的数据结构
type Network struct {
Host host.Host//主机
GeneralChannel *Channel//通用节点
MiningChannel *Channel//挖矿节点
FullNodesChannel *Channel//全节点
Blockchain *blockchain.Blockchain
Blocks chan *blockchain.Block//Block类型的通道
Transactions chan *blockchain.Transaction//Transaction类型的通道
Miner bool
}
```
## Channel
Channel为通信通道,每个host有三个通信通道,但根据其节点的类别,一般一个节点只用到其中一个通信通道。
```
// Channel 的数据结构
type Channel struct {
ctx context.Context
pub *pubsub.PubSub//发布者
topic *pubsub.Topic
sub *pubsub.Subscription//订阅者
channelName string//构成Topic名称字符串的组成部分(TopicName="channel:" + channelName)
self peer.ID
Content chan *ChannelContent//ChannelContent类型的通道
}
```
GeneralChannel为通用节点,负责列举所有连接到主机(host)的所有peer,这也是所有连接到host的peer,处理除了tx之外的所有命令消息。
FullNodesChannel为全节点,处理与交易相关的tx及gettxfrompool命令,即将新交易放到内存池,以及每秒不断将交易从交易池中取出(这里我们每秒只取出一条交易,可以优化为每次取出多条交易)给挖矿节点进行挖矿。
MiningChannel为挖矿节点,处理来自交易池的inv命令及来自交易池的tx命令。
## Host
P2P的host package定义了Host这一interface。
```
// 为本主机(host)创建一对新的 RSA 密钥
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
if err != nil {
panic(err)
}
transports := libp2p.ChainOptions(
libp2p.Transport(tcp.NewTCPTransport),//支持TCP传输协议
libp2p.Transport(ws.New),//支持websorcket传输协议
)
muxers := libp2p.ChainOptions(
libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),//支持"/yamux/1.0.0"流连接(基于可靠连接的多路I/O复用)
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),//支持"/mplex/6.7.0"流连接(二进制流多路I/O复用),由LibP2P基于multiplex创建
)
if len(listenPort) == 0 {
listenPort = "0"
}
listenAddrs := libp2p.ListenAddrStrings(
fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", listenPort),//支持tcp传输
fmt.Sprintf("/ip4/0.0.0.0/tcp/%s/ws", listenPort),//支持websorket传输
)
// Host是参与p2p网络的对象,它实现协议或提供服务。
// 它像服务器一样处理请求,像客户端一样发出请求。
// 之所以称为 Host,是因为它既是 Server 又是 Client(而 Peer 可能会混淆)。
// 1、创建host
// 重要:创建主机host
//-如果没有提供transport和listen addresses,节点将监听在多地址(mutiaddresses): "/ip4/0.0.0.0/tcp/0" 和 "/ip6/::/tcp/0";
//-如果没有提供transport的选项,节点使用TCP和websorcket传输协议
//-如果multiplexer配置没有提供,节点缺省使用"yamux/1.0.0" 和 "mplux/6.7.0"流连接配置
//-如果没有提供security transport,主机使用go-libp2p的noise和/或tls加密的transport来加密所有的traffic(新版本libp2p已经不再支持security transport参数设置)
//-如果没有提供peer的identity,它产生一个随机RSA 2048键值对,并由它导出一个新的identity
//-如果没有提供peerstore,主机使用一个空的peerstore来进行初始化
host, err := libp2p.New(
ctx,
transports,
listenAddrs,
muxers,
libp2p.Identity(prvKey),
)
```
上述代码中第一步创建Host:
```
host, err := libp2p.New(...)
```
我们追溯New函数,它来自于libp2p.go,最终调用的是:
```
func NewWithoutDefaults(ctx context.Context, opts ...Option) (host.Host, error) {
varcfg Config
if err := cfg.Apply(opts...); err != nil {
returnnil, err
}
return cfg.NewNode(ctx)
}
```
我们继续追溯cfg.NewNode(ctx),在P2Plib的config.go,关键代码如下:
```
func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {
swrm, err := cfg.makeSwarm(ctx)
if err != nil {
returnnil, err
}
h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{
ConnManager: cfg.ConnManager,
AddrsFactory: cfg.AddrsFactory,
NATManager: cfg.NATManager,
EnablePing: !cfg.DisablePing,
UserAgent: cfg.UserAgent,
})
...
h.Start()
if router != nil {
return outed.Wrap(h, router), nil
}
return h, nil
}
```
security transport,默认的值为:
```
var DefaultSecurity = libp2p.ChainOptions(
Security(noise.ID, noise.New),
Security(tls.ID, tls.New),
)
```
上述代码的第一个关键是:
```
swrm, err := cfg.makeSwarm(ctx)
```
我们追溯进去,看看cfg.makeSwarm(ctx):
```
func (cfg *Config) makeSwarm(ctx context.Context) (*swarm.Swarm, error) {
//从config保存的公钥得到pid
pid, err := peer.IDFromPublicKey(cfg.PeerKey.GetPublic())
...
swrm := swarm.NewSwarm(ctx, pid, cfg.Peerstore, cfg.Reporter, cfg.ConnectionGater)
return swrm, nil
```
我们继续追溯swarm.NewSwarm(ctx, pid, cfg.Peerstore, cfg.Reporter, cfg.ConnectionGater):
```
func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc metrics.Reporter, extra ...interface{}) *Swarm {
s := &Swarm{
local: local,
peers: peers,
bwc: bwc,
}
...
return s
```
可见,peer.ID被赋值到Swarm对象的local变量。
我们回到函数:
```
func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {
swrm, err := cfg.makeSwarm(ctx)
if err != nil {
returnnil, err
}
h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{
ConnManager: cfg.ConnManager,
AddrsFactory: cfg.AddrsFactory,
NATManager: cfg.NATManager,
EnablePing: !cfg.DisablePing,
UserAgent: cfg.UserAgent,
})
...
h.Start()
if router != nil {
return outed.Wrap(h, router), nil
}
return h, nil
}
```
前面已经讨论完了swrm, err := cfg.makeSwarm(ctx),我们继续往下看,swrm成为创建Host的一个参数:
```
h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{
ConnManager: cfg.ConnManager,
AddrsFactory: cfg.AddrsFactory,
NATManager: cfg.NATManager,
EnablePing: !cfg.DisablePing,
UserAgent: cfg.UserAgent,
})
```
bhost是一个package:
```
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
```
我们查看上面的NewHost,进入到basichost package(basic_host.go):
定义了basichost:
```
type BasicHost struct
```
然后BasicHost实现了Host的所有接口方法,其中NewHost接口实现如下:
```
func NewHost(ctx context.Context, n network.Network, opts \*HostOpts) (\*BasicHost, error) {
hostCtx, cancel := context.WithCancel(ctx)
h := &BasicHost{
network: n,
mux: msmux.NewMultistreamMuxer(),
negtimeout: DefaultNegotiationTimeout,
AddrsFactory: DefaultAddrsFactory,
maResolver: madns.DefaultResolver,
eventbus: eventbus.NewBus(),
addrChangeChan: make(chanstruct{}, 1),
ctx: hostCtx,
ctxCancel: cancel,
disableSignedPeerRecord: opts.DisableSignedPeerRecord,
}
...
return h, nil
```
swrm作为参数传给了n network.Network。而实现的接口:
```
func (h *BasicHost) ID() peer.ID {
return h.Network().LocalPeer()
}
```
h.Network()返回Swarm对象(Swarm是一个struct,实现了接口network.Network(Network是一个interface))
```
func (h *BasicHost) Network() network.Network {
return h.network
}
```
我们看看Swarm的函数LocalPeer(),正好返回的是local(即peer的ID):
```
func (s *Swarm) LocalPeer() peer.ID {
return s.local
}
```
### 小结
1、主机实际上是BasicHost struct,它实现了Host interface,peer.ID在创建host时候已经在Host中得到了(host.ID()得到的即是peer.ID)。
2、同时Swarm struct实现了libp2p的network.Network interface。
3、BasicHost和Swarm均由p2plib提供。
## Peer
Peer为对等端,是host的第三方视觉的概念。
Peer以ID为唯一标识,peer.ID是通过哈希peer的公钥而派生,并编码其哈希输出为multihash的结果。
peer.ID是往后不同节点之间进行通信的重要参数,它代表一个Host,或者说,我们可以通过peer.ID获得一个具体的Host对象。如发送虚拟币:
```
func (net *Network) SendTx(peerId string, transaction *blockchain.Transaction) {
memoryPool.Add(*transaction)
tnx := Tx{net.Host.ID().Pretty(), transaction.Serializer()}
payload := GobEncode(tnx)
request := append(CmdToBytes("tx"), payload...)
// 给全节点(FullNodes)第通信通道发布此消息,全节点将进行处理
net.FullNodesChannel.Publish("接收到 Send transaction 命令", request, peerId)
}
```
如同Host一样,peer package也是在libp2p库中定义,所在的文件是peer.go,不同的是,在peer.go中并没有定义一个peer的struct,而是直接在peer package中定义ID:
```
type ID string
```
但显然ID是一个mutihash的值,如需要对外呈现需要使用base58编码后得到人可以识别的字符串:
```
func (id ID) String() string {
return id.Pretty()
}
```
Pretty方法如下:
```
func (id ID) Pretty() string {
returnIDB58Encode(id)
}
```
## 网络通信流程
一切从startNode开始。
main.go:
```
cli.StartNode(listenPort, minerAddress, miner, fullNode, func(net *p2p.Network) {//最后一个参数是回调函数,获得net实例
if rpc {
cli.P2p = net//启动节点后设置cli的P2P实例,net为启动节点函数的回调函数参数被回调后返回的Network实例
go jsonrpc.StartServer(cli, rpc, rpcPort, rpcAddr)
}
})
```
其中cli的结构:
```
type CommandLinestruct {
Blockchain *blockchain.Blockchain
P2p *p2p.Network
CloseDbAlways bool//每次命令执行完毕是否关闭数据库
}
```
其中istenPort, minerAddress, miner, fullNode等参数的值来自于命令startnode执行时获得的命令行参数。
cli.StartNode实现:
```
// StartNode 启动节点,其中fn为回调函数,p2p.StartNode调用过程中调用fn,设置p2p.Network实例
func (cli *CommandLine) StartNode(listenPort, minerAddress string, miner, fullNode bool, fn func(*p2p.Network)) {
if miner {
log.Infof("作为矿工正在启动节点: %s\\n", listenPort)
iflen(minerAddress) > 0 {
if wallet.ValidateAddress(minerAddress) {
log.Info("正在挖矿,接收奖励的地址是:", minerAddress)
} else {
log.Fatal("请提供一个合法的矿工地址")
}
}
} else {
log.Infof("在: %s\\n端口上启动节点", listenPort)
}
chain := cli.Blockchain.ContinueBlockchain()
p2p.StartNode(chain, listenPort, minerAddress, miner, fullNode, fn)
}
```
在获得了blockchain实例后,调用p2p package的StartNode函数:
```
// StartNode 启动一个节点
func StartNode(chain *blockchain.Blockchain, listenPort, minerAddress string, miner, fullNode bool, callback func(*Network)) {
var r io.Reader
r = rand.Reader//没有指定seed,使用随机种子
MinerAddress = minerAddress
ctx, cancel := context.WithCancel(context.Background())
defercancel()
defer chain.Database.Close()//函数运行结束,关闭区块链数据库
go appUtils.CloseDB(chain)//启动协程,遇到程序强行终止信号时关闭数据库,退出程序
// 为本主机(host)创建一对新的 RSA 密钥
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
if err != nil {
panic(err)
}
transports := libp2p.ChainOptions(
libp2p.Transport(tcp.NewTCPTransport),//支持TCP传输协议
libp2p.Transport(ws.New),//支持websorcket传输协议
)
muxers := libp2p.ChainOptions(
libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),//支持"/yamux/1.0.0"流连接(基于可靠连接的多路I/O复用)
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),//支持"/mplex/6.7.0"流连接(二进制流多路I/O复用),由LibP2P基于multiplex创建
)
if len(listenPort) == 0 {
listenPort = "0"
}
listenAddrs := libp2p.ListenAddrStrings(
fmt.Sprintf("/ip4/0.0.0.0/tcp/%s", listenPort),//支持tcp传输
fmt.Sprintf("/ip4/0.0.0.0/tcp/%s/ws", listenPort),//支持websorket传输
)
// Host是参与p2p网络的对象,它实现协议或提供服务。
// 它像服务器一样处理请求,像客户端一样发出请求。
// 之所以称为 Host,是因为它既是 Server 又是 Client(而 Peer 可能会混淆)。
// 1、创建host
// 重要:创建主机host
//-如果没有提供transport和listen addresses,节点将监听在多地址(mutiaddresses): "/ip4/0.0.0.0/tcp/0" 和 "/ip6/::/tcp/0";
//-如果没有提供transport的选项,节点使用TCP和websorcket传输协议
//-如果multiplexer配置没有提供,节点缺省使用"yamux/1.0.0" 和 "mplux/6.7.0"流连接配置
//-如果没有提供security transport,主机使用go-libp2p的noise和/或tls加密的transport来加密所有的traffic(新版本libp2p已经不再支持security transport参数设置)
//-如果没有提供peer的identity,它产生一个随机RSA 2048键值对,并由它导出一个新的identity
//-如果没有提供peerstore,主机使用一个空的peerstore来进行初始化
host, err := libp2p.New(
ctx,
transports,
listenAddrs,
muxers,
libp2p.Identity(prvKey),
)
if err != nil {
panic(err)
}
for _, addr := range host.Addrs() {
fmt.Println("正在监听在", addr)
}
log.Info("主机已创建: ", host.ID())
// 2、使用GossipSub路由,创建一个新的基于Gossip 协议的 PubSub 服务系统
// 任何一个主机节点,都是一个订阅发布服务系统
// 这是整个区块链网络运行的关键所在
pub, err := pubsub.NewGossipSub(ctx, host)
if err != nil {
panic(err)
}
// 3、构建三个通信通道,通信通道使用发布-订阅系统,在不同节点之间传递信息
// 之所以需要三个通道,是因为未来规划不同节点拥有不同的功能,不同功能的节点完成不同类型的任务。
// 三个通道的消息独立,只有订阅了该通道消息的节点,才能收到该通道的消息,然后进行处理,以完成相应的任务。
// 任何一个节点,均创建了三个通道实例,这意味着人一个节点都可以根据需要,选择任意一个通道发送消息
// 在订阅上,一个具体的节点, GeneralChannel 订阅将消息,如果是采矿节点(miner==true),miningChannel 会接收到消息,
// 如果是全节点(fullNode==true),fullNodesChannel会接受到消息
//GeneralChannel 通道订阅消息
generalChannel, _ := JoinChannel(ctx, pub, host.ID(), GeneralChannel, true)
subscribe := false
if miner {
subscribe = true
}
//如果是挖矿节点, miningChannel 订阅消息,否则 miningChannel 不订阅消息
miningChannel, _ := JoinChannel(ctx, pub, host.ID(), MiningChannel, subscribe)
subscribe = false
if fullNode {
subscribe = true
}
//如果是全节点, fullNodesChannel 订阅消息,否则 fullNodesChannel 不订阅消息
fullNodesChannel, _ := JoinChannel(ctx, pub, host.ID(), FullNodesChannel, subscribe)
// 3、为各通信通道建立命令行界面对象
ui := NewCLIUI(generalChannel, miningChannel, fullNodesChannel)
// 4、建立对等端(peer)发现机制(discovery),使得本节点可以被网络上的其它节点发现
//同时将主机(host)连接到所有已经发现的对等端(peer)
err = SetupDiscovery(ctx, host)
if err != nil {
panic(err)
}
network := &Network{
Host: host,
GeneralChannel: generalChannel,
MiningChannel: miningChannel,
FullNodesChannel: fullNodesChannel,
Blockchain: chain,
Blocks: make(chan *blockchain.Block, 200),
Transactions: make(chan *blockchain.Transaction, 200),
Miner: miner,
}
// 5、回调,将节点(network)实例传回
callback(network)
// 6、向全网请求区块信息,以补全本地区块链
// 每一个节点均有区块链的一个完整副本
err = RequestBlocks(network)
// 7、启用协程,处理节点事件
goHandleEvents(network)
// 8、如果是矿工节点,启用协程,不断发送ping命令给全节点
if miner {
// 矿工事件循环,以不断地发送一个ping给全节点,目的是得到新的交易,为它挖矿,并添加到区块链
go network.MinersEventLoop()
}
if err != nil {
panic(err)
}
// 9、运行UI界面,将在Run函数体中启动协程,循环接收并处理全网节点publish的消息
iferr = ui.Run(network); err != nil {
log.Error("运行文字UI发生错误: %s", err)
}
}
```
- 重要更新说明
- linechain发布
- linechain新版设计
- 引言一
- 引言二
- 引言三
- vs-code设置及开发环境设置
- BoltDB数据库应用
- 关于Go语言、VS-code的一些Tips
- 区块链的架构
- 网络通信与区块链
- 单元测试
- 比特币脚本语言
- 关于区块链的一些概念
- 区块链组件
- 区块链第一版:基本原型
- 区块链第二版:增加工作量证明
- 区块链第三版:持久化
- 区块链第四版:交易
- 区块链第五版:实现钱包
- 区块链第六版:实现UTXO集
- 区块链第七版:网络
- 阶段小结
- 区块链第八版:P2P
- P2P网络架构
- 区块链网络层
- P2P区块链最简体验
- libp2p建立P2P网络的关键概念
- 区块链结构层设计与实现
- 用户交互层设计与实现
- 网络层设计与实现
- 建立节点发现机制
- 向区块链网络请求区块信息
- 向区块链网络发布消息
- 运行区块链
- LineChain
- 系统运行流程
- Multihash
- 区块链网络的节点发现机制深入探讨
- DHT
- Bootstrap
- 连接到所有引导节点
- Advertise
- 搜索其它peers
- 连接到搜到的其它peers
- 区块链网络的消息订发布-订阅机制深入探讨
- LineChain:适用于智能合约编程的脚本语言支持
- LineChain:解决分叉问题
- LineChain:多重签名
- libp2p升级到v0.22版本
- 以太坊基础
- 重温以太坊的树结构
- 世界状态树
- (智能合约)账户存储树
- 交易树
- 交易收据树
- 小结
- 以太坊的存储结构
- 以太坊状态数据库
- MPT
- 以太坊POW共识算法
- 智能合约存储
- Polygon Edge
- block结构
- transaction数据结构
- 数据结构小结
- 关于本区块链的一些说明
- UML工具-PlantUML
- libp2p介绍
- JSON-RPC
- docker制作:启动多个应用系统
- Dockerfile
- docker-entrypoint.sh
- supervisord.conf
- docker run
- nginx.conf
- docker基础操作整理
- jupyter计算交互环境
- git技巧一
- git技巧二
- 使用github项目的最佳实践
- windows下package管理工具