在java中得到SparkSession sparkSession = c.getSparkSession();
sparkSession的查詢陳述句:
sb.append("SELECT base_user.uuid AS owner_id,base_user.name AS owner_name,base_user.sex AS sex,"
+"base_user.birth AS birth,base_user.nation AS nation,base_user.origin AS origin,base_user.company AS company,COALESCE(car.car_totalassets,0) AS car_totalassets,"
+"COALESCE(house.estate_totalassets,0) AS estate_totalassets,house.first_estate_date,"
+"now() AS analysis_time,base_user.iden_type,base_user.iden_num,base_user.delete_flag,");
sb.append(sql);
sb.append(" FROM mdc.base_user AS base_user"
+" LEFT JOIN (SELECT owner_id,COUNT (owner_id) AS car_totalassets FROM plc.park_car AS park_car WHERE park_car.delete_flag = 1"
+" GROUP BY owner_id) AS car ON car.owner_id = base_user.uuid LEFT JOIN (SELECT user_uuid, MIN (create_time) AS first_estate_date,"
+" COUNT (user_uuid) AS estate_totalassets FROM mdc.base_house_user_rel AS base_house_user_rel WHERE base_house_user_rel.delete_flag = 1"
+" GROUP BY user_uuid ) AS house ON house.user_uuid = base_user.uuid LEFT JOIN "
+" op.owner_portrait_info AS owner_portrait_info ON owner_portrait_info.owner_id = base_user.uuid WHERE"
+" base_user.delete_flag = 1");
通過查詢到一個 dataset的結果,Dataset<Row> da = sparkSession.sql(sb.toString());
集成了carbondata,
現在我想將這個結果更新至carbondata的關系資料庫中,類似與這種:sparkSession.sql("update op.owner_portrait_info set (owner_name) = ('test1') where id = ?");如何實作Dataset批量的更新操作,請問有沒有好的實作思路?
uj5u.com熱心網友回復:
DataFrame.write.jdbc(),記得把jdbc驅動放在spark/jars檔案夾下轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/53464.html
標籤:Spark
