🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
# Storm SQL 示例 本文通过处理 Apache 日志的例子来展示如何使用 strom SQL. 本文使用 "how-to" 风格书写, 因此可以根据步骤, 一步一步的学习如何使用 Storm SQL. ## 准备 本文假定 Apache Zookeeper、Apache Storm、Apache Kafka 都本地安装并且正确配置运行. 方便起见, 本文假设 Apache Kafka 0.10.0 是通过 `brew` 安装. 我们将使用下列工具为输入数据源生成 JSON 数据. 因为他们都是 Python 工程, 本页假设已经安装了 Python 2.7、`pip`、`virtualenv`. 如果你使用 Python 3, 需要在生成数据的时候修改一些与 Python 3 不兼容的代码. * [https://github.com/kiritbasu/Fake-Apache-Log-Generator](https://github.com/kiritbasu/Fake-Apache-Log-Generator) * [https://github.com/rory/apache-log-parser](https://github.com/rory/apache-log-parser) ## 创建 topic 在本页, 我们会使用3个 topic, `apache-logs`, `apache-errorlogs`, `apache-slowlogs`. 请根据你的环境来创建 topic. 对于使用 brew 安装的 Apache Kafka 0.10.0, ``` kafka-topics --create --topic apache-logs --zookeeper localhost:2181 --replication-factor 1 --partitions 5 kafka-topics --create --topic apache-errorlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5 kafka-topics --create --topic apache-slowlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5 ``` ## 灌入数据 让我们提供数据给输入 topic. 在本页中我们将生成假的 Apache 日志, 转换为 JSON 格式, 并把 JSON 灌入 Kafka topic. 开始创建你的工作目录, 用于克隆项目并且设置 virtualenv. 在你的工作目录, 命令 `virtualenv env` 把 evn 目录设置为 virtualenv 目录, 然后激活虚拟环境. ``` $ virtualenv env $ source env/bin/activate ``` 完成例子以后, 可以随时 `deactivate`, 退出 python 虚拟环境. ### 安装和修改 Fake-Apache-Log-Generator `Fake-Apache-Log-Generator` 对包不可见, 我们还需要修改一下脚本. ``` $ git clone https://github.com/kiritbasu/Fake-Apache-Log-Generator.git $ cd Fake-Apache-Log-Generator ``` 打开 `apache-fake-log-gen.py`, 将 `while (flag):` 语句替换成下面的语句: ``` elapsed_us = random.randint(1 * 1000,1000 * 1000) # 1 ms to 1 sec seconds=random.randint(30,300) increment = datetime.timedelta(seconds=seconds) otime += increment ip = faker.ipv4() dt = otime.strftime('%d/%b/%Y:%H:%M:%S') tz = datetime.datetime.now(pytz.timezone('US/Pacific')).strftime('%z') vrb = numpy.random.choice(verb,p=[0.6,0.1,0.1,0.2]) uri = random.choice(resources) if uri.find("apps")>0: uri += `random.randint(1000,10000)` resp = numpy.random.choice(response,p=[0.9,0.04,0.02,0.04]) byt = int(random.gauss(5000,50)) referer = faker.uri() useragent = numpy.random.choice(ualist,p=[0.5,0.3,0.1,0.05,0.05] )() f.write('%s - - [%s %s] %s "%s %s HTTP/1.0" %s %s "%s" "%s"\n' % (ip,dt,tz,elapsed_us,vrb,uri,resp,byt,referer,useragent)) log_lines = log_lines - 1 flag = False if log_lines == 0 else True ``` 要确保 elapsed_us 包含在假日志中. 为了方便, 你可以跳过克隆项目这一步, 直接从这里下载修改过的文件: [apache-fake-log-gen.py (gist)](https://gist.github.com/HeartSaVioR/79fd4e461604fabecf535ffece47e6c2) ### 安装 apache-log-parser 并编写转换脚本 `apache-log-parser` 模块可以通过 `pip` 命令安装. ``` $ pip install apache-log-parser ``` 因为 `apache-log-parser` 是一个 python 库, 为了转换日志我们需要写编写一个小脚本. 我们创建一个文件 `parse-fake-log-gen-to-json-with-incrementing-id.py` 包含以下内容: ``` import sys import apache_log_parser import json auto_incr_id = 1 parser_format = '%a - - %t %D "%r" %s %b "%{Referer}i" "%{User-Agent}i"' line_parser = apache_log_parser.make_parser(parser_format) while True: # we'll use pipe line = sys.stdin.readline() if not line: break parsed_dict = line_parser(line) parsed_dict['id'] = auto_incr_id auto_incr_id += 1 # works only python 2, but I don't care cause it's just a test module :) parsed_dict = {k.upper(): v for k, v in parsed_dict.iteritems() if not k.endswith('datetimeobj')} print json.dumps(parsed_dict) ``` ### 将转换后的 JSON Apache Log 灌入 Kafka 好了! 我们已经准备好将数据写入 Kafka topic. 下面使用 `kafka-console-producer` 来灌入 JSON. ``` $ python apache-fake-log-gen.py -n 0 | python parse-fake-log-gen-to-json-with-incrementing-id.py | kafka-console-producer --broker-list localhost:9092 --topic apache-logs ``` 打开另一个终端执行下面的命令, 确认数据已经进入 topic. ``` $ kafka-console-consumer --zookeeper localhost:2181 --topic apache-logs ``` 如果看到如下的 json, 就说明搞定了: ``` {"TIME_US": "757467", "REQUEST_FIRST_LINE": "GET /wp-content HTTP/1.0", "REQUEST_METHOD": "GET", "RESPONSE_BYTES_CLF": "4988", "TIME_RECEIVED_ISOFORMAT": "2021-06-30T22:02:53", "TIME_RECEIVED_TZ_ISOFORMAT": "2021-06-30T22:02:53-07:00", "REQUEST_HTTP_VER": "1.0", "REQUEST_HEADER_USER_AGENT__BROWSER__FAMILY": "Firefox", "REQUEST_HEADER_USER_AGENT__IS_MOBILE": false, "REQUEST_HEADER_USER_AGENT__BROWSER__VERSION_STRING": "3.6.13", "REQUEST_URL_FRAGMENT": "", "REQUEST_HEADER_USER_AGENT": "Mozilla/5.0 (X11; Linux x86_64; rv:1.9.7.20) Gecko/2010-10-13 13:52:34 Firefox/3.6.13", "REQUEST_URL_SCHEME": "", "REQUEST_URL_PATH": "/wp-content", "REQUEST_URL_QUERY_SIMPLE_DICT": {}, "TIME_RECEIVED_UTC_ISOFORMAT": "2021-07-01T05:02:53+00:00", "REQUEST_URL_QUERY_DICT": {}, "STATUS": "200", "REQUEST_URL_NETLOC": "", "REQUEST_URL_QUERY_LIST": [], "REQUEST_URL_QUERY": "", "REQUEST_URL_USERNAME": null, "REQUEST_HEADER_USER_AGENT__OS__VERSION_STRING": "", "REQUEST_URL_HOSTNAME": null, "REQUEST_HEADER_USER_AGENT__OS__FAMILY": "Linux", "REQUEST_URL": "/wp-content", "ID": 904128, "REQUEST_HEADER_REFERER": "http://white.com/terms/", "REQUEST_URL_PORT": null, "REQUEST_URL_PASSWORD": null, "TIME_RECEIVED": "[30/Jun/2021:22:02:53 -0700]", "REMOTE_IP": "88.203.90.62"} ``` ## 例子: 过滤错误日志 在这个例子中, 我们将从所有的日志中过滤出 error 日志并且存储到另一个 topic 中. 将会用到 `project` 和 `filter` 特性. 脚本文件的内容如下: ``` CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apache-logs' CREATE EXTERNAL TABLE APACHE_ERROR_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_ELAPSED_MS INT) LOCATION 'kafka://localhost:2181/brokers?topic=apache-error-logs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' INSERT INTO APACHE_ERROR_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, (TIME_US / 1000) AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (CAST(STATUS AS INT) / 100) >= 4 ``` 把文件保存为 `apache_log_error_filtering.sql`. 让我们过一遍这个脚本. 第一个语句定义了一个表 `APACHE_LOGS` 代表输入流. `LOCATION` 从句指定了 ZkHost (`localhost:2181`), brokers路径(`/brokers`) 和 topic (`apache-logs`). 注意 Kafka 数据源必须定义一个主键. 这就是为什么我们为 JSON 数据设置了一个整数 id. 同样, 第二个语句指定了表 `APACHE_ERROR_LOGS` 代表输出流. `TBLPROPERTIES` 从句指定了[KafkaProducer](http://kafka.apache.org/documentation.html#producerconfigs)的配置, 从句对于 Kafka sink表 是必须的. 最后的语句定义了一个 topology. Storm SQL 只会在 DML 语句上定义和运行 topology. DDL 语句定义输入数据源、输出数据源、以及可以被 DML 语句引用的用户定义函数(user defined function). L我们先看 where 语句. 由于我们想过滤 error 日志, 我们使用状态码除以 100, 验证得到的商是否等于或者大于4.(简单的说就是 statu_code >= 400) 由于JSON中的状态码是字符串格式(因此在 APACHE_LOGS 表中是 VARCHAR 格式). 我们在应用除法之前, 使用 CAST(STATUS AS INT) 先把状态码转换为整数. 现在我们只有 error 日志了. 我们转换一些列以和输出流想匹配. 在这个语句中, 我们使用 CAST(STATUS AS INT) 转化为整数类型, 然后使用 1000 除 TIME_US 将毫秒转换成秒. 最后, insert 语句将过滤和转换后的行(tuples)存入输出流. 要运行这个例子, 用户需要包含数据源(本例中是 `storm-sql-kafka`) 和的所有依赖到 class path 中. 当用户运行 `storm sql` 命令的时候 Storm SQL 的依赖会被自动处理. 用户可以在提交阶段包含数据源依赖, 如下: ``` $ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql apache_log_error_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2" ``` 上面的命令提交 SQL 语句到 StormSQL. storm sql 命令的选项是 `storm sql [script file] [topology name]`. 如果用户使用了不同版本的 Storm 或者 Kafka ,需要修改每个 artifacts 的版本号与之对应. 如果你的语句通过了验证阶段, 会在 Storm UI 页面上显示 topology. 你可以在控制台上看到下面的输出: ``` $ kafka-console-consumer --zookeeper localhost:2181 --topic apache-error-logs ``` 输出类似下面的内容: ``` {"ID":854643,"REMOTE_IP":"4.227.214.159","REQUEST_URL":"/wp-content","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows 98; Win 9x 4.90; it-IT; rv:1.9.2.20) Gecko/2015-06-03 11:20:16 Firefox/3.6.17","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T19:14:44+00:00","TIME_RECEIVED_TIMESTAMP":1616958884000,"TIME_ELAPSED_MS":274.222} {"ID":854693,"REMOTE_IP":"223.50.249.7","REQUEST_URL":"/apps/cart.jsp?appID=5578","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_6; rv:1.9.2.20) Gecko/2015-11-06 00:20:43 Firefox/3.8","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T21:41:02+00:00","TIME_RECEIVED_TIMESTAMP":1616967662000,"TIME_ELAPSED_MS":716.851} ... ``` 你可以运行 Storm SQL runner, 将 topology 名称替换为 `--explain` 来查看逻辑执行计划. ``` $ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql --explain --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2" ``` 输入类似下面的内容: ``` LogicalTableModify(table=[[APACHE_ERROR_LOGS]], operation=[INSERT], updateColumnList=[[]], flattened=[true]), id = 8 LogicalProject(ID=[$0], REMOTE_IP=[$1], REQUEST_URL=[$2], REQUEST_METHOD=[$3], STATUS=[CAST($4):INTEGER NOT NULL], REQUEST_HEADER_USER_AGENT=[$5], TIME_RECEIVED_UTC_ISOFORMAT=[$6], TIME_ELAPSED_MS=[/($7, 1000)]), id = 7 LogicalFilter(condition=[>=(/(CAST($4):INTEGER NOT NULL, 100), 4)]), id = 6 EnumerableTableScan(table=[[APACHE_LOGS]]), id = 5 ``` 如果 Storm SQL 应用了查询优化, 你可能看到的输出会和上面的不一样. 我们正在执行第一个 Storm SQL topology! 如果你看到了足够多的输出和日志, 请杀掉 topology. 为了简洁, 我们不再解释我们已经看到的东西. ## 例子: 过滤访问慢的日志 在这个例子中我们将过滤访问慢的日志, 把他们存储到另一个 topic. 用到的特性有 `project`、`filter`、`User Defined Function (UDF)`. 这个例子与上一个例子 `filtering error logs` 非常相似, 我们主要看如何定义 `User Defined Function (UDF)`. 脚本文件的内容如下: ``` CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apachelogs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' CREATE EXTERNAL TABLE APACHE_SLOW_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_RECEIVED_TIMESTAMP BIGINT, TIME_ELAPSED_MS INT) LOCATION 'kafka://localhost:2181/brokers?topic=apacheslowlogs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' CREATE FUNCTION GET_TIME AS 'org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2' INSERT INTO APACHE_SLOW_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 'yyyy-MM-dd''T''HH:mm:ssZZ') AS TIME_RECEIVED_TIMESTAMP, TIME_US / 1000 AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (TIME_US / 1000) >= 100 ``` 内容保存为文件 `apache_log_slow_filtering.sql`. 由于前两个语句和上一个例子相似, 我们直接跳过. 第三个语句定义了一个 `User defined function`. 我们使用 `org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2` 定义了一个 `GET_TIME`. `GetTime2` 函数的实现如下: ``` package org.apache.storm.sql.runtime.functions.scalar.datetime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; public class GetTime2 { public static Long evaluate(String dateString, String dateFormat) { try { DateTimeFormatter df = DateTimeFormat.forPattern(dateFormat).withZoneUTC(); return df.parseDateTime(dateString).getMillis(); } catch (Exception ex) { throw new RuntimeException(ex); } } } ``` 由于这个类定义了静态方法 `evaluate` 可以用于 UDF. SQL 的参数和返回的类型取决于 Storm SQL 依赖哪种风格. 注意, 这个类应该放在 classpath 路径下, 因此为了定义 UDF, 你需要创建一个包含 UDF 类的 jar 文件, 并在执行 `storm sql` 命令的时候使用 `-- jar` 选项. 最后一个语句和过滤错误日志的例子相似. 唯一新鲜的东西就是我们调用了 `GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 'yyyy-MM-dd''T''HH:mm:ssZZ')` 将字符串格式的时间转换为 unix timestamp (BIGINT). 执行: ``` $ $STORM_DIR/bin/storm sql apache_log_slow_filtering.sql apache_log_slow_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2" ``` 可以在控制台看到下面的输出: ``` $ kafka-console-consumer --zookeeper localhost:2181 --topic apache-slow-logs 输出类似下面的内容: ``` {"ID":890502,"REMOTE_IP":"136.156.159.160","REQUEST_URL":"/list","REQUEST_METHOD":"GET","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows NT 5.01) AppleWebKit/5311 (KHTML, like Gecko) Chrome/13.0.860.0 Safari/5311","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T03:44:59+00:00","TIME_RECEIVED_TIMESTAMP":1622864699000,"TIME_ELAPSED_MS":638.579} {"ID":890542,"REMOTE_IP":"105.146.3.190","REQUEST_URL":"/search/tag/list","REQUEST_METHOD":"DELETE","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (X11; Linux i686) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.891.0 Safari/5332","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T05:54:27+00:00","TIME_RECEIVED_TIMESTAMP":1622872467000,"TIME_ELAPSED_MS":403.957} ... ``` 好了! 假设我们有通过远程 IP 查询 geo 信息的 UDF, 我们可以通过 geo 位置做过滤, 或者将 geo 位置添加到转换结果中. ## Summary 我们通读了几个 Storm SQL 的简单的用例来学习 Storm SQL 的特性. 如果还没有看过[Storm SQL integration](storm-sql.html) 和 [Storm SQL language](storm-sql-reference.html), 你需要阅读这些章节来查看所有支持的特性. 注意, Storm SQL 运行在小批量和非强类型的 Trident 库之上. Sink 实际并不检查类型. (你可能注意到一些输出字段的类型与输出表模式的定义不同). 当 Storm SQL 的后端 API 修改为核心(tuple by tuple, low-level, high-level) 时, Storm SQL 的行为会相应改变.