使用spark 1.6
原始資料如下
val baseDF=hiveContext.sql(newSql)
ID ID2 C1 C2 C3 C4 C5 .....C33
CM1 a 1 1 1 0 0
CM2 a 1 1 0 1 0
CM3 a 1 0 1 1 1
CM4 a 1 1 1 1 1
CM5 a 1 1 1 1 1
1k2 b 0 0 1 1 1
1K3 b 1 1 1 1 1
1K1 b 0 0 0 0 1
ID ID2 C1 C2 C3 C4 C5 .....C33
CM1 a 1 1 1 0 0
CM2 a 0 0 0 1 0
CM3 a 0 0 0 0 1
CM4 a 0 0 0 0 0
CM5 a 0 0 0 0 0
1K1 b 0 0 0 0 1
1k2 b 0 0 1 1 0
1K3 b 1 1 0 0 0
邏輯是根據ID2做groupby 然后找ID 最小同時Cn 為1 的設定為1 ,其他的設定為0
而Cn 最多為C33
如果用case class 會超過上限
這是我目前嘗試的 ,出來結果是錯的 ,理解過后應該是用錯方法 應該要用group
case class testGoods(ID: String, ID2: String, C1 : String, C2 : String)
val cartMap = new HashMap[String, Set[(String,String,String)]] with MultiMap[String,(String,String,String)]
val baseDF=hiveContext.sql(newSql)
val testRDD=baseDF.mapPartitions( partition => {
while (partition.hasNext) {
val record = partition.next()
val ID = record.getString(0)
if (ID != null && ID != "null") {
val ID2=record.getString(1)
val C1=record.getString(2)
val C2=record.getString(3)
cartMap.addBinding(ID2, (ID,C1,C2))
}
}
cartMap.iterator
})
val recordList = new mutable.ListBuffer[testGoods]()
val testRDD1=testRDD.mapPartitions( partition => {
while (partition.hasNext) {
val record = partition.next()
val ID2=record._1
val recordRow= record._2
val sortedRecordRow = TreeSet[(String,String,String)]() ++ recordRow
val dic=new mutable.HashMap[String,String]
for(v<-sortedRecordRow) {
val ID = v._1
val C1 = v._2
val C2 = v._3
if (dic.contains(ID2)){
val goodsValue=https://bbs.csdn.net/topics/dic.get(ID2)
if("1".equals(goodsValue)){
recordList.append(new testGoods(ID, ID2, "0", C2))
}else{
dic.put(ID2,C1)
recordList.append(new testGoods(ID, ID2, C1,C2))
}
}else{
dic.put(ID2,C1)
recordList.append(new testGoods(ID, ID2, C1, C2))
}
}
}
recordList.iterator
})
val searchToItemNewDF = hiveContext.createDataFrame(testRDD1).repartition(1)
.rdd.map { r => r.mkString("\t") }
.saveAsTextFile("/data/testRDD1")
查了網路上 好像大家都用 groupBy +agg 實作 自己也嘗試使用
baseDF.groupBy("ID2").agg((collect_list($"ID"), collect_list($"ID2")))
但試了好久都沒辦法把上面使用mapPartitions的邏輯用上
請問如何使用DataFrame實作?
uj5u.com熱心網友回復:
Don't know what you try to do. You need more detail example. 是我的中文不夠好?uj5u.com熱心網友回復:
不好意思 我講的不太清楚 新增測驗資料目標是根據ID2 先分組 然后把分組下的ID 從小到大排列
然后分別看 C1 ....C33 每一列 找到ID首先出現 1的留下 其他的設為0
原始資料
ID ID2 C1 C2 ....C33
CM1 a 1 0
CM2 a 1 0
1K13 f 0 0
CM4 a 1 1
CM5 a 1 1
1K14 f 0 1
1K2 b 0 1
1K3 b 1 1
1K11 f 0 0
1K12 f 0 0
1K1 b 1 0
CM3 a 1 0
目標輸出
ID ID2 C1 C2
CM1 a 1 0
CM2 a 0 0
CM3 a 0 1
CM4 a 0 0
CM5 a 0 0
1K1 b 1 0
1K2 b 0 1
1K3 b 0 0
1K11 f 0 0
1K12 f 0 0
1K13 f 0 0
1K14 f 0 1
uj5u.com熱心網友回復:
case class TestGoods(ID:String,ID2:String,C1:Int,C2:Int)
case class CompressedRows(id2:String,ids:Array[String],indexs:Array[Int])
def main(args: Array[String]) {
import session.implicits._
val random = new Random(10)
val generator: () => Int = () => {
if (random.nextBoolean()) 1
else 0
}
val datasource = (1 to 100).map(idx => TestGoods(s"ID-${idx}", s"ID2-${idx % 20}", generator(), generator()))
val df = session.sparkContext.makeRDD(datasource).toDF("id", "id2", "c1", "c2")
df.show(false)
import session.implicits.newStringEncoder
df.groupByKey { case Row(_, id2: String, _, _) => id2 }
.mapGroups {
case (id2, rows) =>
val cs: Array[(Int, Boolean)] = (1 to 2).map(_ => (-1, false)).toArray
val sorted = rows.toList.sortBy(row => row(0).hashCode())
(0 until sorted.length).foreach { idx =>
val row = sorted(idx)
row match {
case Row(id, id2, c1: Int, c2: Int) =>
if (!cs(0)._2 && c1 == 1) cs(0) = (idx -> true)
if (!cs(1)._2 && c2 == 1) cs(1) = (idx -> true)
}
}
val compressedIdx = cs.map(r => if (r._2) r._1 else sorted.length - 1)
CompressedRows(id2,sorted.map(r=>r(0).toString).toArray,compressedIdx)
}.show()
}
uj5u.com熱心網友回復:
謝謝yangguo_2011的回復雖然我使用的是spark1.6 不能直接跑 但是大概知道如何解決問題了
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/65976.html
標籤:Spark
