久久久精品一区ed2k-女人被男人叉到高潮的视频-中文字幕乱码一区久久麻豆樱花-俄罗斯熟妇真实视频

SparkSQLJoin原理分析

Spark SQL Join原理分析

1. Join問(wèn)題綜述:

Join有inner,leftouter,rightouter,fullouter,leftsemi,leftanti六種類型,對(duì)單獨(dú)版本的Join操作,可以將問(wèn)題表述為:

創(chuàng)新互聯(lián)公司是一家專業(yè)提供額濟(jì)納企業(yè)網(wǎng)站建設(shè),專注與網(wǎng)站設(shè)計(jì)制作、成都網(wǎng)站制作H5高端網(wǎng)站建設(shè)、小程序制作等業(yè)務(wù)。10年已為額濟(jì)納眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進(jìn)行中。

IterA,IterB為兩個(gè)Iterator,根據(jù)規(guī)則A將兩個(gè)Iterator中相應(yīng)的Row進(jìn)行合并,然后按照規(guī)則B對(duì)合并后Row進(jìn)行過(guò)濾。
比如Inner_join,它的合并規(guī)則A為:對(duì)IterA中每一條記錄,生成一個(gè)key,并利用該key從IterB的Map集合中獲取到相應(yīng)記錄,并將它們進(jìn)行合并;而對(duì)于規(guī)則B可以為任意過(guò)濾條件,比如IterA和IterB任何兩個(gè)字段進(jìn)行比較操作。

對(duì)于IterA和IterB,當(dāng)我們利用iterA中key去IterB中進(jìn)行一一匹配時(shí),我們稱IterA為streamedIter,IterB為BuildIter或者hashedIter。即我們流式遍歷streamedIter中每一條記錄,去hashedIter中去查找相應(yīng)匹配的記錄。

而這個(gè)查找過(guò)程中,即為Build過(guò)程,每一次Build操作的結(jié)果即為一條JoinRow(A,B),其中JoinRow(A)來(lái)自streamedIter,JoinRow(B)來(lái)自BuildIter,此時(shí)這個(gè)過(guò)程為BuildRight,而如果JoinRow(B)來(lái)自streamedIter,JoinRow(A)來(lái)自BuildIter,即為BuildLeft,

有點(diǎn)拗口!那么為什么要去區(qū)分BuildLeftBuildRight呢?對(duì)于leftouterrightouter,leftsemi,leftanti,它們的Build類型是確定,即left*BuildRight,right*BuildLeft類型,但是對(duì)于inner操作,BuildLeftBuildRight兩種都可以,而且選擇不同,可能有很大性能區(qū)別:

BuildIter也稱為hashedIter,即需要將BuildIter構(gòu)建為一個(gè)內(nèi)存Hash,從而加速Build的匹配過(guò)程;此時(shí)如果BuildIter和streamedIter大小相差較大,顯然利用小的來(lái)建立Hash,內(nèi)存占用要小很多!

總結(jié)一下:Join即由下面幾部分組成:

trait Join {
  val joinType: JoinType //Join類型
  val streamedPlan: SparkPlan //用于生成streamedIter
  val buildPlan: SparkPlan //用于生成hashedIter

  val buildSide: BuildSide //BuildLeft或BuildRight
  val buildKeys: Seq[Expression] //用于從streamedIter中生成buildKey的表達(dá)式
  val streamedKeys: Seq[Expression] //用于從hashedIter中生成streamedKey的表達(dá)式

  val condition: Option[Expression]//對(duì)joinRow進(jìn)行過(guò)濾
}

注:對(duì)于fullouter,IterA和IterB同時(shí)為streamedIter和hashedIter,即先IterA=streamedIter,IterB=hashedIter進(jìn)行l(wèi)eftouter,然后再用先IterB=streamedIter,IterA=hashedIter進(jìn)行l(wèi)eftouter,再把兩次結(jié)果進(jìn)行合并。

1.1 幾種Join的實(shí)現(xiàn)

1.1.1 InnerJoin
  1. 利用streamIter中每個(gè)srow,從hashedIter中查找匹配項(xiàng);
  2. 如果匹配成功,即構(gòu)建多個(gè)JoinRow,否則返回empty

    streamIter.flatMap{ srow =>
        val joinRow = new JoinedRow
        joinRow.withLeft(srow)
        val matches = hashedIter.get(buildKeys(srow))
        if (matches != null) {
            matches.map(joinRow.withRight(_)).filter(condition)
        } else {
            Seq.empty
        }
    }
1.1.2 LeftOutJoin
  1. leftIter即為streamIter,而RightIter即為hashedIter,不可以改變
  2. 利用streamIter中每個(gè)srow,從hashedIter中查找匹配項(xiàng);
  3. 如果匹配成功,即構(gòu)建多個(gè)JoinRow,否則返回JoinRow的Build部分為Null

    val nullRow = new NullRow()
    streamIter.flatMap{ srow =>
        val joinRow = new JoinedRow
        joinRow.withLeft(srow)
        val matches = hashedIter.get(buildKeys(srow))
        if (matches != null) {
            matches.map(joinRow.withRight(_)).filter(condition)
        } else {
            Seq(joinRow.withRight(nullRow))
        }
    }
1.1.3 RightOutJoin
  1. RightIter即為streamIter,而LeftIter即為hashedIter,不可以改變
  2. 利用streamIter中每個(gè)srow,從hashedIter中查找匹配項(xiàng);
  3. 如果匹配成功,即構(gòu)建多個(gè)JoinRow,否則返回JoinRow的Build部分為Null

    val nullRow = new NullRow()
    streamIter.flatMap{ srow =>
        val joinRow = new JoinedRow
        joinRow.withRight(srow)//注意與LeftOutJoin的區(qū)別
        val matches = hashedIter.get(buildKeys(srow))
        if (matches != null) {
            matches.map(joinRow.withLeft(_)).filter(condition)
        } else {
            Seq(joinRow.withLeft(nullRow))
        }
    }
1.1.4 LeftSemi
  1. leftIter即為streamIter,而RightIter即為hashedIter,不可以改變
  2. 利用streamIter中每個(gè)srow,從hashedIter中查找匹配項(xiàng);
  3. 如果匹配成功,即返回srow,否則返回empty
  4. 它不是返回JoinRow,而是返回srow

    streamIter.filter{ srow =>
        val matches = hashedIter.get(buildKeys(srow))
        if(matches == null) {
            false //沒有找到匹配項(xiàng)
        } else{
            if(condition.isEmpty == false) { //需要對(duì)`假想`后joinrow進(jìn)行判斷
                    val joinRow = new JoinedRow
                    joinRow.withLeft(srow)
                    ! matches.map(joinRow.withLeft(_)).filter(condition).isEmpty
            } else {
                true
            }
        }
    }

    LeftSemi從邏輯上來(lái)說(shuō),它即為In判斷。

1.1.5 LeftAnti
  1. leftIter即為streamIter,而RightIter即為hashedIter,不可以改變
  2. 利用streamIter中每個(gè)srow,從hashedIter中查找匹配項(xiàng);
  3. 它匹配邏輯為L(zhǎng)eftSemi基本相反,即相當(dāng)于No In判斷。
  4. 如果匹配不成功,即返回srow,否則返回empty
  5. 它不是返回JoinRow,而是返回srow

    streamIter.filter{ srow =>
        val matches = hashedIter.get(buildKeys(srow))
        if(matches == null) {
            true //沒有找到匹配項(xiàng)
        } else{
            if(condition.isEmpty == false) { //需要對(duì)`假想`后joinrow進(jìn)行判斷
                    val joinRow = new JoinedRow
                    joinRow.withLeft(srow)
                    matches.map(joinRow.withLeft(_)).filter(condition).isEmpty
            } else {
                false
            }
        }
    }

1.2 HashJoin與SortJoin

上面描述的Join是需要將BuildIter在內(nèi)存中構(gòu)建為hashedIter,從而加速匹配過(guò)程,因此我們也將這個(gè)Join稱為HashJoin。但是建立一個(gè)Hash表需要占用大量的內(nèi)存。
那么問(wèn)題來(lái):如果我們的Iter太大,無(wú)法建立Hash表怎么吧?在分布式Join計(jì)算下,Join過(guò)程中發(fā)生在Shuffle階段,如果一個(gè)數(shù)據(jù)集的Key存在數(shù)據(jù)偏移,很容易出現(xiàn)一個(gè)BuildIter超過(guò)內(nèi)存大小,無(wú)法完成Hash表的建立,進(jìn)而導(dǎo)致HashJoin失敗,那么怎么辦?

在HashJoin過(guò)程中,針對(duì)BuildIter建立hashedIter是為了加速匹配過(guò)程中。匹配查找除了建立Hash表這個(gè)方法以外,將streamedIter和BuildIter進(jìn)行排序,也是一個(gè)加速匹配過(guò)程,即我們這里說(shuō)的sortJoin。

排序不也是需要內(nèi)存嗎?是的,首先排序占用內(nèi)存比建立一個(gè)hash表要小很多,其次排序如果內(nèi)存不夠,可以將一部分?jǐn)?shù)據(jù)Spill到磁盤,而Hash為全內(nèi)存,如果內(nèi)存不夠,將會(huì)導(dǎo)致整個(gè)Shuffle失敗。

下面以InnerJoin的SortJoin實(shí)現(xiàn)為例子,講述它與HashJoin的區(qū)別:

  1. streamIter和BuildIter都需要為有序。
  2. 利用streamIter中每個(gè)srow,從BuildIter中順序查找,由于兩邊都是有序的,所以查找代價(jià)很小。

    val buildIndex = 0
    streamIter.flatMap{ srow =>
        val joinRow = new JoinedRow
        joinRow.withLeft(srow)
        //順序查找
        val matches = BuildIter.search(buildKeys(srow), buildIndex)
        if (matches != null) {
            matches.map(joinRow.withRight(_)).filter(condition)
            buildIndex += matches.length
        } else {
            Seq.empty
        }
    }

對(duì)于FullOuterJoin,如果采用HashJoin方式來(lái)實(shí)現(xiàn),代價(jià)較大,需要建立雙向的Hash表,而基于SortJoin,它的代價(jià)與其他幾種Join相差不大,因此`FullOuter默認(rèn)都是基于SortJon來(lái)實(shí)現(xiàn)。

2. Spark中的Join實(shí)現(xiàn)

Spark針對(duì)Join提供了分布式實(shí)現(xiàn),但是Join操作本質(zhì)上也是單機(jī)進(jìn)行,怎么理解?如果要對(duì)兩個(gè)數(shù)據(jù)集進(jìn)行分布式Join,Spark會(huì)先對(duì)兩個(gè)數(shù)據(jù)集進(jìn)行Exchange,即進(jìn)行ShuffleMap操作,將Key相同數(shù)據(jù)分到一個(gè)分區(qū)中,然后在ShuffleFetch過(guò)程中利用HashJoin/SortJoin單機(jī)版算法來(lái)對(duì)兩個(gè)分區(qū)進(jìn)行Join操作。

另外如果Build端的整個(gè)數(shù)據(jù)集(非一個(gè)iter)大小較小,可以將它進(jìn)行Broadcast操作,從而節(jié)約Shuffle的開銷。

因此Spark支持ShuffledHashJoinExec,SortMergeJoinExec,BroadcastHashJoinExec三種Join算法,那么它怎么進(jìn)行選擇的呢?

  • 如果build-dataset支持Broadcastable,并且它的大小小于spark.sql.autoBroadcastJoinThreshold,默認(rèn)10M,那么優(yōu)先進(jìn)行BroadcastHashJoinExec
  • 如果dataset支持Sort,并且spark.sql.join.preferSortMergeJoin為True,那么優(yōu)先選擇SortMergeJoinExec
  • 如果dataset不支持Sort,那么只能選擇ShuffledHashJoinExec
    • 如果Join同時(shí)支持BuildRight和BuildLeft,那么根據(jù)兩邊數(shù)據(jù)大小,優(yōu)先選擇數(shù)據(jù)量小的進(jìn)行Hash。

這一塊邏輯都在org.apache.spark.sql.execution.JoinSelection 中描述。ps:Spark也對(duì)Without joining keys的Join進(jìn)行支持,但是不在我們這次討論范圍中。

BroadcastHashJoinExec

val p = spark.read.parquet("/Users/p.parquet")
val p1 = spark.read.parquet("/Users/p1.parquet")
p.joinWith(p1, p("to_module") === p1("to_module"),"inner")

此時(shí)由于p和p1的大小都較小,它會(huì)默認(rèn)選擇BroadcastHashJoinExec
== Physical Plan ==
BroadcastHashJoin [_1#269.to_module], [_2#270.to_module], Inner, BuildRight
    :- Project p
    :- Project p1

SortMergeJoinExec

val p = spark.read.parquet("/Users/p.parquet")
val p1 = spark.read.parquet("/Users/p1.parquet")
p.joinWith(p1, p("to_module") === p1("to_module"),"fullouter")

fullouterJoin不支持Broadcast和ShuffledHashJoinExec,因此為ShuffledHashJoinExec

== Physical Plan ==
SortMergeJoin [_1#273.to_module], [_2#274.to_module], FullOuter
    :- Project p
    :- Project p1

由于ShuffledHashJoinExec一般情況下,不會(huì)被選擇,它的條件比較苛責(zé)。

//首先不能進(jìn)行Broadcast!
private def canBroadcast(plan: LogicalPlan): Boolean = {
  plan.statistics.isBroadcastable ||
    plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold(10M)
}
//其次spark.sql.join.preferSortMergeJoin必須設(shè)置false
//然后build端可以放的進(jìn)內(nèi)存!
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
  plan.statistics.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}
 //最后build端和stream端大小必須相差3倍!否則使用sort性能要好。
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
  a.statistics.sizeInBytes * 3 <= b.statistics.sizeInBytes
}
//或者RowOrdering.isOrderable(leftKeys)==false

本文標(biāo)題:SparkSQLJoin原理分析
URL地址:http://sd-ha.com/article12/jichgc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供移動(dòng)網(wǎng)站建設(shè)、網(wǎng)站策劃App開發(fā)、網(wǎng)站收錄、手機(jī)網(wǎng)站建設(shè)、云服務(wù)器

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站網(wǎng)頁(yè)設(shè)計(jì)