先說一下環境,
3臺server:master, slave01, slave02
hadoop cluster、hbase cluster、spark cluster 同樣的部署在這3臺機器上。
首先是代碼:
object SparkOnHBase {
def convertScanToString(scan: Scan) = {
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("FileAna").setMaster("spark://master:7077").
set("spark.driver.host", "192.168.1.139").
setJars(List("/home/pang/woozoomws/spark-service.jar",
"/home/pang/woozoomws/spark-service/lib/hbase/hbase-common-1.2.2.jar",
"/home/pang/woozoomws/spark-service/lib/hbase/hbase-client-1.2.2.jar",
"/home/pang/woozoomws/spark-service/lib/hbase/hbase-protocol-1.2.2.jar",
"/home/pang/woozoomws/spark-service/lib/hbase/htrace-core-3.1.0-incubating.jar",
"/home/pang/woozoomws/spark-service/lib/hbase/hbase-server-1.2.2.jar"))
val sc = new SparkContext(conf)
val hbaseConf = HBaseConfiguration.create()
val jobConf = new JobConf(hbaseConf, this.getClass)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "MissionItem")
def convert(triple: (Int, String, Int)) = {
val p = new Put(Bytes.toBytes(triple._1))
p.addColumn(Bytes.toBytes("data"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
p.addColumn(Bytes.toBytes("data"), Bytes.toBytes("age"), Bytes.toBytes(triple._3))
(new ImmutableBytesWritable, p)
}
// step 3: read RDD data from somewhere and convert
val rawData = List((1, "lilei", 14), (2, "hanmei", 18), (3, "someone", 38))
val localData = sc.parallelize(rawData).map(convert)
//step 4: use `saveAsHadoopDataset` to save RDD to HBase
localData.saveAsHadoopDataset(jobConf)
}
}
在scala sdk當中直接運行,在下面幾行日志之后,會出現長時間等待:
16/08/25 00:02:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.247:55383 (size: 28.2 KB, free: 366.3 MB)
從web site可以看到大量的下面這樣的資訊:
16/08/25 22:50:23 INFO client.RpcRetryingCaller: Call exception, tries=10, retries=35, started=38411 ms ago, cancelled=false, msg=row 'MissionItem,,99999999999999' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=master,16020,1472135364343, seqNum=0
但是如果是下面這樣的代碼,就沒有問題:
val hbaseConf = HBaseConfiguration.create()
val table = new HTable(hbaseConf, TableName.valueOf("MissionItem"))
for (i <- 0 until 100) {
val put = new Put(Bytes.toBytes(String.valueOf(i)))
put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("x"),
Bytes.toBytes(String.valueOf(i)));
table.put(put);
}
誰能告訴我是怎么回事?
uj5u.com熱心網友回復:
建議用foreachPartition的方法用put存HBASE。uj5u.com熱心網友回復:
我也遇到一樣的問題,請問樓主問題解決了嗎?uj5u.com熱心網友回復:
好吧,現在有更舒爽的API了:引入hbase-spark依賴,用HBaseContext.bulkLoad生成HFile,然后通過LoadIncrementalHFiles.doBulkLoad匯入到HBase表。非常非常快!uj5u.com熱心網友回復:
我也遇到這個問題,檢查之后發現每次呼叫saveAsHadoopDataset方法后,沒有釋放zookeeper的session,導致zookeeper的session達到最大值(默認60),可以通過設定zookeeper的最大連接數,但是一直不釋放session才是根本問題!到現在我也沒找到解決辦法,覺得通過設定zookeeper的最大連接數不是最好的解決辦法
uj5u.com熱心網友回復:
能貼出使用該方式成功運行的代碼嗎?uj5u.com熱心網友回復:
cloudera的那個依賴? 我在maven上下載不了,而且沒有找到jar包,請問你有否?
uj5u.com熱心網友回復:
pom的repositories標簽添加如下專案:
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/47509.html
標籤:Spark
