项目场景:
一个将本地文本文件写入到MySQL的Flink作业
问题描述
运行作业时报错:Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
因为错误导致Flink作业不能完成
报错信息: Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: java.io.IOException: unable to open JDBC writer at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:56) at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.open(JdbcBatchingOutputFormat.java:115) at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.open(GenericJdbcSinkFunction.java:49) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:574) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:554) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174) at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64) at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:828) at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:448) at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:241) at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198) at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:121) at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54) ... 12 more Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61) at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105) at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151) at com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167) at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:378) at com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:205) at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1433) at com.mysql.cj.NativeSession.connect(NativeSession.java:133) at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:948) at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:818) ... 17 more Caused by: javax.net.ssl.SSLException: Unsupported record version Unknown-0.0 at sun.security.ssl.InputRecord.checkRecordVersion(InputRecord.java:552) at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:565) at sun.security.ssl.InputRecord.read(InputRecord.java:529) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983) at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397) at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347) at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:191) at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101) at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:369) ... 22 more Process finished with exit code 1 ###################################################################################### 原代码: package com.lcvc import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder} import java.sql.PreparedStatement case class Purchase(customerId: Int, productId: Int,quantity: Int) object SinkToMysql { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 读取数据源 val input = env.readTextFile("input/purchases.log") val purchases = input.map { line => val fields = line.split(",") Purchase(fields(0).toInt,fields(1).toInt,fields(2).toInt) } // 创建 JDBC sink val sink = JdbcSink.sink( "INSERT INTO customer (customerId, productId, total) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE total = total + VALUES(total)", new JdbcStatementBuilder[Purchase]{ override def accept(t: PreparedStatement,u: Purchase) = { t.setInt(1,u.customerId) t.setInt(2,u.productId) t.setInt(3,u.quantity) } }, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://master:3306/eco") .withDriverName("com.mysql.cj.jdbc.Driver") .withUsername("root") .withPassword("Password123456$") .build() ) // 发送作业到sink purchases.addSink(sink) // 启动Flink作业 env.execute() } }
原因分析:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
Flink 在运行作业时抛出的一个异常。这通常意味着在执行 Flink 作业时发生了某种错误,导致作业无法成功完成
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
Flink 在执行作业时发生了错误,并且由于配置了
NoRestartBackoffTimeStrategy
,所以不会尝试重启该作业Caused by: java.io.IOException: unable to open JDBC writer
一个 Java IO 异常,表示在尝试打开 JDBC writer 时发生了问题。这通常意味着程序在连接数据库或进行写操作时遇到了困难
Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
尝试与 MySQL 数据库通信时失败了
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
成功地将最后一个数据包发送到服务器,这发生在0毫秒前,
JDBC 驱动程序未能从服务器收到任何响应数据包
Caused by: javax.net.ssl.SSLException: Unsupported record version Unknown-0.0
使用 SSL 进行安全连接时,客户端和服务器之间的 SSL 协议版本不兼容
根据报错来看问题是Flink作业和MySQL数据库的连接有问题
我的数据库版本是5.7.18,而我pom.xml文件里却写着8.0.31
为了兼容pom文件里的版本所以代码用的驱动类是com.mysql.cj.jdbc.Driver
而MySQL 8.0以下的用的是com.mysql.jdbc.Driver,这就因为兼容性问题导致无法连接数据库
解决方案:
pom.xml文件修改:
使用和集群数据库相同或相近版本的依赖
具体类代码修改:
useSSL=false:禁用SSL连接,防止出现SSL相关警告
修改后的源码:
package com.lcvc
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcSink, JdbcStatementBuilder}
import java.sql.PreparedStatementcase class Purchase(customerId: Int, productId: Int,quantity: Int)
object SinkToMysql {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment// 读取数据源
val input = env.readTextFile("input/purchases.log")
val purchases = input.map { line =>
val fields = line.split(",")
Purchase(fields(0).toInt,fields(1).toInt,fields(2).toInt)
}
// 创建 JDBC sink
val sink = JdbcSink.sink(
"INSERT INTO customer (customerId, productId, total) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE total = total + VALUES(total)",
new JdbcStatementBuilder[Purchase]{
override def accept(t: PreparedStatement,u: Purchase) = {
t.setInt(1,u.customerId)
t.setInt(2,u.productId)
t.setInt(3,u.quantity)
}
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://master:3306/eco?useSSL=false")
// .withDriverName("com.mysql.cj.jdbc.Driver")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("Password123$")
.build()
)
// 发送作业到sink
purchases.addSink(sink)// 启动Flink作业
env.execute()
}
}
问题解决