人妖在线一区,国产日韩欧美一区二区综合在线,国产啪精品视频网站免费,欧美内射深插日本少妇

新聞動(dòng)態(tài)

Z-Order加速Hudi大規(guī)模數(shù)據(jù)集方案分析

發(fā)布日期:2022-07-15 19:38 | 文章來源:站長之家

1. 背景

多維分析是大數(shù)據(jù)分析的一個(gè)典型場(chǎng)景,這種分析一般帶有過濾條件。對(duì)于此類查詢,尤其是在高基字段的過濾查詢,理論上只我們對(duì)原始數(shù)據(jù)做合理的布局,結(jié)合相關(guān)過濾條件,查詢引擎可以過濾掉大量不相關(guān)數(shù)據(jù),只需讀取很少部分需要的數(shù)據(jù)。例如我們?cè)谌霂熘皩?duì)相關(guān)字段做排序,這樣生成的每個(gè)文件相關(guān)字段的min-max值是不存在交叉的,查詢引擎下推過濾條件給數(shù)據(jù)源結(jié)合每個(gè)文件的min-max統(tǒng)計(jì)信息,即可過濾掉大量不相干數(shù)據(jù)。 上述技術(shù)即我們通常所說的data clustering 和 data skip。直接排序可以在單個(gè)字段上產(chǎn)生很好的效果,如果多字段直接排序那么效果會(huì)大大折扣的,Z-Order可以較好的解決多字段排序問題。

本文基于Apache Spark 以及 Apache Hudi 結(jié)合Z-order技術(shù)介紹如何更好的對(duì)原始數(shù)據(jù)做布局, 減少不必要的I/O,進(jìn)而提升查詢速度。具體提案可參考HudiRFC-28:Support Z-order curve

2. Z-Order介紹

Z-Order是一種可以將多維數(shù)據(jù)壓縮到一維的技術(shù),在時(shí)空索引以及圖像方面使用較廣。Z曲線可以以一條無限長的一維曲線填充任意維度的空間,對(duì)于數(shù)據(jù)庫的一條數(shù)據(jù)來說,我們可以將其多個(gè)要排序的字段看作是數(shù)據(jù)的多個(gè)維度,z曲線可以通過一定的規(guī)則將多維數(shù)據(jù)映射到一維數(shù)據(jù)上,構(gòu)建z-value 進(jìn)而可以基于該一維數(shù)據(jù)進(jìn)行排序。z-value的映射規(guī)則保證了排序后那些在多維維度臨近的數(shù)據(jù)在一維曲線上仍然可以彼此臨近。

wiki定義:假設(shè)存在一個(gè)二維坐標(biāo)對(duì)(x, y),這些坐標(biāo)對(duì)于于一個(gè)二維平面上,使用Z排序,我們可以將這些坐標(biāo)對(duì)壓縮到一維。

當(dāng)前在delta lake的商業(yè)版本實(shí)現(xiàn)了基于Z-Order的data Clustering技術(shù),開源方面Spark/Hive/Presto 均未有對(duì)Z-Order的支持。

3. 具體實(shí)現(xiàn)

我們接下來分2部分介紹如何在Hudi中使用Z-Order:

  • z-value的生成和排序
  • 與Hudi結(jié)合

3.1 z-value的生成和排序

這部分是Z-Order策略的核心,這部分邏輯是公用的,同樣適用其他框架。

Z-Order的關(guān)鍵在于z-value的映射規(guī)則。wiki上給出了基于位交叉的技術(shù),每個(gè)維度值的比特位交叉出現(xiàn)在最終的z-value里。例如假設(shè)我們想計(jì)算二維坐標(biāo)(x=97, y=214)的z-value,我們可以按如下步驟進(jìn)行

第一步:將每一維數(shù)據(jù)用bits表示

x value:01100001
y value:11010110

第二步:從y的最左側(cè)bit開始,我們將x和y按位做交叉,即可得到z 值,如下所示

z-value: 1011011000101001

對(duì)于多維數(shù)據(jù),我們可以采用同樣的方法對(duì)每個(gè)維度的bit位做按位交叉形成 z-value,一旦我們生成z-values 我們即可用該值做排序,基于z值的排序自然形成z階曲線對(duì)多個(gè)參與生成z值的維度都有良好的聚合效果。

上述生成z-value的方法看起來非常好,但在實(shí)際生產(chǎn)環(huán)境上我們要使用位交叉技術(shù)產(chǎn)生z-value 還需解決如下問題:

  • 上述介紹是基于多個(gè)unsigned int類型的遞增數(shù)據(jù),通過位交叉生成z-value的。實(shí)際上的數(shù)據(jù)類型多種多樣,如何處理其他類型數(shù)據(jù)
  • 不同類型的維度值轉(zhuǎn)成bit位表示,長度不一致如何處理
  • 如何選擇數(shù)據(jù)類型合理的保存z-value,以及相應(yīng)的z值排序策略

針對(duì)上述問題,我們采用兩種策略生成z值。

3.1.1 基于映射策略的z值生成方法

第一個(gè)問題:對(duì)不同的數(shù)據(jù)類型采用不同的轉(zhuǎn)換策略

  • 無符號(hào)類型整數(shù):直接轉(zhuǎn)換成bits位表示

  • Int類型的數(shù)據(jù): 直接轉(zhuǎn)成二進(jìn)制表示會(huì)有問題,因?yàn)閖ava里面負(fù)數(shù)的二進(jìn)制表示最高位(符號(hào)位)為1,而正整數(shù)的二進(jìn)制表示最高位為0(如下圖所示), 直接轉(zhuǎn)換后會(huì)出現(xiàn)負(fù)數(shù)大于正數(shù)的現(xiàn)象。

十進(jìn)制二進(jìn)制
00000 0000
10000 0001
20000 0010
1260111 1110
1270111 1111
-1281000 0000
-1271000 0001
-1261000 0010
-21111 1110
-11111 1111

對(duì)于這個(gè)問題,我們可以直接將二進(jìn)制的最高位反轉(zhuǎn),就可以保證轉(zhuǎn)換后的詞典順序和原值相同。如下圖

十進(jìn)制二進(jìn)制最高位反轉(zhuǎn)最高位反轉(zhuǎn)后十進(jìn)制
00000 00001000 0000128
10000 00011000 0001129
20000 00101000 0010130
1260111 11101111 1110254
1270111 11111111 1111255
-1281000 00000000 00000
-1271000 00010000 00011
-1261000 00100000 00102
-21111 11100111 1110126
-11111 11110111 1111127
  • Long類型的數(shù)據(jù):轉(zhuǎn)換方式和Int類型一樣,轉(zhuǎn)成二進(jìn)制形式并將最高位反轉(zhuǎn)
  • Double、Float類型的數(shù)據(jù):轉(zhuǎn)成Long類型,之后轉(zhuǎn)成二進(jìn)制形式并將最高位反轉(zhuǎn)
  • Decimal/Date/TimeStamp類型數(shù)據(jù):轉(zhuǎn)換成long類型,然后直接用二進(jìn)制表示。
  • UTF-8 String類型的數(shù)據(jù):String類型的數(shù)據(jù) 直接用二進(jìn)制表示即可保持原來的自然序, 但是字符串是不定長的無法直接用來做位交叉。 我們采用如下策略處理string類型大于8bytes的字符串截?cái)喑?bytes, 不足8bytes的string 填充成8bytes。
  • null值處理:
    • 數(shù)值類型的null直接變成該數(shù)值類型的最大值,之后按上述步驟轉(zhuǎn)換;
    • String類型null直接變成空字符串之后再做轉(zhuǎn)換;

第二個(gè)問題:生成的二進(jìn)制值統(tǒng)一按64位對(duì)齊即可

第三個(gè)問題:可以用Array[Byte]來保存z值(參考Amazon的DynamoDB 可以限制該數(shù)組的長度位1024)。對(duì)于 Array[Byte]類型的數(shù)據(jù)排序,hbase的rowkey 排序器可以直接拿來解決這個(gè)問題

基于映射策略的z值生成方法,方便快捷很容易理解,但是有一定缺陷:

參與生成z-value的字段理論上需要是從0開始的正整數(shù),這樣才能生成很好的z曲線。 真實(shí)的數(shù)據(jù)集中 是不可能有這么完美的情況出現(xiàn)的, zorder的效果將會(huì)打折扣。比如x 字段取值(0, 1, 2), y字段取值(100, 200, 300), 用x, y生成的z-value只是完整z曲線的一部分,對(duì)其做z值排序的效果和直接用x排序的效果是一樣的; 再比如x的基數(shù)值遠(yuǎn)遠(yuǎn)低于y的基數(shù)值時(shí)采用上述策略排序效果基本和按y值排序是一樣的,真實(shí)效果還不如先按x排序再按y排序。

String類型的處理, 上述策略對(duì)string類型是取前8個(gè)字節(jié)的參與z值計(jì)算, 這將導(dǎo)致精度丟失。 當(dāng)出現(xiàn)字符串都是相同字符串前綴的情況就無法處理了,比如"https://www.baidu.com" , "https://www.google.com" 這兩個(gè)字符串前8個(gè)字節(jié)完全一樣, 對(duì)這樣的數(shù)據(jù)截取前8個(gè)字節(jié)參與z值計(jì)算沒有任何意義。

上述策略出現(xiàn)缺陷的主要原因是數(shù)據(jù)的分布并不總是那么好導(dǎo)致。有一種簡單的方案可以解決上述問題: 對(duì)參與z值計(jì)算的所有維度值做全局Rank,用Rank值代替其原始值參與到z值計(jì)算中,由于Rank值一定是從0開始的正整數(shù),完全符合z值構(gòu)建條件,較好的解決上述問題。 在實(shí)驗(yàn)中我們發(fā)現(xiàn)這種用Rank值的方法確實(shí)很有效,但是z值生成效率極低,計(jì)算引擎做全局Rank的代價(jià)是非常高的,基于Rank的方法效率瓶頸在于要做全局Rank計(jì)算,那么我們可不可以對(duì)原始數(shù)據(jù)做采樣減少數(shù)據(jù)量,用采樣后的數(shù)據(jù)計(jì)算z值呢,答案是肯定的。

/** Generates z-value*/
val newRDD = df.rdd.map { row =>
  val values = zFields.map { case (index, field) =>
    field.dataType match {
      case LongType =>
        ZOrderingUtil.longTo8Byte(row.getLong(index))
      case DoubleType =>
        ZOrderingUtil.doubleTo8Byte(row.getDouble(index))
      case IntegerType =>
        ZOrderingUtil.intTo8Byte(row.getInt(index))
      case FloatType =>
        ZOrderingUtil.doubleTo8Byte(row.getFloat(index).toDouble)
      case StringType =>
        ZOrderingUtil.utf8To8Byte(row.getString(index))
      case DateType =>
        ZOrderingUtil.longTo8Byte(row.getDate(index).getTime)
      case TimestampType =>
        ZOrderingUtil.longTo8Byte(row.getTimestamp(index).getTime)
      case ByteType =>
        ZOrderingUtil.byteTo8Byte(row.getByte(index))
      case ShortType =>
        ZOrderingUtil.intTo8Byte(row.getShort(index).toInt)
      case d: DecimalType =>
        ZOrderingUtil.longTo8Byte(row.getDecimal(index).longValue())
      case _ =>
        null
    }
  }.filter(v => v != null).toArray
  val zValues = ZOrderingUtil.interleaveMulti8Byte(values)
  Row.fromSeq(row.toSeq ++ Seq(zValues))
}.sortBy(x => ZorderingBinarySort(x.getAs[Array[Byte]](fieldNum)))

3.1.2 基于RangeBounds的z-value生成策略

在介紹基于RangeBounds的z-value生成策略之前先看看Spark的排序過程,Spark排序大致分為2步

  • 對(duì)輸入數(shù)據(jù)的key做sampling來估計(jì)key的分布,按指定的分區(qū)數(shù)切分成range并排序。計(jì)算出來的rangeBounds是一個(gè)長度為numPartition - 1 的數(shù)組,該數(shù)組里面每個(gè)元素表示一個(gè)分區(qū)內(nèi)key值的上界/下界。
  • shuffle write 過程中,每個(gè)輸入的key應(yīng)該分到哪個(gè)分區(qū)內(nèi),由第一步計(jì)算出來的rangeBounds來確定。每個(gè)分區(qū)內(nèi)的數(shù)據(jù)雖然沒有排序,但是注意rangeBounds是有序的因此分區(qū)之間宏觀上看是有序的,故只需對(duì)每個(gè)分區(qū)內(nèi)數(shù)據(jù)做好排序即可保證數(shù)據(jù)全局有序。

參考Spark的排序過程,我們可以這樣做

  • 對(duì)每個(gè)參與Z-Order的字段篩選規(guī)定個(gè)數(shù)(類比分區(qū)數(shù))的Range并對(duì)進(jìn)行排序,并計(jì)算出每個(gè)字段的RangeBounds;
  • 實(shí)際映射過程中每個(gè)字段映射為該數(shù)據(jù)所在rangeBounds的中的下標(biāo),然后參與z-value的計(jì)算??梢钥闯鲇捎趨^(qū)間下標(biāo)是從0開始遞增的正整數(shù),完全滿足z值生成條件;并且String類型的字段映射問題也被一并解決了?;赗angeBounds的z值生成方法,很好的解決了第一種方法所面臨的缺陷。由于多了一步采樣生成RangeBounds的過程,其效率顯然不如第一種方案,我們實(shí)現(xiàn)了上述兩種z值生成方法以供選擇。
/** Generates z-value */
val indexRdd = internalRdd.mapPartitionsInternal { iter =>
  val bounds = boundBroadCast.value
  val origin_Projections = sortingExpressions.map { se =>
    UnsafeProjection.create(Seq(se), outputAttributes)
  }
  iter.map { unsafeRow =>
    val interleaveValues = origin_Projections.zip(origin_lazyGeneratedOrderings).zipWithIndex.map { case ((rowProject, lazyOrdering), index) =>
      val row = rowProject(unsafeRow)
      val decisionBound = new DecisionBound(sampleRdd, lazyOrdering)
      if (row.isNullAt(0)) {
        bounds(index).length + 1
      } else {
        decisionBound.getBound(row, bounds(index).asInstanceOf[Array[InternalRow]])
      }
    }.toArray.map(ZOrderingUtil.toBytes(_))
    val zValues = ZOrderingUtil.interleaveMulti4Byte(interleaveValues)
    val mutablePair = new MutablePair[InternalRow, Array[Byte]]()
    mutablePair.update(unsafeRow, zValues)
  }
}.sortBy(x => ZorderingBinarySort(x._2), numPartitions = fileNum).map(_._1)

3.2 與Hudi結(jié)合

與Hudi的結(jié)合大致分為兩部分

3.2.1 表數(shù)據(jù)的Z排序重組

這塊相對(duì)比較簡單,借助Hudi內(nèi)部的Clustering機(jī)制結(jié)合上述z值的生成排序策略我們可以直接完成Hudi表數(shù)據(jù)的數(shù)據(jù)重組,這里不再詳細(xì)介紹。

3.2.2 收集保存統(tǒng)計(jì)信息

這塊其實(shí)RFC27已經(jīng)在做了,感覺有點(diǎn)重復(fù)工作我們簡單介紹下我們的實(shí)現(xiàn),數(shù)據(jù)完成z重組后,我們需要對(duì)重組后的每個(gè)文件都收集參與z值計(jì)算的各個(gè)字段的min/max/nullCount 的統(tǒng)計(jì)信息。對(duì)于統(tǒng)計(jì)信息收集,可以通過讀取Parquet文件或者通過SparkSQL收集

  • 讀取Parquet文件收集統(tǒng)計(jì)信息
/** collect statistic info*/
val sc = df.sparkSession.sparkContext
val serializableConfiguration = new SerializableConfiguration(conf)
val numParallelism = inputFiles.size/3
val previousJobDescription = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
try {
  val description = s"Listing parquet column statistics"
  sc.setJobDescription(description)
  sc.parallelize(inputFiles, numParallelism).mapPartitions { paths =>
    val hadoopConf = serializableConfiguration.value
    paths.map(new Path(_)).flatMap { filePath =>
      val blocks = ParquetFileReader.readFooter(hadoopConf, filePath).getBlocks().asScala
      blocks.flatMap(b => b.getColumns().asScala.
        map(col => (col.getPath().toDotString(),
          FileStats(col.getStatistics().minAsString(), col.getStatistics().maxAsString(), col.getStatistics.getNumNulls.toInt))))
        .groupBy(x => x._1).mapValues(v => v.map(vv => vv._2)).
        mapValues(value => FileStats(value.map(_.minVal).min, value.map(_.maxVal).max, value.map(_.num_nulls).max)).toSeq.
        map(x => ColumnFileStats(filePath.getName(), x._1, x._2.minVal, x._2.maxVal, x._2.num_nulls))
    }.filter(p => cols.contains(p.colName))
  }.collect()
} finally {
  sc.setJobDescription(previousJobDescription)
}
  • 通過SparkSQL方式收集統(tǒng)計(jì)信息
/** collect statistic info*/
val inputFiles = df.inputFiles
val conf = df.sparkSession.sparkContext.hadoopConfiguration
val values = cols.flatMap(c => Seq( min(col(c)).as(c + "_minValue"), max(col(c)).as(c + "_maxValue"), count(c).as(c + "_noNullCou

之后將這些信息保存在Hudi表里面的hoodie目錄下的index目錄下,然后供Spark查詢使用。

3.2.3 應(yīng)用到Spark查詢

為將統(tǒng)計(jì)信息應(yīng)用Spark查詢,需修改HudiIndex的文件過濾邏輯,將DataFilter轉(zhuǎn)成對(duì)Index表的過濾,選出候選要讀取的文件,返回給查詢引擎,具體步驟如下。

  • 將索引表加載到 IndexDataFrame
  • 使用原始查詢過濾器為 IndexDataFrame 構(gòu)建數(shù)據(jù)過濾器
  • 查詢 IndexDataFrame 選擇候選文件
  • 使用這些候選文件來重建 HudiMemoryIndex

通過min/max值和null計(jì)數(shù)信息為 IndexDataFrame 構(gòu)建數(shù)據(jù)過濾器,由于z排序后參與z值計(jì)算的各個(gè)字段在每個(gè)文件里面的min/max值很大概率不交叉,因此對(duì)Index表的過濾可以過濾掉大量的文件。

/** convert filter */
def createZindexFilter(condition: Expression): Expression = {
  val minValue = (colName: Seq[String]) =>
    col(UnresolvedAttribute(colName) + "_minValue").expr
  val maxValue = (colName: Seq[String]) =>
    col(UnresolvedAttribute(colName) + "_maxValue").expr
  val num_nulls = (colName: Seq[String]) =>
    col(UnresolvedAttribute(colName) + "_num_nulls").expr
  condition match {
    case EqualTo(attribute: AttributeReference, value: Literal) =>
      val colName = HudiMergeIntoUtils.getTargetColNameParts(attribute)
      And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))
    case EqualTo(value: Literal, attribute: AttributeReference) =>
      val colName = HudiMergeIntoUtils.getTargetColNameParts(attribute)
      And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))
    case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ Literal(null, _)) =>
      val colName = HudiMergeIntoUtils.getTargetColNameParts(equalNullSafe.left)
      EqualTo(num_nulls(colName), equalNullSafe.right)
.......

4. 測(cè)試結(jié)果

我們采用databrick的測(cè)試樣例https://help.aliyun.com/document_detail/168137.html?spm=a2c4g.11186623.6.563.53c258ccmqvYfy 進(jìn)行了測(cè)試

測(cè)試數(shù)據(jù)量和資源使用大小和databrick保持一致。唯一區(qū)別是我們只生成了10000個(gè)文件,原文是100w個(gè)文件。 測(cè)試結(jié)果表明zorder加速比還說很可觀的,另外Z-Order的效果隨著文件數(shù)的增加會(huì)越來越好,我們后續(xù)也會(huì)在100w文件級(jí)別測(cè)試。

表名稱時(shí)間(s)
conn_random_parquet89.3
conn_zorder19.4
conn_zorder_only_ip18.2

以上就是Z-Order加速Hudi大規(guī)模數(shù)據(jù)集方案分析的詳細(xì)內(nèi)容,更多關(guān)于Z-Order加速Hudi大規(guī)模數(shù)據(jù)集的資料請(qǐng)關(guān)注本站其它相關(guān)文章!

美國服務(wù)器租用

版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請(qǐng)保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場(chǎng),如有內(nèi)容涉嫌侵權(quán),請(qǐng)聯(lián)系alex-e#qq.com處理。

實(shí)時(shí)開通

自選配置、實(shí)時(shí)開通

免備案

全球線路精選!

全天候客戶服務(wù)

7x24全年不間斷在線

專屬顧問服務(wù)

1對(duì)1客戶咨詢顧問

在線
客服

在線客服:7*24小時(shí)在線

客服
熱線

400-630-3752
7*24小時(shí)客服服務(wù)熱線

關(guān)注
微信

關(guān)注官方微信
頂部