Spark SQL----CSV文件

avatar
作者
筋斗云
阅读量:3

Spark SQL----CSV文件


Spark SQL提供了spark.read().csv(“file_name”)将CSV格式的文件或文件目录读入Spark DataFrame,提供了dataframe.write().csv(“path”)将CSV文件写入。函数 option()可用于自定义读或写的行为,例如控制header、分隔符、字符集等的行为。

# spark is from the previous example sc = spark.sparkContext  # A CSV dataset is pointed to by path. # The path can be either a single CSV file or a directory of CSV files path = "examples/src/main/resources/people.csv"  df = spark.read.csv(path) df.show() # +------------------+ # |               _c0| # +------------------+ # |      name;age;job| # |Jorge;30;Developer| # |  Bob;32;Developer| # +------------------+  # Read a csv with delimiter, the default delimiter is "," df2 = spark.read.option("delimiter", ";").csv(path) df2.show() # +-----+---+---------+ # |  _c0|_c1|      _c2| # +-----+---+---------+ # | name|age|      job| # |Jorge| 30|Developer| # |  Bob| 32|Developer| # +-----+---+---------+  # Read a csv with delimiter and a header df3 = spark.read.option("delimiter", ";").option("header", True).csv(path) df3.show() # +-----+---+---------+ # | name|age|      job| # +-----+---+---------+ # |Jorge| 30|Developer| # |  Bob| 32|Developer| # +-----+---+---------+  # You can also use options() to use multiple options df4 = spark.read.options(delimiter=";", header=True).csv(path)  # "output" is a folder which contains multiple csv files and a _SUCCESS file. df3.write.csv("output")  # Read all files in a folder, please make sure only CSV files should present in the folder. folderPath = "examples/src/main/resources" df5 = spark.read.csv(folderPath) df5.show() # Wrong schema because non-CSV files are read # +-----------+ # |        _c0| # +-----------+ # |238val_238| # |  86val_86| # |311val_311| # |  27val_27| # |165val_165| # +-----------+ 

Spark repo中的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。

数据源选项

CSV的数据源选项可以通过以下方式设置:

  • 以下类的.option/.options方法
    • DataFrameReader
    • DataFrameWriter
    • DataStreamReader
    • DataStreamWriter
  • 下面的内置函数
    • from_csv
    • to_csv
    • schema_of_csv
  • CREATE TABLE USING DATA_SOURCE处的OPTIONS子句
Property NameDefaultMeaningScope
sep,为每个字段和值设置分隔符。此分隔符可以是一个或多个字符。read/write
encodingUTF-8对于读取,按给定的编码类型对CSV文件进行解码。对于写入,指定已保存CSV文件的编码(字符集)。CSV内置函数忽略此选项。read/write
quote"设置用于转义带引号的值的单个字符,其中分隔符可以是值的一部分。在读取时,如果你想关闭引号,你需要设置一个空字符串,而不是null。在写入时,如果设置了空字符串,则使用u0000(null字符)。read/write
quoteAllfalse一个flag,指示是否所有值都应始终用引号括起来。默认是仅转义包含引号字符的值。write
escape\在已引用的值中设置用于转义引号的单个字符。read/write
escapeQuotestrue一个flag,指示是否应始终将包含引号的值括在引号中。默认情况是转义所有包含引号字符的值。write
comment设置单个字符,用于跳过以该字符开头的行。缺省情况下,禁用该功能。read
headerfalse对于读取,使用第一行作为列的名称。对于写入,将列的名称作为第一行写入。注意,如果给定的路径是字符串的RDD,这个header选项将删除所有与header相同的行。CSV内置函数忽略此选项。read/write
inferSchemafalse从数据自动推断输入schema。它需要对数据进行一次额外的传递。CSV内置函数忽略此选项。read
preferDatetrue在schema推断(inferSchema)过程中,如果值满足dateFormat选项或默认日期格式,则尝试将包含日期的字符串列推断为Date。对于包含日期和时间戳的混合列,如果未指定时间戳格式,尝试将其推断为TimestampType,否则将其推断成StringType。read
enforceSchematrue如果设置为true,则指定或推断的schema将被强制应用于数据源文件,并且CSV文件中的headers将被忽略。如果该选项设置为false,则在header选项设置为true的情况下,将根据CSV文件中的所有headers验证schema。schema中的字段名和CSV header中的列名根据其位置进行检查,并考虑spark.sql.caseSensitive。尽管默认值为true,但建议禁用enforceSchema选项以避免出现错误的结果。CSV内置函数忽略此选项。read
ignoreLeadingWhiteSpacefalse (用于读取), true (用于写入)一个flag,指示是否应该跳过正在读/写的值的前导空白。read/write
ignoreTrailingWhiteSpacefalse (用于读取), true (用于写入)一个flag,指示是否应该跳过正在读/写的值后面的空白。read/write
nullValue设置null值的字符串表示形式。自2.0.1版本起,此nullValue参数适用于所有支持的类型,包括字符串类型。read/write
nanValueNaN设置非数字值的字符串表示形式。read
positiveInfInf设置正无穷大值的字符串表示形式。read
negativeInf-Inf设置负无穷大值的字符串表示形式。read
dateFormatyyyy-MM-dd设置指示日期格式的字符串。自定义日期格式遵循Datetime Patterns中的格式。这适用于日期类型。read/write
timestampFormatyyyy-MM-dd’T’HH:mm:ss[.SSS][XXX]设置指示时间戳格式的字符串。自定义日期格式遵循Datetime Patterns中的格式。这适用于时间戳类型。read/write
timestampNTZFormatyyyy-MM-dd’T’HH:mm:ss[.SSS]设置表示时间戳的字符串,不带时区格式。自定义日期格式遵循Datetime Patterns中的格式。这适用于没有时区类型的时间戳,请注意,在写入或读取此数据类型时不支持区域偏移和时区组件。read/write
enableDateTimeParsingFallback如果时间解析器策略具有遗留设置,或者没有提供自定义日期或时间戳pattern,则启用。如果值与设置的patterns不匹配,则允许回退到向后兼容的(Spark 1.x和2.0)行为,来解析日期和时间戳。read
maxColumns20480定义一条记录可以包含的列数的硬限制。read
maxCharsPerColumn-1定义要读取的任何给定值所允许的最大字符数。默认值为-1,表示不限制长度read
modePERMISSIVE允许在解析期间处理损坏记录的模式。它支持以下不区分大小写的模式。注意,Spark尝试在列修剪(column pruning)下解析CSV中只需要的列。因此,损坏的记录可以根据所需的字段集而有所不同。此行为可以通过spark.sql.csv.parser.columnPruning.enabled(默认为启用)来控制。 ● PERMISSIVE: 当遇到损坏的记录时,将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中,并将格式错误字段设置为null。若要保留损坏的记录,用户可以在用户定义的schema中设置名为columnNameOfCorruptRecord的字符串类型字段。如果schema没有该字段,则会在解析过程中删除损坏的记录。令牌少于/多于schema的记录不是CSV的损坏记录。当遇到令牌少于schema长度的记录时,将null设置为额外字段。当记录的标记数超过schema的长度时,它会丢弃额外的标记。● DROPMALFORMED: 忽略所有损坏的记录。CSV内置函数不支持此模式。● FAILFAST: 当遇到损坏的记录时引发异常。read
columnNameOfCorruptRecord(spark.sql.columnNameOfCorruptRecord配置的值)允许重命名由PERMISSIVE模式创建的字符串格式不正确的新字段。这将覆盖spark.sql.columnNameOfCorruptRecord。read
multiLinefalse每个文件解析一条记录,它可能跨越多行。CSV内置函数忽略此选项。read
charToEscapeQuoteEscapingescape or \0设置用于转义引号字符的转义的单个字符。当转义符和引号不同时,默认值为转义符,否则为\0。read/write
samplingRatio1.0定义用于schema推断的行数。CSV内置函数忽略此选项。read
emptyValue(用于读取), “” (用于写入)设置空值的字符串表示形式。read/write
localeen-US将locale设置为IETF BCP 47格式的语言标记。例如,这在解析日期和时间戳时使用。read
lineSep\r, \r\n and \n (for reading), \n (for writing)定义应用于解析/写入的行分隔符。最大长度为1个字符。CSV内置函数忽略此选项。read/write
unescapedQuoteHandlingSTOP_AT_DELIMITER定义CsvParser将如何处理带有无转义引号的值。 ● STOP_AT_CLOSING_QUOTE:如果在输入中发现未转义的引号,则累加引号字符并将值作为引号值继续解析,直到找到结束引号。 ● BACK_TO_DELIMITER: 如果在输入中发现未转义的引号,则将该值视为未引号值。这将使解析器累积当前解析值的所有字符,直到找到分隔符。如果值中没有找到分隔符,解析器将继续从输入中积累字符,直到找到分隔符或行结束符。 ● STOP_AT_DELIMITER: 如果在输入中发现未转义的引号,则将该值视为未引号值。这将使解析器累积所有字符,直到在输入中找到分隔符或行结束符。 ● SKIP_VALUE: 如果在输入中找到未转义的引号,将跳过为给定值解析的内容,并生成nullValue中设置的值。 ● RAISE_ERROR:如果在输入中发现未转义的引号,将抛出TextParsingException。read
compression(none)保存到文件时使用的压缩编解码器。这可以是已知的不区分大小写的缩写名之一(none、bzip2、gzip、lz4、snappy和deflate)。CSV内置函数忽略此选项。write

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!