合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
使用Lua脚本可以实现更复杂的数据处理逻辑,go-mysql-transfer支持Lua5.1语法 # **示例** t_user表,数据如下: ![](https://img.kancloud.cn/72/5f/725f91f8dd6beb960b8de8872b9512f3_600x89.jpg) ## **示例一** 引入Lua脚本: ``` rule: - schema: eseap #数据库名称 table: t_user #表名称 order_by_column: id #排序字段,存量数据同步时不能为空 lua_file_path: lua/t_user_es.lua #lua脚本文件 es_index: user_index #Elasticsearch Index名称,可以为空,默认使用表(Table)名称 es_mappings: #索引映射,可以为空,为空时根据数据类型自行推导ES推导 - field: id #映射后的ES字段名称 type: keyword #ES字段类型 - field: userName #映射后的ES字段名称 type: keyword #ES字段类型 - field: password #映射后的ES字段名称 type: keyword #ES字段类型 - field: createTime #映射后的ES字段名称 type: date #ES字段类型 format: yyyy-MM-dd HH:mm:ss #日期格式,type为date此项有意义 - field: remark #映射后的ES字段名称 type: text #ES字段类型 analyzer: ik_smart #ES分词器,type为text此项有意义 - field: source #映射后的ES字段名称 type: keyword #ES字段类型 ``` 其中, es_mappings 表示索引的mappings(映射关系),不定义es_mappings则根据字段的值自动创建mappings(映射关系)。根据es_mappings 生成的mappings如下: ![](https://img.kancloud.cn/08/57/08573c3be355c9dfadda240a18a009c7_433x480.png) Lua脚本: ``` local ops = require("esOps") --加载elasticsearch操作模块 local row = ops.rawRow() --当前数据库的一行数据,table类型,key为列名称 local action = ops.rawAction() --当前数据库事件,包括:insert、update、delete local id = row["ID"] --获取ID列的值 local userName = row["USER_NAME"] --获取USER_NAME列的值 local password = row["PASSWORD"] --获取USER_NAME列的值 local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值 local remark = row["REMARK"] --获取REMARK列的值 local result = {} -- 定义一个table,作为结果集 result["id"] = id result["userName"] = userName result["password"] = password result["createTime"] = createTime result["remark"] = remark result["source"] = "binlog" -- 数据来源 if action == "insert" then -- 只监听新增事件 ops.INSERT("t_user",id,result) -- 新增,参数1为index名称,string类型;参数2为要插入的数据主键;参数3为要插入的数据,tablele类型或者json字符串 end ``` 同步到Elasticsearch的数据如下: ![](https://img.kancloud.cn/de/5d/de5dbc46fcbb9cebd95d6e04643acef3_432x276.png) ## **示例二** 引入Lua脚本: ``` schema: eseap #数据库名称 table: t_user #表名称 lua_file_path: lua/t_user_es2.lua #lua脚本文件 ``` 未明确定义index名称、mappings,es会根据值自动创建一个名为t_user的index。 使用如下脚本: ``` local ops = require("esOps") --加载elasticsearch操作模块 local row = ops.rawRow() --当前数据库的一行数据,table类型,key为列名称 local action = ops.rawAction() --当前数据库事件,包括:insert、update、delete local id = row["ID"] --获取ID列的值 local userName = row["USER_NAME"] --获取USER_NAME列的值 local password = row["PASSWORD"] --获取USER_NAME列的值 local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值 local result = {} -- 定义一个table,作为结果集 result["id"] = id result["userName"] = userName result["password"] = password result["createTime"] = createTime result["remark"] = remark result["source"] = "binlog" -- 数据来源 if action == "insert" then -- 只监听新增事件 ops.INSERT("t_user",id,result) -- 新增,参数1为index名称,string类型;参数2为要插入的数据主键;参数3为要插入的数据,tablele类型或者json字符串 end ``` 同步到Elasticsearch的数据如下: ![](https://img.kancloud.cn/b8/ec/b8ec66ee15cc7ee366623fadf8552258_439x272.png) # **esOps模块** 提供的方法如下: 1. INSERT: 插入操作,如:ops.INSERT(index,id,result)。参数index为索引名称,字符串类型;参数index为要插入数据的主键;参数result为要插入的数据,可以为table类型或者json字符串 2. UPDATE: 修改操作,如:ops.UPDATE(index,id,result)。参数index为索引名称,字符串类型;参数index为要修改数据的主键;参数result为要修改的数据,可以为table类型或者json字符串 3. DELETE: 删除操作,如:ops.DELETE(index,id)。参数index为索引名称,字符串类型;参数id为要删除的数据主键,类型不限;