Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 81 additions & 59 deletions src/iceberg/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
namespace iceberg {

Schema::Schema(std::vector<SchemaField> fields, int32_t schema_id)
: StructType(std::move(fields)), schema_id_(schema_id) {}
: StructType(std::move(fields)),
schema_id_(schema_id),
cache_(std::make_unique<SchemaCache>(this)) {}

Result<std::unique_ptr<Schema>> Schema::Make(std::vector<SchemaField> fields,
int32_t schema_id,
Expand Down Expand Up @@ -156,54 +158,24 @@ bool Schema::Equals(const Schema& other) const {
Result<std::optional<std::reference_wrapper<const SchemaField>>> Schema::FindFieldByName(
std::string_view name, bool case_sensitive) const {
if (case_sensitive) {
ICEBERG_ASSIGN_OR_RAISE(auto name_id_map, name_id_map_.Get(*this));
ICEBERG_ASSIGN_OR_RAISE(auto name_id_map, cache_->GetNameIdMap());
auto it = name_id_map.get().name_to_id.find(name);
if (it == name_id_map.get().name_to_id.end()) {
return std::nullopt;
};
return FindFieldById(it->second);
}
ICEBERG_ASSIGN_OR_RAISE(auto lowercase_name_to_id, lowercase_name_to_id_.Get(*this));
ICEBERG_ASSIGN_OR_RAISE(auto lowercase_name_to_id, cache_->GetLowercaseNameToIdMap());
auto it = lowercase_name_to_id.get().find(StringUtils::ToLower(name));
if (it == lowercase_name_to_id.get().end()) {
return std::nullopt;
}
return FindFieldById(it->second);
}

Result<std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>>
Schema::InitIdToFieldMap(const Schema& self) {
std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>> id_to_field;
IdToFieldVisitor visitor(id_to_field);
ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(self, &visitor));
return id_to_field;
}

Result<Schema::NameIdMap> Schema::InitNameIdMap(const Schema& self) {
NameIdMap name_id_map;
NameToIdVisitor visitor(name_id_map.name_to_id, &name_id_map.id_to_name,
/*case_sensitive=*/true);
ICEBERG_RETURN_UNEXPECTED(
VisitTypeInline(self, &visitor, /*path=*/"", /*short_path=*/""));
visitor.Finish();
return name_id_map;
}

Result<std::unordered_map<std::string, int32_t, StringHash, std::equal_to<>>>
Schema::InitLowerCaseNameToIdMap(const Schema& self) {
std::unordered_map<std::string, int32_t, StringHash, std::equal_to<>>
lowercase_name_to_id;
NameToIdVisitor visitor(lowercase_name_to_id, /*id_to_name=*/nullptr,
/*case_sensitive=*/false);
ICEBERG_RETURN_UNEXPECTED(
VisitTypeInline(self, &visitor, /*path=*/"", /*short_path=*/""));
visitor.Finish();
return lowercase_name_to_id;
}

Result<std::optional<std::reference_wrapper<const SchemaField>>> Schema::FindFieldById(
int32_t field_id) const {
ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this));
ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, cache_->GetIdToFieldMap());
auto it = id_to_field.get().find(field_id);
if (it == id_to_field.get().end()) {
return std::nullopt;
Expand All @@ -213,38 +185,17 @@ Result<std::optional<std::reference_wrapper<const SchemaField>>> Schema::FindFie

Result<std::optional<std::string_view>> Schema::FindColumnNameById(
int32_t field_id) const {
ICEBERG_ASSIGN_OR_RAISE(auto name_id_map, name_id_map_.Get(*this));
ICEBERG_ASSIGN_OR_RAISE(auto name_id_map, cache_->GetNameIdMap());
auto it = name_id_map.get().id_to_name.find(field_id);
if (it == name_id_map.get().id_to_name.end()) {
return std::nullopt;
}
return it->second;
}

Result<std::unordered_map<int32_t, std::vector<size_t>>> Schema::InitIdToPositionPath(
const Schema& self) {
PositionPathVisitor visitor;
ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(self, &visitor));
return visitor.Finish();
}

Result<int32_t> Schema::InitHighestFieldId(const Schema& self) {
ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, self.id_to_field_.Get(self));

if (id_to_field.get().empty()) {
return kInitialColumnId;
}

auto max_it = std::ranges::max_element(
id_to_field.get(),
[](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; });

return max_it->first;
}

Result<std::unique_ptr<StructLikeAccessor>> Schema::GetAccessorById(
int32_t field_id) const {
ICEBERG_ASSIGN_OR_RAISE(auto id_to_position_path, id_to_position_path_.Get(*this));
ICEBERG_ASSIGN_OR_RAISE(auto id_to_position_path, cache_->GetIdToPositionPathMap());
if (auto it = id_to_position_path.get().find(field_id);
it != id_to_position_path.get().cend()) {
ICEBERG_ASSIGN_OR_RAISE(auto field, FindFieldById(field_id));
Expand Down Expand Up @@ -322,15 +273,15 @@ Result<std::vector<std::string>> Schema::IdentifierFieldNames() const {
return names;
}

Result<int32_t> Schema::HighestFieldId() const { return highest_field_id_.Get(*this); }
Result<int32_t> Schema::HighestFieldId() const { return cache_->GetHighestFieldId(); }

bool Schema::SameSchema(const Schema& other) const {
return fields_ == other.fields_ && identifier_field_ids_ == other.identifier_field_ids_;
}

Status Schema::Validate(int32_t format_version) const {
// Get all fields including nested ones
ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this));
ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, cache_->GetIdToFieldMap());

// Check each field's type and defaults
for (const auto& [field_id, field_ref] : id_to_field.get()) {
Expand All @@ -351,4 +302,75 @@ Status Schema::Validate(int32_t format_version) const {
return {};
}

Result<SchemaCache::IdToFieldMapRef> SchemaCache::GetIdToFieldMap() const {
return id_to_field_.Get(schema_);
}

Result<SchemaCache::NameIdMapRef> SchemaCache::GetNameIdMap() const {
return name_id_map_.Get(schema_);
}

Result<SchemaCache::LowercaseNameToIdMapRef> SchemaCache::GetLowercaseNameToIdMap()
const {
return lowercase_name_to_id_.Get(schema_);
}

Result<SchemaCache::IdToPositionPathMapRef> SchemaCache::GetIdToPositionPathMap() const {
return id_to_position_path_.Get(schema_);
}

Result<int32_t> SchemaCache::GetHighestFieldId() const {
return highest_field_id_.Get(schema_);
}

Result<SchemaCache::IdToFieldMap> SchemaCache::InitIdToFieldMap(const Schema* schema) {
std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>> id_to_field;
IdToFieldVisitor visitor(id_to_field);
ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*schema, &visitor));
return id_to_field;
}

Result<SchemaCache::NameIdMap> SchemaCache::InitNameIdMap(const Schema* schema) {
NameIdMap name_id_map;
NameToIdVisitor visitor(name_id_map.name_to_id, &name_id_map.id_to_name,
/*case_sensitive=*/true);
ICEBERG_RETURN_UNEXPECTED(
VisitTypeInline(*schema, &visitor, /*path=*/"", /*short_path=*/""));
visitor.Finish();
return name_id_map;
}

Result<SchemaCache::LowercaseNameToIdMap> SchemaCache::InitLowerCaseNameToIdMap(
const Schema* schema) {
std::unordered_map<std::string, int32_t, StringHash, std::equal_to<>>
lowercase_name_to_id;
NameToIdVisitor visitor(lowercase_name_to_id, /*id_to_name=*/nullptr,
/*case_sensitive=*/false);
ICEBERG_RETURN_UNEXPECTED(
VisitTypeInline(*schema, &visitor, /*path=*/"", /*short_path=*/""));
visitor.Finish();
return lowercase_name_to_id;
}

Result<SchemaCache::IdToPositionPathMap> SchemaCache::InitIdToPositionPath(
const Schema* schema) {
PositionPathVisitor visitor;
ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*schema, &visitor));
return visitor.Finish();
}

Result<int32_t> SchemaCache::InitHighestFieldId(const Schema* schema) {
ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, InitIdToFieldMap(schema));

if (id_to_field.empty()) {
return Schema::kInitialColumnId;
}

auto max_it = std::ranges::max_element(
id_to_field,
[](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; });

return max_it->first;
}

} // namespace iceberg
61 changes: 45 additions & 16 deletions src/iceberg/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <cstdint>
#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

Expand All @@ -38,6 +39,8 @@

namespace iceberg {

class SchemaCache;

/// \brief A schema for a Table.
///
/// A schema is a list of typed columns, along with a unique integer ID. A
Expand Down Expand Up @@ -187,6 +190,22 @@ class ICEBERG_EXPORT Schema : public StructType {
/// \brief Compare two schemas for equality.
bool Equals(const Schema& other) const;

const int32_t schema_id_;
// Field IDs that uniquely identify rows in the table.
std::vector<int32_t> identifier_field_ids_;
// Cache for schema mappings to facilitate fast lookups.
std::unique_ptr<SchemaCache> cache_;
};

// Cache for schema mappings to facilitate fast lookups.
class ICEBERG_EXPORT SchemaCache {
public:
explicit SchemaCache(const Schema* schema) : schema_(schema) {}

using IdToFieldMap =
std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>;
using IdToFieldMapRef = std::reference_wrapper<const IdToFieldMap>;

struct NameIdMap {
/// \brief Mapping from canonical field name to ID
///
Expand All @@ -201,28 +220,38 @@ class ICEBERG_EXPORT Schema : public StructType {
/// 'list.element.field' instead of 'list.field'.
std::unordered_map<int32_t, std::string> id_to_name;
};
using NameIdMapRef = std::reference_wrapper<const NameIdMap>;

static Result<std::unordered_map<int32_t, std::reference_wrapper<const SchemaField>>>
InitIdToFieldMap(const Schema&);
static Result<NameIdMap> InitNameIdMap(const Schema&);
static Result<std::unordered_map<std::string, int32_t, StringHash, std::equal_to<>>>
InitLowerCaseNameToIdMap(const Schema&);
static Result<std::unordered_map<int32_t, std::vector<size_t>>> InitIdToPositionPath(
const Schema&);
static Result<int32_t> InitHighestFieldId(const Schema&);
using LowercaseNameToIdMap =
std::unordered_map<std::string, int32_t, StringHash, std::equal_to<>>;
using LowercaseNameToIdMapRef = std::reference_wrapper<const LowercaseNameToIdMap>;

const int32_t schema_id_;
/// Field IDs that uniquely identify rows in the table.
std::vector<int32_t> identifier_field_ids_;
/// Mapping from field id to field.
using IdToPositionPathMap = std::unordered_map<int32_t, std::vector<size_t>>;
using IdToPositionPathMapRef = std::reference_wrapper<const IdToPositionPathMap>;

Result<IdToFieldMapRef> GetIdToFieldMap() const;
Result<NameIdMapRef> GetNameIdMap() const;
Result<LowercaseNameToIdMapRef> GetLowercaseNameToIdMap() const;
Result<IdToPositionPathMapRef> GetIdToPositionPathMap() const;
Result<int32_t> GetHighestFieldId() const;

private:
static Result<IdToFieldMap> InitIdToFieldMap(const Schema* schema);
static Result<NameIdMap> InitNameIdMap(const Schema* schema);
static Result<LowercaseNameToIdMap> InitLowerCaseNameToIdMap(const Schema* schema);
static Result<IdToPositionPathMap> InitIdToPositionPath(const Schema* schema);
static Result<int32_t> InitHighestFieldId(const Schema* schema);

const Schema* schema_;
// Mapping from field id to field.
Lazy<InitIdToFieldMap> id_to_field_;
/// Mapping from field name to field id.
// Mapping from field name to field id.
Lazy<InitNameIdMap> name_id_map_;
/// Mapping from lowercased field name to field id
// Mapping from lowercased field name to field id.
Lazy<InitLowerCaseNameToIdMap> lowercase_name_to_id_;
/// Mapping from field id to (nested) position path to access the field.
// Mapping from field id to (nested) position path to access the field.
Lazy<InitIdToPositionPath> id_to_position_path_;
/// Highest field ID in the schema.
// Highest field ID in the schema.
Lazy<InitHighestFieldId> highest_field_id_;
};

Expand Down
12 changes: 6 additions & 6 deletions src/iceberg/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ bool Snapshot::Equals(const Snapshot& other) const {
schema_id == other.schema_id;
}

Result<CachedSnapshot::ManifestsCache> CachedSnapshot::InitManifestsCache(
const Snapshot& snapshot, std::shared_ptr<FileIO> file_io) {
Result<SnapshotCache::ManifestsCache> SnapshotCache::InitManifestsCache(
const Snapshot* snapshot, std::shared_ptr<FileIO> file_io) {
if (file_io == nullptr) {
return InvalidArgument("Cannot cache manifests: FileIO is null");
}

// Read manifest list
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ManifestListReader::Make(snapshot.manifest_list, file_io));
ManifestListReader::Make(snapshot->manifest_list, file_io));
ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, reader->Files());

std::vector<ManifestFile> manifests;
Expand All @@ -118,21 +118,21 @@ Result<CachedSnapshot::ManifestsCache> CachedSnapshot::InitManifestsCache(
return std::make_pair(std::move(manifests), data_manifests_count);
}

Result<std::span<ManifestFile>> CachedSnapshot::Manifests(
Result<std::span<ManifestFile>> SnapshotCache::Manifests(
std::shared_ptr<FileIO> file_io) const {
ICEBERG_ASSIGN_OR_RAISE(auto cache_ref, manifests_cache_.Get(snapshot_, file_io));
auto& cache = cache_ref.get();
return std::span<ManifestFile>(cache.first.data(), cache.first.size());
}

Result<std::span<ManifestFile>> CachedSnapshot::DataManifests(
Result<std::span<ManifestFile>> SnapshotCache::DataManifests(
std::shared_ptr<FileIO> file_io) const {
ICEBERG_ASSIGN_OR_RAISE(auto cache_ref, manifests_cache_.Get(snapshot_, file_io));
auto& cache = cache_ref.get();
return std::span<ManifestFile>(cache.first.data(), cache.second);
}

Result<std::span<ManifestFile>> CachedSnapshot::DeleteManifests(
Result<std::span<ManifestFile>> SnapshotCache::DeleteManifests(
std::shared_ptr<FileIO> file_io) const {
ICEBERG_ASSIGN_OR_RAISE(auto cache_ref, manifests_cache_.Get(snapshot_, file_io));
auto& cache = cache_ref.get();
Expand Down
12 changes: 6 additions & 6 deletions src/iceberg/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,13 @@ struct ICEBERG_EXPORT Snapshot {

/// \brief A snapshot with cached manifest loading capabilities.
///
/// This class wraps a Snapshot reference and provides lazy-loading of manifests.
class ICEBERG_EXPORT CachedSnapshot {
/// This class wraps a Snapshot pointer and provides lazy-loading of manifests.
class ICEBERG_EXPORT SnapshotCache {
public:
explicit CachedSnapshot(const Snapshot& snapshot) : snapshot_(snapshot) {}
explicit SnapshotCache(const Snapshot* snapshot) : snapshot_(snapshot) {}

/// \brief Get the underlying Snapshot reference
const Snapshot& snapshot() const { return snapshot_; }
const Snapshot& snapshot() const { return *snapshot_; }

/// \brief Returns all ManifestFile instances for either data or delete manifests
/// in this snapshot.
Expand Down Expand Up @@ -303,11 +303,11 @@ class ICEBERG_EXPORT CachedSnapshot {
/// \param snapshot The snapshot to initialize the manifests cache for
/// \param file_io The FileIO instance to use for reading the manifest list
/// \return A result containing the manifests cache
static Result<ManifestsCache> InitManifestsCache(const Snapshot& snapshot,
static Result<ManifestsCache> InitManifestsCache(const Snapshot* snapshot,
std::shared_ptr<FileIO> file_io);

/// The underlying snapshot data
const Snapshot& snapshot_;
const Snapshot* snapshot_;

/// Lazy-loaded manifests cache
Lazy<InitManifestsCache> manifests_cache_;
Expand Down
Loading