我使用 flink 來處理一些資料。我發現fromCollection有StreamExecutionEnvironment很多實作。

pom.xml 的一部分是:
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.version>2.12.16</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.14.4</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- SQL Server-->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>sqljdbc4</artifactId>
<version>4.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>1.14.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>1.14.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>1.14.4</version>
</dependency>
</dependencies>
似乎 Scala 無法確定fromCollection使用哪種方法。
package com.mycode.learnflink.ts.sync
import com.mycode.learnflink.model.datasourcesync.domain.RiverWaterRegime
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
class SyncWater {
var cachedRiverWaterRegimes: List[RiverWaterRegime] = List()
private val BATCH_SIZE = 1000
private var size = 0
def flinkProcess(list:List[RiverWaterRegime] ): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(6)
val source:DataStreamSource[String] = env.fromCollection(list) // Error:Cannot resolve overloaded method 'fromCollection'
}
}
我能做些什么來消除這個錯誤?
uj5u.com熱心網友回復:
在您的情況下,匯入的包應該是Scala包。
這是示例。
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
class SyncWater {
type RiverWaterRegime = Int
private val BATCH_SIZE = 1000
private var size = 0
def flinkProcess(list:List[RiverWaterRegime] ): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(6)
val source = env.fromCollection(list)
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/496951.html
標籤:斯卡拉
下一篇:抽象特征的Scalajson4s反序列化導致MappingExeption:意外的型別資訊RefinedType
