主函式
SparkConf sparkConf = new SparkConf().setAppName("SparkMain");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
BaseBean.initConfig(ctx);
JavaRDD<String> datas = ctx.textFile(input2, 1);
//這里獲取到的是一個個的json串
JavaPairRDD<String, TopicBean> dataMap = datas.mapToPair(f -> BaseUtil.toDataTuple(f));
dataMap.saveAsTextFile(output);
BaseBean類的initConfig(ctx)方法
public static Map<String, String> regionMap = new HashMap<String, String>();
public static void initConfig(JavaSparkContext ctx) throws Exception {
JavaRDD<String> rdds = ctx.textFile("/data/conf/regionConf.txt");
List<String> regionss = rdds.collect();
initMap(regionMap, regionss);
regionMap.put("中國&市&市", "110100");
ctx.broadcast(regionMap);
}
public static void initMap(Map<String, String> map, List<String> ips) throws Exception {
for (String ip : ips) {
String str = new String(ip.getBytes(),"UTF-8");
if(!BaseUtil.isEmpty(str)){
String[] strs = str.split("=");
if(null != strs && strs.length == 2){
map.put(strs[0], strs[1]);
if(strs[0].endsWith("地區")){
map.put(strs[0].substring(0, strs[0].length() - 2) + "市", strs[1]);
}else if(strs[0].endsWith("自治州")){
map.put(strs[0].substring(0, strs[0].length() - 3) + "市", strs[1]);
}else if(strs[0].endsWith("盟")){
map.put(strs[0].substring(0, strs[0].length() - 1) + "市", strs[1]);
}else if(strs[0].endsWith("&市")){
map.put(strs[0].substring(0, strs[0].length() - 1), strs[1]);
}else if(strs[0].endsWith("市")){
map.put(strs[0] + "市", strs[1]);
}
}
}
}
}
BaseUtil類的toDataTuple方法
public static Tuple2<String, TopicBean> toDataTuple(String str) throws Exception {
if(null == str || "".equals(str.trim()) || "null".equalsIgnoreCase(str)){
return new Tuple2<String, TopicBean>("0", null);
}
TopicBean topicBean = GSON.fromJson(str, TopicBean.class);
String l1 = topicBean.getL1();
if(!BaseUtil.isEmpty(l1)){
if(l1.contains(".")){
return new Tuple2<String, TopicBean>(l1, topicBean);
}else{
String s15 = topicBean.getS15();
String s16 = topicBean.getS16();
String address = s15 + "&" + s16 + "&" + l1;
if(BaseBean.getOtherCountry(address) != 0){
topicBean.setL1(String.valueOf(BaseBean.getOtherCountry(address)));
}else{
topicBean.setL1(BaseUtil.converNull(BaseBean.regionMap.get(address)));
}
return new Tuple2<String, TopicBean>(topicBean.getL1(), topicBean);
}
}else{
topicBean.setL1("0");
return new Tuple2<String, TopicBean>("0", topicBean);
}
}
為什么我最后這個regionMap永遠都是空的啊,前面不是init了嗎
uj5u.com熱心網友回復:
ctx.broadcast(regionMap);有個回傳值,那個才是集群認得的廣播出的大變數,集群上計算的時候用那個變數的get才能找到你廣播的變數轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/53468.html
標籤:Spark
上一篇:spark on yarn:啟動時候報錯Error initializing SparkContext,求各位大神指教。
