今天给各位分享9000套网站源码分享分享的知识,其中也会对实用的网站源码进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!
一背景
在建设准实时数据仓库时,由于列式存储具有较高的查询性能,因此,通常都采用ORC、Parquet数据格式,但是这种格式不能追加数据。而HDFS的数据块大小一般都是128MB或者256MB,如果等文件凑够一个HDFSBlock大小再写入时,就会导致数据延迟增大。因此,难免产生一个非常常见但是很麻烦的问题,即HDFS小文件问题。过多的小文件会增加NameNode的压力,并且影响查询性能,所以我们在使用流式数据入库的之后,一般会对小文件进行合并处理。
1.1产生小文件的现象
即使是使用Iceberg这种数据湖解决方案,也难免产生小文件。因此,Iceberg本身也想到了解决小文件的方案。
如何每次就写入几条数据,Iceberg的每个分区在写文件的时候都会产生新的文件,这就导致底层文件系统里面产生了很多大小才几KB的文件。下面是我们在前面的演示环境中写了3次数据:
–创建Iceberg表\nCREATETABLEsensordata(\nsensor_idSTRING,\ntsBIGINT,\ntemperatureDOUBLE,\ndtSTRING\n)USINGiceberg\nPARTITIONEDBY(dt);\n\n–Append写入1条数据\nINSERTINTOsensordataVALUES(&39;,1635743301,-12.1,&39;);\n–OverWrite写入一条数据\nINSERTOVERWRITEsensordataVALUES(&39;,1635743301,23.6,&39;);\n–Append写入1条数据\nINSERTINTOsensordataVALUES(&39;,1638421701,-22.2,&39;);
虽然每次仅仅写入1条数据,但是却产生了很多小文件。
[bigdata@bigdata185iceberg]$tree\n.\n└──sensordata\n├──data\n│├──dt=2021-12-01\n││├──00000-0-275a936f-4d21-4a82-9346-bceac4381e6c-00001.parquet\n││└──00000-2-1189ac19-b488-4956-8de8-8dd96cd5920a-00001.parquet\n│└──dt=2021-12-02\n│└──00000-1-cc4f552a-28eb-4ff3-a4fa-6c28ce6e5f79-00001.parquet\n└──metadata\n├──0dafa6f3-2dbd-4728-ba9b-af31a3416700-m0.avro\n├──2b1fbd5a-6241-4f7d-a4a6-3121019b9afb-m0.avro\n├──2b1fbd5a-6241-4f7d-a4a6-3121019b9afb-m1.avro\n├──ad4cd65e-7351-4ad3-baaf-5e5bd99dc257-m0.avro\n├──snap-232980156660427676-1-0dafa6f3-2dbd-4728-ba9b-af31a3416700.avro\n├──snap-4599216928086762873-1-ad4cd65e-7351-4ad3-baaf-5e5bd99dc257.avro\n├──snap-5874199297651146296-1-2b1fbd5a-6241-4f7d-a4a6-3121019b9afb.avro\n├──v1.metadata.json\n├──v2.metadata.json\n├──v3.metadata.json\n├──v4.metadata.json\n└──version-hint.text\n\n5directories,16files\n[bigdata@bigdata185iceberg]$
1.2产生小文件的原因
当我们执行这个INSERT语句时,会发生以下过程:
首先创建一个Parquet格式数据文件-sensordata/data/dt=2021-12-01/00000-5-cbf2920c-3823-41a1-a612-04679b50a999-00001.parquet创建一个指向这个数据文件的清单文件-sensordata/metadata/cd1171e3-d178-42d0-8f0d-634804f97a01-m0.avro创建指向该清单文件的清单列表文件-sensordata/metadata/snap-2251043931717096659-1-cd1171e3-d178-42d0-8f0d-634804f97a01.avro基于当前的元数据文件(v1.metadata.json)创建新的元数据文件,并通过新快照s1跟踪先前的快照s0,指向此清单列表文件-sensordata/metadata/v2.metadata.json最后,当前元数据指针的值version-hint.text在目录中自动更新,现在指向这个新的元数据文件(v2.metadata.json)。
当一个事务commit完成之后,会生成metadata.json和Manifest文件。
优势就是数据以事务原子性的方式写入Iceberg表,但是不足恰好是,每次提交数据都要产生一次快照,这难免就产生的小文件。
1.3快照保留周期过长
Iceberg使用v[number].metadata.json文件跟踪表元数据。对表的每次更改操作都会生成一个新的元数据文件以提供原子性。
默认情况下,旧的元数据文件会保留历史记录。频繁提交的表,特别是在流作业写数据时,需要定期清理元数据文件。
每张表的write.metadata.delete-after-commit.enabled默认值为false,如果不设置为true,历史版本元数据就不会被删除。每张表最大的快照保留数write.metadata.previous-versions-max,默认为100(可以看到100以内的每次Snapshot)。
二解决方案
2.1设置历史快照保留周期
Iceberg每一次操作都会产生多个数据文件(metadata、data、snapshot),需要自行合并清理。
2.1.1建Iceberg新增with设置
通过org.apache.iceberg.actions.RewriteDataFiles来实现小文件合并时,如果仅仅对Iceberg表的数据进行小文件合并,但是不开启write.metadata.delete-after-commit.enabled为true,历史不会被删除,开启后就会实现合并后清除历史文件。因此,建议设置合理的快照保存策略,write.metadata.previous-versions-max(历史文件保留最大值为2,metadata的里面文件数则始终保持为3个)。
配置保留历史数量(比如配置为2,则元数据和数据都保留2份历史数据和1份最新数据)\nwrite.metadata.previous-versions-max=2
2.1.2建表示例
CREATETABLEsensordata_01(\nsensor_idSTRING,\ntsBIGINT,\ntemperatureDOUBLE,\ndtSTRING\n)USINGiceberg\nPARTITIONEDBY(dt)\nTBLPROPERTIES(&39;=&39;,\n&39;=&39;);\n\n–必须在sparksql下执行,FlinkSQL不支持\nALTERTABLEsensordata_01SETTBLPROPERTIES(&39;=&39;);\nALTERTABLEsensordata_01SETTBLPROPERTIES(&39;=&39;);
2.2开启文件压缩程序
Iceberg跟踪表中的每个数据文件。更多的数据文件会导致更多的元数据存储在清单文件中,而小数据文件会导致不必要的元数据量和文件打开成本,从而降低查询数据的效率。
2.2.1Spark
Iceberg可以使用带有操作的Spark并行压缩数据文件rewriteDataFiles。这会将小文件组合成更大的文件,以减少元数据开销和运行时文件打开成本。
Actions.forTable(table).rewriteDataFiles()\n.targetSizeInBytes(128*1024*1024)//128MB\n.execute()
2.2.2Flink
Iceberg可以使用带有操作的Flink并行压缩数据文件rewriteDataFiles。这会将小文件组合成更大的文件,以减少元数据开销和运行时文件打开成本。
Actions.forTable(table)\n.rewriteDataFiles()\n.maxParallelism(1)\n.targetSizeInBytes(128*1024*1024)//128MB\n.execute();
2.3重写Manifest文件
Iceberg在其清单列表和清单文件中使用元数据来加快查询计划并修剪不必要的数据文件。元数据树用作表数据的索引。
元数据树中的清单会按照它们添加的顺序自动压缩,当写入模式与读取过滤器对齐时,查询会更快。例如,在数据到达时写入每小时分区的数据与时间范围查询过滤器保持一致。
2.3.1Spark
当表的写入模式与查询模式不一致时,可以重写元数据以使用rewriteManifests操作将数据文件重新分组到清单中,Spark支持并行重写操作。
table.rewriteManifests()\n.rewriteIf((file)->file.length()<32*1024*1024)//32MB\n.clusterBy((file)->file.partition().get(0,String.class))\n.commit();
2.3.2Flink
\ttable.rewriteManifests\n.rewriteIf((file)=>file.length<32*1024*1024)//32MB\n.clusterBy((file)=>file.partition.get(0,classOf[String]))\n.commit
2.3使快照过期
在每次向Iceberg表写数据时,都会创建一个新的快照,快照可以用于基于时间旅行查询,或者将表回滚到历史上的某一有效快照上。建议定期删除过期快照,以删除不再需要的数据文件,使表元数据的最小且可用。
2.3.1Spark
目前我们的应用场景只需要查询当前数据就可以了,不需要查询历史数据,所以我只保留了最新的快照。在每次压缩程序之后,做了处理,使当前快照时间以前的快照过期。程序会自动删除以前的过期数据文件。过期旧快照会将它们从元数据中删除,因此它们不再可用于时间旅行查询。而数据文件只有在不能被基于时间旅行查询之后,才会被删除。
valsnapshot=table.currentSnapshot\nif(snapshot!=null){\ntable.expireSnapshots.expireOlderThan(snapshot.timestampMillis).commit\n}
2.3.2Flink
过期的旧快照会将它们从元数据中删除,因此它们不再可用于时间旅行查询。
//6清除5分钟前的历史快照\nSnapshotsnapshot=table.currentSnapshot();\nlongoldSnapshot=snapshot.timestampMillis()-TimeUnit.MINUTES.toMillis(5);\n\nif(snapshot!=null){\ntable.expireSnapshots().expireOlderThan(oldSnapshot).commit();\n}
数据文件在不再被可用于时间旅行或回滚的快照引用之前不会被删除。定期过期的快照会删除未使用的数据文件。
三源代码
3.1Spark
(1)使用Spark编写的合并Iceberg表小文件,是快照过期,删除不用的数据文件。
packagecom.yunclass.iceberg.streaming\n\nimportjava.util\n\nimportorg.apache.hadoop.conf.Configuration\nimportorg.apache.iceberg.Table\nimportorg.apache.iceberg.actions.Actions\nimportorg.apache.iceberg.catalog.{Namespace,TableIdentifier}\nimportorg.apache.iceberg.expressions.Expressions\nimportorg.apache.iceberg.hadoop.HadoopCatalog\nimportorg.apache.iceberg.spark.SparkCatalog\nimportorg.apache.spark.SparkConf\nimportorg.apache.spark.sql.SparkSession\n\nobjectCombineTableFiles{\n\ndefmain(args:Array[String]):Unit={\n\n//1设置执行账号\nSystem.setProperty(&34;,&34;)\n//2配置SparkSession\nvalsparkConf=newSparkConf()\n.set(&34;,&34;)\n.set(s&34;,classOf[SparkCatalog].getName)\n.set(s&34;,&34;)\n.set(s&34;,&34;)\n.setMaster(&34;).setAppName(&34;)\nsparkConf.set(&34;,&34;)\n\nvalsparkSession=SparkSession.builder().config(sparkConf).getOrCreate()\n\n//3获取TableLoader\nvalconf=newutil.HashMap[String,String]\nconf.put(&34;,&34;)\nconf.put(&34;,&34;)\nconf.put(&34;,&34;)\nvalhadoopCatalog=newHadoopCatalog(newConfiguration())\nhadoopCatalog.initialize(&34;,conf)\n\nvalidentifier=TableIdentifier.of(Namespace.of(&34;),&34;)\nvaltable=hadoopCatalog.loadTable(identifier)\n\n//调用合并小文件方法\n//combineFiles(sparkSession,table)\ndeleteSnapshot(table)\n}\n\n//合并小文件\ndefcombineFiles(sparkSession:SparkSession,table:Table):Unit={\nActions.forTable(sparkSession,table).rewriteDataFiles()\n.filter(Expressions.equal(&34;,&34;))\n.targetSizeInBytes(128*1024*1024)\n.execute()\n\n//重新manifest文件\ntable.rewriteManifests()\n.rewriteIf((file)=>file.length()<28*1024*1024)\n.clusterBy((file)=>file.partition().get(0,classOf[String]))\n.commit()\n}\n\n//删除快照信息\ndefdeleteSnapshot(table:Table):Unit={\nvalsnapshot=table.currentSnapshot()\nvaloldSnapshot=snapshot.timestampMillis()\nif(snapshot!=null){\ntable.expireSnapshots().expireOlderThan(oldSnapshot).commit()\n}\n}\n}\n
(2)合并小文件之前的数据状态
(3)合并之后的状态,将小于128MB的文件,合并成大数据文件。
3.2Flink
(1)使用Flink编写的合并Iceberg表小文件,是快照过期,删除不用的数据文件。
packagecom.yunclass.iceberg.streaming;\n\nimportcom.sun.javafx.fxml.expression.Expression;\nimportorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;\nimportorg.apache.flink.table.api.Expressions;\nimportorg.apache.hadoop.conf.Configuration;\nimportorg.apache.iceberg.Snapshot;\nimportorg.apache.iceberg.Table;\nimportorg.apache.iceberg.catalog.Catalog;\nimportorg.apache.iceberg.catalog.Namespace;\nimportorg.apache.iceberg.catalog.TableIdentifier;\nimportorg.apache.iceberg.flink.CatalogLoader;\nimportorg.apache.iceberg.flink.actions.Actions;\n\nimportjava.util.HashMap;\nimportjava.util.Map;\n\npublicclassCombineTableFileDemo{\npublicstaticvoidmain(String[]args){\n//1设置执行用户\nSystem.setProperty(&34;,&34;);\n\n//2获取Flink执行环境\nStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();\nenv.setParallelism(1);\n\n//3使用HadoopCatalog模式加载Iceberg表\nMap<String,String>icebergMap=newHashMap<>();\nicebergMap.put(&34;,&34;);\nicebergMap.put(&34;,&34;);\nicebergMap.put(&34;,&34;);\nicebergMap.put(&34;,&34;);\n\n//获取catalogLoader\nCatalogLoaderhadoopCatalog=CatalogLoader.hadoop(&34;,newConfiguration(),icebergMap);\nCatalogcatalog=hadoopCatalog.loadCatalog();\nTableIdentifiertableIdentifier=TableIdentifier.of(Namespace.of(&34;),&34;);\nTabletable=catalog.loadTable(tableIdentifier);\n\n//调用方法\n//combineFiles(env,table);\ndeleteOldSnapshot(table);\n}\n\n//合并小文件\nprivatestaticvoidcombineFiles(StreamExecutionEnvironmentenv,Tabletable){\nActions.forTable(env,table)\n.rewriteDataFiles()\n.maxParallelism(1)\n.targetSizeInBytes(128*1024*1024)\n.execute();\n\n//重写Manifest文件\ntable.rewriteManifests()\n.rewriteIf((file)->file.length()<32*1024*1024)\n.clusterBy((file)->file.partition().get(0,String.class))\n.commit();\n}\n\n//删除过期快照\nprivatestaticvoiddeleteOldSnapshot(Tabletable){\nSnapshotsnapshot=table.currentSnapshot();\nlongoldSnapshot=snapshot.timestampMillis();\nif(snapshot!=null){\ntable.expireSnapshots().expireOlderThan(oldSnapshot).commit();\n}\n}\n}\n
(2)合并小文件之前的数据状态
(3)合并之后的状态,将小于32MB的文件,合并成大Manifest文件。
四总结
关于9000套网站源码分享分享和实用的网站源码的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。
