企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
​ 参考地址: [python seek、re.search实现文件新增内容监控、关键字检索](https://blog.csdn.net/qq_17328759/article/details/95413478?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-2&spm=1001.2101.3001.4242 "python seek、re.search实现文件新增内容监控、关键字检索") [python 文件新增内容监控、关键字检索升级版](https://blog.csdn.net/qq_17328759/article/details/96151600 "python 文件新增内容监控、关键字检索升级版") 本文章针对windows上面运行的结果对上面的两个脚本进行了优化和适配,修改后的代码如下所示 ## 脚本实现了一下功能: 1. GetFileEndChange方法实现了对文件追加内容的监控 2. GetFileKeyInfo方法实现了对文件关键字的检索 3. GetFileNewKeyData方法实现了对文件追加的内容关键字的检索 4. 对交互的优化,实现了检索文件、检索次数的统计 ## 应用场景: 日志文件管理 ~~~python #!/usr/bin/python3 # coding=gbk ''' Note: 获取文件的改变 Author:redsun Data:2019/7/10 ''' import re import time import os def GetFileEndChange(File_Path): ''' 从文件尾部开始检查文件追加的内容 :param File_Path: 路径 :return: 输出尾部增加的内容 ''' with open( File_Path, encoding='utf-8' ) as f: f.seek(0,2) cur = f.tell() temp = 0 text = '' HelpInfoHead('Change', File_Path, cur) while True: f.seek(cur) ch = f.readlines() # 如果没有读到数据,跳出循环 if not ch: if HelpInfoEnd('Change',temp) == 'no': break else: for line in ch: text += line HelpInfoContent(text, temp) temp += 1 cur = f.tell() def GetFileKeyInfo(File_Path, KeyWord): ''' 检索文件中与关键字相关的行,并输出 :param File_Path: 文件路径 :param KeyWord: 关键字 :return: 输出关键字所在的行 ''' with open(File_Path, encoding='utf-8') as f: lines = f.readlines() if len(lines) == 0: print("================日志文件为空================") else: count = 0 text = '' HelpInfoHead('KeyInfo', File_Path) for line in lines: rs = re.search(KeyWord, line) if rs: count += 1 text += line HelpInfoContent(text) print('[命中{count}次]'.format(count=count)) HelpInfoEnd('KeyInfo') def GetFileNewKeyData(File_Path, KeyWord): ''' 检索文件尾部新增内容关键字匹配,并输出结果 :param File_Path: 文件路径 :param KeyWord: 关键字 :return: ''' with open(File_Path, encoding='utf-8') as f: f.seek(0, 2) cur = f.tell() temp = 0 count = 0 text = '' HelpInfoHead('NewKeyData', File_Path, cur) while True: f.seek(cur) ch = f.readlines() # 如果没有读到数据,跳出循环 if not ch: if HelpInfoEnd('NewKeyData', temp) == 'no': break else: for line in ch: rs = re.search(KeyWord, line) if rs: count += 1 text += line HelpInfoContent(text) print('[命中{count}次]'.format(count=count)) temp += 1 cur = f.tell() def HelpInfoHead(FuctionName, File_Path, bits = -1): ''' 提示信息——头部 :param FuctionName:函数名 :param file_name: 文件名 :param bits: 当前文件结束地址 :return: ''' file_name = os.path.basename(File_Path) print(FuctionName) print('----------------------------------------------------------') print('Start Listen The File ({file_name}) Info ...'.format(file_name=file_name)) if bits != -1: print('The File Ends in {bits} bits . '.format(bits=bits)) print('//') def HelpInfoEnd(FuctionName,times=-1): ''' 提示信息——尾部 :return: no 结束 yes 继续 ''' if times == 0: raw = input('Wait Code (not "no") To Start Check:') elif times == -1 : raw = 'no' else: print('Check {times} times. '.format(times=times)) raw = input('Continue Check The File Key Info (yes/no):') print('No {FuctionName} in {clock}'.format(FuctionName= FuctionName, clock = GetTime()) ) if raw == 'no' or raw == 'NO': print('check finish !') print('----------------------------------------------------------') sign = 'no' else: sign = 'yes' return sign def HelpInfoContent(text, times = 0): print('**********************************************************') if times != 0: print('Check {times} times. '.format(times=times)) print('Changed at {times} content: \n {text}'.format( times=GetTime(), text=text)) # count = 1 # for line in text: # print('{count} \t {line}'.format(count = count, line = line)) # count += 1 print('**********************************************************') def GetTime(): ''' 返回当前时间 :return: ''' return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) ~~~ ![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== "点击并拖拽以移动") ## 效果: ![在这里插入图片描述](https://img-blog.csdnimg.cn/20190711022158484.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzE3MzI4NzU5,size_16,color_FFFFFF,t_70)![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== "点击并拖拽以移动")​编辑 \### 关键词检索升级版修改: ~~~python #!/usr/bin/python3 # coding=gbk ''' Note: 文件内容检索、监控 Author:Qred Date:2019/7/11 ''' import re import time import os def GetFileNewKeyData ( File_Path, KeyWord , type = -1): ''' 检索文件。新增内容关键字匹配/全文关键字匹配,并输出结果 :param File_Path: 文件路径 :param KeyWord: 关键字 :param type: 检索类型 -1 新增内容检索 0 全文检索 :return: ''' with open(File_Path, encoding='utf-8') as f: # 获取文件结束位置 f.seek(0, 2) End = f.tell() # 设置文件检索的起始位置 if type == -1 : cur = End temp = 1 # 检索次数 elif type == 0 : # f.seek(0, 0) cur = 0 #f.tell() temp = -1 # 检索次数 count = 0 # 关键词匹配次数 text = '' # 结果 HelpInfoHead('NewKeyData', File_Path, End) while True: f.seek(cur) ch = f.readlines() # print ch # 如果没有读到数据,跳出循环 if not ch: if HelpInfoEnd('NewKeyData', temp) == 'no': break else: for line in ch: rs = re.search(KeyWord, line) if rs: count += 1 text += str(count) + '\t' + line HelpInfoContent(text, temp) print('[命中{count}次]'.format(count=count)) if temp == -1: temp -= 1 temp += 1 cur = f.tell() return count,text ''' 下面是为了优化输出效果做的调整 ''' def HelpInfoHead ( FuctionName, File_Path, bits=-1 ): ''' 提示信息——头部 :param FuctionName:函数名 :param file_name: 文件名 :param bits: 当前文件结束地址 :return: ''' file_name = os.path.basename(File_Path) print(FuctionName) print('----------------------------------------------------------') print('Start Listen The File ({file_name}) Info ...'.format(file_name=file_name)) if bits != -1: print('The File Ends in {bits} bits . '.format(bits=bits)) print('//') def HelpInfoEnd ( FuctionName, times=-1 ): ''' 提示信息——尾部 :return: no 结束 yes 继续 ''' if times == -1: raw = 'no' else: print('Check {times} times. '.format(times=times)) raw = raw_input('Continue Check The File Key Info (yes/no): ') print('No {FuctionName} in {clock}'.format(FuctionName=FuctionName, clock=GetTime())) if raw == 'no' or raw == 'NO': print('check finish !') print('----------------------------------------------------------') sign = 'no' else: sign = 'yes' return sign def HelpInfoContent ( text, times=0 ): ''' 输出检索出的配置的内容 :param text: 匹配的内容 :param times: 检索的次数 :return: ''' print('**********************************************************') if times != 0: print('Check {times} times. '.format(times=times)) print('Changed at {times} content: \n{text}'.format( times=GetTime(), text=text)) print('**********************************************************') def GetTime (): ''' 返回当前时间 :return: ''' return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) GetFileNewKeyData ( '业主公众号菜单栏.txt', '公示' , 0) ~~~ ![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== "点击并拖拽以移动") 结果如下所示: ![](https://img-blog.csdnimg.cn/20201225154555319.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM0MDUwMzYw,size_16,color_FFFFFF,t_70)![](data:image/gif;base64,R0lGODlhAQABAPABAP///wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw== "点击并拖拽以移动")​编辑 ​