# Storm SQL 示例
本文通过处理 Apache 日志的例子来展示如何使用 strom SQL. 本文使用 "how-to" 风格书写, 因此可以根据步骤, 一步一步的学习如何使用 Storm SQL.
## 准备
本文假定 Apache Zookeeper、Apache Storm、Apache Kafka 都本地安装并且正确配置运行. 方便起见, 本文假设 Apache Kafka 0.10.0 是通过 `brew` 安装.
我们将使用下列工具为输入数据源生成 JSON 数据. 因为他们都是 Python 工程, 本页假设已经安装了 Python 2.7、`pip`、`virtualenv`. 如果你使用 Python 3, 需要在生成数据的时候修改一些与 Python 3 不兼容的代码.
* [https://github.com/kiritbasu/Fake-Apache-Log-Generator](https://github.com/kiritbasu/Fake-Apache-Log-Generator)
* [https://github.com/rory/apache-log-parser](https://github.com/rory/apache-log-parser)
## 创建 topic
在本页, 我们会使用3个 topic, `apache-logs`, `apache-errorlogs`, `apache-slowlogs`. 请根据你的环境来创建 topic.
对于使用 brew 安装的 Apache Kafka 0.10.0,
```
kafka-topics --create --topic apache-logs --zookeeper localhost:2181 --replication-factor 1 --partitions 5
kafka-topics --create --topic apache-errorlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5
kafka-topics --create --topic apache-slowlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5
```
## 灌入数据
让我们提供数据给输入 topic. 在本页中我们将生成假的 Apache 日志, 转换为 JSON 格式, 并把 JSON 灌入 Kafka topic.
开始创建你的工作目录, 用于克隆项目并且设置 virtualenv.
在你的工作目录, 命令 `virtualenv env` 把 evn 目录设置为 virtualenv 目录, 然后激活虚拟环境.
```
$ virtualenv env
$ source env/bin/activate
```
完成例子以后, 可以随时 `deactivate`, 退出 python 虚拟环境.
### 安装和修改 Fake-Apache-Log-Generator
`Fake-Apache-Log-Generator` 对包不可见, 我们还需要修改一下脚本.
```
$ git clone https://github.com/kiritbasu/Fake-Apache-Log-Generator.git
$ cd Fake-Apache-Log-Generator
```
打开 `apache-fake-log-gen.py`, 将 `while (flag):` 语句替换成下面的语句:
```
elapsed_us = random.randint(1 * 1000,1000 * 1000) # 1 ms to 1 sec
seconds=random.randint(30,300)
increment = datetime.timedelta(seconds=seconds)
otime += increment
ip = faker.ipv4()
dt = otime.strftime('%d/%b/%Y:%H:%M:%S')
tz = datetime.datetime.now(pytz.timezone('US/Pacific')).strftime('%z')
vrb = numpy.random.choice(verb,p=[0.6,0.1,0.1,0.2])
uri = random.choice(resources)
if uri.find("apps")>0:
uri += `random.randint(1000,10000)`
resp = numpy.random.choice(response,p=[0.9,0.04,0.02,0.04])
byt = int(random.gauss(5000,50))
referer = faker.uri()
useragent = numpy.random.choice(ualist,p=[0.5,0.3,0.1,0.05,0.05] )()
f.write('%s - - [%s %s] %s "%s %s HTTP/1.0" %s %s "%s" "%s"\n' % (ip,dt,tz,elapsed_us,vrb,uri,resp,byt,referer,useragent))
log_lines = log_lines - 1
flag = False if log_lines == 0 else True
```
要确保 elapsed_us 包含在假日志中.
为了方便, 你可以跳过克隆项目这一步, 直接从这里下载修改过的文件: [apache-fake-log-gen.py (gist)](https://gist.github.com/HeartSaVioR/79fd4e461604fabecf535ffece47e6c2)
### 安装 apache-log-parser 并编写转换脚本
`apache-log-parser` 模块可以通过 `pip` 命令安装.
```
$ pip install apache-log-parser
```
因为 `apache-log-parser` 是一个 python 库, 为了转换日志我们需要写编写一个小脚本. 我们创建一个文件 `parse-fake-log-gen-to-json-with-incrementing-id.py` 包含以下内容:
```
import sys
import apache_log_parser
import json
auto_incr_id = 1
parser_format = '%a - - %t %D "%r" %s %b "%{Referer}i" "%{User-Agent}i"'
line_parser = apache_log_parser.make_parser(parser_format)
while True:
# we'll use pipe
line = sys.stdin.readline()
if not line:
break
parsed_dict = line_parser(line)
parsed_dict['id'] = auto_incr_id
auto_incr_id += 1
# works only python 2, but I don't care cause it's just a test module :)
parsed_dict = {k.upper(): v for k, v in parsed_dict.iteritems() if not k.endswith('datetimeobj')}
print json.dumps(parsed_dict)
```
### 将转换后的 JSON Apache Log 灌入 Kafka
好了! 我们已经准备好将数据写入 Kafka topic. 下面使用 `kafka-console-producer` 来灌入 JSON.
```
$ python apache-fake-log-gen.py -n 0 | python parse-fake-log-gen-to-json-with-incrementing-id.py | kafka-console-producer --broker-list localhost:9092 --topic apache-logs
```
打开另一个终端执行下面的命令, 确认数据已经进入 topic.
```
$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-logs
```
如果看到如下的 json, 就说明搞定了:
```
{"TIME_US": "757467", "REQUEST_FIRST_LINE": "GET /wp-content HTTP/1.0", "REQUEST_METHOD": "GET", "RESPONSE_BYTES_CLF": "4988", "TIME_RECEIVED_ISOFORMAT": "2021-06-30T22:02:53", "TIME_RECEIVED_TZ_ISOFORMAT": "2021-06-30T22:02:53-07:00", "REQUEST_HTTP_VER": "1.0", "REQUEST_HEADER_USER_AGENT__BROWSER__FAMILY": "Firefox", "REQUEST_HEADER_USER_AGENT__IS_MOBILE": false, "REQUEST_HEADER_USER_AGENT__BROWSER__VERSION_STRING": "3.6.13", "REQUEST_URL_FRAGMENT": "", "REQUEST_HEADER_USER_AGENT": "Mozilla/5.0 (X11; Linux x86_64; rv:1.9.7.20) Gecko/2010-10-13 13:52:34 Firefox/3.6.13", "REQUEST_URL_SCHEME": "", "REQUEST_URL_PATH": "/wp-content", "REQUEST_URL_QUERY_SIMPLE_DICT": {}, "TIME_RECEIVED_UTC_ISOFORMAT": "2021-07-01T05:02:53+00:00", "REQUEST_URL_QUERY_DICT": {}, "STATUS": "200", "REQUEST_URL_NETLOC": "", "REQUEST_URL_QUERY_LIST": [], "REQUEST_URL_QUERY": "", "REQUEST_URL_USERNAME": null, "REQUEST_HEADER_USER_AGENT__OS__VERSION_STRING": "", "REQUEST_URL_HOSTNAME": null, "REQUEST_HEADER_USER_AGENT__OS__FAMILY": "Linux", "REQUEST_URL": "/wp-content", "ID": 904128, "REQUEST_HEADER_REFERER": "http://white.com/terms/", "REQUEST_URL_PORT": null, "REQUEST_URL_PASSWORD": null, "TIME_RECEIVED": "[30/Jun/2021:22:02:53 -0700]", "REMOTE_IP": "88.203.90.62"}
```
## 例子: 过滤错误日志
在这个例子中, 我们将从所有的日志中过滤出 error 日志并且存储到另一个 topic 中. 将会用到 `project` 和 `filter` 特性.
脚本文件的内容如下:
```
CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apache-logs'
CREATE EXTERNAL TABLE APACHE_ERROR_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_ELAPSED_MS INT) LOCATION 'kafka://localhost:2181/brokers?topic=apache-error-logs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
INSERT INTO APACHE_ERROR_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, (TIME_US / 1000) AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (CAST(STATUS AS INT) / 100) >= 4
```
把文件保存为 `apache_log_error_filtering.sql`.
让我们过一遍这个脚本.
第一个语句定义了一个表 `APACHE_LOGS` 代表输入流. `LOCATION` 从句指定了 ZkHost (`localhost:2181`), brokers路径(`/brokers`) 和 topic (`apache-logs`). 注意 Kafka 数据源必须定义一个主键. 这就是为什么我们为 JSON 数据设置了一个整数 id.
同样, 第二个语句指定了表 `APACHE_ERROR_LOGS` 代表输出流. `TBLPROPERTIES` 从句指定了[KafkaProducer](http://kafka.apache.org/documentation.html#producerconfigs)的配置, 从句对于 Kafka sink表 是必须的.
最后的语句定义了一个 topology. Storm SQL 只会在 DML 语句上定义和运行 topology. DDL 语句定义输入数据源、输出数据源、以及可以被 DML 语句引用的用户定义函数(user defined function).
L我们先看 where 语句. 由于我们想过滤 error 日志, 我们使用状态码除以 100, 验证得到的商是否等于或者大于4.(简单的说就是 statu_code >= 400) 由于JSON中的状态码是字符串格式(因此在 APACHE_LOGS 表中是 VARCHAR 格式). 我们在应用除法之前, 使用 CAST(STATUS AS INT) 先把状态码转换为整数. 现在我们只有 error 日志了.
我们转换一些列以和输出流想匹配. 在这个语句中, 我们使用 CAST(STATUS AS INT) 转化为整数类型, 然后使用 1000 除 TIME_US 将毫秒转换成秒.
最后, insert 语句将过滤和转换后的行(tuples)存入输出流.
要运行这个例子, 用户需要包含数据源(本例中是 `storm-sql-kafka`) 和的所有依赖到 class path 中. 当用户运行 `storm sql` 命令的时候 Storm SQL 的依赖会被自动处理. 用户可以在提交阶段包含数据源依赖, 如下:
```
$ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql apache_log_error_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
```
上面的命令提交 SQL 语句到 StormSQL. storm sql 命令的选项是 `storm sql [script file] [topology name]`. 如果用户使用了不同版本的 Storm 或者 Kafka ,需要修改每个 artifacts 的版本号与之对应.
如果你的语句通过了验证阶段, 会在 Storm UI 页面上显示 topology.
你可以在控制台上看到下面的输出:
```
$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-error-logs
```
输出类似下面的内容:
```
{"ID":854643,"REMOTE_IP":"4.227.214.159","REQUEST_URL":"/wp-content","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows 98; Win 9x 4.90; it-IT; rv:1.9.2.20) Gecko/2015-06-03 11:20:16 Firefox/3.6.17","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T19:14:44+00:00","TIME_RECEIVED_TIMESTAMP":1616958884000,"TIME_ELAPSED_MS":274.222} {"ID":854693,"REMOTE_IP":"223.50.249.7","REQUEST_URL":"/apps/cart.jsp?appID=5578","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_6; rv:1.9.2.20) Gecko/2015-11-06 00:20:43 Firefox/3.8","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T21:41:02+00:00","TIME_RECEIVED_TIMESTAMP":1616967662000,"TIME_ELAPSED_MS":716.851} ...
```
你可以运行 Storm SQL runner, 将 topology 名称替换为 `--explain` 来查看逻辑执行计划.
```
$ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql --explain --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
```
输入类似下面的内容:
```
LogicalTableModify(table=[[APACHE_ERROR_LOGS]], operation=[INSERT], updateColumnList=[[]], flattened=[true]), id = 8
LogicalProject(ID=[$0], REMOTE_IP=[$1], REQUEST_URL=[$2], REQUEST_METHOD=[$3], STATUS=[CAST($4):INTEGER NOT NULL], REQUEST_HEADER_USER_AGENT=[$5], TIME_RECEIVED_UTC_ISOFORMAT=[$6], TIME_ELAPSED_MS=[/($7, 1000)]), id = 7
LogicalFilter(condition=[>=(/(CAST($4):INTEGER NOT NULL, 100), 4)]), id = 6
EnumerableTableScan(table=[[APACHE_LOGS]]), id = 5
```
如果 Storm SQL 应用了查询优化, 你可能看到的输出会和上面的不一样.
我们正在执行第一个 Storm SQL topology! 如果你看到了足够多的输出和日志, 请杀掉 topology.
为了简洁, 我们不再解释我们已经看到的东西.
## 例子: 过滤访问慢的日志
在这个例子中我们将过滤访问慢的日志, 把他们存储到另一个 topic. 用到的特性有 `project`、`filter`、`User Defined Function (UDF)`. 这个例子与上一个例子 `filtering error logs` 非常相似, 我们主要看如何定义 `User Defined Function (UDF)`.
脚本文件的内容如下:
```
CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apachelogs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
CREATE EXTERNAL TABLE APACHE_SLOW_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_RECEIVED_TIMESTAMP BIGINT, TIME_ELAPSED_MS INT) LOCATION 'kafka://localhost:2181/brokers?topic=apacheslowlogs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
CREATE FUNCTION GET_TIME AS 'org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2'
INSERT INTO APACHE_SLOW_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 'yyyy-MM-dd''T''HH:mm:ssZZ') AS TIME_RECEIVED_TIMESTAMP, TIME_US / 1000 AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (TIME_US / 1000) >= 100
```
内容保存为文件 `apache_log_slow_filtering.sql`.
由于前两个语句和上一个例子相似, 我们直接跳过.
第三个语句定义了一个 `User defined function`. 我们使用 `org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2` 定义了一个 `GET_TIME`.
`GetTime2` 函数的实现如下:
```
package org.apache.storm.sql.runtime.functions.scalar.datetime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
public class GetTime2 {
public static Long evaluate(String dateString, String dateFormat) {
try {
DateTimeFormatter df = DateTimeFormat.forPattern(dateFormat).withZoneUTC();
return df.parseDateTime(dateString).getMillis();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
```
由于这个类定义了静态方法 `evaluate` 可以用于 UDF. SQL 的参数和返回的类型取决于 Storm SQL 依赖哪种风格.
注意, 这个类应该放在 classpath 路径下, 因此为了定义 UDF, 你需要创建一个包含 UDF 类的 jar 文件, 并在执行 `storm sql` 命令的时候使用 `-- jar` 选项.
最后一个语句和过滤错误日志的例子相似. 唯一新鲜的东西就是我们调用了 `GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 'yyyy-MM-dd''T''HH:mm:ssZZ')` 将字符串格式的时间转换为 unix timestamp (BIGINT).
执行:
```
$ $STORM_DIR/bin/storm sql apache_log_slow_filtering.sql apache_log_slow_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
```
可以在控制台看到下面的输出:
```
$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-slow-logs
输出类似下面的内容:
```
{"ID":890502,"REMOTE_IP":"136.156.159.160","REQUEST_URL":"/list","REQUEST_METHOD":"GET","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows NT 5.01) AppleWebKit/5311 (KHTML, like Gecko) Chrome/13.0.860.0 Safari/5311","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T03:44:59+00:00","TIME_RECEIVED_TIMESTAMP":1622864699000,"TIME_ELAPSED_MS":638.579} {"ID":890542,"REMOTE_IP":"105.146.3.190","REQUEST_URL":"/search/tag/list","REQUEST_METHOD":"DELETE","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (X11; Linux i686) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.891.0 Safari/5332","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T05:54:27+00:00","TIME_RECEIVED_TIMESTAMP":1622872467000,"TIME_ELAPSED_MS":403.957} ... ```
好了! 假设我们有通过远程 IP 查询 geo 信息的 UDF, 我们可以通过 geo 位置做过滤, 或者将 geo 位置添加到转换结果中.
## Summary
我们通读了几个 Storm SQL 的简单的用例来学习 Storm SQL 的特性. 如果还没有看过[Storm SQL integration](storm-sql.html) 和 [Storm SQL language](storm-sql-reference.html), 你需要阅读这些章节来查看所有支持的特性.
注意, Storm SQL 运行在小批量和非强类型的 Trident 库之上. Sink 实际并不检查类型. (你可能注意到一些输出字段的类型与输出表模式的定义不同).
当 Storm SQL 的后端 API 修改为核心(tuple by tuple, low-level, high-level) 时, Storm SQL 的行为会相应改变.
- Storm 基础
- 概念
- Scheduler(调度器)
- Configuration
- Guaranteeing Message Processing
- 守护进程容错
- 命令行客户端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 综述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 语言参考
- Storm SQL 内部实现
- Flux
- Storm 安装和部署
- 设置Storm集群
- 本地模式
- 疑难解答
- 在生产集群上运行 Topology
- Maven
- 安全地运行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 资源感知调度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各种内部行为的 Metrics
- Windows 用户指南
- Storm 中级
- 序列化
- 常见 Topology 模式
- Clojure DSL
- 使用没有jvm的语言编辑storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 状态管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 调试
- 动态日志级别设置
- Storm Logs
- 动态员工分析
- 拓扑事件检查器
- Storm 与外部系统, 以及其它库的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高级
- 针对 Storm 定义一个不是 JVM 的 DSL
- 多语言协议
- Storm 内部实现
- 翻译进度