数据存储格式——Arrow

avatar
作者
筋斗云
阅读量:0

1.概述

huggingface下载的.arrow数据集读取与使用说明 - 代码天地 (codetd.com)

Arrow数据格式-Arrow究竟是个啥_arrow格式-CSDN博客

Python : Arrow、Pyarrow库、以及与Julia互读_import pyarrow as pa-CSDN博客
  Apache Arrow 是 Apache 基金会全新孵化的一个顶级项目。它设计的目的在于作为一个跨平台的数据层,来加快大数据分析项目的运行速度。

2.内容


  现在大数据处理模型很多,用户在应用大数据分析时,除了将 Hadoop 等大数据平台作为一个存储和批处理平台之外,同样也得关注系统的扩展性和性能。过去开源社区已经发布了很多工具来完善大数据分析的生态系统,这些工具包含了数据分析的各个层面,例如列式存储格式(Parquet,ORC),内存计算模型(Drill,Spark,Impala 和 Storm)以及其强大的 API 接口。而 Arrow 则是最新加入的一员,它提供了一种跨平台应用的内存数据交换格式。

  在数据快速增长和复杂化的情况下,提高大数据分析性能一个重要的途径是对列式数据的设计和处理。列式数据处理借助了向量计算和 SIMD 使我们可以充分挖掘硬件的潜力。而 Apache Drill 其大数据查询引擎无论是在硬盘还是内存中数据都是以列的方式存在的,而 Arrow 就是由 Drill 中的 Value Vector 这一数据格式发展而来。此外,Arrow 也支持关系型和动态数据集。

  Arrow 的诞生为大数据生态带来了很多可能性,有了 Arrow 作为今后标准数据交换格式,各个数据分析的系统和应用之间的交互性可以说是揭开了新的篇章。过去大部分的 CPU 周期都花在了数据的序列化与反序列化上,现在我们则能够实现不同系统之间数据的无缝链接。这意味着使用者在不同系统结合时,不用在数据格式上话费过多的时间。

3.Arrow Group


  Arrow 的内存数据结构如下所示:

  从上图中,我们可以很清晰的看出,传统的内存数据格式,各个字段的分布是以没一行呈现,相同字段并未集中排列在一起。而通过 Arrow 格式化后的内存数据,可以将相同字段集中排列在一起。我们可以很方便的使用 SQL 来操作数据。

  传统的访问各个数据模型中的数据以及使用 Arrow 后的图,如下所示:

  通过上图可以总结出以下观点:

每个系统都有属于自己的内存格式。
70~80% 的 CPU 浪费在序列化和反序列化上。
在多个项目都实现的类似的功能(Copy & Convert)。


  而在看上述使用 Arrow 后,得出以下结论:

所有的系统都使用相同的内存格式。
没有跨系统通信开销。
项目可以贡献功能(比如,Parquet 到 Arrow 的读取)。


4.Arrow 数据格式


  Arrow 列式数据格式如下所示:

persons = [{     name: 'wes',     iq: 180,     addresses: [     {number: 2, street 'a'},     {number: 3, street 'bb'}     ] }, {     name: 'joe', iq: 100, addresses: [ {number: 4, street 'ccc'}, {number: 5, street 'dddd'}, {number: 2, street 'f'} ] }]

如何使用Apache Arrow?

Apache Arrow 可以通过多种方式使用。以下是如何在 Python 中使用 Apache Arrow 的一些示例:

  1. 读取和写入 Arrow 文件

您以使用该库从磁盘或内存pyarrow读取和写入 Arrow 文件(带有.arrow扩展名)。

import pyarrow as pa import pyarrow.feather as feather  # Create a table from a list of dictionaries data = [     {"name": "Alice", "age": 25, "gender": "F"},     {"name": "Bob", "age": 30, "gender": "M"},     {"name": "Charlie", "age": 35, "gender": "M"} ] table = pa.Table.from_pydict(data)  # Write the table to an Arrow file feather.write_feather(table, "data.arrow")  # Read the table from an Arrow file table = feather.read_table("data.arrow")

2. Arrow 和 Pandas 之间的转换

可以使用该pyarrow库在 Arrow 表和 Pandas 数据框之间进行转换。例如:

import pyarrow as pa  import pandas as pd   # 创建 Pandas 数据框 df = pd.DataFrame(data)   # 将数据框转换为 Arrow 表 table = pa.Table.from_pandas(df)   # 将表转换回 Pandas 数据框架 df = table.to_pandas()

3. 将 Arrow 与 Spark 结合使用

pyarrow在 Python 中使用 Spark DataFrame 时,您可以使用该库来启用 Arrow 优化。例如:

import pyarrow as pa  import pyspark.sql.functions as F   # 启用箭头优化 spark.conf.set ( "spark.sql.execution.arrow.pyspark.enabled" , "true" )   # 从 Pandas 数据帧创建 Spark DataFrame  df = Spark.createDataFrame(pd.DataFrame(data))   # 应用一个使用 Spark UDF箭头数据 @F.pandas_udf( "int" , F.PandasUDFType.SCALAR )  def  add_one ( s: pd.Series ) -> pd.Series:      return s + 1   df = df.withColumn( "age_plus_one", add_one(df.age))

4. 使用内存池实现极快的访问

Arrow 的核心功能之一是零拷贝读取。这允许直接访问数据而无需复制内存缓冲区。为了充分利用这一点,我们需要使用 Arrow 内存池。

内存池允许您有效地重用分配的内存并避免不必要的分配或复制。以下是在 Python 中配置内存池的方法:

import pyarrow as pa   # 创建内存池 pool = pa.default_memory_pool()   # 将内存使用限制为 512 MB  pool = pa.proxy_memory_pool(pool, 0.5 * 2 ** 30 )   # 使用此池写入表      with pa.BufferOutputStream(pool=pool)as stream:   writer = pa.RecordBatchStreamWriter(stream, table.schema)     writer.write_table(table)    writer.close()

通过内存池,可以在多次读取或写入中使用相同的内存缓冲区。这减少了总体内存使用量并避免了昂贵的分配。

让我们看一下处理大量数据的批处理管道的真实示例:

# 使用内存池处理批处理 with pa.BufferOutputStream(pool=pool) as out_stream:     for batch in batch_reader:       # 过滤批处理     filtered = filter_func(batch)       # 写入过滤批处理     writer = pa.RecordBatchStreamWriter(out_stream,filtered.schema)      writer. write_batch(过滤)      writer.close()

通过跨批次重用内存池,即使在处理大量数据时,我们也可以最大限度地减少分配。智能内存管理可显着提高性能。

5.利用并行计算

分析工作负载通常需要执行复杂的操作,例如过滤、聚合、排序等。Arrow 允许通过将数据拆分为可以独立处理的块来利用并行性。

pyarrow.compute模块包含可应用于 Arrow 表或记录批次的矢量化函数:

import pyarrow as pa  import pyarrow.compute as pc   # 定义过滤函数 filter_fn = lambda batch: batch.column( 'age' ) > 30   # 并行过滤表   table = pc.filter(table,filter_fn,nthreads = 8)

这使过滤器操作在 8 个线程上并行化。其他函数如reducesortcol_distinct也支持类似的并行化。

让我们看一个更复杂的分析示例:

# 将数据加载到 Arrow 表 table = load_data()   # 在并行中过滤 30 岁以上的年龄 table = pc.filter (table, filter_age, nthreads= 8 )   # 并行聚合 totals = pc.sum(table, value=[ 'salary' , 'bonus' ])   # 将合计奖金除以 10 totals[ 'bonus' ] /= 10    # 写入输出 write_file(totals)

这使用所有可用核心以矢量化方式应用过滤、聚合和算术等多种操作。通过 Arrow 计算模块进行并行分析,可以利用现代硬件实现更快的数据处理。

6. 与不同数据格式无缝互操作

现实世界的数据管道需要使用不同的数据源和格式。Arrow 生态系统对许多常见格式提供一流的支持:

Parquet:针对分析优化的柱状格式:

import pyarrow.parquet as pq   # 将 Parquet 文件读入 Arrow Table  table = pq.read_table( 'data.parquet' )   # 将 Arrow Table 写入 Parquet 文件 pq.write_table(table, 'output.parquet' )

JSON:Web API 和文档的简单文本格式:

import pyarrow as pa  import pyarrow.json as json   # 加载 JSON 文档 table = json.read_json( 'data.json' )   # 将 Arrow 转换为 JSON  json_text = json.write_json(table)

原文链接:https://blog.csdn.net/javazyw/article/details/103063938

https://zhuanlan.zhihu.com/p/655305778

    广告一刻

    为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!