# 前言
作为最简准备,旨在熟悉libp2p library的使用方法。一个节点的区块链每隔5秒广播本地的区块链,启动监听,并在对端节点连接到后,将本地的区块链数据以json字符串形式写入网络,对端节点读取后,解析为区块链,如果解析而来的区块链长度长于本地的区块链,则直接更新本地区块链,否则丢弃。
libp2p很特别,采用mutiaddress,我们可以自定义系统的网络地址形式(称为网络协议),比如,在本文中,我们设计为:/ip4/127.0.0.1/tcp/listenPort,这个地址用于服务器监听,其中listenPort为服务器监听端口;另外一个地址是:/ipfs/ID,其中ID为全网唯一,唯一标识本服务器,用于被其它节点发现,例如:/ipfs/QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N
实际改造我们的区块链时候,会有更细粒度的控制。
# 导入所需的包
其中大部分的包来自于`go-libp2p`:
~~~go
"github.com/davecgh/go-spew/spew"
golog "github.com/ipfs/go-log"
libp2p "github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
net "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
gologging "github.com/whyrusleeping/go-logging"
~~~
`spew`包是为了能够友好地打印区块链数据。
# 创建一个LibP2P节点主机
~~~go
// makeBasicHost 创建一个LibP2P主机
//randseed:一个随机数,提供随机数创建主机,程序会更健壮
//listenPort:监听端口
// secio:是否对数据流进行加密,推荐打开
func makeBasicHost(listenPort int, secio bool, randseed int64) (host.Host, error) {
// 如果seed为0,使用真实的密码随机源,
//否则,使用确定性随机性源,以使生成的密钥在多次运行中保持不变
var r io.Reader
if randseed == 0 {
r = rand.Reader
} else {
r = mrand.New(mrand.NewSource(randseed))
}
// 为主机产生一对钥匙. 我们将使用它 来获得一个合法的主机ID
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
if err != nil {
return nil, err
}
//选项
opts := []libp2p.Option{
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)),//监听地址和端口
libp2p.Identity(priv),
}
//默认是开启的
if !secio {
opts = append(opts, libp2p.NoSecurity)
}
//利用相关参数,创建主机basicHost,获得全网唯一的IPFS ID,唯一标识本服务器节点
basicHost, err := libp2p.New(context.Background(), opts...)
if err != nil {
return nil, err
}
// 建立主机的Multiaddr,libp2p使用一种独特的Mutliaddr,而非传统的IP+端口,用于节点之间互相发现
hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty()))//服务器的ipfs地址,用于被其它节点发现
// 现在,我们可以通过封装两个地址来构建一个可抵达主机的完整的Multiaddr
addr := basicHost.Addrs()[0]
fullAddr := addr.Encapsulate(hostAddr)
log.Printf("I am %s\n", fullAddr)
if secio {
log.Printf("现在运行命令: \"go run main.go -l %d -d %s -secio\" 在一个不同的终端\n", listenPort+1, fullAddr)
} else {
log.Printf("现在运行命令: \"go run main.go -l %d -d %s\" 在一个不同的终端\n", listenPort+1, fullAddr)
}
return basicHost, nil
}
~~~
# 流处理
我们需要让我们的主机处理传入的数据流。既要处理读取,也要处理写操作。
在流处理中,本地需要对Blockchain使用互斥锁进行读写保护,此外,将数据写入到网络上也需要使用互斥锁
~~~go
func handleStream(s net.Stream) {
log.Println("Got a new stream!")
// bufio为非阻塞的读和写操作创建一个读写缓冲流
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
go readData(rw)
go writeData(rw)
//流将保持打开直到你关闭它(或者其它方关闭它)
}
~~~
# 读取
~~~go
func readData(rw *bufio.ReadWriter) {
for {//无限循环,永不停歇地读取外面进来的数据
//我们使用`ReadString`解析从其它节点发送过来的新的区块链(JSON字符串)。
str, err := rw.ReadString('\n')//以'\n'为分隔符读取。返回的是字符串拷贝。
if err != nil {//只有一种情况会返回err:没有遇到分隔符。
log.Fatal(err)
}
if str == "" {//没有读取到任何数据
return
}
if str != "\n" {
chain := make([]Block, 0)//make只用于映射、切片和程道,不返回指针,这里创建一个[]Block类型的切片
if err := json.Unmarshal([]byte(str), &chain); err != nil {//json转为结构对象
log.Fatal(err)
}
mutex.Lock()//独占互斥锁,保护的是Blockchain
if len(chain) > len(Blockchain) {//如果流中解析的区块链长度大于本地区块链的长度,替换本地区块链为读取的区块链
Blockchain = chain
bytes, err := json.MarshalIndent(Blockchain, "", " ")//缩进一个空格,对象Blockchain转为json
if err != nil {
log.Fatal(err)
}
fmt.Printf("\x1b[32m%s\x1b[0m> ", string(bytes))
}
mutex.Unlock()
}
}
}
~~~
# 写
我们用一个Go例程启动函数,它每隔5秒广播我们的区块链的最新状态给我们的对等体。如果长度比他们的短,它们会接受并扔掉。如果更长,他们会接受的。无论是哪种方式,所有的对等体都在不断地通过网络的最新状态来更新他们的链链。
~~~go
func writeData(rw *bufio.ReadWriter) {
go func() {//每隔5秒广播我们的区块链的最新状态给我们的对等体
for {
time.Sleep(5 * time.Second)
mutex.Lock()//互斥锁,保护Blockchain
bytes, err := json.Marshal(Blockchain)//本地Blockchain转为json字符串
if err != nil {
log.Println(err)
}
mutex.Unlock()
mutex.Lock()//互斥锁,独占rw
rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
rw.Flush()//将缓存的数据真正写入到网络上
mutex.Unlock()
}
}()
stdReader := bufio.NewReader(os.Stdin)//从终端读取待发送到信息(心率数)
for {//无限循环,随时读取终端填写的心律数据,发送到网上
fmt.Print("> ")
sendData, err := stdReader.ReadString('\n')
if err != nil {
log.Fatal(err)
}
sendData = strings.Replace(sendData, "\n", "", -1)
bpm, err := strconv.Atoi(sendData)//将读取的字符串转为数字(心率为数字)
if err != nil {
log.Fatal(err)
}
newBlock := generateBlock(Blockchain[len(Blockchain)-1], bpm)//创建区块
if isBlockValid(newBlock, Blockchain[len(Blockchain)-1]) {
mutex.Lock()//凡是读或写Blockchain,均需要开启互斥锁
Blockchain = append(Blockchain, newBlock)//将新区块加入到区块链
mutex.Unlock()
}
mutex.Lock()
bytes, err := json.Marshal(Blockchain)//读取本地区块,转为json
if err != nil {
log.Println(err)
}
mutex.Unlock()
//使用spew.Dump 这个函数可以以非常美观和方便阅读的方式将 struct、slice 等数据打印在控制台里,方便我们调试
spew.Dump(Blockchain)
mutex.Lock()//互斥锁,独占rw
rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
rw.Flush()//将本地区块数据写入到网络
mutex.Unlock()
}
}
~~~
* `secio` 是否允许安全传输。我们会一直把这个开关打开的。
* `target` 指定想要连接的host地址,这里我们其实扮演的节点去连接其他host。
* `listenF`打开指定端口让其他节点连接,这里我们扮演的host。
我们用一个Go例程启动函数,它每隔5秒广播我们的区块链的最新状态给我们的对等体。如果长度比他们的短,他们会接受并扔掉。如果更长,他们会接受并更新本地的区块链。无论是哪种方式,所有的对等体都在不断地通过网络的最新状态来更新他们的链链。
我们进行一些字符串操作,以确保输入的BPM是一个整数,并且格式正确,可以添加为新块。我们通过我们的标准BangLink函数(见上面的“Blockchain stuff”部分)。然后,我们`Marshal`它,使它看起来漂亮,打印到我们的控制台,用`spew.Dump`验证。然后我们用`rw.WriteString`将它广播到我们的连接的对等体。
创建了我们的处理程序和读写逻辑来处理输入和输出的块链。通过这些函数,我们已经为每个对等点创建了一种方法,以连续地相互检查其块链的状态,并且在同一时间,它们都被更新到最新状态(最长的有效块链)。
# main
~~~go
t := time.Now()
genesisBlock := Block{}
genesisBlock = Block{0, t.String(), 0, calculateHash(genesisBlock), ""}
Blockchain = append(Blockchain, genesisBlock)
// LibP2P 使用 golog记录消息日志. 我们可以控制日志的详细程度
golog.SetAllLoggers(gologging.INFO) // 变更为 DEBUG 可以获得额外的信息
// 从命令行解析出选项
listenF := flag.Int("l", 0, "等待到来的连接")
target := flag.String("d", "", "连接的目标节点")
secio := flag.Bool("secio", false, "启用 secio")
seed := flag.Int64("seed", 0, "设定产生ID的随机种子")
flag.Parse()
if *listenF == 0 {
log.Fatal("请提供绑定的端口 -l")
}
// 创建一个主机
ha, err := makeBasicHost(*listenF, *secio, *seed)
if err != nil {
log.Fatal(err)
}
//target为我们指定要连接的另一主机的地址,这意味着如果使用此标志,我们将充当主机的对等方
if *target == "" {//只充当主机
log.Println("listening for connections")
// 将流处理器设定在主机: A。 /p2p/1.0.0 是一个用户自定义的协议
ha.SetStreamHandler("/p2p/1.0.0", handleStream)
select {} // 一直挂起
} else {//作为主机的对等方,连接到target主机
ha.SetStreamHandler("/p2p/1.0.0", handleStream)//对等端仍然要开启监听,接受其它节点的连接
//**下面的代码,是作为对等端B主机所做的工作:连接到A主机节点,执行读和写,实现两个节点之间的网络通信**
// 下面的代码从目标节点的mutiaddress中展开获得节点的ID
ipfsaddr, err := ma.NewMultiaddr(*target)
if err != nil {
log.Fatalln(err)
}
pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
if err != nil {
log.Fatalln(err)
}
peerid, err := peer.IDB58Decode(pid)//将58位的string转化为id
if err != nil {
log.Fatalln(err)
}
// 从目标主机解封装 /ipfs/<peerID>
// /ip4/<a.b.c.d>/ipfs/<peer> 变成 /ip4/<a.b.c.d>
targetPeerAddr, _ := ma.NewMultiaddr(
fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))
targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)
// 我们有了一个节点ID和targetAddr,将它添加到peerstore
// 这样LibP2就知道如何联系到它
ha.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)
log.Println("正在打开B到A的网络流...")
// 创建一个新的从主机B到A的流
// 它应当被主机A通过我们上面设定的处理器进行处理
// 因为我们使用相同的 /p2p/1.0.0 协议
s, err := ha.NewStream(context.Background(), peerid, "/p2p/1.0.0")
if err != nil {
log.Fatalln(err)
}
// 创建一个新的缓冲流,这样读和写将不会阻塞
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
// 创建一个线程读和写数据
go writeData(rw)
go readData(rw)
select {} // 永远挂起
}
~~~
我们设置所有的命令行标志。
* `secio` 我们以前覆盖并允许安全流。我们将确保在运行程序时始终使用这个标志。
* `target` 让我们指定要连接的另一主机的地址,这意味着如果使用此标志,我们将充当主机的对等方。
* `listenF`打开了我们希望允许连接的端口,这意味着我们作为主机。我们既可以是主机(接收连接),也可以是对等体(连接到其他主机)。这就是这个系统真正成为P2P的原因!
* `seed` 是可选的随机播种器,用来构造我们的地址,其他节点可以用来连接我们。
然后,我们创建了一个新的主机,我们之前创建了`makeBasicHost`函数。如果我们只充当主机(即,我们没有连接到其他主机),我们指定如果`*target==“”`,则使用我们之前创建的`setStreamHandle`函数启动处理程序,这是我们的监听器代码的结束。
如果我们确实想要连接到另一个主机,我们移动到`else`部分。我们再次设置我们的处理程序,因为我们作为一个主机和一个连接的对等体。
接下来的几行解构了我们提供给目标的字符串,这样我们就可以找到我们想要连接的主机。这也被称为解封装。
我们最终得到要连接的主机的`peerID`和目标地址`targetAddr`,并将该记录添加到“存储”中,以便跟踪我们与谁连接。
然后,我们使用`ha.NewStream`创建想要连接到的对等体连接。我们希望能够接收和发送数据流(我们的区块链),因此就像我们在处理程序中做的那样,我们创建一个`ReadWriter`,并为`readData`和`writeData`创建单独的Go例程。最后我们通过空的`select`来阻塞程序,这样程序不会停止。
- 重要更新说明
- linechain发布
- linechain新版设计
- 引言一
- 引言二
- 引言三
- vs-code设置及开发环境设置
- BoltDB数据库应用
- 关于Go语言、VS-code的一些Tips
- 区块链的架构
- 网络通信与区块链
- 单元测试
- 比特币脚本语言
- 关于区块链的一些概念
- 区块链组件
- 区块链第一版:基本原型
- 区块链第二版:增加工作量证明
- 区块链第三版:持久化
- 区块链第四版:交易
- 区块链第五版:实现钱包
- 区块链第六版:实现UTXO集
- 区块链第七版:网络
- 阶段小结
- 区块链第八版:P2P
- P2P网络架构
- 区块链网络层
- P2P区块链最简体验
- libp2p建立P2P网络的关键概念
- 区块链结构层设计与实现
- 用户交互层设计与实现
- 网络层设计与实现
- 建立节点发现机制
- 向区块链网络请求区块信息
- 向区块链网络发布消息
- 运行区块链
- LineChain
- 系统运行流程
- Multihash
- 区块链网络的节点发现机制深入探讨
- DHT
- Bootstrap
- 连接到所有引导节点
- Advertise
- 搜索其它peers
- 连接到搜到的其它peers
- 区块链网络的消息订发布-订阅机制深入探讨
- LineChain:适用于智能合约编程的脚本语言支持
- LineChain:解决分叉问题
- LineChain:多重签名
- libp2p升级到v0.22版本
- 以太坊基础
- 重温以太坊的树结构
- 世界状态树
- (智能合约)账户存储树
- 交易树
- 交易收据树
- 小结
- 以太坊的存储结构
- 以太坊状态数据库
- MPT
- 以太坊POW共识算法
- 智能合约存储
- Polygon Edge
- block结构
- transaction数据结构
- 数据结构小结
- 关于本区块链的一些说明
- UML工具-PlantUML
- libp2p介绍
- JSON-RPC
- docker制作:启动多个应用系统
- Dockerfile
- docker-entrypoint.sh
- supervisord.conf
- docker run
- nginx.conf
- docker基础操作整理
- jupyter计算交互环境
- git技巧一
- git技巧二
- 使用github项目的最佳实践
- windows下package管理工具