具體代碼如下。一直spark能夠代碼正常運行,hbase也能正常連接。但是在spark的foreachPartition中連接hbase時即沒有報錯,也沒有成功將資料寫入到hbase。已驗證foreachPartition中的代碼單獨運行時沒有問題的!
HiveContext hv = new HiveContext(sc);
DataFrame data = hv.sql("select * from url_pending limit 10");
data.show(2);
data.toJavaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
@Override
public void call(Iterator<Row> rows) {
Row row;
Configuration conf = conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", "hdfs://archive.cloudera.com:9000/hbase");
conf.set("zookeeper.znode.parent", "/hbase");
conf.set("hbase.zookeeper.quorum", "GDLT10471");
HConnection connection;
try {
connection = HConnectionManager.createConnection(conf);
HTableInterface table = connection.getTable("url_matched");
table.setAutoFlush(true);
List<Put> list = new ArrayList<Put>();
Put put ;
while(rows.hasNext()){
row = rows.next();
put = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
String url = row.getString(0);
put.add(Bytes.toBytes("baseData"), Bytes.toBytes("url"),Bytes.toBytes(url));
put.add(Bytes.toBytes("baseData"), Bytes.toBytes("phone"),Bytes.toBytes(row.getString(1)));
System.out.println("URL:"+row.getString(0)+"phone:"+row.getString(1));
list.add(put);
}
table.put(list);
table.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/60094.html
標籤:Spark
