各位大牛們,我現在有這樣一個問題,我從redis中取資料,redis中的資料是安partition存盤的,我需要讓不同patition中的資料在各自的partition中分別處理,應該如何做啊。應該如何處理呢?
uj5u.com熱心網友回復:
你可以在foreachPartition算子內進行判斷,然后根據條件執行指定的方法。例如
rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
@Override
public void call(Iterator<String> it) throws Exception {
if( xxxxx ) { // 條件。比如遍歷該磁區的資料去取某個特征
new RealForeachPartitionFunc1().call(it); // 執行真正的foreachPartition算子
} else {
new RealForeachPartitionFunc2().call(it);
}
}
});
uj5u.com熱心網友回復:
自定義磁區,rdd.partitionBy(/*自定義的磁區*/new TestPartitioner()).foreachPartition {/*對磁區內資料的操作代碼*/}class TestPartitioner extends Partitioner {
//redis磁區個數
override def numPartitions: Int = ???
override def getPartition(key: Any): Int = {
/*redis中的磁區規則*/
}
}
uj5u.com熱心網友回復:
自定義磁區
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/69072.html
標籤:Spark
上一篇:hp服務器報錯提示,求解決
