企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
Listen 和 Notify 是PG很有意思的一个功能,可以用来进行多应用间的通信。它们可以在SQL中使用,也可以用C、JDBC里面的API调用。下面介绍一下其使用方法和内核实现。 ## 使用方法 用一个简单的例子,来看一下Listen/Notify如何使用。假设我们有两个应用A和B,部署在不同的机器上:A机器处理前端用户请求,同时需要将一些可以异步执行的任务,分配给后台服务器B。B接收到任务并处理完成后反馈给A结果。 ~~~ --分别在A和B两台机器上初始化PG连接,它们互相监听对方消息,A负责派发任务给B --机器A初始化与PG的连接 session A: listen workA; commit; --机器B初始化与PG的连接 session B: listen workerB; commit; --A派发任务给B session A: begin; notify workerB, 'do job 1001'; commit; --B接受消息 session B: begin; Asynchronous notification "worker1" with payload "1001" received from server process with PID 29826. commit; --B解析消息(可用脚本或应用实现),然后完成任务,发送反馈给A session B: .... begin; notify workA 'job 1001 done'; commit; ~~~ 利用上面的步骤,A和B两个机器通过PG完成了通信。在上面的过程中,需要注意的是: 1. B要想接受到消息,必须在A Notify之前运行了Listen命令; 2. A需要使用事务commit操作来触发消息发送; 3. 消息是异步发送到B的,即无论B的状态如何,消息都会先到达PG的消息队列(每个PG实例只有一个唯一的存放所有消息的队列);B要查看消息,如果使用的是psql客户端,则需要先发送带有事务操作的命令(如begin、commit或rollback)给PG; 4. A 如果连续发送多个消息,B会一次性收到这些消息; 5. 在C代码里面,你可以使用如下的调用来获取所有已到达的消息,如果没有消息到达,则进入睡眠状态。 ~~~ while (1) { sock = PQsocket(conn); /* Monitor socket. Sleep if there is no message */ select(sock + 1, &input_mask, NULL, NULL, NULL) /* Now check for input */ PQconsumeInput(conn); /* Loop until all notifications currently received have been handled */ while ((notify = PQnotifies(conn)) != NULL) { /* Received some message, print it out */ fprintf(stderr, "ASYNC NOTIFY of '%s' received from backend PID %d\n", notify->relname, notify->be_pid); PQfreemem(notify); } } ~~~ ## 内核实现 Listen/Notify的实现其实比较简单。主要的数据结构是一个消息队列(`asyncQueueControl->tail`和`asyncQueueControl->head`分别指向队列尾和队列头)和一个进程状态数组(`asyncQueueControl->backend`),如下图所示: ![消息通知机制](https://box.kancloud.cn/2015-09-24_5603982365c2e.png "消息通知机制") 消息队列里面存放了所有进程的所有通知消息,而状态数组存放了所有执行了Listen命令、准备接收异步消息的进程的状态信息。状态数组中含有每个进程已经读取到的消息在队列里面的位置指针。如果有了新消息,进程就从此指针往后取,直到读取全部消息。 当一个连接的后台进程接收到Listen命令时,先将Listen的信息记录下来,然后在事务提交时,执行Listen操作,即把本进程放入状态数组(参见`Exec_ListenPreCommit`函数)。 执行Notify命令时,Async_Notify函数负责把通知放入pendingNotifies链表。在事务Commit操作前后,执行下面的逻辑: 1. 调用PreCommit_Notify函数,将pendingNotifies链表中的消息,放入全局消息队列; 2. 执行Commit操作; 3. 利用调用链`ProcessCompletedNotifies->SignalBackends->SendProcSignal->kill`,向其他所有状态数组中的进程,发出通知信号。 另一方面,每个进程在接收到信号后,利用函数`HandleNotifyInterrupt`处理信号。如果当前进程处于事务中,则不立即处理消息,等到事务提交完毕,调用`prepare_for_client_read`读取下一个用户命令时,利用`ProcessIncomingNotify`处理消息;否则,立即调用`ProcessIncomingNotify`处理消息。`ProcessIncomingNotify`最终调用`NotifyMyFrontEnd`发送消息到客户端: ~~~ ProcessIncomingNotify --> asyncQueueReadAllNotifications() --> NotifyMyFrontEnd ~~~ 注意,客户端收到消息后,并不立即显示出来,而是需要用API进行获取。例如,psql就是在执行下一个命令时(如begin、commit),会顺便把收到的消息显示出来的。 ## 总结 Listen/Notify是一个轻量级的应用间通信机制,有了它,具有访问数据库能力的应用可以轻易的利用PG实现互操作。当然,由于消息队列是存放在内存里面的,在发生实例宕机等问题时,消息将丢失,对可靠性要求高的应用,需要自己进行消息持久化(如利用PG存储消息,进行持久化)。