我在 EKS 中運行我的 flink 集群,我需要使用 WebIdentityTokenCredentialsProvider 進行身份驗證。根據檔案,有兩種方法:
急速
我嘗試在 flink conf 中進行設定:
presto.s3.credentials-provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
但它顯示錯誤:
Caused by: java.lang.NoSuchMethodException: com.amazonaws.auth.WebIdentityTokenCredentialsProvider.<init>(java.net.URI, org.apache.hadoop.conf.Configuration)
at java.lang.Class.getConstructor0(Class.java:3082) ~[?:1.8.0_292]
at java.lang.Class.getConstructor(Class.java:1825) ~[?:1.8.0_292]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getCustomAWSCredentialsProvider(PrestoS3FileSystem.java:845) ~[?:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.createAwsCredentialsProvider(PrestoS3FileSystem.java:833) ~[?:?]
at com.facebook.presto.hive.s3.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:244) ~[?:?]
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:123) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess.<init>(FsCheckpointStorageAccess.java:64) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage.createCheckpointStorage(FileSystemCheckpointStorage.java:323) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:321) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:240) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:452) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:315) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:107) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:335) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:191) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:140) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:134) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:346) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:323) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_292]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_292]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_292]
結果,我使用了后備 hadoop 解決方案:
Hadoop
但是,使用 Hadoop,我能夠成功進行身份驗證,但它總是將 0 長度的資料寫入檢查點,我并沒有真正發現錯誤。日志中只有一個警告
2022-10-05 17:24:34,285 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 1 of job 1a68f74acf0ccf403693e2f228fa62a6 expired before completing.
2022-10-05 17:24:34,287 WARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job 1a68f74acf0ccf403693e2f228fa62a6. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2000) [flink-dist_2.12-1.14.5.jar:1.14.5]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_292]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_292]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_292]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_292]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_292]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_292]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
我還通過將并行度設定為 1 來遵循在谷歌上找到的一些建議,但這根本沒有幫助。
理想情況下,使用 presto 會很好,但就目前而言,我可以接受任何一種有助于解決檢查點問題的方法。謝謝!
uj5u.com熱心網友回復:
實際上有單獨的問題:
對于 s3a
它沒有寫入檢查點的原因是因為我使用的是 Apache Beam 便攜式運行器,并且--checkpointing_interval默認為 -1 ,這意味著檢查點被禁用。將其設定為非零值后,問題得到解決。
對于 s3p
感謝 flink slack 頻道的 vignesh kumar kathiresan!
解決方案是配置
presto.s3.use-instance-credentials: "false"
并使用DefaultAWSCredentialsProviderChain(默認設定),DefaultAWSCredentialsProviderChain將嘗試多個潛在的提供者并且WebIdentityTokenCredentialsProvider是其中之一并且將被拾取。
鑒于我已正確配置所有 AWS,這對我有用。但是對于剛開始的人來說,除了 s3 權限之外,你還需要kms:GenerateDataKey,kms:Decrypt以及。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/522781.html
下一篇:從掃描儀輸入生成新的結構實體
