报错如下:
- D:\software_install\java\bin\java.exe "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2021.2.3\lib\idea_rt.jar=58672:C:\Program Files\JetBrains\IntelliJ IDEA 2021.2.3\bin" -Dfile.encoding=UTF-8 -classpath D:\software_install\java\jre\lib\charsets.jar;D:\software_install\java\jre\lib\deploy.jar;D:\software_install\java\jre\lib\ext\access-bridge-64.jar;D:\software_install\java\jre\lib\ext\cldrdata.jar;D:\software_install\java\jre\lib\ext\dnsns.jar;D:\software_install\java\jre\lib\ext\jaccess.jar;D:\software_install\java\jre\lib\ext\jfxrt.jar;D:\software_install\java\jre\lib\ext\localedata.jar;D:\software_install\java\jre\lib\ext\nashorn.jar;D:\software_install\java\jre\lib\ext\sunec.jar;D:\software_install\java\jre\lib\ext\sunjce_provider.jar;D:\software_install\java\jre\lib\ext\sunmscapi.jar;D:\software_install\java\jre\lib\ext\sunpkcs11.jar;D:\software_install\java\jre\lib\ext\zipfs.jar;D:\software_install\java\jre\lib\javaws.jar;D:\software_install\java\jre\lib\jce.jar;D:\software_install\java\jre\lib\jfr.jar;D:\software_install\java\jre\lib\jfxswt.jar;D:\software_install\java\jre\lib\jsse.jar;D:\software_install\java\jre\lib\management-agent.jar;D:\software_install\java\jre\lib\plugin.jar;D:\software_install\java\jre\lib\resources.jar;D:\software_install\java\jre\lib\rt.jar;D:\IT\Flink117_Test_01\target\classes;D:\software_install\maven_repo\org\apache\flink\flink-streaming-java\1.17.0\flink-streaming-java-1.17.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-core\1.17.0\flink-core-1.17.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-annotations\1.17.0\flink-annotations-1.17.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-metrics-core\1.17.0\flink-metrics-core-1.17.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-shaded-asm-9\9.3-16.1\flink-shaded-asm-9-9.3-16.1.jar;D:\software_install\maven_repo\org\apache\commons\commons-lang3\3.12.0\commons-lang3-3.12.0.jar;D:\software_install\maven_repo\org\apache\commons\commons-text\1.10.0\commons-text-1.10.0.jar;D:\software_install\maven_repo\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;D:\software_install\maven_repo\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;D:\software_install\maven_repo\org\objenesis\objenesis\2.1\objenesis-2.1.jar;D:\software_install\maven_repo\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;D:\software_install\maven_repo\org\apache\commons\commons-compress\1.21\commons-compress-1.21.jar;D:\software_install\maven_repo\org\apache\flink\flink-file-sink-common\1.17.0\flink-file-sink-common-1.17.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-runtime\1.17.0\flink-runtime-1.17.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-rpc-core\1.17.0\flink-rpc-core-1.17.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-rpc-akka-loader\1.17.0\flink-rpc-akka-loader-1.17.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-queryable-state-client-java\1.17.0\flink-queryable-state-client-java-1.17.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-hadoop-fs\1.17.0\flink-hadoop-fs-1.17.0.jar;D:\software_install\maven_repo\commons-io\commons-io\2.11.0\commons-io-2.11.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-shaded-zookeeper-3\3.7.1-16.1\flink-shaded-zookeeper-3-3.7.1-16.1.jar;D:\software_install\maven_repo\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;D:\software_install\maven_repo\org\xerial\snappy\snappy-java\1.1.8.3\snappy-java-1.1.8.3.jar;D:\software_install\maven_repo\org\lz4\lz4-java\1.8.0\lz4-java-1.8.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-java\1.17.0\flink-java-1.17.0.jar;D:\software_install\maven_repo\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;D:\software_install\maven_repo\org\apache\flink\flink-shaded-guava\30.1.1-jre-16.1\flink-shaded-guava-30.1.1-jre-16.1.jar;D:\software_install\maven_repo\org\apache\commons\commons-math3\3.6.1\commons-math3-3.6.1.jar;D:\software_install\maven_repo\org\slf4j\slf4j-api\1.7.36\slf4j-api-1.7.36.jar;D:\software_install\maven_repo\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;D:\software_install\maven_repo\org\apache\flink\flink-clients\1.17.0\flink-clients-1.17.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-optimizer\1.17.0\flink-optimizer-1.17.0.jar;D:\software_install\maven_repo\commons-cli\commons-cli\1.5.0\commons-cli-1.5.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-runtime-web\1.17.0\flink-runtime-web-1.17.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-shaded-netty\4.1.82.Final-16.1\flink-shaded-netty-4.1.82.Final-16.1.jar;D:\software_install\maven_repo\org\apache\flink\flink-shaded-jackson\2.13.4-16.1\flink-shaded-jackson-2.13.4-16.1.jar;D:\software_install\maven_repo\org\apache\flink\flink-connector-files\1.17.0\flink-connector-files-1.17.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-shaded-force-shading\16.1\flink-shaded-force-shading-16.1.jar;D:\software_install\maven_repo\org\apache\flink\flink-connector-kafka\1.17.0\flink-connector-kafka-1.17.0.jar;D:\software_install\maven_repo\org\apache\flink\flink-connector-base\1.17.0\flink-connector-base-1.17.0.jar;D:\software_install\maven_repo\org\apache\kafka\kafka-clients\3.2.3\kafka-clients-3.2.3.jar;D:\software_install\maven_repo\com\github\luben\zstd-jni\1.5.2-1\zstd-jni-1.5.2-1.jar test01.TestReadKafka
- SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
- SLF4J: Defaulting to no-operation (NOP) logger implementation
- SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
- Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
- at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
- at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
- at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
- at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
- at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
- at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
- at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267)
- at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
- at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
- at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
- at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
- at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
- at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
- at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
- at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
- at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
- at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
- at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
- at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
- at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
- at akka.dispatch.OnComplete.internal(Future.scala:300)
- at akka.dispatch.OnComplete.internal(Future.scala:297)
- at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
- at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
- at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
- at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
- at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
- at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
- at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
- at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
- at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:622)
- at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
- at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
- at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
- at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
- at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
- at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
- at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
- at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
- at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
- at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
- at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
- at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
- at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
- at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
- at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
- at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
- at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
- Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
- at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
- at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:102)
- at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:301)
- at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:618)
- at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
- at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
- at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
- at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
- at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
- at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
- at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
- at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
- at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
- at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
- at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
- at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
- at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
- at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
- at akka.actor.Actor.aroundReceive(Actor.scala:537)
- at akka.actor.Actor.aroundReceive$(Actor.scala:535)
- at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
- at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
- at akka.actor.ActorCell.invoke(ActorCell.scala:547)
- at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
- at akka.dispatch.Mailbox.run(Mailbox.scala:231)
- at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
- ... 4 more
- Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: kafkasource -> Sink: Print to Std. Out' (operator cbc357ccb763df2852fee8c4fc7d55f2).
- at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:600)
- at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
- at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:374)
- at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:387)
- at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
- at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
- at java.util.concurrent.FutureTask.run(FutureTask.java:266)
- at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
- at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
- at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
- at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
- at java.lang.Thread.run(Thread.java:750)
- Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to
- at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234)
- at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
- at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
- ... 7 more
- Caused by: java.lang.RuntimeException: Failed to get metadata for topics [test-flink-topic].
- at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47)
- at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
- at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
- at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
- ... 7 more
- Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
- at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
- at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
- at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
- at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
- ... 10 more
- Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
-
- Process finished with exit code 1
解决办法:
修改/usr/local/wyh/kafka/kafka_2.12-2.8.1/config下面的server.properties,默认该配置是被注释掉的额,所以需要放开注释并且配置Host:
listeners=PLAINTEXT://192.168.126.xxx:9092
评论记录:
回复评论: