企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
```python #!/usr/bin/python3 # coding=gbk ''' Note: 获取文件的改变 Author:redsun Data:2019/7/10 ''' import os import time import urllib3 def GetFileEndChange(File_Path): ''' 从文件尾部开始检查文件追加的内容 :param File_Path: 路径 :return: 输出尾部增加的内容 ''' if os.path.isfile(File_Path): with open(File_Path, encoding='gbk') as f: f.seek(0, 2) cur = f.tell() temp = 0 text = '' HelpInfoHead('Change', File_Path, cur) var = 1 firstdatacheck = '' # 改写的代码STATR while var == 1: # while True: f.seek(cur) ch = f.readlines() CheckFile() #监控文件检测是否存在 # 如果没有读到数据,跳出循环 if not ch: # if HelpInfoChangeEnd('Change',temp) == 'no': HelpInfoChangeEnd('Change', temp) if 1 == 1: print('监控中...') time.sleep(3) # break else: print(123) else: for line in ch: text = line print(text) print('查看每次的值'+firstdatacheck) # firstdatacheck=tuidata(text, firstdatacheck) # 执行每条的推送判断开始 # 按照规范处理text 26;20200810;0.091011;TA;多头开仓;手数=5;开仓价格=3772.8;Service_str1;20200810;0.091 tt = text.split(';') if not firstdatacheck is None: if tt[0] != firstdatacheck: firstdatacheck = tt[0] # 赋值 # print('执行第二条件') print('当前的firstdatacheck值' + firstdatacheck) if tt[1] == tt[8]: sec = abs(float(tt[2]) - float(tt[9])) print('执行秒的差值1' + str(sec)) if sec < 0.0015: print('执行推送1') pushdata(text) else: print('不推送3-时间大于15分钟') else: print('不推送2-年月日不同11') else: print('不推送1-第一个参数没变') else: firstdatacheck = tt[0] # 赋值 if tt[1] == tt[8]: sec = abs(float(tt[2]) - float(tt[9])) if sec < 0.0015: print('执行推送2') pushdata(text) else: print('不推送33-时间大于15分钟') else: print('不推送22-年月日不同') HelpInfoContent(text, temp) # 执行每条的推送判断开始 temp += 1 cur = f.tell() # 改写的代码END else: print('文件'+File_Path+'不存在') time.sleep(3) # GetFileEndChange(docname()) GetFileEndChange('Service.log'); def CheckFile(): if os.path.isfile('Service.log'): print('文件存在继续健康执行....') else: print('文件Service.log不存在') time.sleep(3) GetFileEndChange('Service.log'); # 推送消息 def pushdata(text): print('**********************************************************************') print(text) print('**********************************************************************') http = urllib3.PoolManager() url = 'http://www.test.com/api/User/push?data='+text http.request('GET', url) print('发送成功') def HelpInfoChangeEnd(FuctionName,times=-1): ''' 提示信息——尾部 :return: no 结束 yes 继续 ''' if times == 0: raw = 1 elif times == -1 : raw = 'no' else: print('Check {times} times. '.format(times=times)) raw = 'yes' print('No {FuctionName} in {clock}'.format(FuctionName= FuctionName, clock = GetTime()) ) if raw == 'no' or raw == 'NO': print('check finish !') print('----------------------------------------------------------') sign = 'no' else: sign = 'yes' return sign def HelpInfoHead(FuctionName, File_Path, bits = -1): ''' 提示信息——头部 :param FuctionName:函数名 :param file_name: 文件名 :param bits: 当前文件结束地址 :return: ''' file_name = os.path.basename(File_Path) print(FuctionName) print('----------------------------------------------------------') print('Start Listen The File ({file_name}) Info ...'.format(file_name=file_name)) if bits != -1: print('The File Ends in {bits} bits . '.format(bits=bits)) print('//') def HelpInfoContent(text, times = 0): print('**********************************************************') if times != 0: print('Check {times} times. '.format(times=times)) print('Changed at {times} content: \n {text}'.format( times=GetTime(), text=text)) print('**********************************************************') def GetTime(): ''' 返回当前时间 :return: ''' return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) def docname(): return time.strftime("%Y%m%d", time.localtime(time.time()))+'_Backup.log' #GetFileEndChange(docname()) GetFileEndChange('Service.log'); ```