Apache Arrow 优点 :
高性能数据处理: Arrow 使用列式内存布局,这特别适合于数据分析和查询操作,因为它允许对数据进行高效批量处理,减少CPU缓存未命中,从而提升处理速度。
零拷贝数据共享: Arrow 允许不同系统和进程之间直接共享内存中的数据而无需复制,这对于提高数据密集型应用的效率至关重要,减少了内存使用和CPU开销。
跨平台兼容性: Arrow 是一个跨语言开发平台,支持C++, Java, Python等多种编程语言,促进了不同软件组件间的互操作性。
标准化数据格式: 定义了一套统一的数据格式规范,使得数据可以在不同系统间无缝传递,降低了数据转换的成本和复杂性。
优化大数据处理: 特别是在与大数据框架(如Spark、Pandas)集成时,Arrow 可显著加速数据加载、处理和分析的速度,例如,与PySpark集成后数据处理速度提升高达53倍。
集成广泛: 被众多数据处理工具和库采用,如Pandas、Parquet、Drill、Spark等,形成了强大的生态系统。
Apache Arrow 缺点 :
内存消耗: 列式存储相对于行式存储可能需要更多的内存,尤其是在处理稀疏数据或宽表时,因为每一列都需要分配连续的内存空间。
不适合所有场景: 对于需要频繁随机访问记录或更新操作的场景,Arrow 的列式存储可能不如传统的行式存储高效。
学习曲线: 对于新用户来说,理解和掌握Arrow的数据结构和API可能需要一定时间,尤其是当他们习惯于使用其他数据处理模型时。
生态成熟度: 虽然Arrow的生态系统正在快速发展,但在某些特定领域或小众技术栈中,相关支持和工具可能不够丰富或成熟。
实现复杂性: 对于开发者来说,实现Arrow的高效利用可能涉及到复杂的内存管理和优化策略,这在某些情况下可能会增加开发难度。
#define ARROW_COMPUTE #include <arrow/compute/api.h> #include "arrow/pretty_print.h" #include <arrow/api.h> #include <arrow/csv/api.h> #include <arrow/json/api.h> #include <arrow/io/api.h> #include <arrow/table.h> #include <arrow/pretty_print.h> #include <arrow/result.h> #include <arrow/status.h> #include <arrow/ipc/api.h> #include <parquet/arrow/reader.h> #include <parquet/arrow/writer.h> #include <parquet/exception.h> #include <memory> #include <iostream> template <typename T> using numbuildT = arrow::NumericBuilder<T>; struct ArrowUtil { static arrow::Status read_csv(char const* file_name, std::shared_ptr<arrow::Table>& tb); static arrow::Status read_ipc(char const* file_name, std::shared_ptr<arrow::Table>& tb); static arrow::Status read_parquet(char const* file_name, std::shared_ptr<arrow::Table>& tb); static arrow::Status read_json(char const* file_name, std::shared_ptr<arrow::Table>& tb); static arrow::Status write_ipc(arrow::Table const& tb, char const* file_name); static arrow::Status write_parquet(arrow::Table const& tb, char const* file_name); template <typename T, typename buildT, typename arrayT> inline static std::shared_ptr<arrow::Array> chunked_array_to_array(std::shared_ptr<arrow::ChunkedArray> const& array_a) { buildT int64_builder; int64_builder.Resize(array_a->length()); std::vector<T> int64_values; int64_values.reserve(array_a->length()); for (int i = 0; i < array_a->num_chunks(); ++i) { auto inner_arr = array_a->chunk(i); auto int_a = std::static_pointer_cast<arrayT>(inner_arr); for (int j = 0; j < int_a->length(); ++j) { int64_values.push_back(int_a->Value(j)); } } int64_builder.AppendValues(int64_values); std::shared_ptr<arrow::Array> array_a_res; int64_builder.Finish(&array_a_res); return array_a_res; } template <typename T, typename arrayT> inline static std::vector<T> chunked_array_to_vector(std::shared_ptr<arrow::ChunkedArray> const& array_a) { std::vector<T> int64_values; int64_values.reserve(array_a->length()); for (int i = 0; i < array_a->num_chunks(); ++i) { auto inner_arr = array_a->chunk(i); auto int_a = std::static_pointer_cast<arrayT>(inner_arr); for (int j = 0; j < int_a->length(); ++j) { int64_values.push_back(int_a->Value(j)); } } return int64_values; } inline static std::vector<std::string> chunked_array_to_str_vector(std::shared_ptr<arrow::ChunkedArray> const& array_a) { std::vector<std::string> int64_values; int64_values.reserve(array_a->length()); for (int i = 0; i < array_a->num_chunks(); ++i) { auto inner_arr = array_a->chunk(i); auto int_a = std::static_pointer_cast<arrow::StringArray>(inner_arr); for (int j = 0; j < int_a->length(); ++j) { int64_values.push_back(int_a->Value(j).data()); } } return int64_values; } inline static std::shared_ptr<arrow::Array> chunked_array_to_str_array(std::shared_ptr<arrow::ChunkedArray> const& array_a) { arrow::StringBuilder int64_builder; int64_builder.Resize(array_a->length()); std::vector<std::string> int64_values; int64_values.reserve(array_a->length()); for (int i = 0; i < array_a->num_chunks(); ++i) { auto inner_arr = array_a->chunk(i); auto int_a = std::static_pointer_cast<arrow::StringArray>(inner_arr); for (int j = 0; j < int_a->length(); ++j) { int64_values.push_back(int_a->Value(j).data()); } } int64_builder.AppendValues(int64_values); std::shared_ptr<arrow::Array> array_a_res; int64_builder.Finish(&array_a_res); return array_a_res; } }; arrow::Status ArrowUtil::read_csv(char const* file_name, std::shared_ptr<arrow::Table>& tb) { ARROW_ASSIGN_OR_RAISE(auto input_file, arrow::io::ReadableFile::Open(file_name)); ARROW_ASSIGN_OR_RAISE(auto csv_reader, arrow::csv::TableReader::Make( arrow::io::default_io_context(), input_file, arrow::csv::ReadOptions::Defaults(), arrow::csv::ParseOptions::Defaults(), arrow::csv::ConvertOptions::Defaults())); ARROW_ASSIGN_OR_RAISE(auto table, csv_reader->Read()); tb = table; return arrow::Status::OK(); } arrow::Status ArrowUtil::read_ipc(char const* file_name, std::shared_ptr<arrow::Table>& tb) { ARROW_ASSIGN_OR_RAISE(auto input_file, arrow::io::ReadableFile::Open(file_name)); ARROW_ASSIGN_OR_RAISE(auto ipc_reader, arrow::ipc::RecordBatchFileReader::Open(input_file)); std::vector<std::shared_ptr<arrow::RecordBatch>> batches; batches.reserve(ipc_reader->num_record_batches()); for (int i = 0; i < ipc_reader->num_record_batches(); ++i) { ARROW_ASSIGN_OR_RAISE(auto a_record, ipc_reader->ReadRecordBatch(i)); batches.emplace_back(std::move(a_record)); } arrow::Table::FromRecordBatches(ipc_reader->schema(), std::move(batches)).Value(&tb); return arrow::Status::OK(); } arrow::Status ArrowUtil::read_parquet(char const* file_name, std::shared_ptr<arrow::Table>& tb) { std::shared_ptr<arrow::io::ReadableFile> infile; PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(file_name, arrow::default_memory_pool())); std::unique_ptr<parquet::arrow::FileReader> reader; PARQUET_THROW_NOT_OK( parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); std::shared_ptr<arrow::Table> table; PARQUET_THROW_NOT_OK(reader->ReadTable(&table)); tb = table; return arrow::Status::OK(); } arrow::Status ArrowUtil::read_json(char const* file_name, std::shared_ptr<arrow::Table>& tb) { std::shared_ptr<arrow::io::ReadableFile> infile; PARQUET_ASSIGN_OR_THROW(infile, arrow::io::ReadableFile::Open(file_name, arrow::default_memory_pool())); ARROW_ASSIGN_OR_RAISE(auto reader, arrow::json::TableReader::Make(arrow::default_memory_pool(), infile, arrow::json::ReadOptions::Defaults(), arrow::json::ParseOptions::Defaults())); ARROW_ASSIGN_OR_RAISE(auto res_tb, reader->Read()); tb = res_tb; return arrow::Status::OK(); } arrow::Status ArrowUtil::write_ipc(arrow::Table const& tb, char const* file_name) { ARROW_ASSIGN_OR_RAISE(auto output_file, arrow::io::FileOutputStream::Open(file_name)); ARROW_ASSIGN_OR_RAISE(auto batch_writer, arrow::ipc::MakeFileWriter(output_file, tb.schema())); ARROW_RETURN_NOT_OK(batch_writer->WriteTable(tb)); ARROW_RETURN_NOT_OK(batch_writer->Close()); return arrow::Status::OK(); } arrow::Status ArrowUtil::write_parquet(arrow::Table const& tb, char const* file_name) { std::shared_ptr<arrow::io::FileOutputStream> outfile; PARQUET_ASSIGN_OR_THROW( outfile, arrow::io::FileOutputStream::Open(file_name)); // The last argument to the function call is the size of the RowGroup in // the parquet file. Normally you would choose this to be rather large but // for the example, we use a small value to have multiple RowGroups. PARQUET_THROW_NOT_OK( parquet::arrow::WriteTable(tb, arrow::default_memory_pool(), outfile, 3)); return arrow::Status::OK(); } void testReadCSV() { // 读取CSV文件 char const* csv_path = "./test.csv"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_csv(csv_path, tb); auto const& tb_ = *tb; arrow::PrettyPrint(tb_, {}, &std::cerr); assert(tb_.num_rows() == 2); } void testWriteIpc() { // 读取CSV文件并写入IPC文件 char const* csv_path = "./test.csv"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_csv(csv_path, tb); auto const& tb_ = *tb; char const* write_csv_path = "./test_dst.arrow"; arrow::PrettyPrint(tb_, {}, &std::cerr); auto write_res = ArrowUtil::write_ipc(tb_, write_csv_path); assert(write_res == arrow::Status::OK()); } void testReadIPC() { // 读取Arrow IPC 文件 char const* ipc_path = "./test_dst.arrow"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_ipc(ipc_path, tb); auto const& tb_ = *tb; arrow::PrettyPrint(tb_, {}, &std::cerr); assert(tb_.num_rows() == 2); } void testWriteParquet() { // 写入Parquet文件 char const* csv_path = "./test.csv"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_csv(csv_path, tb); auto const& tb_ = *tb; char const* write_parquet_path = "./test_dst.parquet"; arrow::PrettyPrint(tb_, {}, &std::cerr); auto write_res = ArrowUtil::write_parquet(tb_, write_parquet_path); assert(write_res == arrow::Status::OK()); } void testReadParquet() { // 读取 Parquet char const* parquet_path = "./test_dst.parquet"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_parquet(parquet_path, tb); auto const& tb_ = *tb; arrow::PrettyPrint(tb_, {}, &std::cerr); assert(tb_.num_rows() == 2); } void testReadJson() { // 读取Json文件 char const* json_path = "./test.json"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_json(json_path, tb); auto const& tb_ = *tb; arrow::PrettyPrint(tb_, {}, &std::cerr); assert(tb_.num_rows() == 2); } void testComputeGreater() { // 比较两列 int 值中 int1 > int2的值, greater函数 char const* json_path = "./comp_gt.csv"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_csv(json_path, tb); auto const& tb_ = *tb; arrow::PrettyPrint(tb_, {}, &std::cerr); auto array_a = tb_.GetColumnByName("int1"); auto array_b = tb_.GetColumnByName("int2"); auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a); auto array_b_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_b); auto compared_datum = arrow::compute::CallFunction("greater", { array_a_res, array_b_res }); auto array_a_gt_b_compute = compared_datum->make_array(); arrow::PrettyPrint(*array_a_gt_b_compute, {}, &std::cerr); auto schema = arrow::schema({ arrow::field("int1", arrow::int64()), arrow::field("int2", arrow::int64()), arrow::field("a>b? (arrow)", arrow::boolean()) }); std::shared_ptr<arrow::Table> my_table = arrow::Table::Make( schema, { array_a_res, array_b_res, array_a_gt_b_compute }, tb_.num_rows()); arrow::PrettyPrint(*my_table, {}, &std::cerr); } void testComputeMinMax() { // 计算int1列的最大值和最小值 char const* json_path = "./comp_gt.csv"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_csv(json_path, tb); auto const& tb_ = *tb; arrow::PrettyPrint(tb_, {}, &std::cerr); auto array_a = tb_.GetColumnByName("int1"); auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a); arrow::compute::ScalarAggregateOptions scalar_aggregate_options; scalar_aggregate_options.skip_nulls = false; auto min_max = arrow::compute::CallFunction("min_max", { array_a_res }, &scalar_aggregate_options); // Unpack struct scalar result (a two-field {"min", "max"} scalar) auto min_value = min_max->scalar_as<arrow::StructScalar>().value[0]; auto max_value = min_max->scalar_as<arrow::StructScalar>().value[1]; assert(min_value->ToString() == "1"); assert(max_value->ToString() == "8"); } #define GTEST_TEST(a, b) void a##_##b() #define ASSERT_EQ(a, b) assert(a == b) GTEST_TEST(RWTests, ComputeMean) { // 计算int1列的平均值 char const* json_path = "../data/comp_gt.csv"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_csv(json_path, tb); auto const& tb_ = *tb; arrow::PrettyPrint(tb_, {}, &std::cerr); auto array_a = tb_.GetColumnByName("int1"); auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a); arrow::compute::ScalarAggregateOptions scalar_aggregate_options; scalar_aggregate_options.skip_nulls = false; auto mean = arrow::compute::CallFunction("mean", { array_a_res }, &scalar_aggregate_options); auto const& mean_value = mean->scalar_as<arrow::Scalar>(); ASSERT_EQ(mean_value.ToString(), "4.5"); } GTEST_TEST(RWTests, ComputeAdd) { // 将第一列的值加3 char const* json_path = "../data/comp_gt.csv"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_csv(json_path, tb); auto const& tb_ = *tb; arrow::PrettyPrint(tb_, {}, &std::cerr); auto array_a = tb_.GetColumnByName("int1"); auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a); arrow::compute::ScalarAggregateOptions scalar_aggregate_options; scalar_aggregate_options.skip_nulls = false; std::shared_ptr<arrow::Scalar> increment = std::make_shared<arrow::Int64Scalar>(3); auto add = arrow::compute::CallFunction("add", { array_a_res, increment }, &scalar_aggregate_options); std::shared_ptr<arrow::Array> incremented_array = add->array_as<arrow::Array>(); arrow::PrettyPrint(*incremented_array, {}, &std::cerr); } GTEST_TEST(RWTests, ComputeAddArray) { // int1和int2两列相加 char const* json_path = "../data/comp_gt.csv"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_csv(json_path, tb); auto const& tb_ = *tb; arrow::PrettyPrint(tb_, {}, &std::cerr); auto array_a = tb_.GetColumnByName("int1"); auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a); auto array_b = tb_.GetColumnByName("int2"); auto array_b_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_b); arrow::compute::ScalarAggregateOptions scalar_aggregate_options; scalar_aggregate_options.skip_nulls = false; auto add = arrow::compute::CallFunction("add", { array_a_res, array_b_res }, &scalar_aggregate_options); std::shared_ptr<arrow::Array> incremented_array = add->array_as<arrow::Array>(); arrow::PrettyPrint(*incremented_array, {}, &std::cerr); } GTEST_TEST(RWTests, ComputeStringEqual) { // 比较s1和s2两列是否相等 char const* json_path = "../data/comp_s_eq.csv"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_csv(json_path, tb); auto const& tb_ = *tb; arrow::PrettyPrint(tb_, {}, &std::cerr); auto array_a = tb_.GetColumnByName("s1"); auto array_a_res = ArrowUtil::chunked_array_to_str_array(array_a); auto array_b = tb_.GetColumnByName("s2"); auto array_b_res = ArrowUtil::chunked_array_to_str_array(array_b); arrow::compute::ScalarAggregateOptions scalar_aggregate_options; scalar_aggregate_options.skip_nulls = false; auto eq_ = arrow::compute::CallFunction("equal", { array_a_res, array_b_res }, &scalar_aggregate_options); std::shared_ptr<arrow::Array> eq_array = eq_->array_as<arrow::Array>(); arrow::PrettyPrint(*eq_array, {}, &std::cerr); } GTEST_TEST(RWTests, ComputeCustom) { // 自己写算法逐个比较相等 char const* json_path = "../data/comp_s_eq.csv"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_csv(json_path, tb); auto const& tb_ = *tb; arrow::PrettyPrint(tb_, {}, &std::cerr); auto arr1 = tb_.GetColumnByName("s1"); auto arr2 = tb_.GetColumnByName("s2"); auto v1 = ArrowUtil::chunked_array_to_str_vector(arr1); auto v2 = ArrowUtil::chunked_array_to_str_vector(arr2); for (std::size_t i = 0; i < v1.size(); ++i) { if (v1[i] != v2[i]) { std::cerr << v1[i] << "!=" << v2[i] << "\n"; } } } GTEST_TEST(RWTests, ComputeCustomDbl) { // 自己写算法比较double值 char const* json_path = "../data/custom_dbl.csv"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_csv(json_path, tb); auto const& tb_ = *tb; arrow::PrettyPrint(tb_, {}, &std::cerr); auto arr1 = tb_.GetColumnByName("dbl1"); auto arr2 = tb_.GetColumnByName("dbl2"); auto v1 = ArrowUtil::chunked_array_to_vector<double, arrow::DoubleArray>(arr1); auto v2 = ArrowUtil::chunked_array_to_vector<double, arrow::DoubleArray>(arr2); for (std::size_t i = 0; i < v1.size(); ++i) { if (v1[i] != v2[i]) { std::cerr << v1[i] << "!=" << v2[i] << "\n"; } } } GTEST_TEST(RWTests, ComputeEqualDbl) { // 使用equal函数比较double值 char const* json_path = "../data/custom_dbl.csv"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_csv(json_path, tb); auto const& tb_ = *tb; arrow::PrettyPrint(tb_, {}, &std::cerr); auto arr1 = tb_.GetColumnByName("dbl1"); auto arr2 = tb_.GetColumnByName("dbl2"); auto dbl_arr1 = ArrowUtil::chunked_array_to_array<double, numbuildT<arrow::DoubleType>, arrow::DoubleArray>(arr1); auto dbl_arr2 = ArrowUtil::chunked_array_to_array<double, numbuildT<arrow::DoubleType>, arrow::DoubleArray>(arr2); arrow::compute::ScalarAggregateOptions scalar_aggregate_options; scalar_aggregate_options.skip_nulls = false; auto eq_ = arrow::compute::CallFunction("equal", { dbl_arr1, dbl_arr2 }, &scalar_aggregate_options); std::shared_ptr<arrow::Array> eq_array = eq_->array_as<arrow::Array>(); arrow::PrettyPrint(*eq_array, {}, &std::cerr); } GTEST_TEST(RWTests, StrStartsWith) { // 计算s1列以是否以 Zha开头的值 char const* json_path = "../data/comp_s_eq.csv"; std::shared_ptr<arrow::Table> tb; ArrowUtil::read_csv(json_path, tb); auto const& tb_ = *tb; arrow::PrettyPrint(tb_, {}, &std::cerr); auto array_a = tb_.GetColumnByName("s1"); auto array_a_res = ArrowUtil::chunked_array_to_str_array(array_a); arrow::compute::MatchSubstringOptions options("Zha"); auto eq_ = arrow::compute::CallFunction("starts_with", { array_a_res }, &options); std::shared_ptr<arrow::Array> eq_array = eq_->array_as<arrow::Array>(); arrow::PrettyPrint(*eq_array, {}, &std::cerr); } using arrow::Int32Builder; using arrow::Int64Builder; using arrow::DoubleBuilder; using arrow::StringBuilder; struct row_data { int32_t col1; int64_t col2; double col3; std::string col4; };//行结构 #define EXIT_ON_FAILURE(expr) \ do { \ arrow::Status status_ = (expr); \ if (!status_.ok()) { \ std::cerr << status_.message() << std::endl; \ return EXIT_FAILURE; \ } \ } while (0); arrow::Status CreateTable(const std::vector<struct row_data>& rows, std::shared_ptr<arrow::Table>* table) { //使用arrow::jemalloc::MemoryPool::default_pool()构建器更有效,因为这可以适当增加底层内存区域的大小. arrow::MemoryPool* pool = arrow::default_memory_pool(); Int32Builder col1_builder(pool); Int64Builder col2_builder(pool); DoubleBuilder col3_builder(pool); StringBuilder col4_builder(pool); //现在我们可以循环我们现有的数据,并将其插入到构建器中。这里的' Append '调用可能会失败(例如,我们无法分配足够的额外内存)。因此我们需要检查它们的返回值。 for (const row_data& row : rows) { ARROW_RETURN_NOT_OK(col1_builder.Append(row.col1)); ARROW_RETURN_NOT_OK(col2_builder.Append(row.col2)); ARROW_RETURN_NOT_OK(col3_builder.Append(row.col3)); ARROW_RETURN_NOT_OK(col4_builder.Append(row.col4)); } //添加空值,末尾值的元素为空 ARROW_RETURN_NOT_OK(col1_builder.AppendNull()); ARROW_RETURN_NOT_OK(col2_builder.AppendNull()); ARROW_RETURN_NOT_OK(col3_builder.AppendNull()); ARROW_RETURN_NOT_OK(col4_builder.AppendNull()); std::shared_ptr<arrow::Array> col1_array; ARROW_RETURN_NOT_OK(col1_builder.Finish(&col1_array)); std::shared_ptr<arrow::Array> col2_array; ARROW_RETURN_NOT_OK(col2_builder.Finish(&col2_array)); std::shared_ptr<arrow::Array> col3_array; ARROW_RETURN_NOT_OK(col3_builder.Finish(&col3_array)); std::shared_ptr<arrow::Array> col4_array; ARROW_RETURN_NOT_OK(col4_builder.Finish(&col4_array)); std::vector<std::shared_ptr<arrow::Field>> schema_vector = { arrow::field("col1", arrow::int32()), arrow::field("col2", arrow::int64()), arrow::field("col3", arrow::float64()), arrow::field("col4", arrow::utf8()) }; auto schema = std::make_shared<arrow::Schema>(schema_vector); //最终的' table '变量是我们可以传递给其他可以使用Apache Arrow内存结构的函数的变量。这个对象拥有所有引用数据的所有权, //因此一旦我们离开构建表及其底层数组的函数的作用域,就不必关心未定义的引用。 *table = arrow::Table::Make(schema, { col1_array, col2_array, col3_array,col4_array }); return arrow::Status::OK(); } arrow::Status TableToVector(const std::shared_ptr<arrow::Table>& table, std::vector<struct row_data>* rows) { //检查表结构是否一致 std::vector<std::shared_ptr<arrow::Field>> schema_vector = { arrow::field("col1", arrow::int32()), arrow::field("col2", arrow::int64()), arrow::field("col3", arrow::float64()), arrow::field("col4", arrow::utf8()) }; auto expected_schema = std::make_shared<arrow::Schema>(schema_vector); if (!expected_schema->Equals(*table->schema())) { // The table doesn't have the expected schema thus we cannot directly // convert it to our target representation. return arrow::Status::Invalid("Schemas are not matching!"); } //获取对应列数据指针 auto col1s = std::static_pointer_cast<arrow::Int32Array>(table->column(0)->chunk(0)); auto col2s = std::static_pointer_cast<arrow::Int64Array>(table->column(1)->chunk(0)); auto col3s = std::static_pointer_cast<arrow::DoubleArray>(table->column(2)->chunk(0)); auto col4s = std::static_pointer_cast<arrow::StringArray>(table->column(3)->chunk(0)); for (int64_t i = 0; i < table->num_rows(); i++) { if (col1s->IsNull(i)) { assert(i == 3);//第四行为null } else { int32_t col1 = col1s->Value(i); int64_t col2 = col2s->Value(i); double col3 = col3s->Value(i); std::string col4 = col4s->GetString(i); rows->push_back({ col1, col2, col3,col4 }); } } return arrow::Status::OK(); } // 行数组和列数组相互转换 int testTableConvertSTL() { //行数组 std::vector<row_data> rows = { {1, 11,1.0, "John"}, {2, 22,2.0, "Tom"}, {3,33, 3.0,"Susan"} }; std::shared_ptr<arrow::Table> table; EXIT_ON_FAILURE(CreateTable(rows, &table)); std::vector<row_data> expected_rows; EXIT_ON_FAILURE(TableToVector(table, &expected_rows)); std::cout << expected_rows.size() << std::endl; assert(rows.size() == expected_rows.size()); return 0; } void test() { // 构建一个int8数组 arrow::Int8Builder builder; arrow::Int16Builder int16builder; int8_t days_raw[5] = { 1, 12, 17, 23, 28 }; int8_t months_raw[5] = { 1, 3, 5, 7, 1 }; int16_t years_raw[5] = { 1990, 2000, 1995, 2000, 1995 }; builder.AppendValues(days_raw, 5); std::shared_ptr<arrow::Array> days = builder.Finish().MoveValueUnsafe(); builder.AppendValues(months_raw, 5); std::shared_ptr<arrow::Array> months = builder.Finish().MoveValueUnsafe(); int16builder.AppendValues(years_raw, 5); std::shared_ptr<arrow::Array> years = int16builder.Finish().MoveValueUnsafe(); // Schema 自定义table // Now, we want a RecordBatch, which has columns and labels for said columns. // This gets us to the 2d data structures we want in Arrow. // These are defined by schema, which have fields -- here we get both those object types // ready. std::shared_ptr<arrow::Field> field_day, field_month, field_year; std::shared_ptr<arrow::Schema> schema; // Every field needs its name and data type. field_day = arrow::field("Day", arrow::int8()); field_month = arrow::field("Month", arrow::int8()); field_year = arrow::field("Year", arrow::int16()); // The schema can be built from a vector of fields, and we do so here. schema = arrow::schema({ field_day, field_month, field_year }); // 打印 // With the schema and Arrays full of data, we can make our RecordBatch! Here, // each column is internally contiguous. This is in opposition to Tables, which we'll // see next. std::shared_ptr<arrow::RecordBatch> rbatch; // The RecordBatch needs the schema, length for columns, which all must match, // and the actual data itself. rbatch = arrow::RecordBatch::Make(schema, days->length(), { days, months, years }); std::cout << rbatch->ToString(); /* Day: [ 1, 12, 17, 23, 28 ] Month: [ 1, 3, 5, 7, 1 ] Year: [ 1990, 2000, 1995, 2000, 1995 ] */ // stl vector容器 arrow::ArrayVector day_vecs{days}; std::shared_ptr<arrow::ChunkedArray> day_chunks = std::make_shared<arrow::ChunkedArray>(day_vecs); testTableConvertSTL(); testReadCSV(); /* col1: string col2: string col3: string ---- col1: [ [ "val1", "val1" ] ] col2: [ [ "val2", "val2" ] ] col3: [ [ "val3", "val3" ] ] */ testWriteIpc(); testReadIPC(); //testComputeGreater(); //testComputeMinMax(); }
Compute Functions — Apache Arrow v17.0.0