💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
[TOC] # 1. Parquet介绍 Apache Parquet 是 Hadoop 生态圈中一种<ins>新型列式存储格式</ins>,它可以兼容Hadoop 生态圈中大多数计算框架(Mapreduce、Spark 等),被多种查询引擎支持 (Hive、Impala、Drill 等),<ins>并且它是语言和平台无关的</ins>。Parquet 最初是由Twitter 和 Cloudera 合作开发完成并开源,2015 年 5 月从 Apache 的孵化器里毕业成为 Apache 顶级项目。<br/> Parquet 最初的灵感来自 Google 于 2010 年发表的 Dremel 论文,文中介绍了一种支持嵌套结构的存储格式,并且使用了列式存储的方式提升查询性能,在Dremel 论文中还介绍了 Google 如何使用这种存储格式实现并行查询的,如果对此感兴趣可以参考论文和开源实现 Drill。<br/> **数据模型** Parquet 支持嵌套的数据模型,类似于 Protocol Buffers,每一个数据模型的 schema 包含多个字段,<ins>每一个字段有三个属性:重复次数、数据类型和字段 名,重复次数可以是以下三种:required(只出现 1 次),repeated(出现 0 次或多次),optional(出现 0 次或 1 次)</ins>。<br/> 每一个字段的数据类型可以分成两种:group(复杂类型)和 primitive(基本类型)。<br/> **存储方式:列式存储** 列式存储优点: ➢ 按需读取列 ➢ 压缩编码可以降低磁盘存储空间 <br/> **文件结构** Parquet 文件是以二进制方式存储的,是不可以直接读取和修改的,Parquet文件是自解析的,文件中包括该文件的数据和元数据。在 HDFS 文件系统和Parquet 文件中存在如下几个概念: <mark>HDFS 块(Block)</mark>:它是 HDFS 上的最小的副本单位,HDFS 会把一个 Block 存储在本地的一个文件并且维护分散在不同的机器上的多个副本。 <mark>HDFS 文件(File)</mark>:一个 HDFS 的文件,包括数据和元数据,数据分散存储在多个 Block 中。 <mark>行组(Row Group)</mark>:按照行将数据物理上划分为多个单元,每一个行组包含一定的行数,在一个 HDFS 文件中至少存储一个行组,Parquet 读写的时候会将 整个行组缓存在内存中,所以如果每一个行组的大小是由内存的大小决定的。 <mark>列块(Column Chunk)</mark>:在一个行组中每一列保存在一个列块中,行组中的所有列连续的存储在这个行组文件中。不同的列块可使用不同的算法进行压缩。 <mark>页(Page)</mark>:每一个列块划分为多个页,一个页是最小的编码的单位,在同一个列块的不同页可能使用不同的编码方式。<br/> 通常情况下,在存储 Parquet 数据的时候会按照 HDFS 的 Block 大小设置行组的大小,由于一般情况下每一个 Mapper 任务处理数据的最小单位是一个Block,这样可以把每一个行组由一个 Mapper 任务处理,增大任务执行并行度。Parquet 文件的格式如下图所示。 ![](https://img.kancloud.cn/8a/37/8a3773b8115dad636a0c2d299a73aef7_571x449.png) 上图展示了一个 Parquet 文件的结构,一个文件中可以存储多个行组,文件的首位都是该文件的 Magic Code,用于校验它是否是一个 Parquet 文件,Footer length 存储了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和当前文件的 Schema 信息。除了文件中每一个行组的元数据,每一页的开始都会存储该页的元数据,在Parquet 中,有三种类型的页:<ins>数据页、字典页和索引页</ins>。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引,目前 Parquet 中还不支持索引页,但是在后面的版本中增加。<br/> **数据类型** ``` BOOLEAN: 1 bit boolean INT32: 32 bit signed ints INT64: 64 bit signed ints INT96: 96 bit signed ints FLOAT: IEEE 32-bit floating point values DOUBLE: IEEE 64-bit floating point values BYTE_ARRAY: arbitrarily long byte arrays. 也可以全部指定类型为binary二进制 ``` <br/> # 2. Java读写Parquet 在 *`pom.xml`* 中引入下面的依赖 ```xml <!--Parquet--> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-hadoop</artifactId> <version>1.8.2</version> </dependency> ``` Java代码: ```java import org.apache.hadoop.fs.Path; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroup; import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; import java.io.IOException; /** * @Author Leo * @Date 2019/5/7 11:49 **/ public class ParquetOps { public static void main(String[] args) { try { write(); read(); } catch (IOException e) { e.printStackTrace(); } } private static void write() throws IOException { Path file = new Path("/tmp/user-parquet/1.parquet"); String schemaStr = "message User{\n" + " required binary name (UTF8);\n" + " required int32 age;\n" + " repeated group family{\n" + " repeated binary father (UTF8);\n" + " repeated binary mother (UTF8);\n" + " optional binary sister (UTF8);\n" + " }\n" + "}\n"; MessageType schema = MessageTypeParser.parseMessageType(schemaStr); ParquetWriter<Group> writer = ExampleParquetWriter.builder(file) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withType(schema).build(); SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema); Group group1 = groupFactory.newGroup(); group1.add("name", "jason"); group1.add("age", 9); Group cGroup1 = group1.addGroup("family"); cGroup1.add("father", "XXX"); cGroup1.add("mother", "XXX"); Group group2 = groupFactory.newGroup(); group2.add("name", "tom"); group2.add("age", 18); //添加子组 group2.addGroup("family") .append("father", "ZZZ") .append("mother", "ZZZ");//append与add返回值不同 writer.write(group1); writer.write(group2); writer.close(); } private static void read() throws IOException { Path file = new Path( "/tmp/user-parquet/1.parquet"); ParquetReader.Builder<Group> builder = ParquetReader.builder(new GroupReadSupport(), file); ParquetReader<Group> reader = builder.build(); SimpleGroup group = (SimpleGroup) reader.read(); System.out.println("schema:" + group.getType().toString()); while (group != null) { System.out.println("username:" + group.getString(0, 0)); System.out.println("age:" + group.getInteger(1, 0)); System.out.println("family.father:" + group.getGroup(2, 0).getString(0, 0)); System.out.println(group.toString()); group = (SimpleGroup) reader.read(); } } } ``` <br/> # 3. 在Hive中使用Parquet ```sql create external table parquet_table( name string, age int) stored as parquet; 0: jdbc:hive2://hadoop101:10000> select * from parquet_table; +---------------------+--------------------+--+ | parquet_table.name | parquet_table.age | +---------------------+--------------------+--+ +---------------------+--------------------+--+ 0: jdbc:hive2://hadoop101:10000> show create table parquet_table; +----------------------------------------------------+--+ | createtab_stmt | +----------------------------------------------------+--+ | CREATE EXTERNAL TABLE `parquet_table`( | | `name` string, | | `age` int) | | ROW FORMAT SERDE | | 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' | | STORED AS INPUTFORMAT | | 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' | | OUTPUTFORMAT | | 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' | | LOCATION | | 'hdfs://hadoop101:9000/home/hadoop/hive/warehouse/hivebook.db/parquet_table' | | TBLPROPERTIES ( | | 'transient_lastDdlTime'='1609154516') | +----------------------------------------------------+--+ ```