企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 前言 作为最简准备,旨在熟悉libp2p library的使用方法。一个节点的区块链每隔5秒广播本地的区块链,启动监听,并在对端节点连接到后,将本地的区块链数据以json字符串形式写入网络,对端节点读取后,解析为区块链,如果解析而来的区块链长度长于本地的区块链,则直接更新本地区块链,否则丢弃。 libp2p很特别,采用mutiaddress,我们可以自定义系统的网络地址形式(称为网络协议),比如,在本文中,我们设计为:/ip4/127.0.0.1/tcp/listenPort,这个地址用于服务器监听,其中listenPort为服务器监听端口;另外一个地址是:/ipfs/ID,其中ID为全网唯一,唯一标识本服务器,用于被其它节点发现,例如:/ipfs/QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N 实际改造我们的区块链时候,会有更细粒度的控制。 # 导入所需的包 其中大部分的包来自于`go-libp2p`: ~~~go "github.com/davecgh/go-spew/spew" golog "github.com/ipfs/go-log" libp2p "github.com/libp2p/go-libp2p" crypto "github.com/libp2p/go-libp2p-crypto" host "github.com/libp2p/go-libp2p-host" net "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" ma "github.com/multiformats/go-multiaddr" gologging "github.com/whyrusleeping/go-logging" ~~~ `spew`包是为了能够友好地打印区块链数据。 # 创建一个LibP2P节点主机 ~~~go // makeBasicHost 创建一个LibP2P主机 //randseed:一个随机数,提供随机数创建主机,程序会更健壮 //listenPort:监听端口 // secio:是否对数据流进行加密,推荐打开 func makeBasicHost(listenPort int, secio bool, randseed int64) (host.Host, error) { // 如果seed为0,使用真实的密码随机源, //否则,使用确定性随机性源,以使生成的密钥在多次运行中保持不变 var r io.Reader if randseed == 0 { r = rand.Reader } else { r = mrand.New(mrand.NewSource(randseed)) } // 为主机产生一对钥匙. 我们将使用它 来获得一个合法的主机ID priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r) if err != nil { return nil, err } //选项 opts := []libp2p.Option{ libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)),//监听地址和端口 libp2p.Identity(priv), } //默认是开启的 if !secio { opts = append(opts, libp2p.NoSecurity) } //利用相关参数,创建主机basicHost,获得全网唯一的IPFS ID,唯一标识本服务器节点 basicHost, err := libp2p.New(context.Background(), opts...) if err != nil { return nil, err } // 建立主机的Multiaddr,libp2p使用一种独特的Mutliaddr,而非传统的IP+端口,用于节点之间互相发现 hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty()))//服务器的ipfs地址,用于被其它节点发现 // 现在,我们可以通过封装两个地址来构建一个可抵达主机的完整的Multiaddr addr := basicHost.Addrs()[0] fullAddr := addr.Encapsulate(hostAddr) log.Printf("I am %s\n", fullAddr) if secio { log.Printf("现在运行命令: \"go run main.go -l %d -d %s -secio\" 在一个不同的终端\n", listenPort+1, fullAddr) } else { log.Printf("现在运行命令: \"go run main.go -l %d -d %s\" 在一个不同的终端\n", listenPort+1, fullAddr) } return basicHost, nil } ~~~ # 流处理 我们需要让我们的主机处理传入的数据流。既要处理读取,也要处理写操作。 在流处理中,本地需要对Blockchain使用互斥锁进行读写保护,此外,将数据写入到网络上也需要使用互斥锁 ~~~go func handleStream(s net.Stream) { log.Println("Got a new stream!") // bufio为非阻塞的读和写操作创建一个读写缓冲流 rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s)) go readData(rw) go writeData(rw) //流将保持打开直到你关闭它(或者其它方关闭它) } ~~~ # 读取 ~~~go func readData(rw *bufio.ReadWriter) { for {//无限循环,永不停歇地读取外面进来的数据 //我们使用`ReadString`解析从其它节点发送过来的新的区块链(JSON字符串)。 str, err := rw.ReadString('\n')//以'\n'为分隔符读取。返回的是字符串拷贝。 if err != nil {//只有一种情况会返回err:没有遇到分隔符。 log.Fatal(err) } if str == "" {//没有读取到任何数据 return } if str != "\n" { chain := make([]Block, 0)//make只用于映射、切片和程道,不返回指针,这里创建一个[]Block类型的切片 if err := json.Unmarshal([]byte(str), &chain); err != nil {//json转为结构对象 log.Fatal(err) } mutex.Lock()//独占互斥锁,保护的是Blockchain if len(chain) > len(Blockchain) {//如果流中解析的区块链长度大于本地区块链的长度,替换本地区块链为读取的区块链 Blockchain = chain bytes, err := json.MarshalIndent(Blockchain, "", " ")//缩进一个空格,对象Blockchain转为json if err != nil { log.Fatal(err) } fmt.Printf("\x1b[32m%s\x1b[0m> ", string(bytes)) } mutex.Unlock() } } } ~~~ # 写 我们用一个Go例程启动函数,它每隔5秒广播我们的区块链的最新状态给我们的对等体。如果长度比他们的短,它们会接受并扔掉。如果更长,他们会接受的。无论是哪种方式,所有的对等体都在不断地通过网络的最新状态来更新他们的链链。 ~~~go func writeData(rw *bufio.ReadWriter) { go func() {//每隔5秒广播我们的区块链的最新状态给我们的对等体 for { time.Sleep(5 * time.Second) mutex.Lock()//互斥锁,保护Blockchain bytes, err := json.Marshal(Blockchain)//本地Blockchain转为json字符串 if err != nil { log.Println(err) } mutex.Unlock() mutex.Lock()//互斥锁,独占rw rw.WriteString(fmt.Sprintf("%s\n", string(bytes))) rw.Flush()//将缓存的数据真正写入到网络上 mutex.Unlock() } }() stdReader := bufio.NewReader(os.Stdin)//从终端读取待发送到信息(心率数) for {//无限循环,随时读取终端填写的心律数据,发送到网上 fmt.Print("> ") sendData, err := stdReader.ReadString('\n') if err != nil { log.Fatal(err) } sendData = strings.Replace(sendData, "\n", "", -1) bpm, err := strconv.Atoi(sendData)//将读取的字符串转为数字(心率为数字) if err != nil { log.Fatal(err) } newBlock := generateBlock(Blockchain[len(Blockchain)-1], bpm)//创建区块 if isBlockValid(newBlock, Blockchain[len(Blockchain)-1]) { mutex.Lock()//凡是读或写Blockchain,均需要开启互斥锁 Blockchain = append(Blockchain, newBlock)//将新区块加入到区块链 mutex.Unlock() } mutex.Lock() bytes, err := json.Marshal(Blockchain)//读取本地区块,转为json if err != nil { log.Println(err) } mutex.Unlock() //使用spew.Dump 这个函数可以以非常美观和方便阅读的方式将 struct、slice 等数据打印在控制台里,方便我们调试 spew.Dump(Blockchain) mutex.Lock()//互斥锁,独占rw rw.WriteString(fmt.Sprintf("%s\n", string(bytes))) rw.Flush()//将本地区块数据写入到网络 mutex.Unlock() } } ~~~ * `secio` 是否允许安全传输。我们会一直把这个开关打开的。 * `target` 指定想要连接的host地址,这里我们其实扮演的节点去连接其他host。 * `listenF`打开指定端口让其他节点连接,这里我们扮演的host。 我们用一个Go例程启动函数,它每隔5秒广播我们的区块链的最新状态给我们的对等体。如果长度比他们的短,他们会接受并扔掉。如果更长,他们会接受并更新本地的区块链。无论是哪种方式,所有的对等体都在不断地通过网络的最新状态来更新他们的链链。 我们进行一些字符串操作,以确保输入的BPM是一个整数,并且格式正确,可以添加为新块。我们通过我们的标准BangLink函数(见上面的“Blockchain stuff”部分)。然后,我们`Marshal`它,使它看起来漂亮,打印到我们的控制台,用`spew.Dump`验证。然后我们用`rw.WriteString`将它广播到我们的连接的对等体。 创建了我们的处理程序和读写逻辑来处理输入和输出的块链。通过这些函数,我们已经为每个对等点创建了一种方法,以连续地相互检查其块链的状态,并且在同一时间,它们都被更新到最新状态(最长的有效块链)。 # main ~~~go t := time.Now() genesisBlock := Block{} genesisBlock = Block{0, t.String(), 0, calculateHash(genesisBlock), ""} Blockchain = append(Blockchain, genesisBlock) // LibP2P 使用 golog记录消息日志. 我们可以控制日志的详细程度 golog.SetAllLoggers(gologging.INFO) // 变更为 DEBUG 可以获得额外的信息 // 从命令行解析出选项 listenF := flag.Int("l", 0, "等待到来的连接") target := flag.String("d", "", "连接的目标节点") secio := flag.Bool("secio", false, "启用 secio") seed := flag.Int64("seed", 0, "设定产生ID的随机种子") flag.Parse() if *listenF == 0 { log.Fatal("请提供绑定的端口 -l") } // 创建一个主机 ha, err := makeBasicHost(*listenF, *secio, *seed) if err != nil { log.Fatal(err) } //target为我们指定要连接的另一主机的地址,这意味着如果使用此标志,我们将充当主机的对等方 if *target == "" {//只充当主机 log.Println("listening for connections") // 将流处理器设定在主机: A。 /p2p/1.0.0 是一个用户自定义的协议 ha.SetStreamHandler("/p2p/1.0.0", handleStream) select {} // 一直挂起 } else {//作为主机的对等方,连接到target主机 ha.SetStreamHandler("/p2p/1.0.0", handleStream)//对等端仍然要开启监听,接受其它节点的连接 //**下面的代码,是作为对等端B主机所做的工作:连接到A主机节点,执行读和写,实现两个节点之间的网络通信** // 下面的代码从目标节点的mutiaddress中展开获得节点的ID ipfsaddr, err := ma.NewMultiaddr(*target) if err != nil { log.Fatalln(err) } pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS) if err != nil { log.Fatalln(err) } peerid, err := peer.IDB58Decode(pid)//将58位的string转化为id if err != nil { log.Fatalln(err) } // 从目标主机解封装 /ipfs/<peerID> // /ip4/<a.b.c.d>/ipfs/<peer> 变成 /ip4/<a.b.c.d> targetPeerAddr, _ := ma.NewMultiaddr( fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid))) targetAddr := ipfsaddr.Decapsulate(targetPeerAddr) // 我们有了一个节点ID和targetAddr,将它添加到peerstore // 这样LibP2就知道如何联系到它 ha.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL) log.Println("正在打开B到A的网络流...") // 创建一个新的从主机B到A的流 // 它应当被主机A通过我们上面设定的处理器进行处理 // 因为我们使用相同的 /p2p/1.0.0 协议 s, err := ha.NewStream(context.Background(), peerid, "/p2p/1.0.0") if err != nil { log.Fatalln(err) } // 创建一个新的缓冲流,这样读和写将不会阻塞 rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s)) // 创建一个线程读和写数据 go writeData(rw) go readData(rw) select {} // 永远挂起 } ~~~ 我们设置所有的命令行标志。 * `secio` 我们以前覆盖并允许安全流。我们将确保在运行程序时始终使用这个标志。 * `target` 让我们指定要连接的另一主机的地址,这意味着如果使用此标志,我们将充当主机的对等方。 * `listenF`打开了我们希望允许连接的端口,这意味着我们作为主机。我们既可以是主机(接收连接),也可以是对等体(连接到其他主机)。这就是这个系统真正成为P2P的原因! * `seed` 是可选的随机播种器,用来构造我们的地址,其他节点可以用来连接我们。 然后,我们创建了一个新的主机,我们之前创建了`makeBasicHost`函数。如果我们只充当主机(即,我们没有连接到其他主机),我们指定如果`*target==“”`,则使用我们之前创建的`setStreamHandle`函数启动处理程序,这是我们的监听器代码的结束。 如果我们确实想要连接到另一个主机,我们移动到`else`部分。我们再次设置我们的处理程序,因为我们作为一个主机和一个连接的对等体。 接下来的几行解构了我们提供给目标的字符串,这样我们就可以找到我们想要连接的主机。这也被称为解封装。 我们最终得到要连接的主机的`peerID`和目标地址`targetAddr`,并将该记录添加到“存储”中,以便跟踪我们与谁连接。 然后,我们使用`ha.NewStream`创建想要连接到的对等体连接。我们希望能够接收和发送数据流(我们的区块链),因此就像我们在处理程序中做的那样,我们创建一个`ReadWriter`,并为`readData`和`writeData`创建单独的Go例程。最后我们通过空的`select`来阻塞程序,这样程序不会停止。