From e1baf4269dd4174aab676466fbc29aebd9f862bf Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 21 Jan 2026 09:56:09 +0900 Subject: [PATCH] [C++] Extract the internals of the RunEndEncode kernel out of compute/ --- cpp/src/arrow/CMakeLists.txt | 1 - cpp/src/arrow/c/CMakeLists.txt | 23 +- cpp/src/arrow/c/bridge_test.cc | 70 +- cpp/src/arrow/compute/kernel.cc | 22 + cpp/src/arrow/compute/kernel.h | 4 + .../compute/kernels/ree_util_internal.cc | 142 ---- .../arrow/compute/kernels/ree_util_internal.h | 388 ----------- .../compute/kernels/vector_run_end_encode.cc | 419 +++--------- cpp/src/arrow/json/from_string.cc | 14 + cpp/src/arrow/json/from_string_test.cc | 45 ++ cpp/src/arrow/util/ree_util.cc | 484 ++++++++++---- cpp/src/arrow/util/ree_util.h | 191 ++---- cpp/src/arrow/util/ree_util_internal.h | 627 ++++++++++++++++++ 13 files changed, 1229 insertions(+), 1201 deletions(-) delete mode 100644 cpp/src/arrow/compute/kernels/ree_util_internal.cc delete mode 100644 cpp/src/arrow/compute/kernels/ree_util_internal.h create mode 100644 cpp/src/arrow/util/ree_util_internal.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index d9f04a627bc..3cf163b120d 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -772,7 +772,6 @@ if(ARROW_COMPUTE) compute/kernels/hash_aggregate_numeric.cc compute/kernels/hash_aggregate_pivot.cc compute/kernels/pivot_internal.cc - compute/kernels/ree_util_internal.cc compute/kernels/scalar_arithmetic.cc compute/kernels/scalar_boolean.cc compute/kernels/scalar_compare.cc diff --git a/cpp/src/arrow/c/CMakeLists.txt b/cpp/src/arrow/c/CMakeLists.txt index a7f722aacc9..81a81cd3f11 100644 --- a/cpp/src/arrow/c/CMakeLists.txt +++ b/cpp/src/arrow/c/CMakeLists.txt @@ -15,28 +15,7 @@ # specific language governing permissions and limitations # under the License. -# TODO(GH-37221): Remove compute dependency for REE requirements on bridge_test -set(ARROW_TEST_LINK_LIBS "") - -if(ARROW_TEST_LINKAGE STREQUAL "static") - list(APPEND ARROW_TEST_LINK_LIBS ${ARROW_TEST_STATIC_LINK_LIBS}) -else() - list(APPEND ARROW_TEST_LINK_LIBS ${ARROW_TEST_SHARED_LINK_LIBS}) -endif() - -if(ARROW_COMPUTE) - if(ARROW_TEST_LINKAGE STREQUAL "static") - list(APPEND ARROW_TEST_LINK_LIBS arrow_compute_static arrow_compute_testing) - else() - list(APPEND ARROW_TEST_LINK_LIBS arrow_compute_shared arrow_compute_testing) - endif() -endif() - -add_arrow_test(bridge_test - PREFIX - "arrow-c" - STATIC_LINK_LIBS - ${ARROW_TEST_LINK_LIBS}) +add_arrow_test(bridge_test PREFIX "arrow-c") add_arrow_test(dlpack_test) add_arrow_benchmark(bridge_benchmark) diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc index c6a5e01e038..fd60eaecb3a 100644 --- a/cpp/src/arrow/c/bridge_test.cc +++ b/cpp/src/arrow/c/bridge_test.cc @@ -31,6 +31,7 @@ #include "arrow/c/bridge.h" #include "arrow/c/helpers.h" #include "arrow/c/util_internal.h" +#include "arrow/json/from_string.h" #include "arrow/memory_pool.h" #include "arrow/testing/builder.h" #include "arrow/testing/extension_type.h" @@ -45,13 +46,9 @@ #include "arrow/util/logging_internal.h" #include "arrow/util/macros.h" #include "arrow/util/range.h" +#include "arrow/util/ree_util.h" #include "arrow/util/thread_pool.h" -// TODO(GH-37221): Remove these ifdef checks when compute dependency is removed -#ifdef ARROW_COMPUTE -# include "arrow/compute/api_vector.h" -#endif - namespace arrow { using internal::ArrayDeviceExportTraits; @@ -472,7 +469,6 @@ TEST_F(TestSchemaExport, Union) { {ARROW_FLAG_NULLABLE}); } -#ifdef ARROW_COMPUTE TEST_F(TestSchemaExport, RunEndEncoded) { TestNested(run_end_encoded(int16(), uint8()), {"+r", "s", "C"}, {"", "run_ends", "values"}, {ARROW_FLAG_NULLABLE, 0, ARROW_FLAG_NULLABLE}); @@ -484,7 +480,6 @@ TEST_F(TestSchemaExport, RunEndEncoded) { {"", "run_ends", "values", "item"}, {ARROW_FLAG_NULLABLE, 0, ARROW_FLAG_NULLABLE, ARROW_FLAG_NULLABLE}); } -#endif std::string GetIndexFormat(Type::type type_id) { switch (type_id) { @@ -1080,21 +1075,10 @@ TEST_F(TestArrayExport, Union) { TestNested(type, data); } -#ifdef ARROW_COMPUTE -Result> REEFromJSON(const std::shared_ptr& ree_type, - const std::string& json) { - auto ree_type_ptr = checked_cast(ree_type.get()); - auto array = ArrayFromJSON(ree_type_ptr->value_type(), json); - ARROW_ASSIGN_OR_RAISE( - auto datum, - RunEndEncode(array, compute::RunEndEncodeOptions{ree_type_ptr->run_end_type()})); - return datum.make_array(); -} - TEST_F(TestArrayExport, RunEndEncoded) { auto factory = []() { - return REEFromJSON(run_end_encoded(int32(), int8()), - "[1, 2, 2, 3, null, null, null, 4]"); + return ArrayFromJSON(run_end_encoded(int32(), int8()), + "[1, 2, 2, 3, null, null, null, 4]"); }; TestNested(factory); } @@ -1102,13 +1086,12 @@ TEST_F(TestArrayExport, RunEndEncoded) { TEST_F(TestArrayExport, RunEndEncodedSliced) { auto factory = []() -> Result> { ARROW_ASSIGN_OR_RAISE(auto ree_array, - REEFromJSON(run_end_encoded(int32(), int8()), - "[1, 2, 2, 3, null, null, null, 4]")); + json::ArrayFromJSONString(run_end_encoded(int32(), int8()), + "[1, 2, 2, 3, null, null, null, 4]")); return ree_array->Slice(1, 5); }; TestNested(factory); } -#endif TEST_F(TestArrayExport, Dictionary) { { @@ -1432,16 +1415,14 @@ class TestDeviceArrayExport : public ::testing::Test { return [=]() { return ToDevice(mm, *ArrayFromJSON(type, json)->data()); }; } -#ifdef ARROW_COMPUTE static std::function>()> JSONREEArrayFactory( const std::shared_ptr& mm, std::shared_ptr type, const char* json) { return [=]() -> Result> { - ARROW_ASSIGN_OR_RAISE(auto result, REEFromJSON(type, json)); + ARROW_ASSIGN_OR_RAISE(auto result, json::ArrayFromJSONString(type, json)); return ToDevice(mm, *result->data()); }; } -#endif template void TestWithArrayFactory(ArrayFactory&& factory, ExportCheckFunc&& check_func) { @@ -1680,7 +1661,6 @@ TEST_F(TestDeviceArrayExport, Union) { TestNested(mm, type, data); } -#ifdef ARROW_COMPUTE TEST_F(TestDeviceArrayExport, RunEndEncoded) { std::shared_ptr device = std::make_shared(1); auto mm = device->default_memory_manager(); @@ -1689,7 +1669,6 @@ TEST_F(TestDeviceArrayExport, RunEndEncoded) { const char* data = "[1, null, 2, 2, 4, 5]"; TestNested(JSONREEArrayFactory(mm, type, data)); } -#endif TEST_F(TestDeviceArrayExport, Extension) { std::shared_ptr device = std::make_shared(1); @@ -2186,14 +2165,12 @@ TEST_F(TestSchemaImport, Map) { CheckImport(expected); } -#ifdef ARROW_COMPUTE TEST_F(TestSchemaImport, RunEndEncoded) { FillPrimitive(AddChild(), "s", "run_ends"); FillPrimitive(AddChild(), "I", "values"); FillRunEndEncoded("+r"); CheckImport(run_end_encoded(int16(), uint32())); } -#endif TEST_F(TestSchemaImport, Dictionary) { FillPrimitive(AddChild(), "u"); @@ -3175,14 +3152,14 @@ TEST_F(TestArrayImport, Struct) { CheckImport(expected); } -#ifdef ARROW_COMPUTE TEST_F(TestArrayImport, RunEndEncoded) { FillPrimitive(AddChild(), 5, 0, 0, run_ends_buffers5); FillPrimitive(AddChild(), 5, 0, 0, primitive_buffers_no_nulls5); FillRunEndEncoded(9, 0); - ASSERT_OK_AND_ASSIGN(auto expected, - REEFromJSON(run_end_encoded(int16(), float32()), - "[0.0, 1.5, -2.0, -2.0, 3.0, 3.0, 3.0, 4.0, 4.0]")); + ASSERT_OK_AND_ASSIGN( + auto expected, + json::ArrayFromJSONString(run_end_encoded(int16(), float32()), + "[0.0, 1.5, -2.0, -2.0, 3.0, 3.0, 3.0, 4.0, 4.0]")); ASSERT_OK(expected->ValidateFull()); CheckImport(expected); } @@ -3193,32 +3170,35 @@ TEST_F(TestArrayImport, RunEndEncodedWithOffset) { FillPrimitive(AddChild(), 3, 0, 2, run_ends_buffers5); FillPrimitive(AddChild(), 3, 0, 2, primitive_buffers_no_nulls5); FillRunEndEncoded(7, 0); - ASSERT_OK_AND_ASSIGN(auto expected, - REEFromJSON(ree_type, "[-2.0, -2.0, -2.0, -2.0, 3.0, 3.0, 3.0]")); + ASSERT_OK_AND_ASSIGN( + auto expected, + json::ArrayFromJSONString(ree_type, "[-2.0, -2.0, -2.0, -2.0, 3.0, 3.0, 3.0]")); CheckImport(expected); // Offset in parent FillPrimitive(AddChild(), 5, 0, 0, run_ends_buffers5); FillPrimitive(AddChild(), 5, 0, 0, primitive_buffers_no_nulls5); FillRunEndEncoded(5, 2); - ASSERT_OK_AND_ASSIGN(expected, REEFromJSON(ree_type, "[-2.0, -2.0, 3.0, 3.0, 3.0]")); + ASSERT_OK_AND_ASSIGN( + expected, json::ArrayFromJSONString(ree_type, "[-2.0, -2.0, 3.0, 3.0, 3.0]")); CheckImport(expected); // Length in parent that cuts last run FillPrimitive(AddChild(), 5, 0, 0, run_ends_buffers5); FillPrimitive(AddChild(), 5, 0, 0, primitive_buffers_no_nulls5); FillRunEndEncoded(4, 2); - ASSERT_OK_AND_ASSIGN(expected, REEFromJSON(ree_type, "[-2.0, -2.0, 3.0, 3.0]")); + ASSERT_OK_AND_ASSIGN(expected, + json::ArrayFromJSONString(ree_type, "[-2.0, -2.0, 3.0, 3.0]")); CheckImport(expected); // Offset in both children and parent FillPrimitive(AddChild(), 3, 0, 2, run_ends_buffers5); FillPrimitive(AddChild(), 3, 0, 2, primitive_buffers_no_nulls5); FillRunEndEncoded(4, 2); - ASSERT_OK_AND_ASSIGN(expected, REEFromJSON(ree_type, "[-2.0, -2.0, 3.0, 3.0]")); + ASSERT_OK_AND_ASSIGN(expected, + json::ArrayFromJSONString(ree_type, "[-2.0, -2.0, 3.0, 3.0]")); CheckImport(expected); } -#endif TEST_F(TestArrayImport, SparseUnion) { auto type = sparse_union({field("strs", utf8()), field("ints", int8())}, {43, 42}); @@ -3749,12 +3729,10 @@ TEST_F(TestSchemaRoundtrip, Union) { TestWithTypeFactory([&]() { return dense_union({f1, f2}, type_codes); }); } -#ifdef ARROW_COMPUTE TEST_F(TestSchemaRoundtrip, RunEndEncoded) { TestWithTypeFactory([]() { return run_end_encoded(int16(), float32()); }); TestWithTypeFactory([]() { return run_end_encoded(int32(), list(float32())); }); } -#endif TEST_F(TestSchemaRoundtrip, Dictionary) { for (auto index_ty : all_dictionary_index_types()) { @@ -4104,13 +4082,12 @@ TEST_F(TestArrayRoundtrip, Union) { } } -#ifdef ARROW_COMPUTE TEST_F(TestArrayRoundtrip, RunEndEncoded) { { auto factory = []() -> Result> { - ARROW_ASSIGN_OR_RAISE(auto ree_array, - REEFromJSON(run_end_encoded(int32(), int8()), - "[1, 2, 2, 3, null, null, null, 4]")); + ARROW_ASSIGN_OR_RAISE( + auto ree_array, json::ArrayFromJSONString(run_end_encoded(int32(), int8()), + "[1, 2, 2, 3, null, null, null, 4]")); return ree_array->Slice(1, 5); }; TestWithArrayFactory(factory); @@ -4130,7 +4107,6 @@ TEST_F(TestArrayRoundtrip, RunEndEncoded) { TestWithArrayFactory(factory); } } -#endif TEST_F(TestArrayRoundtrip, Dictionary) { { diff --git a/cpp/src/arrow/compute/kernel.cc b/cpp/src/arrow/compute/kernel.cc index addbb29edd2..1d64fe7b275 100644 --- a/cpp/src/arrow/compute/kernel.cc +++ b/cpp/src/arrow/compute/kernel.cc @@ -32,6 +32,7 @@ #include "arrow/util/hash_util.h" #include "arrow/util/logging_internal.h" #include "arrow/util/macros.h" +#include "arrow/util/ree_util.h" namespace arrow { @@ -276,6 +277,27 @@ std::shared_ptr FixedSizeBinaryLike() { return std::make_shared(); } +class REEValueMatcher : public TypeMatcher { + public: + REEValueMatcher() {} + + bool Matches(const DataType& type) const override { + return ree_util::internal::IsValueTypeSupported(type); + } + + bool Equals(const TypeMatcher& other) const override { + if (this == &other) { + return true; + } + auto casted = dynamic_cast(&other); + return casted != nullptr; + } + + std::string ToString() const override { return "ree-value"; } +}; + +std::shared_ptr REEValue() { return std::make_shared(); } + class RunEndIntegerMatcher : public TypeMatcher { public: ~RunEndIntegerMatcher() override = default; diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index 0d4f9d6ff43..5a00b5de9b5 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -144,6 +144,10 @@ ARROW_EXPORT std::shared_ptr LargeBinaryLike(); // Match any fixed binary type ARROW_EXPORT std::shared_ptr FixedSizeBinaryLike(); +/// \brief Match any type supported as a value type for run-end encoding +/// (any non-nested type except Null) +ARROW_EXPORT std::shared_ptr REEValue(); + // \brief Match any primitive type (boolean or any type representable as a C // Type) ARROW_EXPORT std::shared_ptr Primitive(); diff --git a/cpp/src/arrow/compute/kernels/ree_util_internal.cc b/cpp/src/arrow/compute/kernels/ree_util_internal.cc deleted file mode 100644 index 4e425079d3d..00000000000 --- a/cpp/src/arrow/compute/kernels/ree_util_internal.cc +++ /dev/null @@ -1,142 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include - -#include "arrow/compute/kernels/ree_util_internal.h" - -#include "arrow/buffer.h" -#include "arrow/memory_pool.h" -#include "arrow/result.h" -#include "arrow/type.h" -#include "arrow/type_traits.h" -#include "arrow/util/logging_internal.h" - -namespace arrow { -namespace compute { -namespace internal { -namespace ree_util { - -Result> AllocateValuesBuffer(int64_t length, const DataType& type, - MemoryPool* pool, - int64_t data_buffer_size) { - if (type.bit_width() == 1) { - // Make sure the bitmap is initialized (avoids Valgrind errors). - return AllocateEmptyBitmap(length, pool); - } else if (is_fixed_width(type.id())) { - return AllocateBuffer(length * type.byte_width(), pool); - } else { - DCHECK(is_base_binary_like(type.id())); - return AllocateBuffer(data_buffer_size, pool); - } -} - -Result> PreallocateRunEndsArray( - const std::shared_ptr& run_end_type, int64_t physical_length, - MemoryPool* pool) { - DCHECK(is_run_end_type(run_end_type->id())); - ARROW_ASSIGN_OR_RAISE( - auto run_ends_buffer, - AllocateBuffer(physical_length * run_end_type->byte_width(), pool)); - return ArrayData::Make(run_end_type, physical_length, - {NULLPTR, std::move(run_ends_buffer)}, /*null_count=*/0); -} - -Result> PreallocateValuesArray( - const std::shared_ptr& value_type, bool has_validity_buffer, int64_t length, - MemoryPool* pool, int64_t data_buffer_size) { - std::vector> values_data_buffers; - std::shared_ptr validity_buffer = NULLPTR; - if (has_validity_buffer) { - ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateEmptyBitmap(length, pool)); - } - ARROW_ASSIGN_OR_RAISE(auto values_buffer, AllocateValuesBuffer(length, *value_type, - pool, data_buffer_size)); - if (is_base_binary_like(value_type->id())) { - const int offset_byte_width = offset_bit_width(value_type->id()) / 8; - ARROW_ASSIGN_OR_RAISE(auto offsets_buffer, - AllocateBuffer((length + 1) * offset_byte_width, pool)); - // Ensure the first offset is zero - memset(offsets_buffer->mutable_data(), 0, offset_byte_width); - offsets_buffer->ZeroPadding(); - values_data_buffers = {std::move(validity_buffer), std::move(offsets_buffer), - std::move(values_buffer)}; - } else { - values_data_buffers = {std::move(validity_buffer), std::move(values_buffer)}; - } - auto data = ArrayData::Make(value_type, length, std::move(values_data_buffers), - kUnknownNullCount); - DCHECK(!(has_validity_buffer && length > 0) || data->buffers[0]); - return data; -} - -Result> PreallocateREEArray( - std::shared_ptr ree_type, bool has_validity_buffer, - int64_t logical_length, int64_t physical_length, MemoryPool* pool, - int64_t data_buffer_size) { - ARROW_ASSIGN_OR_RAISE( - auto run_ends_data, - PreallocateRunEndsArray(ree_type->run_end_type(), physical_length, pool)); - ARROW_ASSIGN_OR_RAISE(auto values_data, PreallocateValuesArray( - ree_type->value_type(), has_validity_buffer, - physical_length, pool, data_buffer_size)); - - return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR}, - {std::move(run_ends_data), std::move(values_data)}, - /*null_count=*/0); -} - -void WriteSingleRunEnd(ArrayData* run_ends_data, int64_t run_end) { - DCHECK_GT(run_end, 0); - DCHECK(is_run_end_type(run_ends_data->type->id())); - auto* output_run_ends = run_ends_data->template GetMutableValues(1); - switch (run_ends_data->type->id()) { - case Type::INT16: - *reinterpret_cast(output_run_ends) = static_cast(run_end); - break; - case Type::INT32: - *reinterpret_cast(output_run_ends) = static_cast(run_end); - break; - default: - DCHECK_EQ(run_ends_data->type->id(), Type::INT64); - *reinterpret_cast(output_run_ends) = static_cast(run_end); - break; - } -} - -Result> MakeNullREEArray( - const std::shared_ptr& run_end_type, int64_t logical_length, - MemoryPool* pool) { - auto ree_type = std::make_shared(run_end_type, null()); - const int64_t physical_length = logical_length > 0 ? 1 : 0; - ARROW_ASSIGN_OR_RAISE(auto run_ends_data, - PreallocateRunEndsArray(run_end_type, physical_length, pool)); - if (logical_length > 0) { - WriteSingleRunEnd(run_ends_data.get(), logical_length); - } - auto values_data = ArrayData::Make(null(), physical_length, {NULLPTR}, - /*null_count=*/physical_length); - return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR}, - {std::move(run_ends_data), std::move(values_data)}, - /*null_count=*/0); -} - -} // namespace ree_util -} // namespace internal -} // namespace compute -} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/ree_util_internal.h b/cpp/src/arrow/compute/kernels/ree_util_internal.h deleted file mode 100644 index 3e2bf8af87e..00000000000 --- a/cpp/src/arrow/compute/kernels/ree_util_internal.h +++ /dev/null @@ -1,388 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -// Useful operations to implement kernels handling run-end encoded arrays - -#include -#include -#include -#include - -#include "arrow/array/data.h" -#include "arrow/compute/exec.h" -#include "arrow/compute/kernel.h" -#include "arrow/compute/visibility.h" -#include "arrow/result.h" -#include "arrow/status.h" -#include "arrow/type_traits.h" -#include "arrow/util/bit_util.h" -#include "arrow/util/logging.h" - -namespace arrow { -namespace compute { -namespace internal { -namespace ree_util { - -template -struct ReadWriteValue {}; - -// Numeric and primitive C-compatible types -template -class ReadWriteValue> { - public: - using ValueRepr = typename ArrowType::c_type; - - private: - const uint8_t* input_validity_; - const uint8_t* input_values_; - - // Needed only by the writing functions - uint8_t* output_validity_; - uint8_t* output_values_; - - public: - explicit ReadWriteValue(const ArraySpan& input_values_array, - ArrayData* output_values_array_data) - : input_validity_(in_has_validity_buffer ? input_values_array.buffers[0].data - : NULLPTR), - input_values_(input_values_array.buffers[1].data), - output_validity_((out_has_validity_buffer && output_values_array_data) - ? output_values_array_data->buffers[0]->mutable_data() - : NULLPTR), - output_values_(output_values_array_data - ? output_values_array_data->buffers[1]->mutable_data() - : NULLPTR) {} - - [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const { - bool valid = true; - if constexpr (in_has_validity_buffer) { - valid = bit_util::GetBit(input_validity_, read_offset); - } - if constexpr (std::is_same_v) { - *out = bit_util::GetBit(input_values_, read_offset); - } else { - *out = (reinterpret_cast(input_values_))[read_offset]; - } - return valid; - } - - /// Pre-conditions guaranteed by the callers: - /// - i and j are valid indices into the values buffer - /// - the values in i and j are valid - bool CompareValuesAt(int64_t i, int64_t j) const { - if constexpr (std::is_same_v) { - return bit_util::GetBit(input_values_, i) == bit_util::GetBit(input_values_, j); - } else { - return (reinterpret_cast(input_values_))[i] == - (reinterpret_cast(input_values_))[j]; - } - } - - /// \brief Ensure padding is zeroed in validity bitmap. - void ZeroValidityPadding(int64_t length) const { - ARROW_DCHECK(output_values_); - if constexpr (out_has_validity_buffer) { - ARROW_DCHECK(output_validity_); - const int64_t validity_buffer_size = bit_util::BytesForBits(length); - output_validity_[validity_buffer_size - 1] = 0; - } - } - - void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const { - if constexpr (out_has_validity_buffer) { - bit_util::SetBitTo(output_validity_, write_offset, valid); - } - if (valid) { - if constexpr (std::is_same_v) { - bit_util::SetBitTo(output_values_, write_offset, value); - } else { - (reinterpret_cast(output_values_))[write_offset] = value; - } - } - } - - void WriteRun(int64_t write_offset, int64_t run_length, bool valid, - ValueRepr value) const { - if constexpr (out_has_validity_buffer) { - bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid); - } - if (valid) { - if constexpr (std::is_same_v) { - bit_util::SetBitsTo(reinterpret_cast(output_values_), write_offset, - run_length, value); - } else { - auto* output_values_c = reinterpret_cast(output_values_); - std::fill(output_values_c + write_offset, - output_values_c + write_offset + run_length, value); - } - } - } - - bool Compare(ValueRepr lhs, ValueRepr rhs) const { return lhs == rhs; } -}; - -// FixedSizeBinary, Decimal128 -template -class ReadWriteValue> { - public: - // Every value is represented as a pointer to byte_width_ bytes - using ValueRepr = const uint8_t*; - - private: - const uint8_t* input_validity_; - const uint8_t* input_values_; - - // Needed only by the writing functions - uint8_t* output_validity_; - uint8_t* output_values_; - - const size_t byte_width_; - - public: - ReadWriteValue(const ArraySpan& input_values_array, ArrayData* output_values_array_data) - : input_validity_(in_has_validity_buffer ? input_values_array.buffers[0].data - : NULLPTR), - input_values_(input_values_array.buffers[1].data), - output_validity_((out_has_validity_buffer && output_values_array_data) - ? output_values_array_data->buffers[0]->mutable_data() - : NULLPTR), - output_values_(output_values_array_data - ? output_values_array_data->buffers[1]->mutable_data() - : NULLPTR), - byte_width_(input_values_array.type->byte_width()) {} - - [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const { - bool valid = true; - if constexpr (in_has_validity_buffer) { - valid = bit_util::GetBit(input_validity_, read_offset); - } - *out = input_values_ + (read_offset * byte_width_); - return valid; - } - - bool CompareValuesAt(int64_t i, int64_t j) const { - return 0 == memcmp(input_values_ + (i * byte_width_), - input_values_ + (j * byte_width_), byte_width_); - } - - /// \brief Ensure padding is zeroed in validity bitmap. - void ZeroValidityPadding(int64_t length) const { - ARROW_DCHECK(output_values_); - if constexpr (out_has_validity_buffer) { - ARROW_DCHECK(output_validity_); - const int64_t validity_buffer_size = bit_util::BytesForBits(length); - output_validity_[validity_buffer_size - 1] = 0; - } - } - - void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const { - if constexpr (out_has_validity_buffer) { - bit_util::SetBitTo(output_validity_, write_offset, valid); - } - if (valid) { - memcpy(output_values_ + (write_offset * byte_width_), value, byte_width_); - } - } - - void WriteRun(int64_t write_offset, int64_t run_length, bool valid, - ValueRepr value) const { - if constexpr (out_has_validity_buffer) { - bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid); - } - if (valid) { - uint8_t* ptr = output_values_ + (write_offset * byte_width_); - for (int64_t i = 0; i < run_length; ++i) { - memcpy(ptr, value, byte_width_); - ptr += byte_width_; - } - } - } - - bool Compare(ValueRepr lhs, ValueRepr rhs) const { - return memcmp(lhs, rhs, byte_width_) == 0; - } -}; - -// Binary, String... -template -class ReadWriteValue> { - public: - using ValueRepr = std::string_view; - using offset_type = typename ArrowType::offset_type; - - private: - const uint8_t* input_validity_; - const offset_type* input_offsets_; - const uint8_t* input_values_; - - // Needed only by the writing functions - uint8_t* output_validity_; - offset_type* output_offsets_; - uint8_t* output_values_; - - public: - ReadWriteValue(const ArraySpan& input_values_array, ArrayData* output_values_array_data) - : input_validity_(in_has_validity_buffer ? input_values_array.buffers[0].data - : NULLPTR), - input_offsets_(input_values_array.template GetValues(1, 0)), - input_values_(input_values_array.buffers[2].data), - output_validity_((out_has_validity_buffer && output_values_array_data) - ? output_values_array_data->buffers[0]->mutable_data() - : NULLPTR), - output_offsets_( - output_values_array_data - ? output_values_array_data->template GetMutableValues(1, 0) - : NULLPTR), - output_values_(output_values_array_data - ? output_values_array_data->buffers[2]->mutable_data() - : NULLPTR) {} - - [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const { - bool valid = true; - if constexpr (in_has_validity_buffer) { - valid = bit_util::GetBit(input_validity_, read_offset); - } - if (valid) { - const offset_type offset0 = input_offsets_[read_offset]; - const offset_type offset1 = input_offsets_[read_offset + 1]; - *out = std::string_view(reinterpret_cast(input_values_ + offset0), - offset1 - offset0); - } - return valid; - } - - bool CompareValuesAt(int64_t i, int64_t j) const { - const offset_type len_i = input_offsets_[i + 1] - input_offsets_[i]; - const offset_type len_j = input_offsets_[j + 1] - input_offsets_[j]; - return len_i == len_j && - memcmp(input_values_ + input_offsets_[i], input_values_ + input_offsets_[j], - static_cast(len_i)); - } - - /// \brief Ensure padding is zeroed in validity bitmap. - void ZeroValidityPadding(int64_t length) const { - ARROW_DCHECK(output_values_); - if constexpr (out_has_validity_buffer) { - ARROW_DCHECK(output_validity_); - const int64_t validity_buffer_size = bit_util::BytesForBits(length); - output_validity_[validity_buffer_size - 1] = 0; - } - } - - void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const { - if constexpr (out_has_validity_buffer) { - bit_util::SetBitTo(output_validity_, write_offset, valid); - } - const offset_type offset0 = output_offsets_[write_offset]; - const offset_type offset1 = - offset0 + (valid ? static_cast(value.size()) : 0); - output_offsets_[write_offset + 1] = offset1; - if (valid) { - memcpy(output_values_ + offset0, value.data(), value.size()); - } - } - - void WriteRun(int64_t write_offset, int64_t run_length, bool valid, - ValueRepr value) const { - if constexpr (out_has_validity_buffer) { - bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid); - } - if (valid) { - int64_t i = write_offset; - offset_type offset = output_offsets_[i]; - while (i < write_offset + run_length) { - memcpy(output_values_ + offset, value.data(), value.size()); - offset += static_cast(value.size()); - i += 1; - output_offsets_[i] = offset; - } - } else { - offset_type offset = output_offsets_[write_offset]; - offset_type* begin = output_offsets_ + write_offset + 1; - std::fill(begin, begin + run_length, offset); - } - } - - bool Compare(ValueRepr lhs, ValueRepr rhs) const { return lhs == rhs; } -}; - -Result> AllocateValuesBuffer(int64_t length, const DataType& type, - MemoryPool* pool, - int64_t data_buffer_size); - -Result> PreallocateRunEndsArray( - const std::shared_ptr& run_end_type, int64_t physical_length, - MemoryPool* pool); - -/// \brief Preallocate the physical values array for a run-end encoded array -/// -/// data_buffer_size is passed here pre-calculated so this function doesn't have -/// to be template-specialized for each type. -/// -/// The null_count is left as kUnknownNullCount (or 0 if length is 0) and, if -/// after writing the values, the caller knows the null count, it can be set. -/// -/// \post if has_validity_buffer and length > 0, then data.buffer[0] != NULLPTR -/// -/// \param has_validity_buffer a validity buffer must be allocated -/// \param length the length of the values array -/// \param data_buffer_size the size of the data buffer for string and binary types -ARROW_COMPUTE_EXPORT Result> PreallocateValuesArray( - const std::shared_ptr& value_type, bool has_validity_buffer, int64_t length, - MemoryPool* pool, int64_t data_buffer_size); - -/// \brief Preallocate the ArrayData for the run-end encoded version -/// of the flat input array -/// -/// The top-level null_count is set to 0 (REEs keep all the data in child -/// arrays). The null_count of the values array (child_data[1]) is left as -/// kUnknownNullCount (or 0 if physical_length is 0) and, if after writing -/// the values, the caller knows the null count, it can be set. -/// -/// \post if has_validity_buffer and physical_length > 0, then -/// data.child_data[1].buffer[0] != NULLPTR -/// -/// \param data_buffer_size the size of the data buffer for string and binary types -ARROW_COMPUTE_EXPORT Result> PreallocateREEArray( - std::shared_ptr ree_type, bool has_validity_buffer, - int64_t logical_length, int64_t physical_length, MemoryPool* pool, - int64_t data_buffer_size); - -/// \brief Writes a single run-end to the first slot of the pre-allocated -/// run-end encoded array in out -/// -/// Pre-conditions: -/// - run_ends_data is of a valid run-ends type -/// - run_ends_data has at least one slot -/// - run_end > 0 -/// - run_ends fits in the run-end type without overflow -void WriteSingleRunEnd(ArrayData* run_ends_data, int64_t run_end); - -ARROW_COMPUTE_EXPORT Result> MakeNullREEArray( - const std::shared_ptr& run_end_type, int64_t logical_length, - MemoryPool* pool); - -} // namespace ree_util -} // namespace internal -} // namespace compute -} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc b/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc index bc8b25de4ec..bdee0248bb9 100644 --- a/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc +++ b/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc @@ -20,12 +20,12 @@ #include "arrow/compute/api_vector.h" #include "arrow/compute/kernel.h" #include "arrow/compute/kernels/common_internal.h" -#include "arrow/compute/kernels/ree_util_internal.h" #include "arrow/compute/registry_internal.h" #include "arrow/type_traits.h" #include "arrow/util/checked_cast.h" #include "arrow/util/logging_internal.h" #include "arrow/util/ree_util.h" +#include "arrow/util/ree_util_internal.h" namespace arrow { namespace compute { @@ -41,214 +41,27 @@ struct RunEndEncodingState : public KernelState { std::shared_ptr run_end_type; }; -template -class RunEndEncodingLoop { - public: - using RunEndCType = typename RunEndType::c_type; - - private: - using ReadWriteValue = ree_util::ReadWriteValue; - using ValueRepr = typename ReadWriteValue::ValueRepr; - - private: - const int64_t input_length_; - const int64_t input_offset_; - ReadWriteValue read_write_value_; - // Needed only by WriteEncodedRuns() - RunEndCType* output_run_ends_; - - public: - explicit RunEndEncodingLoop(const ArraySpan& input_array, - ArrayData* output_values_array_data, - RunEndCType* output_run_ends) - : input_length_(input_array.length), - input_offset_(input_array.offset), - read_write_value_(input_array, output_values_array_data), - output_run_ends_(output_run_ends) { - DCHECK_GT(input_array.length, 0); - } - - /// \brief Give a pass over the input data and count the number of runs - /// - /// \return a tuple with the number of non-null run values, the total number of runs, - /// and the data buffer size for string and binary types - ARROW_NOINLINE std::tuple CountNumberOfRuns() const { - int64_t read_offset = input_offset_; - ValueRepr current_run; - bool current_run_valid = read_write_value_.ReadValue(¤t_run, read_offset); - read_offset += 1; - int64_t num_valid_runs = current_run_valid ? 1 : 0; - int64_t num_output_runs = 1; - int64_t data_buffer_size = 0; - if constexpr (is_base_binary_like(ValueType::type_id)) { - data_buffer_size = current_run_valid ? current_run.size() : 0; - } - for (; read_offset < input_offset_ + input_length_; read_offset += 1) { - ValueRepr value; - const bool valid = read_write_value_.ReadValue(&value, read_offset); - - const bool open_new_run = - valid != current_run_valid || !read_write_value_.Compare(value, current_run); - if (open_new_run) { - // Open the new run - current_run = value; - current_run_valid = valid; - // Count the new run - num_output_runs += 1; - num_valid_runs += valid ? 1 : 0; - if constexpr (is_base_binary_like(ValueType::type_id)) { - data_buffer_size += valid ? current_run.size() : 0; - } - } - } - return std::make_tuple(num_valid_runs, num_output_runs, data_buffer_size); - } - - ARROW_NOINLINE int64_t WriteEncodedRuns() { - DCHECK(output_run_ends_); - int64_t read_offset = input_offset_; - int64_t write_offset = 0; - ValueRepr current_run; - bool current_run_valid = read_write_value_.ReadValue(¤t_run, read_offset); - read_offset += 1; - for (; read_offset < input_offset_ + input_length_; read_offset += 1) { - ValueRepr value; - const bool valid = read_write_value_.ReadValue(&value, read_offset); - - const bool open_new_run = - valid != current_run_valid || !read_write_value_.Compare(value, current_run); - if (open_new_run) { - // Close the current run first by writing it out - read_write_value_.WriteValue(write_offset, current_run_valid, current_run); - const int64_t run_end = read_offset - input_offset_; - output_run_ends_[write_offset] = static_cast(run_end); - write_offset += 1; - // Open the new run - current_run_valid = valid; - current_run = value; - } - } - read_write_value_.WriteValue(write_offset, current_run_valid, current_run); - DCHECK_EQ(input_length_, read_offset - input_offset_); - output_run_ends_[write_offset] = static_cast(input_length_); - return write_offset + 1; - } -}; - -ARROW_NOINLINE Status ValidateRunEndType(const std::shared_ptr& run_end_type, - int64_t input_length) { - int64_t run_end_max = std::numeric_limits::max(); - switch (run_end_type->id()) { - case Type::INT16: - run_end_max = std::numeric_limits::max(); - break; - case Type::INT32: - run_end_max = std::numeric_limits::max(); - break; - default: - DCHECK_EQ(run_end_type->id(), Type::INT64); - break; - } - if (input_length < 0 || input_length > run_end_max) { - return Status::Invalid( - "Cannot run-end encode Arrays with more elements than the " - "run end type can hold: ", - run_end_max); - } - return Status::OK(); -} - -template -class RunEndEncodeImpl { - private: - KernelContext* ctx_; - const ArraySpan& input_array_; - ExecResult* output_; - - public: - using RunEndCType = typename RunEndType::c_type; - - RunEndEncodeImpl(KernelContext* ctx, const ArraySpan& input_array, ExecResult* out) - : ctx_{ctx}, input_array_{input_array}, output_{out} {} - - Status Exec() { - const int64_t input_length = input_array_.length; - - auto run_end_type = TypeTraits::type_singleton(); - auto ree_type = std::make_shared( - run_end_type, input_array_.type->GetSharedPtr()); - if (input_length == 0) { - ARROW_ASSIGN_OR_RAISE( - auto output_array_data, - ree_util::PreallocateREEArray(std::move(ree_type), has_validity_buffer, - /*logical_length=*/input_length, - /*physical_length=*/0, ctx_->memory_pool(), - /*data_buffer_size=*/0)); - output_->value = std::move(output_array_data); - return Status::OK(); - } - - // First pass: count the number of runs - int64_t num_valid_runs = 0; - int64_t num_output_runs = 0; - int64_t data_buffer_size = 0; // for string and binary types - RETURN_NOT_OK(ValidateRunEndType(run_end_type, input_length)); - - RunEndEncodingLoop counting_loop( - input_array_, - /*output_values_array_data=*/NULLPTR, - /*output_run_ends=*/NULLPTR); - std::tie(num_valid_runs, num_output_runs, data_buffer_size) = - counting_loop.CountNumberOfRuns(); - const auto physical_null_count = num_output_runs - num_valid_runs; - DCHECK(!has_validity_buffer || physical_null_count > 0) - << "has_validity_buffer is expected to imply physical_null_count > 0"; - - ARROW_ASSIGN_OR_RAISE( - auto output_array_data, - ree_util::PreallocateREEArray( - std::move(ree_type), has_validity_buffer, /*logical_length=*/input_length, - /*physical_length=*/num_output_runs, ctx_->memory_pool(), data_buffer_size)); - - // Initialize the output pointers - auto* output_run_ends = - output_array_data->child_data[0]->template GetMutableValues(1, 0); - auto* output_values_array_data = output_array_data->child_data[1].get(); - // Set the null_count on the physical array - output_values_array_data->null_count = physical_null_count; - - // Second pass: write the runs - RunEndEncodingLoop writing_loop( - input_array_, output_values_array_data, output_run_ends); - [[maybe_unused]] int64_t num_written_runs = writing_loop.WriteEncodedRuns(); - DCHECK_EQ(num_written_runs, num_output_runs); - - output_->value = std::move(output_array_data); - return Status::OK(); - } -}; - ARROW_NOINLINE Status RunEndEncodeNullArray(const std::shared_ptr& run_end_type, KernelContext* ctx, const ArraySpan& input_array, ExecResult* output) { const int64_t input_length = input_array.length; - DCHECK(input_array.type->id() == Type::NA); + ARROW_DCHECK(input_array.type->id() == Type::NA); if (input_length == 0) { ARROW_ASSIGN_OR_RAISE( auto output_array_data, - ree_util::MakeNullREEArray(run_end_type, 0, ctx->memory_pool())); + ree_util::internal::MakeNullREEArray(run_end_type, 0, ctx->memory_pool())); output->value = std::move(output_array_data); return Status::OK(); } // Abort if run-end type cannot hold the input length - RETURN_NOT_OK(ValidateRunEndType(run_end_type, input_array.length)); + RETURN_NOT_OK(ree_util::internal::ValidateRunEndType(run_end_type, input_array.length)); - ARROW_ASSIGN_OR_RAISE( - auto output_array_data, - ree_util::MakeNullREEArray(run_end_type, input_length, ctx->memory_pool())); + ARROW_ASSIGN_OR_RAISE(auto output_array_data, + ree_util::internal::MakeNullREEArray(run_end_type, input_length, + ctx->memory_pool())); output->value = std::move(output_array_data); return Status::OK(); @@ -257,19 +70,18 @@ ARROW_NOINLINE Status RunEndEncodeNullArray(const std::shared_ptr& run struct RunEndEncodeExec { template static Status DoExec(KernelContext* ctx, const ExecSpan& span, ExecResult* result) { - DCHECK(span.values[0].is_array()); + ARROW_DCHECK(span.values[0].is_array()); const auto& input_array = span.values[0].array; + auto run_end_type = TypeTraits::type_singleton(); + if constexpr (ValueType::type_id == Type::NA) { - return RunEndEncodeNullArray(TypeTraits::type_singleton(), ctx, - input_array, result); + return RunEndEncodeNullArray(run_end_type, ctx, input_array, result); } else { - const bool has_validity_buffer = input_array.GetNullCount() > 0; - if (has_validity_buffer) { - return RunEndEncodeImpl(ctx, input_array, result) - .Exec(); - } - return RunEndEncodeImpl(ctx, input_array, result) - .Exec(); + ARROW_ASSIGN_OR_RAISE( + auto encoded, + ree_util::RunEndEncodeArray(input_array, run_end_type, ctx->memory_pool())); + result->value = std::move(encoded); + return Status::OK(); } } @@ -312,7 +124,8 @@ class RunEndDecodingLoop { using RunEndCType = typename RunEndType::c_type; private: - using ReadWriteValue = ree_util::ReadWriteValue; + using ReadWriteValue = + arrow::ree_util::internal::ReadWriteValue; using ValueRepr = typename ReadWriteValue::ValueRepr; const ArraySpan& input_array_; @@ -376,7 +189,7 @@ class RunEndDecodingLoop { write_offset += run_length; output_valid_count += valid ? run_length : 0; } - DCHECK(write_offset == ree_array_span.length()); + ARROW_DCHECK(write_offset == ree_array_span.length()); return output_valid_count; } }; @@ -407,10 +220,10 @@ class RunEndDecodeImpl { } } - ARROW_ASSIGN_OR_RAISE( - auto output_array_data, - ree_util::PreallocateValuesArray(ree_type->value_type(), has_validity_buffer, - length, ctx_->memory_pool(), data_buffer_size)); + ARROW_ASSIGN_OR_RAISE(auto output_array_data, + arrow::ree_util::internal::PreallocateValuesArray( + ree_type->value_type(), has_validity_buffer, length, + ctx_->memory_pool(), data_buffer_size)); int64_t output_null_count = 0; if (length > 0) { @@ -438,7 +251,7 @@ Status RunEndDecodeNullREEArray(KernelContext* ctx, const ArraySpan& input_array struct RunEndDecodeExec { template static Status DoExec(KernelContext* ctx, const ExecSpan& span, ExecResult* result) { - DCHECK(span.values[0].is_array()); + ARROW_DCHECK(span.values[0].is_array()); auto& input_array = span.values[0].array; if constexpr (ValueType::type_id == Type::NA) { return RunEndDecodeNullREEArray(ctx, input_array, result); @@ -478,60 +291,6 @@ struct RunEndDecodeExec { } }; -template -static ArrayKernelExec GenerateREEKernelExec(Type::type type_id) { - switch (type_id) { - case Type::NA: - return Functor::template Exec; - case Type::BOOL: - return Functor::template Exec; - case Type::UINT8: - case Type::INT8: - return Functor::template Exec; - case Type::UINT16: - case Type::INT16: - case Type::HALF_FLOAT: - return Functor::template Exec; - case Type::UINT32: - case Type::INT32: - case Type::FLOAT: - case Type::DATE32: - case Type::TIME32: - case Type::INTERVAL_MONTHS: - case Type::DECIMAL32: - return Functor::template Exec; - case Type::UINT64: - case Type::INT64: - case Type::DOUBLE: - case Type::DATE64: - case Type::TIMESTAMP: - case Type::TIME64: - case Type::DURATION: - case Type::INTERVAL_DAY_TIME: - case Type::DECIMAL64: - return Functor::template Exec; - case Type::INTERVAL_MONTH_DAY_NANO: - return Functor::template Exec; - case Type::DECIMAL128: - return Functor::template Exec; - case Type::DECIMAL256: - return Functor::template Exec; - case Type::FIXED_SIZE_BINARY: - return Functor::template Exec; - case Type::STRING: - return Functor::template Exec; - case Type::BINARY: - return Functor::template Exec; - case Type::LARGE_STRING: - return Functor::template Exec; - case Type::LARGE_BINARY: - return Functor::template Exec; - default: - DCHECK(false); - return FailFunctor::Exec; - } -} - static const FunctionDoc run_end_encode_doc( "Run-end encode array", ("Return a run-end encoded version of the input array."), {"array"}, "RunEndEncodeOptions"); @@ -552,39 +311,41 @@ void RegisterVectorRunEndEncode(FunctionRegistry* registry) { // cannot be encoded as a single run in the output. This is a conscious trade-off as // trying to solve this corner-case would complicate the implementation, // require reallocations, and could create surprising behavior for users of this API. - auto add_kernel = [&function](Type::type type_id) { - auto sig = KernelSignature::Make({InputType(match::SameTypeId(type_id))}, - OutputType(RunEndEncodeExec::ResolveOutputType)); - auto exec = GenerateREEKernelExec(type_id); - VectorKernel kernel(sig, exec, RunEndEncodeInit); - // A REE has null_count=0, so no need to allocate a validity bitmap for them. - kernel.null_handling = NullHandling::OUTPUT_NOT_NULL; - DCHECK_OK(function->AddKernel(std::move(kernel))); + + // Runtime dispatcher for flexible REE value type matcher + auto runtime_exec = [](KernelContext* ctx, const ExecSpan& span, ExecResult* result) { + const auto& input_array = span.values[0].array; + const Type::type type_id = input_array.type->id(); + + // Dispatch to the appropriate exec based on input type + switch (type_id) { +#define DISPATCH_REE_ENCODE_CASE(TypeClass, TYPE_ENUM) \ + case Type::TYPE_ENUM: \ + return RunEndEncodeExec::Exec(ctx, span, result); + ARROW_REE_SUPPORTED_TYPES(DISPATCH_REE_ENCODE_CASE) +#undef DISPATCH_REE_ENCODE_CASE + case Type::NA: + return RunEndEncodeExec::Exec(ctx, span, result); + default: + return Status::NotImplemented("run_end_encode does not support type ", + input_array.type->ToString()); + } }; - add_kernel(Type::NA); - add_kernel(Type::BOOL); - for (const auto& ty : NumericTypes()) { - add_kernel(ty->id()); - } - add_kernel(Type::HALF_FLOAT); - add_kernel(Type::DATE32); - add_kernel(Type::DATE64); - add_kernel(Type::TIME32); - add_kernel(Type::TIME64); - add_kernel(Type::TIMESTAMP); - add_kernel(Type::DURATION); - for (const auto& ty : IntervalTypes()) { - add_kernel(ty->id()); - } - for (const auto& type_id : DecimalTypeIds()) { - add_kernel(type_id); - } - add_kernel(Type::FIXED_SIZE_BINARY); - add_kernel(Type::STRING); - add_kernel(Type::BINARY); - add_kernel(Type::LARGE_STRING); - add_kernel(Type::LARGE_BINARY); + // Use flexible matcher for REE value types (any non-nested type except Null) + auto sig = KernelSignature::Make({InputType(match::REEValue())}, + OutputType(RunEndEncodeExec::ResolveOutputType)); + VectorKernel kernel(sig, runtime_exec, RunEndEncodeInit); + // A REE has null_count=0, so no need to allocate a validity bitmap for them. + kernel.null_handling = NullHandling::OUTPUT_NOT_NULL; + DCHECK_OK(function->AddKernel(std::move(kernel))); + + // Also support Null type explicitly + sig = KernelSignature::Make({InputType(Type::NA)}, + OutputType(RunEndEncodeExec::ResolveOutputType)); + VectorKernel null_kernel(sig, runtime_exec, RunEndEncodeInit); + null_kernel.null_handling = NullHandling::OUTPUT_NOT_NULL; + DCHECK_OK(function->AddKernel(std::move(null_kernel))); DCHECK_OK(registry->AddFunction(std::move(function))); } @@ -593,41 +354,45 @@ void RegisterVectorRunEndDecode(FunctionRegistry* registry) { auto function = std::make_shared("run_end_decode", Arity::Unary(), run_end_decode_doc); - auto add_kernel = [&function](Type::type type_id) { - for (const auto& run_end_type_id : {Type::INT16, Type::INT32, Type::INT64}) { - auto exec = GenerateREEKernelExec(type_id); - auto input_type_matcher = match::RunEndEncoded(match::SameTypeId(run_end_type_id), - match::SameTypeId(type_id)); - auto sig = KernelSignature::Make({InputType(std::move(input_type_matcher))}, - OutputType(RunEndDecodeExec::ResolveOutputType)); - VectorKernel kernel(sig, exec); - DCHECK_OK(function->AddKernel(std::move(kernel))); + // Runtime dispatcher for flexible REE value type matcher + auto runtime_exec = [](KernelContext* ctx, const ExecSpan& span, ExecResult* result) { + const auto& input_array = span.values[0].array; + const auto& ree_type = checked_cast(*input_array.type); + const Type::type value_type_id = ree_type.value_type()->id(); + + // Dispatch to the appropriate exec based on value type + switch (value_type_id) { +#define DISPATCH_REE_DECODE_CASE(TypeClass, TYPE_ENUM) \ + case Type::TYPE_ENUM: \ + return RunEndDecodeExec::Exec(ctx, span, result); + ARROW_REE_SUPPORTED_TYPES(DISPATCH_REE_DECODE_CASE) +#undef DISPATCH_REE_DECODE_CASE + case Type::NA: + return RunEndDecodeExec::Exec(ctx, span, result); + default: + return Status::NotImplemented("run_end_decode does not support value type ", + ree_type.value_type()->ToString()); } }; - add_kernel(Type::NA); - add_kernel(Type::BOOL); - for (const auto& ty : NumericTypes()) { - add_kernel(ty->id()); - } - add_kernel(Type::HALF_FLOAT); - add_kernel(Type::DATE32); - add_kernel(Type::DATE64); - add_kernel(Type::TIME32); - add_kernel(Type::TIME64); - add_kernel(Type::TIMESTAMP); - add_kernel(Type::DURATION); - for (const auto& ty : IntervalTypes()) { - add_kernel(ty->id()); - } - for (const auto& type_id : DecimalTypeIds()) { - add_kernel(type_id); + for (const auto& run_end_type_id : {Type::INT16, Type::INT32, Type::INT64}) { + // Use flexible matcher for REE value types (any non-nested type except Null) + auto input_type_matcher = + match::RunEndEncoded(match::SameTypeId(run_end_type_id), match::REEValue()); + auto sig = KernelSignature::Make({InputType(std::move(input_type_matcher))}, + OutputType(RunEndDecodeExec::ResolveOutputType)); + VectorKernel kernel(sig, runtime_exec); + DCHECK_OK(function->AddKernel(std::move(kernel))); + + // Also register a kernel for null value type explicitly + auto null_input_matcher = match::RunEndEncoded(match::SameTypeId(run_end_type_id), + match::SameTypeId(Type::NA)); + auto null_sig = + KernelSignature::Make({InputType(std::move(null_input_matcher))}, + OutputType(RunEndDecodeExec::ResolveOutputType)); + VectorKernel null_kernel(null_sig, runtime_exec); + DCHECK_OK(function->AddKernel(std::move(null_kernel))); } - add_kernel(Type::FIXED_SIZE_BINARY); - add_kernel(Type::STRING); - add_kernel(Type::BINARY); - add_kernel(Type::LARGE_STRING); - add_kernel(Type::LARGE_BINARY); DCHECK_OK(registry->AddFunction(std::move(function))); } diff --git a/cpp/src/arrow/json/from_string.cc b/cpp/src/arrow/json/from_string.cc index e35a362f5a2..2147933bcdb 100644 --- a/cpp/src/arrow/json/from_string.cc +++ b/cpp/src/arrow/json/from_string.cc @@ -33,11 +33,13 @@ #include "arrow/chunked_array.h" #include "arrow/json/from_string.h" #include "arrow/scalar.h" +#include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/checked_cast.h" #include "arrow/util/decimal.h" #include "arrow/util/float16.h" #include "arrow/util/logging_internal.h" +#include "arrow/util/ree_util.h" #include "arrow/util/value_parsing.h" #include "arrow/json/rapidjson_defs.h" @@ -977,6 +979,18 @@ Status GetConverter(const std::shared_ptr& type, Result> ArrayFromJSONString(const std::shared_ptr& type, std::string_view json_string) { + // Parse REE types by decoding values then encoding + if (type->id() == Type::RUN_END_ENCODED) { + const auto& ree_type = checked_cast(*type); + ARROW_ASSIGN_OR_RAISE(auto values_array, + ArrayFromJSONString(ree_type.value_type(), json_string)); + ARROW_ASSIGN_OR_RAISE( + auto ree_data, + ree_util::RunEndEncodeArray(values_array->data(), ree_type.run_end_type(), + default_memory_pool())); + return MakeArray(ree_data); + } + std::shared_ptr converter; RETURN_NOT_OK(GetConverter(type, &converter)); diff --git a/cpp/src/arrow/json/from_string_test.cc b/cpp/src/arrow/json/from_string_test.cc index 654450462e3..6b9af61608c 100644 --- a/cpp/src/arrow/json/from_string_test.cc +++ b/cpp/src/arrow/json/from_string_test.cc @@ -30,6 +30,7 @@ #include #include "arrow/array.h" +#include "arrow/array/array_run_end.h" #include "arrow/array/builder_decimal.h" #include "arrow/array/builder_nested.h" #include "arrow/array/builder_primitive.h" @@ -1511,6 +1512,50 @@ TEST(TestDictArrayFromJSONString, Errors) { "[1]")); // dict value isn't string } +TEST(TestREEArrayFromJSONString, Basics) { + // Test basic REE array creation from JSON + auto ree_type = run_end_encoded(int32(), int8()); + ASSERT_OK_AND_ASSIGN( + auto ree_array, ArrayFromJSONString(ree_type, "[1, 2, 2, 3, null, null, null, 4]")); + ASSERT_OK(ree_array->ValidateFull()); + ASSERT_EQ(ree_array->type()->id(), Type::RUN_END_ENCODED); + ASSERT_EQ(ree_array->length(), 8); + + // Verify the encoding + const auto& ree_data = checked_cast(*ree_array); + ASSERT_EQ(ree_data.values()->length(), 5); // 5 distinct runs +} + +TEST(TestREEArrayFromJSONString, DifferentRunEndTypes) { + // Test with int16 run ends + auto ree_type16 = run_end_encoded(int16(), utf8()); + ASSERT_OK_AND_ASSIGN( + auto array16, ArrayFromJSONString(ree_type16, R"(["a", "a", "b", "c", "c", "c"])")); + ASSERT_OK(array16->ValidateFull()); + ASSERT_EQ(array16->length(), 6); + + // Test with int64 run ends + auto ree_type64 = run_end_encoded(int64(), float64()); + ASSERT_OK_AND_ASSIGN(auto array64, + ArrayFromJSONString(ree_type64, "[1.5, 1.5, 2.5, null, null]")); + ASSERT_OK(array64->ValidateFull()); + ASSERT_EQ(array64->length(), 5); +} + +TEST(TestREEArrayFromJSONString, EmptyAndSingleValue) { + auto ree_type = run_end_encoded(int32(), int32()); + + // Empty array + ASSERT_OK_AND_ASSIGN(auto empty_array, ArrayFromJSONString(ree_type, "[]")); + ASSERT_OK(empty_array->ValidateFull()); + ASSERT_EQ(empty_array->length(), 0); + + // Single value + ASSERT_OK_AND_ASSIGN(auto single_array, ArrayFromJSONString(ree_type, "[42]")); + ASSERT_OK(single_array->ValidateFull()); + ASSERT_EQ(single_array->length(), 1); +} + TEST(TestChunkedArrayFromJSONString, Basics) { auto type = int32(); ASSERT_OK_AND_ASSIGN(auto chunked_array, ChunkedArrayFromJSONString(type, {})); diff --git a/cpp/src/arrow/util/ree_util.cc b/cpp/src/arrow/util/ree_util.cc index 461d6804b8c..0030dae23fb 100644 --- a/cpp/src/arrow/util/ree_util.cc +++ b/cpp/src/arrow/util/ree_util.cc @@ -15,14 +15,21 @@ // specific language governing permissions and limitations // under the License. -#include -#include - -#include "arrow/builder.h" -#include "arrow/util/bit_util.h" -#include "arrow/util/logging_internal.h" #include "arrow/util/ree_util.h" +#include +#include +#include +#include + +#include "arrow/buffer.h" +#include "arrow/memory_pool.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/type_traits.h" +#include "arrow/util/logging.h" + namespace arrow { namespace ree_util { @@ -47,122 +54,6 @@ int64_t LogicalNullCount(const ArraySpan& span) { return null_count; } -} // namespace - -int64_t LogicalNullCount(const ArraySpan& span) { - const auto type_id = RunEndsArray(span).type->id(); - if (type_id == Type::INT16) { - return LogicalNullCount(span); - } - if (type_id == Type::INT32) { - return LogicalNullCount(span); - } - DCHECK_EQ(type_id, Type::INT64); - return LogicalNullCount(span); -} - -namespace internal { - -/// \pre 0 <= i < array_span.length() -template -int64_t FindPhysicalIndexImpl(PhysicalIndexFinder& self, int64_t i) { - DCHECK_LT(i, self.array_span.length); - const int64_t run_ends_size = ree_util::RunEndsArray(self.array_span).length; - DCHECK_LT(self.last_physical_index, run_ends_size); - // This access to self.run_ends[last_physical_index] is always safe because: - // 1. 0 <= i < array_span.length() implies there is at least one run and the initial - // value 0 will be safe to index with. - // 2. last_physical_index > 0 is always the result of a valid call to - // internal::FindPhysicalIndex. - if (ARROW_PREDICT_TRUE(self.array_span.offset + i < - self.run_ends[self.last_physical_index])) { - // The cached value is an upper-bound, but is it the least upper-bound? - if (self.last_physical_index == 0 || - self.array_span.offset + i >= self.run_ends[self.last_physical_index - 1]) { - return self.last_physical_index; - } - // last_physical_index - 1 is a candidate for the least upper-bound, - // so search for the least upper-bound in the range that includes it. - const int64_t j = ree_util::internal::FindPhysicalIndex( - self.run_ends, /*run_ends_size=*/self.last_physical_index, i, - self.array_span.offset); - DCHECK_LT(j, self.last_physical_index); - return self.last_physical_index = j; - } - - // last_physical_index is not an upper-bound, and the logical index i MUST be - // in the runs that follow it. Since i is a valid logical index, we know that at least - // one extra run is present. - DCHECK_LT(self.last_physical_index + 1, run_ends_size); - const int64_t min_physical_index = self.last_physical_index + 1; - - const int64_t j = ree_util::internal::FindPhysicalIndex( - /*run_ends=*/self.run_ends + min_physical_index, - /*run_ends_size=*/run_ends_size - min_physical_index, i, self.array_span.offset); - DCHECK_LT(min_physical_index + j, run_ends_size); - return self.last_physical_index = min_physical_index + j; -} - -int64_t FindPhysicalIndexImpl16(PhysicalIndexFinder& self, int64_t i) { - return FindPhysicalIndexImpl(self, i); -} - -int64_t FindPhysicalIndexImpl32(PhysicalIndexFinder& self, int64_t i) { - return FindPhysicalIndexImpl(self, i); -} - -int64_t FindPhysicalIndexImpl64(PhysicalIndexFinder& self, int64_t i) { - return FindPhysicalIndexImpl(self, i); -} - -} // namespace internal - -int64_t FindPhysicalIndex(const ArraySpan& span, int64_t i, int64_t absolute_offset) { - const auto type_id = RunEndsArray(span).type->id(); - if (type_id == Type::INT16) { - return internal::FindPhysicalIndex(span, i, absolute_offset); - } - if (type_id == Type::INT32) { - return internal::FindPhysicalIndex(span, i, absolute_offset); - } - DCHECK_EQ(type_id, Type::INT64); - return internal::FindPhysicalIndex(span, i, absolute_offset); -} - -int64_t FindPhysicalLength(const ArraySpan& span) { - auto type_id = RunEndsArray(span).type->id(); - if (type_id == Type::INT16) { - return internal::FindPhysicalLength(span); - } - if (type_id == Type::INT32) { - return internal::FindPhysicalLength(span); - } - DCHECK_EQ(type_id, Type::INT64); - return internal::FindPhysicalLength(span); -} - -std::pair FindPhysicalRange(const ArraySpan& span, int64_t offset, - int64_t length) { - const auto& run_ends_span = RunEndsArray(span); - auto type_id = run_ends_span.type->id(); - if (type_id == Type::INT16) { - auto* run_ends = run_ends_span.GetValues(1); - return internal::FindPhysicalRange(run_ends, run_ends_span.length, length, - offset); - } - if (type_id == Type::INT32) { - auto* run_ends = run_ends_span.GetValues(1); - return internal::FindPhysicalRange(run_ends, run_ends_span.length, length, - offset); - } - DCHECK_EQ(type_id, Type::INT64); - auto* run_ends = run_ends_span.GetValues(1); - return internal::FindPhysicalRange(run_ends, run_ends_span.length, length, - offset); -} - -namespace { - template Status ValidateRunEndEncodedChildren(const RunEndEncodedType& type, int64_t logical_length, @@ -227,6 +118,18 @@ Status ValidateRunEndEncodedChildren(const RunEndEncodedType& type, } // namespace +int64_t LogicalNullCount(const ArraySpan& span) { + const auto type_id = RunEndsArray(span).type->id(); + if (type_id == Type::INT16) { + return LogicalNullCount(span); + } + if (type_id == Type::INT32) { + return LogicalNullCount(span); + } + ARROW_DCHECK_EQ(type_id, Type::INT64); + return LogicalNullCount(span); +} + Status ValidateRunEndEncodedChildren(const RunEndEncodedType& type, int64_t logical_length, const std::shared_ptr& run_ends_data, @@ -240,11 +143,346 @@ Status ValidateRunEndEncodedChildren(const RunEndEncodedType& type, return ValidateRunEndEncodedChildren( type, logical_length, run_ends_data, values_data, null_count, logical_offset); default: - DCHECK_EQ(type.run_end_type()->id(), Type::INT64); + ARROW_DCHECK_EQ(type.run_end_type()->id(), Type::INT64); return ValidateRunEndEncodedChildren( type, logical_length, run_ends_data, values_data, null_count, logical_offset); } } +int64_t FindPhysicalIndex(const ArraySpan& span, int64_t i, int64_t absolute_offset) { + const auto type_id = RunEndsArray(span).type->id(); + if (type_id == Type::INT16) { + return internal::FindPhysicalIndex(span, i, absolute_offset); + } + if (type_id == Type::INT32) { + return internal::FindPhysicalIndex(span, i, absolute_offset); + } + ARROW_DCHECK_EQ(type_id, Type::INT64); + return internal::FindPhysicalIndex(span, i, absolute_offset); +} + +int64_t FindPhysicalLength(const ArraySpan& span) { + auto type_id = RunEndsArray(span).type->id(); + if (type_id == Type::INT16) { + return internal::FindPhysicalLength(span); + } + if (type_id == Type::INT32) { + return internal::FindPhysicalLength(span); + } + ARROW_DCHECK_EQ(type_id, Type::INT64); + return internal::FindPhysicalLength(span); +} + +std::pair FindPhysicalRange(const ArraySpan& span, int64_t offset, + int64_t length) { + const auto& run_ends_span = RunEndsArray(span); + auto type_id = run_ends_span.type->id(); + if (type_id == Type::INT16) { + auto* run_ends = run_ends_span.GetValues(1); + return internal::FindPhysicalRange(run_ends, run_ends_span.length, length, + offset); + } + if (type_id == Type::INT32) { + auto* run_ends = run_ends_span.GetValues(1); + return internal::FindPhysicalRange(run_ends, run_ends_span.length, length, + offset); + } + ARROW_DCHECK_EQ(type_id, Type::INT64); + auto* run_ends = run_ends_span.GetValues(1); + return internal::FindPhysicalRange(run_ends, run_ends_span.length, length, + offset); +} + +namespace { + +template +Result> RunEndEncodeImplExec(const ArraySpan& input_array, + MemoryPool* pool) { + using RunEndCType = typename RunEndType::c_type; + const int64_t input_length = input_array.length; + + auto run_end_type = TypeTraits::type_singleton(); + auto ree_type = + std::make_shared(run_end_type, input_array.type->GetSharedPtr()); + + if (input_length == 0) { + return internal::PreallocateREEArray(std::move(ree_type), has_validity_buffer, + /*logical_length=*/input_length, + /*physical_length=*/0, pool, + /*data_buffer_size=*/0); + } + + // First pass: count the number of runs + int64_t num_valid_runs = 0; + int64_t num_output_runs = 0; + int64_t data_buffer_size = 0; // for string and binary types + RETURN_NOT_OK(internal::ValidateRunEndType(run_end_type, input_length)); + + internal::RunEndEncodingLoop counting_loop( + input_array, + /*output_values_array_data=*/NULLPTR, + /*output_run_ends=*/NULLPTR); + std::tie(num_valid_runs, num_output_runs, data_buffer_size) = + counting_loop.CountNumberOfRuns(); + const auto physical_null_count = num_output_runs - num_valid_runs; + ARROW_DCHECK(!has_validity_buffer || physical_null_count > 0) + << "has_validity_buffer is expected to imply physical_null_count > 0"; + + ARROW_ASSIGN_OR_RAISE(auto output_array_data, + internal::PreallocateREEArray( + std::move(ree_type), has_validity_buffer, + /*logical_length=*/input_length, + /*physical_length=*/num_output_runs, pool, data_buffer_size)); + + // Initialize the output pointers + auto* output_run_ends = + output_array_data->child_data[0]->template GetMutableValues(1, 0); + auto* output_values_array_data = output_array_data->child_data[1].get(); + // Set the null_count on the physical array + output_values_array_data->null_count = physical_null_count; + + // Second pass: write the runs + internal::RunEndEncodingLoop writing_loop( + input_array, output_values_array_data, output_run_ends); + [[maybe_unused]] int64_t num_written_runs = writing_loop.WriteEncodedRuns(); + ARROW_DCHECK_EQ(num_written_runs, num_output_runs); + + return output_array_data; +} + +template +Result> RunEndEncodeDispatchValueType(const ArraySpan& input, + MemoryPool* pool) { + const bool has_validity_buffer = input.GetNullCount() > 0; + switch (input.type->id()) { + case Type::NA: + if (input.length == 0) { + return internal::MakeNullREEArray(TypeTraits::type_singleton(), 0, + pool); + } + RETURN_NOT_OK(internal::ValidateRunEndType(TypeTraits::type_singleton(), + input.length)); + return internal::MakeNullREEArray(TypeTraits::type_singleton(), + input.length, pool); + +#define REE_ENCODE_CASE(TYPE_CLASS, TYPE_ENUM) \ + case Type::TYPE_ENUM: \ + return has_validity_buffer \ + ? RunEndEncodeImplExec(input, pool) \ + : RunEndEncodeImplExec(input, pool); + + ARROW_REE_SUPPORTED_TYPES(REE_ENCODE_CASE) + +#undef REE_ENCODE_CASE + + default: + return Status::NotImplemented("RunEndEncode not implemented for type ", + input.type->ToString()); + } +} + +} // namespace + +Result> RunEndEncodeArray( + const ArraySpan& input, const std::shared_ptr& run_end_type, + MemoryPool* pool) { + switch (run_end_type->id()) { + case Type::INT16: + return RunEndEncodeDispatchValueType(input, pool); + case Type::INT32: + return RunEndEncodeDispatchValueType(input, pool); + case Type::INT64: + return RunEndEncodeDispatchValueType(input, pool); + default: + return Status::Invalid("Invalid run end type: ", run_end_type->ToString()); + } +} + +Result> RunEndEncodeArray( + const std::shared_ptr& input, + const std::shared_ptr& run_end_type, MemoryPool* pool) { + ArraySpan span(*input); + return RunEndEncodeArray(span, run_end_type, pool); +} + +namespace internal { + +/// \pre 0 <= i < array_span.length() +template +int64_t FindPhysicalIndexImpl(PhysicalIndexFinder& self, int64_t i) { + ARROW_DCHECK_LT(i, self.array_span.length); + const int64_t run_ends_size = ree_util::RunEndsArray(self.array_span).length; + ARROW_DCHECK_LT(self.last_physical_index, run_ends_size); + // This access to self.run_ends[last_physical_index] is always safe because: + // 1. 0 <= i < array_span.length() implies there is at least one run and the initial + // value 0 will be safe to index with. + // 2. last_physical_index > 0 is always the result of a valid call to + // internal::FindPhysicalIndex. + if (ARROW_PREDICT_TRUE(self.array_span.offset + i < + self.run_ends[self.last_physical_index])) { + // The cached value is an upper-bound, but is it the least upper-bound? + if (self.last_physical_index == 0 || + self.array_span.offset + i >= self.run_ends[self.last_physical_index - 1]) { + return self.last_physical_index; + } + // last_physical_index - 1 is a candidate for the least upper-bound, + // so search for the least upper-bound in the range that includes it. + const int64_t j = ree_util::internal::FindPhysicalIndex( + self.run_ends, /*run_ends_size=*/self.last_physical_index, i, + self.array_span.offset); + ARROW_DCHECK_LT(j, self.last_physical_index); + return self.last_physical_index = j; + } + + // last_physical_index is not an upper-bound, and the logical index i MUST be + // in the runs that follow it. Since i is a valid logical index, we know that at least + // one extra run is present. + ARROW_DCHECK_LT(self.last_physical_index + 1, run_ends_size); + const int64_t min_physical_index = self.last_physical_index + 1; + + const int64_t j = ree_util::internal::FindPhysicalIndex( + /*run_ends=*/self.run_ends + min_physical_index, + /*run_ends_size=*/run_ends_size - min_physical_index, i, self.array_span.offset); + ARROW_DCHECK_LT(min_physical_index + j, run_ends_size); + return self.last_physical_index = min_physical_index + j; +} + +int64_t FindPhysicalIndexImpl16(PhysicalIndexFinder& self, int64_t i) { + return FindPhysicalIndexImpl(self, i); +} + +int64_t FindPhysicalIndexImpl32(PhysicalIndexFinder& self, int64_t i) { + return FindPhysicalIndexImpl(self, i); +} + +int64_t FindPhysicalIndexImpl64(PhysicalIndexFinder& self, int64_t i) { + return FindPhysicalIndexImpl(self, i); +} + +Result> AllocateValuesBuffer(int64_t length, const DataType& type, + MemoryPool* pool, + int64_t data_buffer_size) { + if (type.bit_width() == 1) { + return AllocateEmptyBitmap(length, pool); + } else if (is_fixed_width(type.id())) { + return AllocateBuffer(length * type.byte_width(), pool); + } else { + ARROW_DCHECK(is_base_binary_like(type.id())); + return AllocateBuffer(data_buffer_size, pool); + } +} + +Result> PreallocateRunEndsArray( + const std::shared_ptr& run_end_type, int64_t physical_length, + MemoryPool* pool) { + ARROW_DCHECK(is_run_end_type(run_end_type->id())); + ARROW_ASSIGN_OR_RAISE( + auto run_ends_buffer, + AllocateBuffer(physical_length * run_end_type->byte_width(), pool)); + return ArrayData::Make(run_end_type, physical_length, + {NULLPTR, std::move(run_ends_buffer)}, /*null_count=*/0); +} + +Result> PreallocateValuesArray( + const std::shared_ptr& value_type, bool has_validity_buffer, int64_t length, + MemoryPool* pool, int64_t data_buffer_size) { + std::vector> values_data_buffers; + std::shared_ptr validity_buffer = NULLPTR; + if (has_validity_buffer) { + ARROW_ASSIGN_OR_RAISE(validity_buffer, AllocateEmptyBitmap(length, pool)); + } + ARROW_ASSIGN_OR_RAISE(auto values_buffer, AllocateValuesBuffer(length, *value_type, + pool, data_buffer_size)); + if (is_base_binary_like(value_type->id())) { + const int offset_byte_width = offset_bit_width(value_type->id()) / 8; + ARROW_ASSIGN_OR_RAISE(auto offsets_buffer, + AllocateBuffer((length + 1) * offset_byte_width, pool)); + memset(offsets_buffer->mutable_data(), 0, offset_byte_width); + offsets_buffer->ZeroPadding(); + values_data_buffers = {std::move(validity_buffer), std::move(offsets_buffer), + std::move(values_buffer)}; + } else { + values_data_buffers = {std::move(validity_buffer), std::move(values_buffer)}; + } + auto data = ArrayData::Make(value_type, length, std::move(values_data_buffers), + kUnknownNullCount); + ARROW_DCHECK(!(has_validity_buffer && length > 0) || data->buffers[0]); + return data; +} + +Result> PreallocateREEArray( + std::shared_ptr ree_type, bool has_validity_buffer, + int64_t logical_length, int64_t physical_length, MemoryPool* pool, + int64_t data_buffer_size) { + ARROW_ASSIGN_OR_RAISE( + auto run_ends_data, + PreallocateRunEndsArray(ree_type->run_end_type(), physical_length, pool)); + ARROW_ASSIGN_OR_RAISE(auto values_data, PreallocateValuesArray( + ree_type->value_type(), has_validity_buffer, + physical_length, pool, data_buffer_size)); + return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR}, + {std::move(run_ends_data), std::move(values_data)}, + /*null_count=*/0); +} + +void WriteSingleRunEnd(ArrayData* run_ends_data, int64_t run_end) { + ARROW_DCHECK_GT(run_end, 0); + ARROW_DCHECK(is_run_end_type(run_ends_data->type->id())); + auto* output_run_ends = run_ends_data->template GetMutableValues(1); + switch (run_ends_data->type->id()) { + case Type::INT16: + *reinterpret_cast(output_run_ends) = static_cast(run_end); + break; + case Type::INT32: + *reinterpret_cast(output_run_ends) = static_cast(run_end); + break; + default: + ARROW_DCHECK_EQ(run_ends_data->type->id(), Type::INT64); + *reinterpret_cast(output_run_ends) = static_cast(run_end); + break; + } +} + +Result> MakeNullREEArray( + const std::shared_ptr& run_end_type, int64_t logical_length, + MemoryPool* pool) { + auto ree_type = std::make_shared(run_end_type, null()); + const int64_t physical_length = logical_length > 0 ? 1 : 0; + ARROW_ASSIGN_OR_RAISE(auto run_ends_data, + PreallocateRunEndsArray(run_end_type, physical_length, pool)); + if (logical_length > 0) { + WriteSingleRunEnd(run_ends_data.get(), logical_length); + } + auto values_data = ArrayData::Make(null(), physical_length, {NULLPTR}, + /*null_count=*/physical_length); + return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR}, + {std::move(run_ends_data), std::move(values_data)}, + /*null_count=*/0); +} + +Status ValidateRunEndType(const std::shared_ptr& run_end_type, + int64_t input_length) { + int64_t run_end_max = std::numeric_limits::max(); + switch (run_end_type->id()) { + case Type::INT16: + run_end_max = std::numeric_limits::max(); + break; + case Type::INT32: + run_end_max = std::numeric_limits::max(); + break; + default: + ARROW_DCHECK_EQ(run_end_type->id(), Type::INT64); + break; + } + if (input_length < 0 || input_length > run_end_max) { + return Status::Invalid( + "Cannot run-end encode Arrays with more elements than the " + "run end type can hold: ", + run_end_max); + } + return Status::OK(); +} + +} // namespace internal } // namespace ree_util } // namespace arrow diff --git a/cpp/src/arrow/util/ree_util.h b/cpp/src/arrow/util/ree_util.h index 5c759f2e80d..3f9de0574a4 100644 --- a/cpp/src/arrow/util/ree_util.h +++ b/cpp/src/arrow/util/ree_util.h @@ -17,14 +17,20 @@ #pragma once -#include #include #include +#include +#include +#include #include "arrow/array/data.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/type_fwd.h" #include "arrow/type_traits.h" -#include "arrow/util/checked_cast.h" #include "arrow/util/macros.h" +#include "arrow/util/visibility.h" namespace arrow { namespace ree_util { @@ -48,6 +54,7 @@ const RunEndCType* RunEnds(const ArraySpan& span) { /// All the checks complete in O(1) time. Consequently, this function: /// - DOES NOT check that run_ends is sorted and all-positive /// - DOES NOT check the actual contents of the run_ends and values arrays +ARROW_EXPORT Status ValidateRunEndEncodedChildren(const RunEndEncodedType& type, int64_t logical_length, const std::shared_ptr& run_ends_data, @@ -55,156 +62,9 @@ Status ValidateRunEndEncodedChildren(const RunEndEncodedType& type, int64_t null_count, int64_t logical_offset); /// \brief Compute the logical null count of an REE array +ARROW_EXPORT int64_t LogicalNullCount(const ArraySpan& span); -namespace internal { - -/// \brief Uses binary-search to find the physical offset given a logical offset -/// and run-end values -/// -/// \return the physical offset or run_ends_size if the physical offset is not -/// found in run_ends -template -int64_t FindPhysicalIndex(const RunEndCType* run_ends, int64_t run_ends_size, int64_t i, - int64_t absolute_offset) { - assert(absolute_offset + i >= 0); - auto it = std::upper_bound(run_ends, run_ends + run_ends_size, absolute_offset + i); - int64_t result = std::distance(run_ends, it); - assert(result <= run_ends_size); - return result; -} - -/// \brief Uses binary-search to calculate the range of physical values (and -/// run-ends) necessary to represent the logical range of values from -/// offset to length -/// -/// \return a pair of physical offset and physical length -template -std::pair FindPhysicalRange(const RunEndCType* run_ends, - int64_t run_ends_size, int64_t length, - int64_t offset) { - const int64_t physical_offset = - FindPhysicalIndex(run_ends, run_ends_size, 0, offset); - // The physical length is calculated by finding the offset of the last element - // and adding 1 to it, so first we ensure there is at least one element. - if (length == 0) { - return {physical_offset, 0}; - } - const int64_t physical_index_of_last = FindPhysicalIndex( - run_ends + physical_offset, run_ends_size - physical_offset, length - 1, offset); - - assert(physical_index_of_last < run_ends_size - physical_offset); - return {physical_offset, physical_index_of_last + 1}; -} - -/// \brief Uses binary-search to calculate the number of physical values (and -/// run-ends) necessary to represent the logical range of values from -/// offset to length -template -int64_t FindPhysicalLength(const RunEndCType* run_ends, int64_t run_ends_size, - int64_t length, int64_t offset) { - auto [_, physical_length] = - FindPhysicalRange(run_ends, run_ends_size, length, offset); - // GH-37107: This is a workaround for GCC 7. GCC 7 doesn't ignore - // variables in structured binding automatically from unused - // variables when one of these variables are used. - // See also: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=81767 - ARROW_UNUSED(_); - return physical_length; -} - -/// \brief Find the physical index into the values array of the REE ArraySpan -/// -/// This function uses binary-search, so it has a O(log N) cost. -template -int64_t FindPhysicalIndex(const ArraySpan& span, int64_t i, int64_t absolute_offset) { - const int64_t run_ends_size = RunEndsArray(span).length; - return FindPhysicalIndex(RunEnds(span), run_ends_size, i, absolute_offset); -} - -/// \brief Find the physical length of an REE ArraySpan -/// -/// The physical length of an REE is the number of physical values (and -/// run-ends) necessary to represent the logical range of values from -/// offset to length. -/// -/// Avoid calling this function if the physical length can be established in -/// some other way (e.g. when iterating over the runs sequentially until the -/// end). This function uses binary-search, so it has a O(log N) cost. -template -int64_t FindPhysicalLength(const ArraySpan& span) { - return FindPhysicalLength( - /*run_ends=*/RunEnds(span), - /*run_ends_size=*/RunEndsArray(span).length, - /*length=*/span.length, - /*offset=*/span.offset); -} - -template -struct PhysicalIndexFinder; - -// non-inline implementations for each run-end type -ARROW_EXPORT int64_t FindPhysicalIndexImpl16(PhysicalIndexFinder& self, - int64_t i); -ARROW_EXPORT int64_t FindPhysicalIndexImpl32(PhysicalIndexFinder& self, - int64_t i); -ARROW_EXPORT int64_t FindPhysicalIndexImpl64(PhysicalIndexFinder& self, - int64_t i); - -/// \brief Stateful version of FindPhysicalIndex() that caches the result of -/// the previous search and uses it to optimize the next search. -/// -/// When new queries for the physical index of a logical index come in, -/// binary search is performed again but the first candidate checked is the -/// result of the previous search (cached physical index) instead of the -/// midpoint of the run-ends array. -/// -/// If that test fails, internal::FindPhysicalIndex() is called with one of the -/// partitions defined by the cached index. If the queried logical indices -/// follow an increasing or decreasing pattern, this first test is much more -/// effective in (1) finding the answer right away (close logical indices belong -/// to the same runs) or (2) discarding many more candidates than probing -/// the midpoint would. -/// -/// The most adversarial case (i.e. alternating between 0 and length-1 queries) -/// only adds one extra binary search probe when compared to always starting -/// binary search from the midpoint without any of these optimizations. -/// -/// \tparam RunEndCType The numeric type of the run-ends array. -template -struct PhysicalIndexFinder { - const ArraySpan array_span; - const RunEndCType* run_ends; - int64_t last_physical_index = 0; - - explicit PhysicalIndexFinder(const ArrayData& data) - : array_span(data), - run_ends(RunEndsArray(array_span).template GetValues(1)) { - assert(CTypeTraits::ArrowType::type_id == - ::arrow::internal::checked_cast(*data.type) - .run_end_type() - ->id()); - } - - /// \brief Find the physical index into the values array of the REE array. - /// - /// \pre 0 <= i < array_span.length() - /// \param i the logical index into the REE array - /// \return the physical index into the values array - int64_t FindPhysicalIndex(int64_t i) { - if constexpr (std::is_same_v) { - return FindPhysicalIndexImpl16(*this, i); - } else if constexpr (std::is_same_v) { - return FindPhysicalIndexImpl32(*this, i); - } else { - static_assert(std::is_same_v, "Unsupported RunEndCType."); - return FindPhysicalIndexImpl64(*this, i); - } - } -}; - -} // namespace internal - /// \brief Find the physical index into the values array of the REE ArraySpan /// /// This function uses binary-search, so it has a O(log N) cost. @@ -230,7 +90,36 @@ ARROW_EXPORT std::pair FindPhysicalRange(const ArraySpan& span int64_t offset, int64_t length); -// Publish PhysicalIndexFinder outside of the internal namespace. +/// \brief Run-end encode an array +/// +/// This function encodes an array into run-end encoded format. It uses an optimized +/// two-pass algorithm: first counting runs, then preallocating buffers and writing. +/// +/// \param[in] input The array to encode as ArrayData +/// \param[in] run_end_type The integer type for run ends (int16, int32, or int64) +/// \param[in] pool Memory pool for allocations +/// \return A RunEndEncodedArrayData +ARROW_EXPORT +Result> RunEndEncodeArray( + const ArraySpan& input, const std::shared_ptr& run_end_type, + MemoryPool* pool); + +/// \brief Run-end encode an array (convenience overload accepting ArrayData) +ARROW_EXPORT +Result> RunEndEncodeArray( + const std::shared_ptr& input, + const std::shared_ptr& run_end_type, MemoryPool* pool); + +} // namespace ree_util +} // namespace arrow + +// Include internal utilities needed by template implementations below +#include "arrow/util/ree_util_internal.h" + +namespace arrow { +namespace ree_util { + +// Publish PhysicalIndexFinder from internal namespace template using PhysicalIndexFinder = internal::PhysicalIndexFinder; diff --git a/cpp/src/arrow/util/ree_util_internal.h b/cpp/src/arrow/util/ree_util_internal.h new file mode 100644 index 00000000000..2670b14bb3b --- /dev/null +++ b/cpp/src/arrow/util/ree_util_internal.h @@ -0,0 +1,627 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include "arrow/array/data.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" + +namespace arrow { +namespace ree_util { + +// Macro listing all value types supported by run-end encoding (for internal use) +#define ARROW_REE_SUPPORTED_TYPES(ACTION) \ + ACTION(Boolean, BOOL) \ + ACTION(Int8, INT8) \ + ACTION(Int16, INT16) \ + ACTION(Int32, INT32) \ + ACTION(Int64, INT64) \ + ACTION(UInt8, UINT8) \ + ACTION(UInt16, UINT16) \ + ACTION(UInt32, UINT32) \ + ACTION(UInt64, UINT64) \ + ACTION(Float, FLOAT) \ + ACTION(Double, DOUBLE) \ + ACTION(HalfFloat, HALF_FLOAT) \ + ACTION(Date32, DATE32) \ + ACTION(Date64, DATE64) \ + ACTION(Time32, TIME32) \ + ACTION(Time64, TIME64) \ + ACTION(Timestamp, TIMESTAMP) \ + ACTION(Duration, DURATION) \ + ACTION(MonthInterval, INTERVAL_MONTHS) \ + ACTION(DayTimeInterval, INTERVAL_DAY_TIME) \ + ACTION(MonthDayNanoInterval, INTERVAL_MONTH_DAY_NANO) \ + ACTION(Decimal32, DECIMAL32) \ + ACTION(Decimal64, DECIMAL64) \ + ACTION(Decimal128, DECIMAL128) \ + ACTION(Decimal256, DECIMAL256) \ + ACTION(FixedSizeBinary, FIXED_SIZE_BINARY) \ + ACTION(String, STRING) \ + ACTION(Binary, BINARY) \ + ACTION(LargeString, LARGE_STRING) \ + ACTION(LargeBinary, LARGE_BINARY) + +namespace internal { + +/// Check if a type is supported for REE values (non-nested types except Null) +inline bool IsValueTypeSupported(Type::type type_id) { + return !is_nested(type_id) && type_id != Type::NA; +} + +inline bool IsValueTypeSupported(const DataType& type) { + return IsValueTypeSupported(type.id()); +} + +/// \brief Uses binary-search to find the physical offset given a logical offset +/// and run-end values +/// +/// \return the physical offset or run_ends_size if the physical offset is not +/// found in run_ends +template +int64_t FindPhysicalIndex(const RunEndCType* run_ends, int64_t run_ends_size, int64_t i, + int64_t absolute_offset) { + assert(absolute_offset + i >= 0); + auto it = std::upper_bound(run_ends, run_ends + run_ends_size, absolute_offset + i); + int64_t result = std::distance(run_ends, it); + assert(result <= run_ends_size); + return result; +} + +/// \brief Uses binary-search to calculate the range of physical values (and +/// run-ends) necessary to represent the logical range of values from +/// offset to length +/// +/// \return a pair of physical offset and physical length +template +std::pair FindPhysicalRange(const RunEndCType* run_ends, + int64_t run_ends_size, int64_t length, + int64_t offset) { + const int64_t physical_offset = + FindPhysicalIndex(run_ends, run_ends_size, 0, offset); + // The physical length is calculated by finding the offset of the last element + // and adding 1 to it, so first we ensure there is at least one element. + if (length == 0) { + return {physical_offset, 0}; + } + const int64_t physical_index_of_last = FindPhysicalIndex( + run_ends + physical_offset, run_ends_size - physical_offset, length - 1, offset); + + assert(physical_index_of_last < run_ends_size - physical_offset); + return {physical_offset, physical_index_of_last + 1}; +} + +/// \brief Uses binary-search to calculate the number of physical values (and +/// run-ends) necessary to represent the logical range of values from +/// offset to length +template +int64_t FindPhysicalLength(const RunEndCType* run_ends, int64_t run_ends_size, + int64_t length, int64_t offset) { + auto [_, physical_length] = + FindPhysicalRange(run_ends, run_ends_size, length, offset); + // GH-37107: This is a workaround for GCC 7. GCC 7 doesn't ignore + // variables in structured binding automatically from unused + // variables when one of these variables are used. + // See also: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=81767 + ARROW_UNUSED(_); + return physical_length; +} + +/// \brief Find the physical index into the values array of the REE ArraySpan +/// +/// This function uses binary-search, so it has a O(log N) cost. +template +int64_t FindPhysicalIndex(const ArraySpan& span, int64_t i, int64_t absolute_offset) { + const int64_t run_ends_size = ree_util::RunEndsArray(span).length; + return FindPhysicalIndex(ree_util::RunEnds(span), run_ends_size, i, + absolute_offset); +} + +/// \brief Find the physical length of an REE ArraySpan +/// +/// The physical length of an REE is the number of physical values (and +/// run-ends) necessary to represent the logical range of values from +/// offset to length. +/// +/// Avoid calling this function if the physical length can be established in +/// some other way (e.g. when iterating over the runs sequentially until the +/// end). This function uses binary-search, so it has a O(log N) cost. +template +int64_t FindPhysicalLength(const ArraySpan& span) { + return FindPhysicalLength( + /*run_ends=*/ree_util::RunEnds(span), + /*run_ends_size=*/ree_util::RunEndsArray(span).length, + /*length=*/span.length, + /*offset=*/span.offset); +} + +template +struct PhysicalIndexFinder; + +// non-inline implementations for each run-end type +ARROW_EXPORT int64_t FindPhysicalIndexImpl16(PhysicalIndexFinder& self, + int64_t i); +ARROW_EXPORT int64_t FindPhysicalIndexImpl32(PhysicalIndexFinder& self, + int64_t i); +ARROW_EXPORT int64_t FindPhysicalIndexImpl64(PhysicalIndexFinder& self, + int64_t i); + +/// \brief Stateful version of FindPhysicalIndex() that caches the result of +/// the previous search and uses it to optimize the next search. +/// +/// When new queries for the physical index of a logical index come in, +/// binary search is performed again but the first candidate checked is the +/// result of the previous search (cached physical index) instead of the +/// midpoint of the run-ends array. +/// +/// If that test fails, internal::FindPhysicalIndex() is called with one of the +/// partitions defined by the cached index. If the queried logical indices +/// follow an increasing or decreasing pattern, this first test is much more +/// effective in (1) finding the answer right away (close logical indices belong +/// to the same runs) or (2) discarding many more candidates than probing +/// the midpoint would. +/// +/// The most adversarial case (i.e. alternating between 0 and length-1 queries) +/// only adds one extra binary search probe when compared to always starting +/// binary search from the midpoint without any of these optimizations. +/// +/// \tparam RunEndCType The numeric type of the run-ends array. +template +struct PhysicalIndexFinder { + const ArraySpan array_span; + const RunEndCType* run_ends; + int64_t last_physical_index = 0; + + explicit PhysicalIndexFinder(const ArrayData& data) + : array_span(data), + run_ends(ree_util::RunEndsArray(array_span).template GetValues(1)) { + assert(CTypeTraits::ArrowType::type_id == + ::arrow::internal::checked_cast(*data.type) + .run_end_type() + ->id()); + } + + /// \brief Find the physical index into the values array of the REE array. + /// + /// \pre 0 <= i < array_span.length() + /// \param i the logical index into the REE array + /// \return the physical index into the values array + int64_t FindPhysicalIndex(int64_t i) { + if constexpr (std::is_same_v) { + return FindPhysicalIndexImpl16(*this, i); + } else if constexpr (std::is_same_v) { + return FindPhysicalIndexImpl32(*this, i); + } else { + static_assert(std::is_same_v, "Unsupported RunEndCType."); + return FindPhysicalIndexImpl64(*this, i); + } + } +}; + +template +struct ReadWriteValue {}; + +// Numeric and primitive C-compatible types +template +class ReadWriteValue> { + public: + using ValueRepr = typename ArrowType::c_type; + + private: + const uint8_t* input_validity_; + const uint8_t* input_values_; + + // Needed only by the writing functions + uint8_t* output_validity_; + uint8_t* output_values_; + + public: + explicit ReadWriteValue(const ArraySpan& input_values_array, + ArrayData* output_values_array_data) + : input_validity_(in_has_validity_buffer ? input_values_array.buffers[0].data + : NULLPTR), + input_values_(input_values_array.buffers[1].data), + output_validity_((out_has_validity_buffer && output_values_array_data) + ? output_values_array_data->buffers[0]->mutable_data() + : NULLPTR), + output_values_(output_values_array_data + ? output_values_array_data->buffers[1]->mutable_data() + : NULLPTR) {} + + [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const { + bool valid = true; + if constexpr (in_has_validity_buffer) { + valid = bit_util::GetBit(input_validity_, read_offset); + } + if constexpr (std::is_same_v) { + *out = bit_util::GetBit(input_values_, read_offset); + } else { + *out = (reinterpret_cast(input_values_))[read_offset]; + } + return valid; + } + + bool CompareValuesAt(int64_t i, int64_t j) const { + if constexpr (std::is_same_v) { + return bit_util::GetBit(input_values_, i) == bit_util::GetBit(input_values_, j); + } else { + return (reinterpret_cast(input_values_))[i] == + (reinterpret_cast(input_values_))[j]; + } + } + + void ZeroValidityPadding(int64_t length) const { + ARROW_DCHECK(output_values_); + if constexpr (out_has_validity_buffer) { + ARROW_DCHECK(output_validity_); + const int64_t validity_buffer_size = bit_util::BytesForBits(length); + output_validity_[validity_buffer_size - 1] = 0; + } + } + + void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const { + if constexpr (out_has_validity_buffer) { + bit_util::SetBitTo(output_validity_, write_offset, valid); + } + if (valid) { + if constexpr (std::is_same_v) { + bit_util::SetBitTo(output_values_, write_offset, value); + } else { + (reinterpret_cast(output_values_))[write_offset] = value; + } + } + } + + void WriteRun(int64_t write_offset, int64_t run_length, bool valid, + ValueRepr value) const { + if constexpr (out_has_validity_buffer) { + bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid); + } + if (valid) { + if constexpr (std::is_same_v) { + bit_util::SetBitsTo(reinterpret_cast(output_values_), write_offset, + run_length, value); + } else { + auto* output_values_c = reinterpret_cast(output_values_); + std::fill(output_values_c + write_offset, + output_values_c + write_offset + run_length, value); + } + } + } + + bool Compare(ValueRepr lhs, ValueRepr rhs) const { return lhs == rhs; } +}; + +// FixedSizeBinary, Decimal128 +template +class ReadWriteValue> { + public: + using ValueRepr = const uint8_t*; + + private: + const uint8_t* input_validity_; + const uint8_t* input_values_; + + // Needed only by the writing functions + uint8_t* output_validity_; + uint8_t* output_values_; + + const size_t byte_width_; + + public: + ReadWriteValue(const ArraySpan& input_values_array, ArrayData* output_values_array_data) + : input_validity_(in_has_validity_buffer ? input_values_array.buffers[0].data + : NULLPTR), + input_values_(input_values_array.buffers[1].data), + output_validity_((out_has_validity_buffer && output_values_array_data) + ? output_values_array_data->buffers[0]->mutable_data() + : NULLPTR), + output_values_(output_values_array_data + ? output_values_array_data->buffers[1]->mutable_data() + : NULLPTR), + byte_width_(input_values_array.type->byte_width()) {} + + [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const { + bool valid = true; + if constexpr (in_has_validity_buffer) { + valid = bit_util::GetBit(input_validity_, read_offset); + } + *out = input_values_ + (read_offset * byte_width_); + return valid; + } + + bool CompareValuesAt(int64_t i, int64_t j) const { + return 0 == memcmp(input_values_ + (i * byte_width_), + input_values_ + (j * byte_width_), byte_width_); + } + + void ZeroValidityPadding(int64_t length) const { + ARROW_DCHECK(output_values_); + if constexpr (out_has_validity_buffer) { + ARROW_DCHECK(output_validity_); + const int64_t validity_buffer_size = bit_util::BytesForBits(length); + output_validity_[validity_buffer_size - 1] = 0; + } + } + + void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const { + if constexpr (out_has_validity_buffer) { + bit_util::SetBitTo(output_validity_, write_offset, valid); + } + if (valid) { + memcpy(output_values_ + (write_offset * byte_width_), value, byte_width_); + } + } + + void WriteRun(int64_t write_offset, int64_t run_length, bool valid, + ValueRepr value) const { + if constexpr (out_has_validity_buffer) { + bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid); + } + if (valid) { + uint8_t* ptr = output_values_ + (write_offset * byte_width_); + for (int64_t i = 0; i < run_length; ++i) { + memcpy(ptr, value, byte_width_); + ptr += byte_width_; + } + } + } + + bool Compare(ValueRepr lhs, ValueRepr rhs) const { + return memcmp(lhs, rhs, byte_width_) == 0; + } +}; + +// Binary, String... +template +class ReadWriteValue> { + public: + using ValueRepr = std::string_view; + using offset_type = typename ArrowType::offset_type; + + private: + const uint8_t* input_validity_; + const offset_type* input_offsets_; + const uint8_t* input_values_; + + // Needed only by the writing functions + uint8_t* output_validity_; + offset_type* output_offsets_; + uint8_t* output_values_; + + public: + ReadWriteValue(const ArraySpan& input_values_array, ArrayData* output_values_array_data) + : input_validity_(in_has_validity_buffer ? input_values_array.buffers[0].data + : NULLPTR), + input_offsets_(input_values_array.template GetValues(1, 0)), + input_values_(input_values_array.buffers[2].data), + output_validity_((out_has_validity_buffer && output_values_array_data) + ? output_values_array_data->buffers[0]->mutable_data() + : NULLPTR), + output_offsets_( + output_values_array_data + ? output_values_array_data->template GetMutableValues(1, 0) + : NULLPTR), + output_values_(output_values_array_data + ? output_values_array_data->buffers[2]->mutable_data() + : NULLPTR) {} + + [[nodiscard]] bool ReadValue(ValueRepr* out, int64_t read_offset) const { + bool valid = true; + if constexpr (in_has_validity_buffer) { + valid = bit_util::GetBit(input_validity_, read_offset); + } + if (valid) { + const offset_type offset0 = input_offsets_[read_offset]; + const offset_type offset1 = input_offsets_[read_offset + 1]; + *out = std::string_view(reinterpret_cast(input_values_ + offset0), + offset1 - offset0); + } + return valid; + } + + bool CompareValuesAt(int64_t i, int64_t j) const { + const offset_type len_i = input_offsets_[i + 1] - input_offsets_[i]; + const offset_type len_j = input_offsets_[j + 1] - input_offsets_[j]; + return len_i == len_j && + memcmp(input_values_ + input_offsets_[i], input_values_ + input_offsets_[j], + static_cast(len_i)) == 0; + } + + void ZeroValidityPadding(int64_t length) const { + ARROW_DCHECK(output_values_); + if constexpr (out_has_validity_buffer) { + ARROW_DCHECK(output_validity_); + const int64_t validity_buffer_size = bit_util::BytesForBits(length); + output_validity_[validity_buffer_size - 1] = 0; + } + } + + void WriteValue(int64_t write_offset, bool valid, ValueRepr value) const { + if constexpr (out_has_validity_buffer) { + bit_util::SetBitTo(output_validity_, write_offset, valid); + } + const offset_type offset0 = output_offsets_[write_offset]; + const offset_type offset1 = + offset0 + (valid ? static_cast(value.size()) : 0); + output_offsets_[write_offset + 1] = offset1; + if (valid) { + memcpy(output_values_ + offset0, value.data(), value.size()); + } + } + + void WriteRun(int64_t write_offset, int64_t run_length, bool valid, + ValueRepr value) const { + if constexpr (out_has_validity_buffer) { + bit_util::SetBitsTo(output_validity_, write_offset, run_length, valid); + } + if (valid) { + int64_t i = write_offset; + offset_type offset = output_offsets_[i]; + while (i < write_offset + run_length) { + memcpy(output_values_ + offset, value.data(), value.size()); + offset += static_cast(value.size()); + i += 1; + output_offsets_[i] = offset; + } + } else { + offset_type offset = output_offsets_[write_offset]; + offset_type* begin = output_offsets_ + write_offset + 1; + std::fill(begin, begin + run_length, offset); + } + } + + bool Compare(ValueRepr lhs, ValueRepr rhs) const { return lhs == rhs; } +}; + +template +class RunEndEncodingLoop { + public: + using RunEndCType = typename RunEndType::c_type; + + private: + using ReadWriteValueType = ReadWriteValue; + using ValueRepr = typename ReadWriteValueType::ValueRepr; + + private: + const int64_t input_length_; + const int64_t input_offset_; + ReadWriteValueType read_write_value_; + // Needed only by WriteEncodedRuns() + RunEndCType* output_run_ends_; + + public: + explicit RunEndEncodingLoop(const ArraySpan& input_array, + ArrayData* output_values_array_data, + RunEndCType* output_run_ends) + : input_length_(input_array.length), + input_offset_(input_array.offset), + read_write_value_(input_array, output_values_array_data), + output_run_ends_(output_run_ends) { + ARROW_DCHECK_GT(input_array.length, 0); + } + + /// \brief Give a pass over the input data and count the number of runs + /// + /// \return a tuple with the number of non-null run values, the total number of runs, + /// and the data buffer size for string and binary types + ARROW_NOINLINE std::tuple CountNumberOfRuns() const { + int64_t read_offset = input_offset_; + ValueRepr current_run; + bool current_run_valid = read_write_value_.ReadValue(¤t_run, read_offset); + read_offset += 1; + int64_t num_valid_runs = current_run_valid ? 1 : 0; + int64_t num_output_runs = 1; + int64_t data_buffer_size = 0; + if constexpr (is_base_binary_like(ValueType::type_id)) { + data_buffer_size = current_run_valid ? current_run.size() : 0; + } + for (; read_offset < input_offset_ + input_length_; read_offset += 1) { + ValueRepr value; + const bool valid = read_write_value_.ReadValue(&value, read_offset); + + const bool open_new_run = + valid != current_run_valid || !read_write_value_.Compare(value, current_run); + if (open_new_run) { + // Open the new run + current_run = value; + current_run_valid = valid; + // Count the new run + num_output_runs += 1; + num_valid_runs += valid ? 1 : 0; + if constexpr (is_base_binary_like(ValueType::type_id)) { + data_buffer_size += valid ? current_run.size() : 0; + } + } + } + return std::make_tuple(num_valid_runs, num_output_runs, data_buffer_size); + } + + ARROW_NOINLINE int64_t WriteEncodedRuns() { + ARROW_DCHECK(output_run_ends_); + int64_t read_offset = input_offset_; + int64_t write_offset = 0; + ValueRepr current_run; + bool current_run_valid = read_write_value_.ReadValue(¤t_run, read_offset); + read_offset += 1; + for (; read_offset < input_offset_ + input_length_; read_offset += 1) { + ValueRepr value; + const bool valid = read_write_value_.ReadValue(&value, read_offset); + + const bool open_new_run = + valid != current_run_valid || !read_write_value_.Compare(value, current_run); + if (open_new_run) { + // Close the current run first by writing it out + read_write_value_.WriteValue(write_offset, current_run_valid, current_run); + const int64_t run_end = read_offset - input_offset_; + output_run_ends_[write_offset] = static_cast(run_end); + write_offset += 1; + // Open the new run + current_run_valid = valid; + current_run = value; + } + } + read_write_value_.WriteValue(write_offset, current_run_valid, current_run); + ARROW_DCHECK_EQ(input_length_, read_offset - input_offset_); + output_run_ends_[write_offset] = static_cast(input_length_); + return write_offset + 1; + } +}; + +// Allocation and helper functions +ARROW_EXPORT Result> AllocateValuesBuffer( + int64_t length, const DataType& type, MemoryPool* pool, int64_t data_buffer_size); + +ARROW_EXPORT Result> PreallocateRunEndsArray( + const std::shared_ptr& run_end_type, int64_t physical_length, + MemoryPool* pool); + +ARROW_EXPORT Result> PreallocateValuesArray( + const std::shared_ptr& value_type, bool has_validity_buffer, int64_t length, + MemoryPool* pool, int64_t data_buffer_size); + +ARROW_EXPORT Result> PreallocateREEArray( + std::shared_ptr ree_type, bool has_validity_buffer, + int64_t logical_length, int64_t physical_length, MemoryPool* pool, + int64_t data_buffer_size); + +ARROW_EXPORT void WriteSingleRunEnd(ArrayData* run_ends_data, int64_t run_end); + +ARROW_EXPORT Result> MakeNullREEArray( + const std::shared_ptr& run_end_type, int64_t logical_length, + MemoryPool* pool); + +ARROW_EXPORT Status ValidateRunEndType(const std::shared_ptr& run_end_type, + int64_t input_length); + +} // namespace internal +} // namespace ree_util +} // namespace arrow