多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
DHT全称叫分布式哈希表(Distributed Hash Table),是一种分布式存储表,是分布式计算系统中的一类,用来将一个关键值(key)的集合分散到所有在分布式系统中的节点,并且可以有效地将消息转送到唯一一个拥有查询者提供的关键值的节点(Peers)。这里的节点类似散列表中的存储位置。分布式散列表通常是为了拥有极大节点数量的系统,而且在系统的节点常常会加入或离开(例如网络断线)而设计的。在一个结构性的覆盖网络(overlay network)中,参加的节点需要与系统中一小部分的节点沟通,这也需要使用分布式散列表。分布式散列表可以用以创建更复杂的服务,例如分布式文件系统、点对点技术文件分享系统、合作的网页缓存、多播、任播、域名系统以及即时通信等。 研究分布式散列表的主要动机是为了开发点对点系统。这些系统使用不同的方法来解决如何**找到拥有某资料的节点**的问题。任一个节点只需要与系统中的部分节点沟通。一般来说,若系统有n个节点,那么只有log n个节点是必须的 ![](https://img.kancloud.cn/64/27/6427ffb5c5a9907cdf3323cb32ee35d0_2880x1207.png) Kademlia算法是一种分布式存储及路由的算法,IpfsDHT是Kademlia算法的一个实现。 我们不仅仅是创建一个新的DHT,而且要求每一个端维护它自己的本地DHT副本,这样以来,DHT的引导节点可以关闭,不会影响后续的对等端发现。 ``` kademliaDHT, err := dht.New(ctx, host) ``` 其中上下文ctx来自主协程(上下文context是golang用于不同协程之间传递信息的机制,是golang的chan的一种实现): ``` ctx, cancel := context.WithCancel(context.Background()) ``` 将主协程上下文作为参数传入,构建kademliaDHT对象。 dht包是p2plib库中的一个package,包含了ipfs的DHT的实现细节。 通过New函数(不是方法),构建一个kademliaDHT对象。 New 使用指定的主机和选项创建一个新的 DHT。 请注意,连接到 DHT 对等点并不一定意味着它也在 DHT 路由表中。 如果路由表有超过 "minRTRefreshThreshold" 的对等点,我们仅在以下情况下将对等点视为路由表候选者: 路由表成功地从它那里得到一个查询响应,或者它向路由表发送了一个查询。 kademliaDHT对象有个ID,其值被赋值为host的ID。(peer.ID是在节点主机创建过程中的用节点的公钥构建的,这是一个Mutihash值)。 ## **Kademlia** Kademlia是一种通过分布式哈希表实现的协议算法,它是由Petar Maymounkov与David Mazières为非集中式P2P计算机网络而设计的。 Kademlia基于两个节点之间的距离计算,该距离是两个网络节点ID号的异或( XOR distance ),计算的结果最终作为整型数值返回。关键字和节点ID有同样的格式和长度,因此,可以使用同样的方法计算关键字和节点ID之间的距离。节点ID一般是一个大的随机数(或字符串),选择该数值的时候所追求的一个目标就是它的唯一性(希望在整个网络中该节点ID是唯一的)。异或距离跟实际上的地理位置没有任何关系,只与ID相关。因此很可能来自德国和澳大利亚的节点由于选择了相似的随机ID而成为邻居。选择异或是因为通过它计算的距离享有几何距离公式的一些特征,尤其体现在以下几点:节点和它本身之间的异或距离是0;异或距离是对称的:即从A到B的异或距离与从B到A的异或距离是等同的;异或距离符合三角不等式:三个顶点A B C,AC异或距离小于或等于AB异或距离和BC异或距离之和。由于以上的这些属性,在实际的节点距离的度量过程中计算量将大大降低。Kademlia搜索的每一次迭代将距目标至少更近1 bit。一个基本的具有2的n次方个节点的Kademlia网络在最坏的情况下只需花n步就可找到被搜索的节点或值。 **Kademlia**路由表由多个列表组成,每个列表对应节点ID的一位(例如:假如节点ID共有128位,则节点的路由表将包含128个列表),包含多个条目,条目中包含定位其他节点所必要的一些数据。列表条目中的这些数据通常是由其他节点的IP地址,端口和节点ID组成。每个列表对应于与节点相距特定范围距离的一些节点,节点的第n个列表中所找到的节点的第n位与该节点的第n位肯定不同,而前n-1位相同,这就意味着很容易使用网络中远离该节点的一半节点来填充第一个列表(第一位不同的节点最多有一半),而用网络中四分之一的节点来填充第二个列表(比第一个列表中的那些节点离该节点更近一位),依次类推。如果ID有128个二进制位,则网络中的每个节点按照不同的异或距离把其他所有的节点分成了128类,ID的每一位对应于其中的一类。随着网络中的节点被某节点发现,它们被逐步加入到该节点的相应的列表中,这个过程中包括向节点列表中存信息和从节点列表中取信息的操作,甚至还包括当时协助其他节点寻找相应键对应值的操作。这个过程中发现的所有节点都将被加入到节点的列表之中,因此节点对整个网络的感知是动态的,这使得网络一直保持着频繁地更新,增强了抵御错误和攻击的能力。 在**Kademlia**相关的论文中,列表也称为K桶,其中K是一个系统变量,如20,每一个K桶是一个最多包含K个条目的列表,也就是说,网络中所有节点的一个列表(对应于某一位,与该节点相距一个特定的距离)最多包含20个节点。随着对应的bit位变低(即对应的异或距离越来越短),K桶包含的可能节点数迅速下降(这是由于K桶对应的异或距离越近,节点数越少),因此,对应于更低bit位的K桶显然包含网络中所有相关部分的节点。由于网络中节点的实际数量远远小于可能ID号的数量,所以对应那些短距离的某些K桶可能一直是空的(如果异或距离只有1,可能的数量就最大只能为1,这个异或距离为1的节点如果没有发现,则对应于异或距离为1的K桶则是空的)。 ## IpfsDHT 我们看看IpfsDHT的结构: ``` // IpfsDHT is an implementation of Kademlia with S/Kademlia modifications. // It is used to implement the base Routing module. type IpfsDHT struct { host host.Host // 节点主机,提供网络服务 self peer.ID //本地peer(自己,即host的peer ID) selfKey kb.ID //根据peer ID计算的hash peerstore peerstore.Peerstore // Peer 注册表 datastore ds.Datastore // 本地数据,host的地址和密钥存储 库 routingTable *kb.RoutingTable // 不同距离节点的路由表 // provider Store 存储和管理此 Dht peer的提供者记录。 providerStore providers.ProviderStore // 管理路由表刷新 rtRefreshManager *rtrefresh.RtRefreshManager birth time.Time // 本peer启动的时间 Validator record.Validator //验证器,用于验证节点是否合法 ctx context.Context //创建它的协程的上下文 proc goprocess.Process //进程管理,详细见后面分析 protoMessenger *pb.ProtocolMessenger//协议消息发送者,用于将DHT消息传给peers并处理来自它们的响应,详细见后面分析 msgSender pb.MessageSender //消息发送接口实现者 plk sync.Mutex 填充或修改DHT表互斥锁,将当前已连接的节点加入DHT表,或者断开连接更新DHT表,均为互斥操作 stripedPutLocks [256]sync.Mutex //用于将一个值存储到本地peer的库中 // 我们用它查询DHT protocols. 我们只把那些讲这些协议的peers加入到路由表中 // 缺省情况下,protocols=[]protocol.ID{v1proto},其中v1proto="/ipfs//kad/1.0.0" protocols []protocol.ID protocolsStrs []string //我们可以响应的DHT protocols. //一般情况下,serverProtocols=protocols serverProtocols []protocol.ID //DHT内部操作的模式,详细见后面的分析 auto ModeOpt mode mode //与auto值相同 modeLk sync.Mutex //mode读取或设置互斥锁 bucketSize int //K桶大小,这是Kademlia列表的一种称呼,默认为20 alpha int // 每个path的并发参数 beta int // 最接近目标的peers数量,这些peers必须响应一个query path才能终止 queryPeerFilter QueryFilterFunc //查询peers的过滤器 routingTablePeerFilter RouteTableFilterFunc //节点路由协议过滤器,用于检查一个节点是否符合指定的路由表协议 rtPeerDiversityFilter peerdiversity.PeerIPGroupFilter //节点分组过滤器,一般而言,除非节点在白名单中,否则只有一个群组内的节点才能能通讯 autoRefresh bool //是否自动刷新,一般默认为自动刷新DHT路由表 //如果所有其他修复路由表的尝试都失败(或者,例如,这是该节点第一次连接到网络),则返回一组引导peers的函数。 bootstrapPeers func() []peer.AddrInfo maxRecordAge time.Duration //节点key在本地库中保存最长时间,超过该时间将删除,默认36小时 //允许禁用 dht 子系统。 这些只应该设置在“分叉”的 DHT 上(例如,具有自定义协议和/或专用网络的 DHT)。 enableProviders, enableValues bool disableFixLowPeers bool// 是否修正最低节点数量。如果节点数量低于临界值(默认值为10个),将尝试获取更多的节点加入到DHT表中 fixLowPeersChan chan struct{} addPeerToRTChan chan addPeerRTReq//添加peer到RT(路由表) refreshFinishedCh chan struct{} //刷新结束通知 rtFreezeTimeout time.Duration //冻结时间,默认1分钟 // 测试配置 testAddressUpdateProcessing bool } ``` ### 1 、proc goprocess.Process proc goprocess.Process:进程,仿照 UNIX 进程组思想,并且重 由 sync.WaitGroup 和 go.net/context.Context 通知。 进程(Process)是goprocess中的基本工作单元。 它定义了一个具有生命周期的计算: - 运行(在调用 Close 之前), - 关闭(在调用 Close 至少一次之后), - 关闭(在 Close 返回之后,并且所有teardown都完成了)。 goprocess 引入了一种在 go 中管理进程生命周期的方法。 它很像 go.net/context(它实际上使用了一个 Context),但它更像是一个 Context-WaitGroup 混合体。 goprocess 是关于能够启动和停止工作单元,这些工作单元可能会收到来自许多客户端的关闭信号。 把它想象成一个 UNIX 进程树,但里面是 go。 goprocess 力求对您的对象产生最小的影响,因此您可以将它与嵌入或组合一起使用。 goprocess 的核心是 Process 接口。 见开源项目:https://github.com/jbenet/goprocess ### 2、protoMessenger *pb.ProtocolMessenger ``` type ProtocolMessenger struct {     m MessageSender } ``` ProtocolMessenger 可用于向peers发送 DHT 消息并处理它们的响应。 这将有线协议格式(wire protocol)与 DHT 协议实现、有线协议格式与 routing.Routing 接口的实现进行分离。这里是指发送DHT协议格式的消息。 注意:ProtocolMessenger 的 MessageSender 仍然需要处理一些有线协议(wire protocol)细节,例如使用 varint-delineated protobufs。 wire protocol: * 是一种传输数据的机制; * 不要从字面上将其理解为物理层上的协议; * 因为代码层面的数据(链表、队列、二叉树)都是结构化的,但网络层看到的都是二进制流,所以把结构化的数据序列化为二进制流发送出去,并且对方也能以同样的格式反序列化出来,这就是wire protocol。 英文原文解释: In a network, a wire protocol is the mechanism for transmitting data from point a to point b. The term is a bit confusing, because it sounds like layer 1 of the network, which physically places the bits "onto the wire." In some cases, it may refer to layer 1; however, it generally refers to higher layers, including Ethernet and ATM (layer 2) and even higher layer distributed object protocols such as SOAP, CORBA or RMI. See OSI model, communications protocol, data link protocol and distributed objects. ### 3、pb.MessageSender MessageSender为接口对象, 处理发送有线协议消息到指定的peer: ``` type MessageSender interface { // SendRequest sends a peer a message and waits for its response SendRequest(ctx context.Context, p peer.ID, pmes *Message) (*Message, error) // SendMessage sends a peer a message without waiting on a response SendMessage(ctx context.Context, p peer.ID, pmes *Message) error } ``` 其中,SendRequest为同步函数,SendMessage为异步函数。 从实际构建IpfsDHT对象来看,我们先创建MessageSender,然后将其作为参数创建ProtocolMessenger。 ### 4、auto ModeOpt const ( // ModeAuto utilizes EvtLocalReachabilityChanged events sent over the event bus to dynamically switch the DHT // between Client and Server modes based on network conditions ModeAuto ModeOpt = iota // ModeClient operates the DHT as a client only, it cannot respond to incoming queries ModeClient // ModeServer operates the DHT as a server, it can both send and respond to queries ModeServer // ModeAutoServer operates in the same way as ModeAuto, but acts as a server when reachability is unknown ModeAutoServer ) 在默认情况下,其值为0,但实际上一个DHT要么是ModeClient,要么是ModeServer,一般不设为ModeAuto。 在我们的区块链中,我们使用dht.New(ctx, host)来构建节点,这种情况下使用默认的config,mode会被设为ModeClient,意味着我们不响应来自其它节点的DHT请求,只发送DHT消息给网络。