最近剛剛接觸spark, 請問用這個saveAsNewAPIHadoopDataset ,可以進行批量put么 如何做?
```
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkContext, SparkConf}
/**
* User:leen
* Date:2017/12/20 0020
* Time:17:34
*/
object HbaseTest2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
val sc = new SparkContext(sparkConf)
val tablename = "account"
sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")
sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val job = Job.getInstance(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
val rdd = indataRDD.map(_.split(',')).map{arr=>{
val put = new Put(Bytes.toBytes(arr(0)))
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
(new ImmutableBytesWritable, put)
}}
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
sc.stop()
}
}
```
uj5u.com熱心網友回復:
最好是把RDD map成RDD[Put] 然后用foreachPartition去批量put。如果是資料量比較大,則建議用bulkload
uj5u.com熱心網友回復:
可以的,你可以禁用和Table的autoflush或者直接使用BUfferedMuator方法,這是一種異步提交的方式,當設定額快取滿的時候自動提交,只用你一直往里面放put就好了,并且把它還是執行緒安全的轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/51990.html
標籤:Spark
