当前位置: 首页 > 資訊 >

Batch Processing (3-1) - MapReduce Reduce-Side Joins and Grouping

Reduce-Side Joins and Grouping

當 MapReuce Job 執行時,它會讀取所有的輸入資料,相較於資料庫來說等於 全表掃描 (full table scan),在資料庫中,如果你查詢的資料量不大,資料庫通常會使用 index 快速定位至你的資料所在 (2020 Day 8),如果查詢涉及 join,資料庫可能會查找多個 index,然而 MapReduce 中沒有 index 這種概念的東西 - 至少不是一般認知中的 index,所以若你想在 MapReduce Job 中過濾出少量的資料,其效能會遠低於使用 index 查找。

在分析查詢的場景下,大部份是從大量資料中做某種程度的聚合,故全表掃描是合理的,再加上 MapReduce 能並行的在多台機器執行,所以我們在談論批次處理中的 join 時,意味著會讀取所有相關的資料集做事,那我們這就來看看 MapReduce join 的細節吧!

例子:分析 user 活動事件

如下圖 10-2,左邊是 user 的活動事件,但我們想在分析時加入年齡維度,所以需要 join 右邊的 user 資料,找到生日。

Srot-merge joins

回憶一下 mapper 做的事,它會從每一筆 record 中提取 key 和 value 出來,因為要做 join,所以我們會有 2 個 mapper ,2 個 mapper 的 key 都是 user ID,整個 join 流程如下圖 10-3:

mapper 會以 key 為基準,完成資料的分區及排序,所以相同 user ID 的資料會相鄰的當做 reducer 的輸入,除了 key 的排序外,MapReduce Job 可以安排 user database 的資料在前面,接著後面的活動 event 再用 timestamp 做排序,這個技巧也稱為 二次排序 (secondary sort)

感謝二次排序,reducer 在處理時,它一次只需要保留第一筆的 user 生日在記憶體中,爾後直接附加生日年份到 event 中,這個算法也稱之為 sort-merge join:mapper 輸出用 key 排序的資料,然後 reducer 將再合併雙方已排序的資料(做 join)。

Group by

除了 join,另一個常在分析用到的功能就是 group by 了,像昨天分析 log 的例子就是以 URL 做為 key, group by 後計算次數,所以,MapReduce 幫我們在 mapper 階段做好 group by 了。

處理傾斜

看到傾斜 2 字就該警覺一下了,因為 MapReduce 的模式是,把所有相同 key 的資料都帶往同一個地方,若有幾個 key 的資料特別多呢?以社群網路舉例,你我的跟隨者可能才 100 人不到,但名人的跟隨者可能是上百萬,這種不成比例的資料也稱為 hot keys

當有單一 reducer 因為 hot key 的關係,比其他 reducer 收到更大量資料,稱為 傾斜 (skew) 或者 hot spots,MapReduce Job 只有在所有 mapper 和 reducer 都完成時才會完成,所以往後 workflow 的 job 都會因為某個 reducer 傾斜的關係,拖慢速度。

當 join 包含 hot keys 時,這裡介紹 2 個不同工具的解決辦法。

Pig 的 skewed join 方法是,它首先會執行一個簡單的 job 檢測哪個 key hot,當執行到 join 時,mapper 會把 hot key 中的資料以隨機方式送到多個 reducer 中,其他 join 的資料,皆會複製一份到所有分配到 hot key 資料的 reducer 中。

Hive 則是以不同方式優化 skewed join,它需要在 table 的 metadata 中明確指定 hot keys,然後把這些資料分散到不同的檔案中,當執行到 join 時,它使用 map-side join 處理 (Day 26 介紹)。


今天講 reduce-side join,明天就講 map-side join 啦。