Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 55 additions & 69 deletions cpp/src/parquet/file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ using arrow::internal::AddWithOverflow;

namespace parquet {

using ::arrow::Future;
using ::arrow::Result;
using ::arrow::Status;

namespace {
bool IsColumnChunkFullyDictionaryEncoded(const ColumnChunkMetaData& col) {
// Check the encoding_stats to see if all data pages are dictionary encoded.
Expand Down Expand Up @@ -398,7 +402,7 @@ class SerializedFile : public ParquetFileReader::Contents {
PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges));
}

::arrow::Result<std::vector<::arrow::io::ReadRange>> GetReadRanges(
Result<std::vector<::arrow::io::ReadRange>> GetReadRanges(
const std::vector<int>& row_groups, const std::vector<int>& column_indices,
int64_t hole_size_limit, int64_t range_size_limit) {
std::vector<::arrow::io::ReadRange> ranges;
Expand All @@ -413,10 +417,10 @@ class SerializedFile : public ParquetFileReader::Contents {
range_size_limit);
}

::arrow::Future<> WhenBuffered(const std::vector<int>& row_groups,
const std::vector<int>& column_indices) const {
Future<> WhenBuffered(const std::vector<int>& row_groups,
const std::vector<int>& column_indices) const {
if (!cached_source_) {
return ::arrow::Status::Invalid("Must call PreBuffer before WhenBuffered");
return Status::Invalid("Must call PreBuffer before WhenBuffered");
}
std::vector<::arrow::io::ReadRange> ranges;
for (int row : row_groups) {
Expand Down Expand Up @@ -465,23 +469,8 @@ class SerializedFile : public ParquetFileReader::Contents {
// Fall through
}

const uint32_t read_metadata_len = ParseUnencryptedFileMetadata(
metadata_buffer, metadata_len, std::move(file_decryptor));
auto file_decryption_properties = properties_.file_decryption_properties();
if (is_encrypted_footer) {
// Nothing else to do here.
return;
} else if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file.
if (file_decryption_properties != nullptr) {
if (!file_decryption_properties->plaintext_files_allowed()) {
throw ParquetException("Applying decryption properties on plaintext file");
}
}
} else {
// Encrypted file with plaintext footer mode.
ParseMetaDataOfEncryptedFileWithPlaintextFooter(
file_decryption_properties, metadata_buffer, metadata_len, read_metadata_len);
}
ParseMetaDataFinal(std::move(metadata_buffer), metadata_len, is_encrypted_footer,
std::move(file_decryptor));
}

// Validate the source size and get the initial read size.
Expand Down Expand Up @@ -522,16 +511,15 @@ class SerializedFile : public ParquetFileReader::Contents {
}

// Does not throw.
::arrow::Future<> ParseMetaDataAsync() {
Future<> ParseMetaDataAsync() {
int64_t footer_read_size;
BEGIN_PARQUET_CATCH_EXCEPTIONS
footer_read_size = GetFooterReadSize();
END_PARQUET_CATCH_EXCEPTIONS
// Assumes this is kept alive externally
return source_->ReadAsync(source_size_ - footer_read_size, footer_read_size)
.Then([this,
footer_read_size](const std::shared_ptr<::arrow::Buffer>& footer_buffer)
-> ::arrow::Future<> {
.Then([this, footer_read_size](
const std::shared_ptr<::arrow::Buffer>& footer_buffer) -> Future<> {
uint32_t metadata_len;
BEGIN_PARQUET_CATCH_EXCEPTIONS
metadata_len = ParseFooterLength(footer_buffer, footer_read_size);
Expand All @@ -557,7 +545,7 @@ class SerializedFile : public ParquetFileReader::Contents {
}

// Continuation
::arrow::Future<> ParseMaybeEncryptedMetaDataAsync(
Future<> ParseMaybeEncryptedMetaDataAsync(
std::shared_ptr<::arrow::Buffer> footer_buffer,
std::shared_ptr<::arrow::Buffer> metadata_buffer, int64_t footer_read_size,
uint32_t metadata_len) {
Expand All @@ -580,26 +568,30 @@ class SerializedFile : public ParquetFileReader::Contents {
file_decryptor = std::move(file_decryptor)](
const std::shared_ptr<::arrow::Buffer>& metadata_buffer) {
// Continue and read the file footer
return ParseMetaDataFinal(metadata_buffer, metadata_len, is_encrypted_footer,
file_decryptor);
BEGIN_PARQUET_CATCH_EXCEPTIONS
ParseMetaDataFinal(metadata_buffer, metadata_len, is_encrypted_footer,
file_decryptor);
END_PARQUET_CATCH_EXCEPTIONS
return Status::OK();
});
}
return ParseMetaDataFinal(std::move(metadata_buffer), metadata_len,
is_encrypted_footer, std::move(file_decryptor));
BEGIN_PARQUET_CATCH_EXCEPTIONS
ParseMetaDataFinal(std::move(metadata_buffer), metadata_len, is_encrypted_footer,
std::move(file_decryptor));
END_PARQUET_CATCH_EXCEPTIONS
return Status::OK();
}

// Continuation
::arrow::Status ParseMetaDataFinal(
std::shared_ptr<::arrow::Buffer> metadata_buffer, uint32_t metadata_len,
const bool is_encrypted_footer,
std::shared_ptr<InternalFileDecryptor> file_decryptor) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
void ParseMetaDataFinal(std::shared_ptr<::arrow::Buffer> metadata_buffer,
uint32_t metadata_len, const bool is_encrypted_footer,
std::shared_ptr<InternalFileDecryptor> file_decryptor) {
const uint32_t read_metadata_len = ParseUnencryptedFileMetadata(
metadata_buffer, metadata_len, std::move(file_decryptor));
auto file_decryption_properties = properties_.file_decryption_properties();
if (is_encrypted_footer) {
// Nothing else to do here.
return ::arrow::Status::OK();
return;
} else if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file.
if (file_decryption_properties != nullptr) {
if (!file_decryption_properties->plaintext_files_allowed()) {
Expand All @@ -611,8 +603,6 @@ class SerializedFile : public ParquetFileReader::Contents {
ParseMetaDataOfEncryptedFileWithPlaintextFooter(
file_decryption_properties, metadata_buffer, metadata_len, read_metadata_len);
}
END_PARQUET_CATCH_EXCEPTIONS
return ::arrow::Status::OK();
}

private:
Expand Down Expand Up @@ -707,20 +697,16 @@ void SerializedFile::ParseMetaDataOfEncryptedFileWithPlaintextFooter(
auto file_decryptor = std::make_shared<InternalFileDecryptor>(
file_decryption_properties, file_aad, algo.algorithm,
file_metadata_->footer_signing_key_metadata(), properties_.memory_pool());
// set the InternalFileDecryptor in the metadata as well, as it's used
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make verify signature as a static function, we can remove set_file_decryptor as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because the decryptor is passed down to row group and column metadata, and is necessary to decrypt encrypted column metadata.

// for signature verification and for ColumnChunkMetaData creation.
file_metadata_->set_file_decryptor(std::move(file_decryptor));
// Set the InternalFileDecryptor in the metadata as well, as it's used
// for ColumnChunkMetaData creation.
file_metadata_->set_file_decryptor(file_decryptor);

if (file_decryption_properties->check_plaintext_footer_integrity()) {
if (metadata_len - read_metadata_len !=
(parquet::encryption::kGcmTagLength + parquet::encryption::kNonceLength)) {
throw ParquetInvalidOrCorruptedFileException(
"Failed reading metadata for encryption signature (requested ",
parquet::encryption::kGcmTagLength + parquet::encryption::kNonceLength,
" bytes but have ", metadata_len - read_metadata_len, " bytes)");
}

if (!file_metadata_->VerifySignature(metadata_buffer->data() + read_metadata_len)) {
auto serialized_metadata =
metadata_buffer->span_as<uint8_t>().subspan(0, read_metadata_len);
auto signature = metadata_buffer->span_as<uint8_t>().subspan(read_metadata_len);
if (!FileMetaData::VerifySignature(serialized_metadata, signature,
file_decryptor.get())) {
throw ParquetInvalidOrCorruptedFileException(
"Parquet crypto signature verification failed");
}
Expand Down Expand Up @@ -804,7 +790,7 @@ std::unique_ptr<ParquetFileReader::Contents> ParquetFileReader::Contents::Open(
return result;
}

::arrow::Future<std::unique_ptr<ParquetFileReader::Contents>>
Future<std::unique_ptr<ParquetFileReader::Contents>>
ParquetFileReader::Contents::OpenAsync(std::shared_ptr<ArrowInputFile> source,
const ReaderProperties& props,
std::shared_ptr<FileMetaData> metadata) {
Expand All @@ -815,7 +801,7 @@ ParquetFileReader::Contents::OpenAsync(std::shared_ptr<ArrowInputFile> source,
if (metadata == nullptr) {
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
struct {
::arrow::Result<std::unique_ptr<ParquetFileReader::Contents>> operator()() {
Result<std::unique_ptr<ParquetFileReader::Contents>> operator()() {
return std::move(result);
}

Expand All @@ -825,7 +811,7 @@ ParquetFileReader::Contents::OpenAsync(std::shared_ptr<ArrowInputFile> source,
return file->ParseMetaDataAsync().Then(std::move(Continuation));
} else {
file->set_metadata(std::move(metadata));
return ::arrow::Future<std::unique_ptr<ParquetFileReader::Contents>>::MakeFinished(
return Future<std::unique_ptr<ParquetFileReader::Contents>>::MakeFinished(
std::move(result));
}
END_PARQUET_CATCH_EXCEPTIONS
Expand Down Expand Up @@ -855,24 +841,24 @@ std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(
return Open(std::move(source), props, std::move(metadata));
}

::arrow::Future<std::unique_ptr<ParquetFileReader>> ParquetFileReader::OpenAsync(
Future<std::unique_ptr<ParquetFileReader>> ParquetFileReader::OpenAsync(
std::shared_ptr<::arrow::io::RandomAccessFile> source, const ReaderProperties& props,
std::shared_ptr<FileMetaData> metadata) {
BEGIN_PARQUET_CATCH_EXCEPTIONS
auto fut = SerializedFile::OpenAsync(std::move(source), props, std::move(metadata));
// TODO(ARROW-12259): workaround since we have Future<(move-only type)>
auto completed = ::arrow::Future<std::unique_ptr<ParquetFileReader>>::Make();
fut.AddCallback([fut, completed](
const ::arrow::Result<std::unique_ptr<ParquetFileReader::Contents>>&
contents) mutable {
if (!contents.ok()) {
completed.MarkFinished(contents.status());
return;
}
std::unique_ptr<ParquetFileReader> result = std::make_unique<ParquetFileReader>();
result->Open(fut.MoveResult().MoveValueUnsafe());
completed.MarkFinished(std::move(result));
});
auto completed = Future<std::unique_ptr<ParquetFileReader>>::Make();
fut.AddCallback(
[fut, completed](
const Result<std::unique_ptr<ParquetFileReader::Contents>>& contents) mutable {
if (!contents.ok()) {
completed.MarkFinished(contents.status());
return;
}
std::unique_ptr<ParquetFileReader> result = std::make_unique<ParquetFileReader>();
result->Open(fut.MoveResult().MoveValueUnsafe());
completed.MarkFinished(std::move(result));
});
return completed;
END_PARQUET_CATCH_EXCEPTIONS
}
Expand Down Expand Up @@ -919,7 +905,7 @@ void ParquetFileReader::PreBuffer(const std::vector<int>& row_groups,
file->PreBuffer(row_groups, column_indices, ctx, options);
}

::arrow::Result<std::vector<::arrow::io::ReadRange>> ParquetFileReader::GetReadRanges(
Result<std::vector<::arrow::io::ReadRange>> ParquetFileReader::GetReadRanges(
const std::vector<int>& row_groups, const std::vector<int>& column_indices,
int64_t hole_size_limit, int64_t range_size_limit) {
// Access private methods here
Expand All @@ -929,8 +915,8 @@ ::arrow::Result<std::vector<::arrow::io::ReadRange>> ParquetFileReader::GetReadR
range_size_limit);
}

::arrow::Future<> ParquetFileReader::WhenBuffered(
const std::vector<int>& row_groups, const std::vector<int>& column_indices) const {
Future<> ParquetFileReader::WhenBuffered(const std::vector<int>& row_groups,
const std::vector<int>& column_indices) const {
// Access private methods here
SerializedFile* file =
::arrow::internal::checked_cast<SerializedFile*>(contents_.get());
Expand Down
36 changes: 36 additions & 0 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,42 @@ void FileMetaData::WriteTo(::arrow::io::OutputStream* dst,
return impl_->WriteTo(dst, encryptor);
}

bool FileMetaData::VerifySignature(std::span<const uint8_t> serialized_metadata,
std::span<const uint8_t> signature,
InternalFileDecryptor* file_decryptor) {
DCHECK_NE(file_decryptor, nullptr);

// In plaintext footer, the "signature" is the concatenation of the nonce used
// for GCM encryption, and the authentication tag obtained after GCM encryption.
if (signature.size() != encryption::kGcmTagLength + encryption::kNonceLength) {
throw ParquetInvalidOrCorruptedFileException(
"Invalid footer encryption signature (expected ",
encryption::kGcmTagLength + encryption::kNonceLength, " bytes, got ",
signature.size(), ")");
}

// Encrypt plaintext serialized metadata so as to compute its signature
auto nonce = signature.subspan(0, encryption::kNonceLength);
auto tag = signature.subspan(encryption::kNonceLength);
const SecureString& key = file_decryptor->GetFooterKey();
const std::string& aad = encryption::CreateFooterAad(file_decryptor->file_aad());

auto aes_encryptor = encryption::AesEncryptor::Make(
file_decryptor->algorithm(), static_cast<int>(key.size()), /*metadata=*/true,
/*write_length=*/false);

std::shared_ptr<Buffer> encrypted_buffer =
AllocateBuffer(file_decryptor->pool(),
aes_encryptor->CiphertextLength(serialized_metadata.size()));
int32_t encrypted_len = aes_encryptor->SignedFooterEncrypt(
serialized_metadata, key.as_span(), str2span(aad), nonce,
encrypted_buffer->mutable_span_as<uint8_t>());
DCHECK_EQ(encrypted_len, encrypted_buffer->size());
// Check computed signature against expected
return 0 == memcmp(encrypted_buffer->data() + encrypted_len - encryption::kGcmTagLength,
tag.data(), encryption::kGcmTagLength);
}

class FileCryptoMetaData::FileCryptoMetaDataImpl {
public:
FileCryptoMetaDataImpl() = default;
Expand Down
10 changes: 8 additions & 2 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <map>
#include <memory>
#include <optional>
#include <span>
#include <string>
#include <vector>

Expand Down Expand Up @@ -322,8 +323,8 @@ class PARQUET_EXPORT FileMetaData {
EncryptionAlgorithm encryption_algorithm() const;
const std::string& footer_signing_key_metadata() const;

/// \brief Verify signature of FileMetaData when file is encrypted but footer
/// is not encrypted (plaintext footer).
PARQUET_DEPRECATED(
"Deprecated in 24.0.0. If you need this functionality, please report an issue.")
bool VerifySignature(const void* signature);

void WriteTo(::arrow::io::OutputStream* dst,
Expand Down Expand Up @@ -383,6 +384,11 @@ class PARQUET_EXPORT FileMetaData {
void set_file_decryptor(std::shared_ptr<InternalFileDecryptor> file_decryptor);
const std::shared_ptr<InternalFileDecryptor>& file_decryptor() const;

// Verify the signature of a plaintext footer.
static bool VerifySignature(std::span<const uint8_t> serialized_metadata,
std::span<const uint8_t> signature,
InternalFileDecryptor* file_decryptor);

// PIMPL Idiom
FileMetaData();
class FileMetaDataImpl;
Expand Down
Loading