阅读量:2
1、问题描述
学习SparkSql中,将spark中dataframe数据结构保存为jdbc的格式并提交到本地的mysql中,相关代码见文章末尾。
运行代码时报出相关配置文件错误,如下。
根据该报错,发现网络上多数解决方都是基于java开发的解决方案,尝试过多种jar配置途径,都没办法解决该问题。
后续发现时jar包有问题,通过参考其他文章思路,最终通过合适的jar包解决问题。
参考文章:pyspark连接mysql读取数据、写入数据(四种模式)、写入数据模式的调优_pyspark 执行mysql的语句读取数据 写入数据-CSDN博客
2、解决过程
首先产生该问题是没有配置合适的jar包,因此需要在本机的pyspark包中配置相关jar。
位置:E:\programfiles\anaconda\Lib\site-packages\pyspark\jars(这里根据各自的环境安排)
一般来说本地的mysql都是8.x版本的所以jar包一般用最新的即可。下载位置如下:
MySQL :: MySQL Connectors,选择jdbc格式。(需要提前注册oracle账号)
这里选择独立平台,之后在下载页面选择合适的压缩文件。
之后,在压缩包中选择该文件粘贴到之前的位置中去即可。
在这里插入图片描述
在此运行代码,发现此时将不再报错,查看数据库也可以发现数据表已经成功保存到本地mysql.
代码
# -*- coding: UTF-8 -*- """================================================= @Project -> File :PySpark -> 15_dataframe_jdbc @IDE :PyCharm @Author :Strive Yang @Date :2024-07-12 17:05 @Email : yangzy927@qq.com @Desc :将sparksql中的dataframe数据以jdbc的形式保存到本地数据库中 ==================================================""" from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StructType, StringType, IntegerType if __name__ == '__main__': # 0、构建执行环境入口对象SparkSession spark = SparkSession.builder. \ appName('test'). \ master('local[*]'). \ getOrCreate() sc = spark.sparkContext # 1、读取数据集u.data schema = StructType().add('user_id', StringType(), nullable=True). \ add('movie_id', IntegerType(), nullable=True). \ add('rank', IntegerType(), nullable=True). \ add('ts', StringType(), nullable=True) df = spark.read.format('csv'). \ option('sep', '\t'). \ option('header', False). \ option('encoding', 'utf8'). \ schema(schema=schema). \ load('文件路径') # 将数据写入jdbc中,写入到本地的mysql中 df.write.mode('overwrite').\ format('jdbc').\ option('url','jdbc:mysql://localhost:3306/数据库名?useSSL=false&useUnicode=true').\ option('dbtable','表名').\ option('user','账户').\ option('password','密码').\ save()