运行Flink作业报错:Job execution failed.

avatar
作者
猴君
阅读量:0

项目场景:

一个将本地文本文件写入到MySQL的Flink作业


问题描述

运行作业时报错:Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

因为错误导致Flink作业不能完成

报错信息: Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.     at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)     at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)     at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)     at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)     at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)     at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)     at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)     at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)     at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)     at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)     at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)     at akka.dispatch.OnComplete.internal(Future.scala:264)     at akka.dispatch.OnComplete.internal(Future.scala:261)     at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)     at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)     at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)     at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)     at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)     at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)     at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)     at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)     at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)     at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)     at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)     at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)     at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)     at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)     at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)     at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)     at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)     at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)     at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)     at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)     at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)     at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)     at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)     at java.lang.reflect.Method.invoke(Method.java:498)     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)     at akka.actor.ActorCell.invoke(ActorCell.scala:561)     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)     at akka.dispatch.Mailbox.run(Mailbox.scala:225)     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)     ... 4 more Caused by: java.io.IOException: unable to open JDBC writer     at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:56)     at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:115)     at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49)     at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)     at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)     at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)     at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:574)     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)     at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:554)     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)     at java.lang.Thread.run(Thread.java:748) Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure  The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.     at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)     at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)     at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:828)     at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:448)     at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:241)     at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)     at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:121)     at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54)     ... 12 more Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure  The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)     at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)     at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)     at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)     at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)     at com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167)     at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:378)     at com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:205)     at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1433)     at com.mysql.cj.NativeSession.connect(NativeSession.java:133)     at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:948)     at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:818)     ... 17 more Caused by: javax.net.ssl.SSLException: Unsupported record version Unknown-0.0     at sun.security.ssl.InputRecord.checkRecordVersion(InputRecord.java:552)     at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:565)     at sun.security.ssl.InputRecord.read(InputRecord.java:529)     at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)     at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)     at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)     at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)     at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347)     at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:191)     at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101)     at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:369)     ... 22 more  Process finished with exit code 1 ###################################################################################### 原代码: package com.lcvc  import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder} import java.sql.PreparedStatement  case class Purchase(customerId: Int, productId: Int,quantity: Int)  object SinkToMysql {   def main(args: Array[String]): Unit = {     val env = StreamExecutionEnvironment.getExecutionEnvironment      // 读取数据源     val input = env.readTextFile("input/purchases.log")     val purchases = input.map { line =>         val fields = line.split(",")         Purchase(fields(0).toInt,fields(1).toInt,fields(2).toInt)     }     // 创建 JDBC sink     val sink = JdbcSink.sink(         "INSERT INTO customer (customerId, productId, total) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE total = total + VALUES(total)",       new JdbcStatementBuilder[Purchase]{         override def accept(t: PreparedStatement,u: Purchase) = {           t.setInt(1,u.customerId)           t.setInt(2,u.productId)           t.setInt(3,u.quantity)         }       },       new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()           .withUrl("jdbc:mysql://master:3306/eco")           .withDriverName("com.mysql.cj.jdbc.Driver")           .withUsername("root")           .withPassword("Password123456$")           .build()     )     // 发送作业到sink     purchases.addSink(sink)      // 启动Flink作业     env.execute()   } }

原因分析:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

Flink 在运行作业时抛出的一个异常。这通常意味着在执行 Flink 作业时发生了某种错误,导致作业无法成功完成

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy

Flink 在执行作业时发生了错误,并且由于配置了 NoRestartBackoffTimeStrategy,所以不会尝试重启该作业

Caused by: java.io.IOException: unable to open JDBC writer

一个 Java IO 异常,表示在尝试打开 JDBC writer 时发生了问题。这通常意味着程序在连接数据库或进行写操作时遇到了困难

Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

尝试与 MySQL 数据库通信时失败了

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.

成功地将最后一个数据包发送到服务器,这发生在0毫秒前,

JDBC 驱动程序未能从服务器收到任何响应数据包

Caused by: javax.net.ssl.SSLException: Unsupported record version Unknown-0.0

使用 SSL 进行安全连接时,客户端和服务器之间的 SSL 协议版本不兼容

根据报错来看问题是Flink作业和MySQL数据库的连接有问题

我的数据库版本是5.7.18,而我pom.xml文件里却写着8.0.31

为了兼容pom文件里的版本所以代码用的驱动类是com.mysql.cj.jdbc.Driver

而MySQL 8.0以下的用的是com.mysql.jdbc.Driver,这就因为兼容性问题导致无法连接数据库


解决方案:

pom.xml文件修改:

使用和集群数据库相同或相近版本的依赖

具体类代码修改:

useSSL=false:禁用SSL连接,防止出现SSL相关警告

修改后的源码:

package com.lcvc

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import java.sql.PreparedStatement

case class Purchase(customerId: Int, productId: Int,quantity: Int)

object SinkToMysql {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 读取数据源
    val input = env.readTextFile("input/purchases.log")
    val purchases = input.map { line =>
        val fields = line.split(",")
        Purchase(fields(0).toInt,fields(1).toInt,fields(2).toInt)
    }
    // 创建 JDBC sink
    val sink = JdbcSink.sink(
        "INSERT INTO customer (customerId, productId, total) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE total = total + VALUES(total)",
      new JdbcStatementBuilder[Purchase]{
        override def accept(t: PreparedStatement,u: Purchase) = {
          t.setInt(1,u.customerId)
          t.setInt(2,u.productId)
          t.setInt(3,u.quantity)
        }
      },
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
          .withUrl("jdbc:mysql://master:3306/eco?useSSL=false")
//          .withDriverName("com.mysql.cj.jdbc.Driver")
          .withDriverName("com.mysql.jdbc.Driver")
          .withUsername("root")
          .withPassword("Password123$")
          .build()
    )
    // 发送作业到sink
    purchases.addSink(sink)

    // 启动Flink作业
    env.execute()
  }
}
 

问题解决

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!