From 22b3cba99a973bad421f40f9ccabbe8eafda5f06 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 30 Dec 2025 13:22:25 +0800 Subject: [PATCH 1/2] feat: implement manifest group --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/constants.h | 12 + src/iceberg/delete_file_index.cc | 36 +- src/iceberg/delete_file_index.h | 25 +- src/iceberg/manifest/manifest_adapter.cc | 91 ++-- src/iceberg/manifest/manifest_entry.h | 25 +- src/iceberg/manifest/manifest_group.cc | 366 +++++++++++++++ src/iceberg/manifest/manifest_group.h | 167 +++++++ src/iceberg/manifest/manifest_writer.cc | 12 +- src/iceberg/manifest/meson.build | 1 + src/iceberg/manifest/v2_metadata.cc | 5 +- src/iceberg/manifest/v3_metadata.cc | 5 +- src/iceberg/meson.build | 1 + src/iceberg/table_scan.cc | 28 +- src/iceberg/table_scan.h | 25 +- src/iceberg/test/CMakeLists.txt | 1 + src/iceberg/test/delete_file_index_test.cc | 6 +- src/iceberg/test/manifest_group_test.cc | 521 +++++++++++++++++++++ src/iceberg/type_fwd.h | 8 + 19 files changed, 1233 insertions(+), 103 deletions(-) create mode 100644 src/iceberg/manifest/manifest_group.cc create mode 100644 src/iceberg/manifest/manifest_group.h create mode 100644 src/iceberg/test/manifest_group_test.cc diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index ea249e199..54bbaa550 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -42,6 +42,7 @@ set(ICEBERG_SOURCES json_internal.cc manifest/manifest_adapter.cc manifest/manifest_entry.cc + manifest/manifest_group.cc manifest/manifest_list.cc manifest/manifest_reader.cc manifest/manifest_writer.cc diff --git a/src/iceberg/constants.h b/src/iceberg/constants.h index 759ae3cca..a95144393 100644 --- a/src/iceberg/constants.h +++ b/src/iceberg/constants.h @@ -19,6 +19,12 @@ #pragma once +/// \file iceberg/constants.h +/// This file defines constants used commonly and shared across multiple +/// source files. It is mostly useful to add constants that are used as +/// default values in the class definitions in the header files without +/// including other headers just for the constant definitions. + #include #include @@ -26,5 +32,11 @@ namespace iceberg { constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; constexpr int64_t kInvalidSnapshotId = -1; +/// \brief Stand-in for the current sequence number that will be assigned when the commit +/// is successful. This is replaced when writing a manifest list by the ManifestFile +/// wrapper +constexpr int64_t kUnassignedSequenceNumber = -1; + +// TODO(gangwu): move other commonly used constants here. } // namespace iceberg diff --git a/src/iceberg/delete_file_index.cc b/src/iceberg/delete_file_index.cc index 2a96e9d3b..7c1a12f0a 100644 --- a/src/iceberg/delete_file_index.cc +++ b/src/iceberg/delete_file_index.cc @@ -453,34 +453,32 @@ Result> DeleteFileIndex::FindDV( } Result DeleteFileIndex::BuilderFor( - std::shared_ptr io, std::vector delete_manifests) { + std::shared_ptr io, std::shared_ptr schema, + std::unordered_map> specs_by_id, + std::vector delete_manifests) { ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null"); - return Builder(std::move(io), std::move(delete_manifests)); + ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null"); + ICEBERG_PRECHECK(!specs_by_id.empty(), "Partition specs cannot be empty"); + return Builder(std::move(io), std::move(schema), std::move(specs_by_id), + std::move(delete_manifests)); } // Builder implementation -DeleteFileIndex::Builder::Builder(std::shared_ptr io, - std::vector delete_manifests) - : io_(std::move(io)), delete_manifests_(std::move(delete_manifests)) {} +DeleteFileIndex::Builder::Builder( + std::shared_ptr io, std::shared_ptr schema, + std::unordered_map> specs_by_id, + std::vector delete_manifests) + : io_(std::move(io)), + schema_(std::move(schema)), + specs_by_id_(std::move(specs_by_id)), + delete_manifests_(std::move(delete_manifests)) {} DeleteFileIndex::Builder::~Builder() = default; DeleteFileIndex::Builder::Builder(Builder&&) noexcept = default; DeleteFileIndex::Builder& DeleteFileIndex::Builder::operator=(Builder&&) noexcept = default; -DeleteFileIndex::Builder& DeleteFileIndex::Builder::SpecsById( - std::unordered_map> specs_by_id) { - specs_by_id_ = std::move(specs_by_id); - return *this; -} - -DeleteFileIndex::Builder& DeleteFileIndex::Builder::WithSchema( - std::shared_ptr schema) { - schema_ = std::move(schema); - return *this; -} - DeleteFileIndex::Builder& DeleteFileIndex::Builder::AfterSequenceNumber(int64_t seq) { min_sequence_number_ = seq; return *this; @@ -721,10 +719,6 @@ Status DeleteFileIndex::Builder::AddEqualityDelete( Result> DeleteFileIndex::Builder::Build() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); - ICEBERG_PRECHECK(io_ != nullptr, "FileIO is required to load delete files"); - ICEBERG_PRECHECK(schema_ != nullptr, "Schema is required to load delete files"); - ICEBERG_PRECHECK(!specs_by_id_.empty(), - "Partition specs are required to load delete files"); std::vector entries; if (!delete_manifests_.empty()) { diff --git a/src/iceberg/delete_file_index.h b/src/iceberg/delete_file_index.h index 03cc26613..5444281a0 100644 --- a/src/iceberg/delete_file_index.h +++ b/src/iceberg/delete_file_index.h @@ -268,10 +268,14 @@ class ICEBERG_EXPORT DeleteFileIndex { /// \brief Create a builder for constructing a DeleteFileIndex from manifest files. /// /// \param io The FileIO to use for reading manifests + /// \param schema Current table schema + /// \param specs_by_id Partition specs by their IDs /// \param delete_manifests The delete manifests to index /// \return A Builder instance - static Result BuilderFor(std::shared_ptr io, - std::vector delete_manifests); + static Result BuilderFor( + std::shared_ptr io, std::shared_ptr schema, + std::unordered_map> specs_by_id, + std::vector delete_manifests); private: friend class Builder; @@ -318,7 +322,9 @@ class ICEBERG_EXPORT DeleteFileIndex { class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector { public: /// \brief Construct a builder from manifest files. - Builder(std::shared_ptr io, std::vector delete_manifests); + Builder(std::shared_ptr io, std::shared_ptr schema, + std::unordered_map> specs_by_id, + std::vector delete_manifests); ~Builder() override; @@ -327,15 +333,6 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector { Builder(const Builder&) = delete; Builder& operator=(const Builder&) = delete; - /// \brief Set the partition specs by ID. - Builder& SpecsById( - std::unordered_map> specs_by_id); - - /// \brief Set the table schema. - /// - /// Required for filtering and expression evaluation. - Builder& WithSchema(std::shared_ptr schema); - /// \brief Set the minimum sequence number for delete files. /// /// Only delete files with sequence number > min_sequence_number will be included. @@ -384,10 +381,10 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector { ManifestEntry&& entry); std::shared_ptr io_; + std::shared_ptr schema_; + std::unordered_map> specs_by_id_; std::vector delete_manifests_; int64_t min_sequence_number_ = 0; - std::unordered_map> specs_by_id_; - std::shared_ptr schema_; std::shared_ptr data_filter_; std::shared_ptr partition_filter_; std::shared_ptr partition_set_; diff --git a/src/iceberg/manifest/manifest_adapter.cc b/src/iceberg/manifest/manifest_adapter.cc index da75a4683..cf0a0515b 100644 --- a/src/iceberg/manifest/manifest_adapter.cc +++ b/src/iceberg/manifest/manifest_adapter.cc @@ -36,6 +36,7 @@ namespace iceberg { namespace { constexpr int64_t kBlockSizeInBytesV1 = 64 * 1024 * 1024L; +constexpr int32_t kBlockSizeInBytesFieldId = 105; Status AppendField(ArrowArray* array, int64_t value) { ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt(array, value)); @@ -254,64 +255,63 @@ Status ManifestEntryAdapter::AppendDataFile( auto child_array = array->children[i]; switch (field.field_id()) { - case 134: // content (optional int32) + case DataFile::kContentFieldId: // optional int32 ICEBERG_RETURN_UNEXPECTED( AppendField(child_array, static_cast(file.content))); break; - case 100: // file_path (required string) + case DataFile::kFilePathFieldId: // required string ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, file.file_path)); break; - case 101: // file_format (required string) + case DataFile::kFileFormatFieldId: // required string ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, ToString(file.file_format))); break; - case 102: { - // partition (required struct) + case DataFile::kPartitionFieldId: { // required struct auto partition_type = internal::checked_pointer_cast(field.type()); ICEBERG_RETURN_UNEXPECTED( AppendPartitionValues(child_array, partition_type, file.partition)); } break; - case 103: // record_count (required int64) + case DataFile::kRecordCountFieldId: // required int64 ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, file.record_count)); break; - case 104: // file_size_in_bytes (required int64) + case DataFile::kFileSizeFieldId: // required int64 ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, file.file_size_in_bytes)); break; - case 105: // block_size_in_bytes (compatible in v1) + case kBlockSizeInBytesFieldId: // compatible with v1 // always 64MB for v1 ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, kBlockSizeInBytesV1)); break; - case 108: // column_sizes (optional map) + case DataFile::kColumnSizesFieldId: // optional map ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.column_sizes)); break; - case 109: // value_counts (optional map) + case DataFile::kValueCountsFieldId: // optional map ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.value_counts)); break; - case 110: // null_value_counts (optional map) + case DataFile::kNullValueCountsFieldId: // optional map ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.null_value_counts)); break; - case 137: // nan_value_counts (optional map) + case DataFile::kNanValueCountsFieldId: // optional map ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.nan_value_counts)); break; - case 125: // lower_bounds (optional map) + case DataFile::kLowerBoundsFieldId: // optional map ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.lower_bounds)); break; - case 128: // upper_bounds (optional map) + case DataFile::kUpperBoundsFieldId: // optional map ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.upper_bounds)); break; - case 131: // key_metadata (optional binary) + case DataFile::kKeyMetadataFieldId: // optional binary if (!file.key_metadata.empty()) { ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, file.key_metadata)); } else { ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1)); } break; - case 132: // split_offsets (optional list) + case DataFile::kSplitOffsetsFieldId: // optional list ICEBERG_RETURN_UNEXPECTED(AppendList(child_array, file.split_offsets)); break; - case 135: // equality_ids (optional list) + case DataFile::kEqualityIdsFieldId: // optional list ICEBERG_RETURN_UNEXPECTED(AppendList(child_array, file.equality_ids)); break; - case 140: // sort_order_id (optional int32) + case DataFile::kSortOrderIdFieldId: // optional int32 if (file.sort_order_id.has_value()) { ICEBERG_RETURN_UNEXPECTED( AppendField(child_array, static_cast(file.sort_order_id.value()))); @@ -319,15 +319,14 @@ Status ManifestEntryAdapter::AppendDataFile( ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1)); } break; - case 142: // first_row_id (optional int64) + case DataFile::kFirstRowIdFieldId: // optional int64 if (file.first_row_id.has_value()) { ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, file.first_row_id.value())); } else { ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1)); } break; - case 143: { - // referenced_data_file (optional string) + case DataFile::kReferencedDataFileFieldId: { // optional string ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file, GetReferenceDataFile(file)); if (referenced_data_file.has_value()) { ICEBERG_RETURN_UNEXPECTED( @@ -337,7 +336,7 @@ Status ManifestEntryAdapter::AppendDataFile( } break; } - case 144: // content_offset (optional int64) + case DataFile::kContentOffsetFieldId: // optional int64 if (file.content_offset.has_value()) { ICEBERG_RETURN_UNEXPECTED( AppendField(child_array, file.content_offset.value())); @@ -345,7 +344,7 @@ Status ManifestEntryAdapter::AppendDataFile( ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1)); } break; - case 145: // content_size_in_bytes (optional int64) + case DataFile::kContentSizeFieldId: // optional int64 if (file.content_size_in_bytes.has_value()) { ICEBERG_RETURN_UNEXPECTED( AppendField(child_array, file.content_size_in_bytes.value())); @@ -397,18 +396,18 @@ Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) { auto array = array_.children[i]; switch (field.field_id()) { - case 0: // status (required int32) + case ManifestEntry::kStatusFieldId: // required int32 ICEBERG_RETURN_UNEXPECTED( AppendField(array, static_cast(static_cast(entry.status)))); break; - case 1: // snapshot_id (optional int64) + case ManifestEntry::kSnapshotIdFieldId: // optional int64 if (entry.snapshot_id.has_value()) { ICEBERG_RETURN_UNEXPECTED(AppendField(array, entry.snapshot_id.value())); } else { ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1)); } break; - case 2: // data_file (required struct) + case ManifestEntry::kDataFileFieldId: // required struct if (entry.data_file) { // Get the data file type from the field auto data_file_type = internal::checked_pointer_cast(field.type()); @@ -418,8 +417,7 @@ Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) { return InvalidManifest("Missing required data_file field from manifest entry."); } break; - case 3: { - // sequence_number (optional int64) + case ManifestEntry::kSequenceNumberFieldId: { // optional int64 ICEBERG_ASSIGN_OR_RAISE(auto sequence_num, GetSequenceNumber(entry)); if (sequence_num.has_value()) { ICEBERG_RETURN_UNEXPECTED(AppendField(array, sequence_num.value())); @@ -428,7 +426,7 @@ Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) { } break; } - case 4: // file_sequence_number (optional int64) + case ManifestEntry::kFileSequenceNumberFieldId: // optional int64 if (entry.file_sequence_number.has_value()) { ICEBERG_RETURN_UNEXPECTED( AppendField(array, entry.file_sequence_number.value())); @@ -538,36 +536,34 @@ Status ManifestFileAdapter::AppendInternal(const ManifestFile& file) { const auto& field = fields[i]; auto array = array_.children[i]; switch (field.field_id()) { - case 500: // manifest_path + case ManifestFile::kManifestPathFieldId: ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.manifest_path)); break; - case 501: // manifest_length + case ManifestFile::kManifestLengthFieldId: ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.manifest_length)); break; - case 502: // partition_spec_id + case ManifestFile::kPartitionSpecIdFieldId: ICEBERG_RETURN_UNEXPECTED( AppendField(array, static_cast(file.partition_spec_id))); break; - case 517: // content + case ManifestFile::kContentFieldId: ICEBERG_RETURN_UNEXPECTED( AppendField(array, static_cast(static_cast(file.content)))); break; - case 515: { - // sequence_number + case ManifestFile::kSequenceNumberFieldId: { ICEBERG_ASSIGN_OR_RAISE(auto sequence_num, GetSequenceNumber(file)); ICEBERG_RETURN_UNEXPECTED(AppendField(array, sequence_num)); break; } - case 516: { - // min_sequence_number + case ManifestFile::kMinSequenceNumberFieldId: { ICEBERG_ASSIGN_OR_RAISE(auto min_sequence_num, GetMinSequenceNumber(file)); ICEBERG_RETURN_UNEXPECTED(AppendField(array, min_sequence_num)); break; } - case 503: // added_snapshot_id + case ManifestFile::kAddedSnapshotIdFieldId: ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.added_snapshot_id)); break; - case 504: // added_files_count + case ManifestFile::kAddedFilesCountFieldId: if (file.added_files_count.has_value()) { ICEBERG_RETURN_UNEXPECTED( AppendField(array, static_cast(file.added_files_count.value()))); @@ -576,7 +572,7 @@ Status ManifestFileAdapter::AppendInternal(const ManifestFile& file) { ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1)); } break; - case 505: // existing_files_count + case ManifestFile::kExistingFilesCountFieldId: if (file.existing_files_count.has_value()) { ICEBERG_RETURN_UNEXPECTED(AppendField( array, static_cast(file.existing_files_count.value()))); @@ -585,7 +581,7 @@ Status ManifestFileAdapter::AppendInternal(const ManifestFile& file) { ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1)); } break; - case 506: // deleted_files_count + case ManifestFile::kDeletedFilesCountFieldId: if (file.deleted_files_count.has_value()) { ICEBERG_RETURN_UNEXPECTED( AppendField(array, static_cast(file.deleted_files_count.value()))); @@ -594,7 +590,7 @@ Status ManifestFileAdapter::AppendInternal(const ManifestFile& file) { ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1)); } break; - case 512: // added_rows_count + case ManifestFile::kAddedRowsCountFieldId: if (file.added_rows_count.has_value()) { ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.added_rows_count.value())); } else { @@ -602,7 +598,7 @@ Status ManifestFileAdapter::AppendInternal(const ManifestFile& file) { ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1)); } break; - case 513: // existing_rows_count + case ManifestFile::kExistingRowsCountFieldId: if (file.existing_rows_count.has_value()) { ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.existing_rows_count.value())); } else { @@ -610,7 +606,7 @@ Status ManifestFileAdapter::AppendInternal(const ManifestFile& file) { ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1)); } break; - case 514: // deleted_rows_count + case ManifestFile::kDeletedRowsCountFieldId: if (file.deleted_rows_count.has_value()) { ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.deleted_rows_count.value())); } else { @@ -618,20 +614,19 @@ Status ManifestFileAdapter::AppendInternal(const ManifestFile& file) { ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1)); } break; - case 507: // partitions + case ManifestFile::kPartitionSummaryFieldId: ICEBERG_RETURN_UNEXPECTED(AppendPartitionSummary( array, internal::checked_pointer_cast(field.type()), file.partitions)); break; - case 519: // key_metadata + case ManifestFile::kKeyMetadataFieldId: if (!file.key_metadata.empty()) { ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.key_metadata)); } else { ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1)); } break; - case 520: { - // first_row_id + case ManifestFile::kFirstRowIdFieldId: { ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, GetFirstRowId(file)); if (first_row_id.has_value()) { ICEBERG_RETURN_UNEXPECTED(AppendField(array, first_row_id.value())); diff --git a/src/iceberg/manifest/manifest_entry.h b/src/iceberg/manifest/manifest_entry.h index 5d35530b8..1e4af0445 100644 --- a/src/iceberg/manifest/manifest_entry.h +++ b/src/iceberg/manifest/manifest_entry.h @@ -308,11 +308,30 @@ struct ICEBERG_EXPORT ManifestEntry { /// null. std::optional snapshot_id; /// Field id: 3 - /// Data sequence number of the file. Inherited when null and status is 1 (added). + /// Data sequence number of the file. + /// + /// Independently of the entry status, it represents the sequence number to which the + /// file should apply. Note the data sequence number may differ from the sequence number + /// of the snapshot in which the underlying file was added. New snapshots can add files + /// that belong to older sequence numbers (e.g. compaction). The data sequence number + /// also does not change when the file is marked as deleted. + /// + /// \note It can return nullopt if the data sequence number is unknown. This may happen + /// while reading a v2 manifest that did not persist the data sequence number for + /// manifest entries with status DELETED (older Iceberg versions). std::optional sequence_number; /// Field id: 4 - /// File sequence number indicating when the file was added. Inherited when null and - /// status is 1 (added). + /// The file sequence number. + /// + /// The file sequence number represents the sequence number of the snapshot in which the + /// underlying file was added. The file sequence number is always assigned at commit and + /// cannot be provided explicitly, unlike the data sequence number. The file sequence + /// number does not change upon assigning and must be preserved in existing and deleted + /// entries. + /// + /// \note It can return nullopt if the file sequence number is unknown. This may happen + /// while reading a v2 manifest that did not persist the file sequence number for + /// manifest entries with status EXISTING or DELETED (older Iceberg versions). std::optional file_sequence_number; /// Field id: 2 /// File path, partition tuple, metrics, ... diff --git a/src/iceberg/manifest/manifest_group.cc b/src/iceberg/manifest/manifest_group.cc new file mode 100644 index 000000000..05f3062f5 --- /dev/null +++ b/src/iceberg/manifest/manifest_group.cc @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/manifest_group.h" + +#include + +#include "iceberg/expression/evaluator.h" +#include "iceberg/expression/expression.h" +#include "iceberg/expression/manifest_evaluator.h" +#include "iceberg/expression/projections.h" +#include "iceberg/expression/residual_evaluator.h" +#include "iceberg/file_io.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/table_scan.h" +#include "iceberg/util/content_file_util.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> ManifestGroup::Make( + std::shared_ptr io, std::shared_ptr schema, + std::unordered_map> specs_by_id, + std::vector manifests) { + std::vector data_manifests; + std::vector delete_manifests; + for (auto& manifest : manifests) { + if (manifest.content == ManifestContent::kData) { + data_manifests.push_back(std::move(manifest)); + } else if (manifest.content == ManifestContent::kDeletes) { + delete_manifests.push_back(std::move(manifest)); + } + } + + return ManifestGroup::Make(std::move(io), std::move(schema), std::move(specs_by_id), + std::move(data_manifests), std::move(delete_manifests)); +} + +Result> ManifestGroup::Make( + std::shared_ptr io, std::shared_ptr schema, + std::unordered_map> specs_by_id, + std::vector data_manifests, + std::vector delete_manifests) { + // DeleteFileIndex::Builder validates all input parameters so we skip validation here + ICEBERG_ASSIGN_OR_RAISE( + auto delete_index_builder, + DeleteFileIndex::BuilderFor(io, schema, specs_by_id, std::move(delete_manifests))); + return std::unique_ptr( + new ManifestGroup(std::move(io), std::move(schema), std::move(specs_by_id), + std::move(data_manifests), std::move(delete_index_builder))); +} + +ManifestGroup::ManifestGroup( + std::shared_ptr io, std::shared_ptr schema, + std::unordered_map> specs_by_id, + std::vector data_manifests, + DeleteFileIndex::Builder&& delete_index_builder) + : io_(std::move(io)), + schema_(std::move(schema)), + specs_by_id_(std::move(specs_by_id)), + data_manifests_(std::move(data_manifests)), + delete_index_builder_(std::move(delete_index_builder)), + data_filter_(True::Instance()), + file_filter_(True::Instance()), + partition_filter_(True::Instance()), + manifest_entry_predicate_([](const ManifestEntry&) { return true; }) {} + +ManifestGroup::~ManifestGroup() = default; + +ManifestGroup::ManifestGroup(ManifestGroup&&) noexcept = default; +ManifestGroup& ManifestGroup::operator=(ManifestGroup&&) noexcept = default; + +ManifestGroup& ManifestGroup::FilterData(std::shared_ptr filter) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(data_filter_, And::Make(data_filter_, filter)); + delete_index_builder_.DataFilter(std::move(filter)); + return *this; +} + +ManifestGroup& ManifestGroup::FilterFiles(std::shared_ptr filter) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(file_filter_, + And::Make(file_filter_, std::move(filter))); + return *this; +} + +ManifestGroup& ManifestGroup::FilterPartitions(std::shared_ptr filter) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(partition_filter_, + And::Make(partition_filter_, filter)); + delete_index_builder_.PartitionFilter(std::move(filter)); + return *this; +} + +ManifestGroup& ManifestGroup::FilterManifestEntries( + std::function predicate) { + auto old_predicate = std::move(manifest_entry_predicate_); + manifest_entry_predicate_ = [old_predicate = std::move(old_predicate), + predicate = + std::move(predicate)](const ManifestEntry& entry) { + return old_predicate(entry) && predicate(entry); + }; + return *this; +} + +ManifestGroup& ManifestGroup::IgnoreDeleted() { + ignore_deleted_ = true; + return *this; +} + +ManifestGroup& ManifestGroup::IgnoreExisting() { + ignore_existing_ = true; + return *this; +} + +ManifestGroup& ManifestGroup::IgnoreResiduals() { + ignore_residuals_ = true; + delete_index_builder_.IgnoreResiduals(); + return *this; +} + +ManifestGroup& ManifestGroup::Select(std::vector columns) { + columns_ = std::move(columns); + return *this; +} + +ManifestGroup& ManifestGroup::CaseSensitive(bool case_sensitive) { + case_sensitive_ = case_sensitive; + delete_index_builder_.CaseSensitive(case_sensitive); + return *this; +} + +ManifestGroup& ManifestGroup::ColumnsToKeepStats(std::unordered_set column_ids) { + columns_to_keep_stats_ = std::move(column_ids); + return *this; +} + +Result>> ManifestGroup::PlanFiles() { + auto create_file_scan_tasks = + [this](std::vector&& entries, + const TaskContext& ctx) -> Result>> { + std::vector> tasks; + tasks.reserve(entries.size()); + + for (auto& entry : entries) { + if (ctx.drop_stats) { + ContentFileUtil::DropAllStats(*entry.data_file); + } else if (!ctx.columns_to_keep_stats.empty()) { + ContentFileUtil::DropUnselectedStats(*entry.data_file, ctx.columns_to_keep_stats); + } + ICEBERG_ASSIGN_OR_RAISE(auto delete_files, ctx.deletes->ForEntry(entry)); + ICEBERG_ASSIGN_OR_RAISE(auto residual, + ctx.residuals->ResidualFor(entry.data_file->partition)); + tasks.push_back(std::make_shared( + std::move(entry.data_file), std::move(delete_files), std::move(residual))); + } + + return tasks; + }; + + ICEBERG_ASSIGN_OR_RAISE(auto tasks, Plan(create_file_scan_tasks)); + + // Convert ScanTask to FileScanTask + std::vector> file_tasks; + file_tasks.reserve(tasks.size()); + for (auto& task : tasks) { + file_tasks.push_back(std::static_pointer_cast(task)); + } + return file_tasks; +} + +Result>> ManifestGroup::Plan( + const CreateTasksFunction& create_tasks) { + std::unordered_map> residual_cache; + auto get_residual_evaluator = [&](int32_t spec_id) -> Result { + if (residual_cache.contains(spec_id)) { + return residual_cache[spec_id].get(); + } + + auto spec_iter = specs_by_id_.find(spec_id); + ICEBERG_CHECK(spec_iter != specs_by_id_.cend(), + "Cannot find partition spec for ID {}", spec_id); + + const auto& spec = spec_iter->second; + ICEBERG_ASSIGN_OR_RAISE( + auto residual_evaluator, + ResidualEvaluator::Make((ignore_residuals_ ? True::Instance() : data_filter_), + *spec, *schema_, case_sensitive_)); + residual_cache[spec_id] = std::move(residual_evaluator); + + return residual_cache[spec_id].get(); + }; + + ICEBERG_ASSIGN_OR_RAISE(auto delete_index, delete_index_builder_.Build()); + + bool drop_stats = ManifestReader::ShouldDropStats(columns_); + if (delete_index->has_equality_deletes()) { + columns_ = ManifestReader::WithStatsColumns(columns_); + } + + std::unordered_map> task_context_cache; + auto get_task_context = [&](int32_t spec_id) -> Result { + if (task_context_cache.contains(spec_id)) { + return task_context_cache[spec_id].get(); + } + + auto spec_iter = specs_by_id_.find(spec_id); + ICEBERG_CHECK(spec_iter != specs_by_id_.cend(), + "Cannot find partition spec for ID {}", spec_id); + + const auto& spec = spec_iter->second; + ICEBERG_ASSIGN_OR_RAISE(auto residuals, get_residual_evaluator(spec_id)); + task_context_cache[spec_id] = std::make_unique( + TaskContext{.spec = spec, + .deletes = delete_index.get(), + .residuals = residuals, + .drop_stats = drop_stats, + .columns_to_keep_stats = columns_to_keep_stats_}); + + return task_context_cache[spec_id].get(); + }; + + ICEBERG_ASSIGN_OR_RAISE(auto entry_groups, ReadEntries()); + + std::vector> all_tasks; + for (auto& [spec_id, entries] : entry_groups) { + ICEBERG_ASSIGN_OR_RAISE(auto ctx, get_task_context(spec_id)); + ICEBERG_ASSIGN_OR_RAISE(auto tasks, create_tasks(std::move(entries), *ctx)); + all_tasks.insert(all_tasks.end(), std::make_move_iterator(tasks.begin()), + std::make_move_iterator(tasks.end())); + } + + return all_tasks; +} + +Result> ManifestGroup::Entries() { + ICEBERG_ASSIGN_OR_RAISE(auto entry_groups, ReadEntries()); + + std::vector all_entries; + for (auto& [_, entries] : entry_groups) { + all_entries.insert(all_entries.end(), std::make_move_iterator(entries.begin()), + std::make_move_iterator(entries.end())); + } + + return all_entries; +} + +Result> ManifestGroup::MakeReader( + const ManifestFile& manifest) { + auto spec_it = specs_by_id_.find(manifest.partition_spec_id); + if (spec_it == specs_by_id_.end()) { + return InvalidArgument("Partition spec {} not found for manifest {}", + manifest.partition_spec_id, manifest.manifest_path); + } + + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, io_, schema_, spec_it->second)); + + reader->FilterRows(data_filter_) + .FilterPartitions(partition_filter_) + .CaseSensitive(case_sensitive_) + .Select(columns_); + + return reader; +} + +Result>> +ManifestGroup::ReadEntries() { + std::unordered_map> eval_cache; + auto get_manifest_evaluator = [&](int32_t spec_id) -> Result { + if (eval_cache.contains(spec_id)) { + return eval_cache[spec_id].get(); + } + + auto spec_iter = specs_by_id_.find(spec_id); + ICEBERG_CHECK(spec_iter != specs_by_id_.cend(), + "Cannot find partition spec for ID {}", spec_id); + + const auto& spec = spec_iter->second; + auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_); + ICEBERG_ASSIGN_OR_RAISE(auto partition_filter, projector->Project(data_filter_)); + ICEBERG_ASSIGN_OR_RAISE(partition_filter, + And::Make(partition_filter, partition_filter_)); + ICEBERG_ASSIGN_OR_RAISE( + auto manifest_evaluator, + ManifestEvaluator::MakePartitionFilter(std::move(partition_filter), spec, + *schema_, case_sensitive_)); + eval_cache[spec_id] = std::move(manifest_evaluator); + + return eval_cache[spec_id].get(); + }; + + std::unique_ptr data_file_evaluator; + if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue) { + // TODO(gangwu): create an Evaluator on the DataFile schema with empty + // partition type + } + + std::unordered_map> result; + + // TODO(gangwu): Parallelize reading manifests + for (const auto& manifest : data_manifests_) { + const int32_t spec_id = manifest.partition_spec_id; + + ICEBERG_ASSIGN_OR_RAISE(auto manifest_evaluator, get_manifest_evaluator(spec_id)); + ICEBERG_ASSIGN_OR_RAISE(bool should_match, manifest_evaluator->Evaluate(manifest)); + if (!should_match) { + continue; // Skip this manifest because it doesn't match partition filter + } + + if (ignore_deleted_) { + // only scan manifests that have entries other than deletes + if (!manifest.has_added_files() && !manifest.has_existing_files()) { + continue; + } + } + + if (ignore_existing_) { + // only scan manifests that have entries other than existing + if (!manifest.has_added_files() && !manifest.has_deleted_files()) { + continue; + } + } + + // Read manifest entries + ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest)); + ICEBERG_ASSIGN_OR_RAISE(auto entries, + ignore_deleted_ ? reader->LiveEntries() : reader->Entries()); + + for (auto& entry : entries) { + if (ignore_existing_ && entry.status == ManifestStatus::kExisting) { + continue; + } + + if (data_file_evaluator != nullptr) { + // TODO(gangwu): implement data_file_evaluator to evaluate StructLike on + // top of entry.data_file + } + + if (!manifest_entry_predicate_(entry)) { + continue; + } + + result[spec_id].push_back(std::move(entry)); + } + } + + return result; +} + +} // namespace iceberg diff --git a/src/iceberg/manifest/manifest_group.h b/src/iceberg/manifest/manifest_group.h new file mode 100644 index 000000000..10b552786 --- /dev/null +++ b/src/iceberg/manifest/manifest_group.h @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/manifest_group.h +/// Coordinates reading manifest files and producing scan tasks. + +#include +#include +#include +#include +#include +#include + +#include "iceberg/delete_file_index.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/util/error_collector.h" + +namespace iceberg { + +/// \brief Context passed to task creation functions. +struct ICEBERG_EXPORT TaskContext { + public: + std::shared_ptr spec; + DeleteFileIndex* deletes; + ResidualEvaluator* residuals; + bool drop_stats; + std::unordered_set columns_to_keep_stats; +}; + +/// \brief Coordinates reading manifest files and producing scan tasks. +class ICEBERG_EXPORT ManifestGroup : public ErrorCollector { + public: + /// \brief Construct a ManifestGroup with a list of manifests. + /// + /// \param io FileIO for reading manifest files. + /// \param schema Current table schema. + /// \param specs_by_id Mapping of partition spec ID to PartitionSpec. + /// \param manifests List of manifest files to process. + static Result> Make( + std::shared_ptr io, std::shared_ptr schema, + std::unordered_map> specs_by_id_, + std::vector manifests); + + /// \brief Construct a ManifestGroup with pre-separated manifests. + /// + /// \param io FileIO for reading manifest files. + /// \param schema Current table schema. + /// \param specs_by_id Mapping of partition spec ID to PartitionSpec. + /// \param data_manifests List of data manifest files. + /// \param delete_manifests List of delete manifest files. + static Result> Make( + std::shared_ptr io, std::shared_ptr schema, + std::unordered_map> specs_by_id, + std::vector data_manifests, + std::vector delete_manifests); + + ~ManifestGroup() override; + + ManifestGroup(ManifestGroup&&) noexcept; + ManifestGroup& operator=(ManifestGroup&&) noexcept; + ManifestGroup(const ManifestGroup&) = delete; + ManifestGroup& operator=(const ManifestGroup&) = delete; + + /// \brief Set a row-level data filter. + ManifestGroup& FilterData(std::shared_ptr filter); + + /// \brief Set a filter that is evaluated against each DataFile's metadata. + ManifestGroup& FilterFiles(std::shared_ptr filter); + + /// \brief Set a partition filter expression. + ManifestGroup& FilterPartitions(std::shared_ptr filter); + + /// \brief Set a custom manifest entry filter predicate. + /// + /// \param predicate A function that returns true if the entry should be included. + ManifestGroup& FilterManifestEntries( + std::function predicate); + + /// \brief Ignore deleted entries in manifests. + ManifestGroup& IgnoreDeleted(); + + /// \brief Ignore existing entries in manifests. + ManifestGroup& IgnoreExisting(); + + /// \brief Ignore residual filter computation. + ManifestGroup& IgnoreResiduals(); + + /// \brief Select specific columns from manifest entries. + /// + /// \param columns Column names to select from manifest entries. + ManifestGroup& Select(std::vector columns); + + /// \brief Set case sensitivity for column name matching. + ManifestGroup& CaseSensitive(bool case_sensitive); + + /// \brief Specify columns that should retain their statistics. + /// + /// \param column_ids Field IDs of columns whose statistics should be preserved. + ManifestGroup& ColumnsToKeepStats(std::unordered_set column_ids); + + /// \brief Plan scan tasks for all matching data files. + Result>> PlanFiles(); + + /// \brief Get all matching manifest entries. + Result> Entries(); + + using CreateTasksFunction = + std::function>>( + std::vector&&, const TaskContext&)>; + + /// \brief Plan tasks using a custom task creation function. + /// + /// \param create_tasks A function that creates ScanTasks from entries and context. + /// \return A list of ScanTask objects, or error on failure. + Result>> Plan( + const CreateTasksFunction& create_tasks); + + private: + ManifestGroup(std::shared_ptr io, std::shared_ptr schema, + std::unordered_map> specs_by_id, + std::vector data_manifests, + DeleteFileIndex::Builder&& delete_index_builder); + + Result>> ReadEntries(); + + Result> MakeReader(const ManifestFile& manifest); + + std::shared_ptr io_; + std::shared_ptr schema_; + std::unordered_map> specs_by_id_; + std::vector data_manifests_; + DeleteFileIndex::Builder delete_index_builder_; + std::shared_ptr data_filter_; + std::shared_ptr file_filter_; + std::shared_ptr partition_filter_; + std::function manifest_entry_predicate_; + std::vector columns_; + std::unordered_set columns_to_keep_stats_; + bool case_sensitive_ = true; + bool ignore_deleted_ = false; + bool ignore_existing_ = false; + bool ignore_residuals_ = false; +}; + +} // namespace iceberg diff --git a/src/iceberg/manifest/manifest_writer.cc b/src/iceberg/manifest/manifest_writer.cc index c30f9f75b..649797fee 100644 --- a/src/iceberg/manifest/manifest_writer.cc +++ b/src/iceberg/manifest/manifest_writer.cc @@ -21,6 +21,7 @@ #include +#include "iceberg/constants.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" #include "iceberg/manifest/v1_metadata_internal.h" @@ -233,11 +234,12 @@ Result ManifestWriter::ToManifestFile() const { .manifest_length = manifest_length, .partition_spec_id = adapter_->partition_spec()->spec_id(), .content = adapter_->content(), - // sequence_number and min_sequence_number with kInvalidSequenceNumber will be - // replace with real sequence number in `ManifestListWriter`. - .sequence_number = TableMetadata::kInvalidSequenceNumber, - .min_sequence_number = - min_sequence_number_.value_or(TableMetadata::kInvalidSequenceNumber), + // sequence_number with kUnassignedSequenceNumber will be assigned when committed. + .sequence_number = kUnassignedSequenceNumber, + // if the min_sequence_number_ is missing, then no manifests with a sequence number + // have been written, so the min data sequence number is the one that will be + // assigned when this is committed. pass kUnassignedSequenceNumber to inherit it. + .min_sequence_number = min_sequence_number_.value_or(kUnassignedSequenceNumber), .added_snapshot_id = adapter_->snapshot_id().value_or(kInvalidSnapshotId), .added_files_count = add_files_count_, .existing_files_count = existing_files_count_, diff --git a/src/iceberg/manifest/meson.build b/src/iceberg/manifest/meson.build index 00f93c599..41e685ffc 100644 --- a/src/iceberg/manifest/meson.build +++ b/src/iceberg/manifest/meson.build @@ -18,6 +18,7 @@ install_headers( [ 'manifest_entry.h', + 'manifest_group.h', 'manifest_list.h', 'manifest_reader.h', 'manifest_writer.h', diff --git a/src/iceberg/manifest/v2_metadata.cc b/src/iceberg/manifest/v2_metadata.cc index 737fa62fe..e5c54fca4 100644 --- a/src/iceberg/manifest/v2_metadata.cc +++ b/src/iceberg/manifest/v2_metadata.cc @@ -17,6 +17,7 @@ * under the License. */ +#include "iceberg/constants.h" #include "iceberg/json_internal.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" @@ -157,7 +158,7 @@ Status ManifestFileAdapterV2::Append(const ManifestFile& file) { } Result ManifestFileAdapterV2::GetSequenceNumber(const ManifestFile& file) const { - if (file.sequence_number == TableMetadata::kInvalidSequenceNumber) { + if (file.sequence_number == kUnassignedSequenceNumber) { // if the sequence number is being assigned here, then the manifest must be created by // the current operation. to validate this, check that the snapshot id matches the // current commit @@ -173,7 +174,7 @@ Result ManifestFileAdapterV2::GetSequenceNumber(const ManifestFile& fil Result ManifestFileAdapterV2::GetMinSequenceNumber( const ManifestFile& file) const { - if (file.min_sequence_number == TableMetadata::kInvalidSequenceNumber) { + if (file.min_sequence_number == kUnassignedSequenceNumber) { // same sanity check as above if (snapshot_id_ != file.added_snapshot_id) { return InvalidManifestList( diff --git a/src/iceberg/manifest/v3_metadata.cc b/src/iceberg/manifest/v3_metadata.cc index 87d4d77d9..35a1a89dd 100644 --- a/src/iceberg/manifest/v3_metadata.cc +++ b/src/iceberg/manifest/v3_metadata.cc @@ -20,6 +20,7 @@ #include #include +#include "iceberg/constants.h" #include "iceberg/json_internal.h" #include "iceberg/manifest/manifest_entry.h" #include "iceberg/manifest/manifest_list.h" @@ -205,7 +206,7 @@ Status ManifestFileAdapterV3::Append(const ManifestFile& file) { } Result ManifestFileAdapterV3::GetSequenceNumber(const ManifestFile& file) const { - if (file.sequence_number == TableMetadata::kInvalidSequenceNumber) { + if (file.sequence_number == kUnassignedSequenceNumber) { // if the sequence number is being assigned here, then the manifest must be created by // the current operation. to validate this, check that the snapshot id matches the // current commit @@ -221,7 +222,7 @@ Result ManifestFileAdapterV3::GetSequenceNumber(const ManifestFile& fil Result ManifestFileAdapterV3::GetMinSequenceNumber( const ManifestFile& file) const { - if (file.min_sequence_number == TableMetadata::kInvalidSequenceNumber) { + if (file.min_sequence_number == kUnassignedSequenceNumber) { // same sanity check as above if (snapshot_id_ != file.added_snapshot_id) { return InvalidManifestList( diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 3929e1803..e8a825abf 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -63,6 +63,7 @@ iceberg_sources = files( 'json_internal.cc', 'manifest/manifest_adapter.cc', 'manifest/manifest_entry.cc', + 'manifest/manifest_group.cc', 'manifest/manifest_list.cc', 'manifest/manifest_reader.cc', 'manifest/manifest_writer.cc', diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 6918de825..700cab1f5 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -31,7 +31,6 @@ #include "iceberg/schema_field.h" #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" -#include "iceberg/type.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -136,12 +135,29 @@ Result MakeArrowArrayStream(std::unique_ptr reader) { } // namespace -// implement FileScanTask -FileScanTask::FileScanTask(std::shared_ptr data_file) - : data_file_(std::move(data_file)) {} +// FileScanTask implementation + +FileScanTask::FileScanTask(std::shared_ptr data_file, + std::vector> delete_files, + std::shared_ptr residual_filter) + : data_file_(std::move(data_file)), + delete_files_(std::move(delete_files)), + residual_filter_(std::move(residual_filter)) {} const std::shared_ptr& FileScanTask::data_file() const { return data_file_; } +const std::vector>& FileScanTask::delete_files() const { + return delete_files_; +} + +const std::shared_ptr& FileScanTask::residual_filter() const { + return residual_filter_; +} + +bool FileScanTask::has_deletes() const { return !delete_files_.empty(); } + +bool FileScanTask::has_residual_filter() const { return residual_filter_ != nullptr; } + int64_t FileScanTask::size_bytes() const { return data_file_->file_size_in_bytes; } int32_t FileScanTask::files_count() const { return 1; } @@ -151,6 +167,10 @@ int64_t FileScanTask::estimated_row_count() const { return data_file_->record_co Result FileScanTask::ToArrow( const std::shared_ptr& io, const std::shared_ptr& projected_schema, const std::shared_ptr& filter) const { + if (has_deletes()) { + return NotSupported("Reading data files with delete files is not yet supported."); + } + const ReaderOptions options{.path = data_file_->file_path, .length = data_file_->file_size_in_bytes, .io = io, diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index c3b1b28f4..4f2ddfde2 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -46,11 +46,30 @@ class ICEBERG_EXPORT ScanTask { /// \brief Task representing a data file and its corresponding delete files. class ICEBERG_EXPORT FileScanTask : public ScanTask { public: - explicit FileScanTask(std::shared_ptr data_file); + /// \brief Construct with data file, delete files, and residual filter. + /// + /// \param data_file The data file to read. + /// \param delete_files Delete files that apply to this data file. + /// \param residual_filter Optional residual filter to apply after reading. + explicit FileScanTask(std::shared_ptr data_file, + std::vector> delete_files = {}, + std::shared_ptr residual_filter = nullptr); /// \brief The data file that should be read by this scan task. const std::shared_ptr& data_file() const; + /// \brief Delete files that apply to this data file. + const std::vector>& delete_files() const; + + /// \brief Residual filter to apply after reading. + const std::shared_ptr& residual_filter() const; + + /// \brief Check if any deletes need to be applied. + bool has_deletes() const; + + /// \brief Check if a residual filter needs to be applied. + bool has_residual_filter() const; + int64_t size_bytes() const override; int32_t files_count() const override; int64_t estimated_row_count() const override; @@ -70,6 +89,10 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask { private: /// \brief Data file metadata. std::shared_ptr data_file_; + /// \brief Delete files that apply to this data file. + std::vector> delete_files_; + /// \brief Residual filter to apply after reading. + std::shared_ptr residual_filter_; }; /// \brief Scan context holding snapshot and scan-specific metadata. diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 731fe0af6..a32bbe4de 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -143,6 +143,7 @@ if(ICEBERG_BUILD_BUNDLE) USE_BUNDLE SOURCES delete_file_index_test.cc + manifest_group_test.cc manifest_list_versions_test.cc manifest_reader_stats_test.cc manifest_reader_test.cc diff --git a/src/iceberg/test/delete_file_index_test.cc b/src/iceberg/test/delete_file_index_test.cc index e091f5ea2..8c87772ed 100644 --- a/src/iceberg/test/delete_file_index_test.cc +++ b/src/iceberg/test/delete_file_index_test.cc @@ -198,9 +198,9 @@ class DeleteFileIndexTest : public testing::TestWithParam { Result> BuildIndex( std::vector delete_manifests, std::optional after_sequence_number = std::nullopt) { - ICEBERG_ASSIGN_OR_RAISE( - auto builder, DeleteFileIndex::BuilderFor(file_io_, std::move(delete_manifests))); - builder.SpecsById(GetSpecsById()).WithSchema(schema_); + ICEBERG_ASSIGN_OR_RAISE(auto builder, + DeleteFileIndex::BuilderFor(file_io_, schema_, GetSpecsById(), + std::move(delete_manifests))); if (after_sequence_number.has_value()) { builder.AfterSequenceNumber(after_sequence_number.value()); } diff --git a/src/iceberg/test/manifest_group_test.cc b/src/iceberg/test/manifest_group_test.cc new file mode 100644 index 000000000..32466c228 --- /dev/null +++ b/src/iceberg/test/manifest_group_test.cc @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/manifest_group.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/avro/avro_register.h" +#include "iceberg/expression/expressions.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/table_scan.h" +#include "iceberg/test/matchers.h" +#include "iceberg/transform.h" +#include "iceberg/type.h" + +namespace iceberg { + +class ManifestGroupTest : public testing::TestWithParam { + protected: + void SetUp() override { + avro::RegisterAll(); + + file_io_ = arrow::MakeMockFileIO(); + + // Schema with id and data fields + schema_ = std::make_shared(std::vector{ + SchemaField::MakeRequired(/*field_id=*/1, "id", int32()), + SchemaField::MakeRequired(/*field_id=*/2, "data", string())}); + + // Partitioned spec: bucket by data + ICEBERG_UNWRAP_OR_FAIL( + partitioned_spec_, + PartitionSpec::Make( + /*spec_id=*/1, {PartitionField(/*source_id=*/2, /*field_id=*/1000, + "data_bucket_16_2", Transform::Bucket(16))})); + + // Unpartitioned spec + unpartitioned_spec_ = PartitionSpec::Unpartitioned(); + } + + std::string MakeManifestPath() { + static int counter = 0; + return std::format("manifest-{}-{}.avro", counter++, + std::chrono::system_clock::now().time_since_epoch().count()); + } + + std::shared_ptr MakeDataFile(const std::string& path, + const PartitionValues& partition, + int32_t spec_id, int64_t record_count = 1) { + return std::make_shared(DataFile{ + .file_path = path, + .file_format = FileFormatType::kParquet, + .partition = partition, + .record_count = record_count, + .file_size_in_bytes = 10, + .sort_order_id = 0, + .partition_spec_id = spec_id, + }); + } + + std::shared_ptr MakePositionDeleteFile( + const std::string& path, const PartitionValues& partition, int32_t spec_id, + std::optional referenced_file = std::nullopt) { + return std::make_shared(DataFile{ + .content = DataFile::Content::kPositionDeletes, + .file_path = path, + .file_format = FileFormatType::kParquet, + .partition = partition, + .record_count = 1, + .file_size_in_bytes = 10, + .partition_spec_id = spec_id, + .referenced_data_file = referenced_file, + }); + } + + std::shared_ptr MakeEqualityDeleteFile(const std::string& path, + const PartitionValues& partition, + int32_t spec_id, + std::vector equality_ids = {1}) { + return std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = path, + .file_format = FileFormatType::kParquet, + .partition = partition, + .record_count = 1, + .file_size_in_bytes = 10, + .equality_ids = std::move(equality_ids), + .partition_spec_id = spec_id, + }); + } + + ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id, + int64_t sequence_number, std::shared_ptr file) { + return ManifestEntry{ + .status = status, + .snapshot_id = snapshot_id, + .sequence_number = sequence_number, + .file_sequence_number = sequence_number, + .data_file = std::move(file), + }; + } + + ManifestFile WriteDataManifest(int format_version, int64_t snapshot_id, + std::vector entries, + std::shared_ptr spec) { + const std::string manifest_path = MakeManifestPath(); + + Result> writer_result = + NotSupported("Format version: {}", format_version); + + if (format_version == 1) { + writer_result = ManifestWriter::MakeV1Writer(snapshot_id, manifest_path, file_io_, + spec, schema_); + } else if (format_version == 2) { + writer_result = ManifestWriter::MakeV2Writer(snapshot_id, manifest_path, file_io_, + spec, schema_, ManifestContent::kData); + } else if (format_version == 3) { + writer_result = + ManifestWriter::MakeV3Writer(snapshot_id, /*first_row_id=*/0L, manifest_path, + file_io_, spec, schema_, ManifestContent::kData); + } + + EXPECT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + for (const auto& entry : entries) { + EXPECT_THAT(writer->WriteEntry(entry), IsOk()); + } + + EXPECT_THAT(writer->Close(), IsOk()); + auto manifest_result = writer->ToManifestFile(); + EXPECT_THAT(manifest_result, IsOk()); + return std::move(manifest_result.value()); + } + + ManifestFile WriteDeleteManifest(int format_version, int64_t snapshot_id, + std::vector entries, + std::shared_ptr spec) { + const std::string manifest_path = MakeManifestPath(); + + Result> writer_result = + NotSupported("Format version: {}", format_version); + + if (format_version == 2) { + writer_result = ManifestWriter::MakeV2Writer( + snapshot_id, manifest_path, file_io_, spec, schema_, ManifestContent::kDeletes); + } else if (format_version == 3) { + writer_result = ManifestWriter::MakeV3Writer( + snapshot_id, /*first_row_id=*/std::nullopt, manifest_path, file_io_, spec, + schema_, ManifestContent::kDeletes); + } + + EXPECT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + for (const auto& entry : entries) { + EXPECT_THAT(writer->WriteEntry(entry), IsOk()); + } + + EXPECT_THAT(writer->Close(), IsOk()); + auto manifest_result = writer->ToManifestFile(); + EXPECT_THAT(manifest_result, IsOk()); + return std::move(manifest_result.value()); + } + + std::unordered_map> GetSpecsById() { + return {{partitioned_spec_->spec_id(), partitioned_spec_}, + {unpartitioned_spec_->spec_id(), unpartitioned_spec_}}; + } + + std::string MakeManifestListPath() { + static int counter = 0; + return std::format("manifest-list-{}-{}.avro", counter++, + std::chrono::system_clock::now().time_since_epoch().count()); + } + + // Write a ManifestFile to a manifest list and read it back. This is useful for v1 + // to populate all missing fields like sequence_number. + ManifestFile WriteAndReadManifestListEntry(int format_version, int64_t snapshot_id, + int64_t sequence_number, + const ManifestFile& manifest) { + const std::string manifest_list_path = MakeManifestListPath(); + constexpr int64_t kParentSnapshotId = 0L; + constexpr int64_t kSnapshotFirstRowId = 0L; + + Result> writer_result = + NotSupported("Format version: {}", format_version); + + if (format_version == 1) { + writer_result = ManifestListWriter::MakeV1Writer(snapshot_id, kParentSnapshotId, + manifest_list_path, file_io_); + } else if (format_version == 2) { + writer_result = ManifestListWriter::MakeV2Writer( + snapshot_id, kParentSnapshotId, sequence_number, manifest_list_path, file_io_); + } else if (format_version == 3) { + writer_result = ManifestListWriter::MakeV3Writer( + snapshot_id, kParentSnapshotId, sequence_number, kSnapshotFirstRowId, + manifest_list_path, file_io_); + } + + EXPECT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + EXPECT_THAT(writer->Add(manifest), IsOk()); + EXPECT_THAT(writer->Close(), IsOk()); + + auto reader_result = ManifestListReader::Make(manifest_list_path, file_io_); + EXPECT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + auto files_result = reader->Files(); + EXPECT_THAT(files_result, IsOk()); + + auto manifests = files_result.value(); + EXPECT_EQ(manifests.size(), 1); + return manifests[0]; + } + + static std::vector GetPaths( + const std::vector>& tasks) { + return tasks | std::views::transform([](const auto& task) { + return task->data_file()->file_path; + }) | + std::ranges::to>(); + } + + static std::vector GetEntryPaths( + const std::vector& entries) { + return entries | std::views::transform([](const auto& entry) { + return entry.data_file->file_path; + }) | + std::ranges::to>(); + } + + std::shared_ptr file_io_; + std::shared_ptr schema_; + std::shared_ptr partitioned_spec_; + std::shared_ptr unpartitioned_spec_; +}; + +TEST_P(ManifestGroupTest, CreateAndGetEntries) { + int version = GetParam(); + if (version < 2) { + GTEST_SKIP() << "Delete files only supported in V2+"; + } + + constexpr int64_t kSnapshotId = 1000L; + const auto part_value = PartitionValues({Literal::Int(0)}); + + // Create data manifests + std::vector data_entries{ + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/data1.parquet", part_value, + partitioned_spec_->spec_id())), + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/data2.parquet", part_value, + partitioned_spec_->spec_id()))}; + auto data_manifest = + WriteDataManifest(version, kSnapshotId, std::move(data_entries), partitioned_spec_); + + // Create delete manifest + std::vector delete_entries{ + MakeEntry(ManifestStatus::kAdded, kSnapshotId, + /*sequence_number=*/2, + MakePositionDeleteFile("/path/to/delete.parquet", part_value, + partitioned_spec_->spec_id()))}; + auto delete_manifest = WriteDeleteManifest( + version, kSnapshotId, std::move(delete_entries), partitioned_spec_); + + // Create ManifestGroup with pre-separated manifests + std::vector data_manifests = {data_manifest}; + std::vector delete_manifests = {delete_manifest}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(data_manifests), + std::move(delete_manifests))); + + // Verify Entries() returns only data file entries + ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries()); + ASSERT_EQ(entries.size(), 2); + EXPECT_THAT( + GetEntryPaths(entries), + testing::UnorderedElementsAre("/path/to/data1.parquet", "/path/to/data2.parquet")); + + // Verify PlanFiles returns data files with associated delete files + ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles()); + ASSERT_EQ(tasks.size(), 2); + EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre("/path/to/data1.parquet", + "/path/to/data2.parquet")); + EXPECT_EQ(tasks[0]->delete_files().size(), 1); + EXPECT_EQ(tasks[1]->delete_files()[0]->file_path, "/path/to/delete.parquet"); + EXPECT_EQ(tasks[1]->delete_files().size(), 1); + EXPECT_EQ(tasks[1]->delete_files()[0]->file_path, "/path/to/delete.parquet"); +} + +TEST_P(ManifestGroupTest, IgnoreDeleted) { + int version = GetParam(); + + constexpr int64_t kSnapshotId = 1000L; + constexpr int64_t kSequenceNumber = 1L; + const auto part_value = PartitionValues({Literal::Int(0)}); + + // Create data manifest with ADDED and DELETED entries + std::vector data_entries{ + MakeEntry(ManifestStatus::kAdded, kSnapshotId, kSequenceNumber, + MakeDataFile("/path/to/added.parquet", part_value, + partitioned_spec_->spec_id())), + MakeEntry(ManifestStatus::kDeleted, kSnapshotId, kSequenceNumber, + MakeDataFile("/path/to/deleted.parquet", part_value, + partitioned_spec_->spec_id())), + MakeEntry(ManifestStatus::kExisting, kSnapshotId, kSequenceNumber, + MakeDataFile("/path/to/existing.parquet", part_value, + partitioned_spec_->spec_id()))}; + auto data_manifest = + WriteDataManifest(version, kSnapshotId, std::move(data_entries), partitioned_spec_); + auto read_back_manifest = + WriteAndReadManifestListEntry(version, kSnapshotId, kSequenceNumber, data_manifest); + + // Create ManifestGroup with IgnoreDeleted + std::vector manifests = {read_back_manifest}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + group->IgnoreDeleted(); + + // Plan files - should only return ADDED and EXISTING + ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles()); + ASSERT_EQ(tasks.size(), 2); + EXPECT_THAT(GetPaths(tasks), + testing::UnorderedElementsAre("/path/to/added.parquet", + "/path/to/existing.parquet")); +} + +TEST_P(ManifestGroupTest, IgnoreExisting) { + int version = GetParam(); + + constexpr int64_t kSnapshotId = 1000L; + constexpr int64_t kSequenceNumber = 1L; + const auto part_value = PartitionValues({Literal::Int(0)}); + + // Create data manifest with ADDED and EXISTING entries + std::vector data_entries{ + MakeEntry(ManifestStatus::kAdded, kSnapshotId, kSequenceNumber, + MakeDataFile("/path/to/added.parquet", part_value, + partitioned_spec_->spec_id())), + MakeEntry(ManifestStatus::kExisting, kSnapshotId, kSequenceNumber, + MakeDataFile("/path/to/existing.parquet", part_value, + partitioned_spec_->spec_id())), + MakeEntry(ManifestStatus::kDeleted, kSnapshotId, kSequenceNumber, + MakeDataFile("/path/to/deleted.parquet", part_value, + partitioned_spec_->spec_id()))}; + + auto data_manifest = + WriteDataManifest(version, kSnapshotId, std::move(data_entries), partitioned_spec_); + auto read_back_manifest = + WriteAndReadManifestListEntry(version, kSnapshotId, kSequenceNumber, data_manifest); + + // Create ManifestGroup with IgnoreExisting + std::vector manifests = {read_back_manifest}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + group->IgnoreExisting(); + + // Plan files - should only return ADDED + ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles()); + ASSERT_EQ(tasks.size(), 2); + EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre("/path/to/added.parquet", + "/path/to/deleted.parquet")); +} + +TEST_P(ManifestGroupTest, CustomManifestEntriesFilter) { + int version = GetParam(); + + constexpr int64_t kSnapshotId = 1000L; + const auto part_value = PartitionValues({Literal::Int(0)}); + + // Create data manifest with multiple files + std::vector data_entries{ + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/data1.parquet", part_value, + partitioned_spec_->spec_id())), + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/data2.parquet", part_value, + partitioned_spec_->spec_id())), + MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1, + MakeDataFile("/path/to/data3.parquet", part_value, + partitioned_spec_->spec_id()))}; + auto data_manifest = + WriteDataManifest(version, kSnapshotId, std::move(data_entries), partitioned_spec_); + + // Create ManifestGroup with custom entry filter + std::vector manifests = {data_manifest}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + group->FilterManifestEntries([](const ManifestEntry& entry) { + // Only include files with "data1" or "data3" in the path + return entry.data_file->file_path.find("data1") != std::string::npos || + entry.data_file->file_path.find("data3") != std::string::npos; + }); + + // Plan files - should only return filtered entries + ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles()); + ASSERT_EQ(tasks.size(), 2); + EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre("/path/to/data1.parquet", + "/path/to/data3.parquet")); +} + +TEST_P(ManifestGroupTest, EmptyManifestGroup) { + std::vector manifests; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + + ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles()); + EXPECT_TRUE(tasks.empty()); + + ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries()); + EXPECT_TRUE(entries.empty()); +} + +TEST_P(ManifestGroupTest, MultipleDataManifests) { + int version = GetParam(); + + const auto partition_a = PartitionValues({Literal::Int(0)}); + const auto partition_b = PartitionValues({Literal::Int(1)}); + + // Create first data manifest + std::vector data_entries_1{MakeEntry( + ManifestStatus::kAdded, /*snapshot_id=*/1000L, /*sequence_number=*/1, + MakeDataFile("/path/to/data1.parquet", partition_a, partitioned_spec_->spec_id()))}; + auto data_manifest_1 = WriteDataManifest(version, /*snapshot_id=*/1000L, + std::move(data_entries_1), partitioned_spec_); + + // Create second data manifest + std::vector data_entries_2{MakeEntry( + ManifestStatus::kAdded, /*snapshot_id=*/1001L, /*sequence_number=*/2, + MakeDataFile("/path/to/data2.parquet", partition_b, partitioned_spec_->spec_id()))}; + auto data_manifest_2 = WriteDataManifest(version, /*snapshot_id=*/1001L, + std::move(data_entries_2), partitioned_spec_); + + // Create ManifestGroup with multiple manifests + std::vector manifests = {data_manifest_1, data_manifest_2}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + + // Plan files - should return files from both manifests + ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles()); + ASSERT_EQ(tasks.size(), 2); + EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre("/path/to/data1.parquet", + "/path/to/data2.parquet")); +} + +TEST_P(ManifestGroupTest, PartitionFilter) { + int version = GetParam(); + + // Create two files with different partition values (bucket 0 and bucket 1) + const auto partition_bucket_0 = PartitionValues({Literal::Int(0)}); + const auto partition_bucket_1 = PartitionValues({Literal::Int(1)}); + + // Create data manifest with two entries in different partitions + std::vector data_entries{ + MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L, /*sequence_number=*/1, + MakeDataFile("/path/to/bucket0.parquet", partition_bucket_0, + partitioned_spec_->spec_id())), + MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L, /*sequence_number=*/1, + MakeDataFile("/path/to/bucket1.parquet", partition_bucket_1, + partitioned_spec_->spec_id()))}; + auto data_manifest = WriteDataManifest(version, /*snapshot_id=*/1000L, + std::move(data_entries), partitioned_spec_); + + // Create ManifestGroup with partition filter for bucket 0 + std::vector manifests = {data_manifest}; + ICEBERG_UNWRAP_OR_FAIL( + auto group, + ManifestGroup::Make(file_io_, schema_, GetSpecsById(), std::move(manifests))); + + // Filter on partition field name (data_bucket_16_2 = 0) + auto partition_filter = Expressions::Equal("data_bucket_16_2", Literal::Int(0)); + group->FilterPartitions(std::move(partition_filter)); + + // Plan files - should only return the file in bucket 0 + ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles()); + ASSERT_EQ(tasks.size(), 1); + EXPECT_THAT(GetPaths(tasks), testing::ElementsAre("/path/to/bucket0.parquet")); +} + +INSTANTIATE_TEST_SUITE_P(ManifestGroupVersions, ManifestGroupTest, + testing::Values(1, 2, 3)); + +} // namespace iceberg diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 6c7e6b777..65afeb87f 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -130,6 +130,13 @@ class Literal; class Term; class UnboundPredicate; +/// \brief Evaluator. +class Evaluator; +class InclusiveMetricsEvaluator; +class ManifestEvaluator; +class ResidualEvaluator; +class StrictMetricsEvaluator; + /// \brief Scan. class DataTableScan; class FileScanTask; @@ -144,6 +151,7 @@ struct ManifestEntry; struct ManifestFile; struct ManifestList; struct PartitionFieldSummary; +class ManifestGroup; class ManifestListReader; class ManifestListWriter; class ManifestReader; From 4e25c2cefa602b33e846b73411b18be8aa5f6909 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 30 Dec 2025 23:10:25 +0800 Subject: [PATCH 2/2] address feedback --- src/iceberg/constants.h | 2 +- src/iceberg/manifest/manifest_group.cc | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/iceberg/constants.h b/src/iceberg/constants.h index a95144393..89001f09c 100644 --- a/src/iceberg/constants.h +++ b/src/iceberg/constants.h @@ -34,7 +34,7 @@ constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id"; constexpr int64_t kInvalidSnapshotId = -1; /// \brief Stand-in for the current sequence number that will be assigned when the commit /// is successful. This is replaced when writing a manifest list by the ManifestFile -/// wrapper +/// adapter. constexpr int64_t kUnassignedSequenceNumber = -1; // TODO(gangwu): move other commonly used constants here. diff --git a/src/iceberg/manifest/manifest_group.cc b/src/iceberg/manifest/manifest_group.cc index 05f3062f5..220b8585c 100644 --- a/src/iceberg/manifest/manifest_group.cc +++ b/src/iceberg/manifest/manifest_group.cc @@ -31,6 +31,7 @@ #include "iceberg/partition_spec.h" #include "iceberg/schema.h" #include "iceberg/table_scan.h" +#include "iceberg/util/checked_cast.h" #include "iceberg/util/content_file_util.h" #include "iceberg/util/macros.h" @@ -109,8 +110,7 @@ ManifestGroup& ManifestGroup::FilterPartitions(std::shared_ptr filte ManifestGroup& ManifestGroup::FilterManifestEntries( std::function predicate) { - auto old_predicate = std::move(manifest_entry_predicate_); - manifest_entry_predicate_ = [old_predicate = std::move(old_predicate), + manifest_entry_predicate_ = [old_predicate = std::move(manifest_entry_predicate_), predicate = std::move(predicate)](const ManifestEntry& entry) { return old_predicate(entry) && predicate(entry); @@ -179,7 +179,7 @@ Result>> ManifestGroup::PlanFiles() { std::vector> file_tasks; file_tasks.reserve(tasks.size()); for (auto& task : tasks) { - file_tasks.push_back(std::static_pointer_cast(task)); + file_tasks.push_back(internal::checked_pointer_cast(task)); } return file_tasks; } @@ -320,7 +320,8 @@ ManifestGroup::ReadEntries() { ICEBERG_ASSIGN_OR_RAISE(auto manifest_evaluator, get_manifest_evaluator(spec_id)); ICEBERG_ASSIGN_OR_RAISE(bool should_match, manifest_evaluator->Evaluate(manifest)); if (!should_match) { - continue; // Skip this manifest because it doesn't match partition filter + // Skip this manifest because it doesn't match partition filter + continue; } if (ignore_deleted_) {