From a94df3d7f7fda7bf4a81eb59fd8a4249510abb9e Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Tue, 2 Jun 2026 09:14:21 +0800 Subject: [PATCH 1/5] feat: add BloomFilter, CRC32C, MurmurHash, varint utilities, and DeltaVarintCompressor --- src/paimon/common/data/data_define_test.cpp | 320 ++++++++++++ src/paimon/common/utils/bloom_filter.cpp | 100 ++++ src/paimon/common/utils/bloom_filter.h | 72 +++ src/paimon/common/utils/bloom_filter64.cpp | 101 ++++ src/paimon/common/utils/bloom_filter64.h | 75 +++ .../common/utils/bloom_filter64_test.cpp | 83 ++++ src/paimon/common/utils/bloom_filter_test.cpp | 157 ++++++ src/paimon/common/utils/crc32c.cpp | 66 +++ src/paimon/common/utils/crc32c.h | 46 ++ src/paimon/common/utils/crc32c_test.cpp | 43 ++ .../common/utils/delta_varint_compressor.cpp | 110 +++++ .../common/utils/delta_varint_compressor.h | 52 ++ .../utils/delta_varint_compressor_test.cpp | 459 ++++++++++++++++++ src/paimon/common/utils/murmurhash_utils.h | 13 +- .../common/utils/murmurhash_utils_test.cpp | 13 +- .../common/utils/var_length_int_utils.h | 139 ++++++ .../utils/var_length_int_utils_test.cpp | 144 ++++++ src/paimon/common/utils/xxhash_test.cpp | 49 ++ 18 files changed, 2028 insertions(+), 14 deletions(-) create mode 100644 src/paimon/common/data/data_define_test.cpp create mode 100644 src/paimon/common/utils/bloom_filter.cpp create mode 100644 src/paimon/common/utils/bloom_filter.h create mode 100644 src/paimon/common/utils/bloom_filter64.cpp create mode 100644 src/paimon/common/utils/bloom_filter64.h create mode 100644 src/paimon/common/utils/bloom_filter64_test.cpp create mode 100644 src/paimon/common/utils/bloom_filter_test.cpp create mode 100644 src/paimon/common/utils/crc32c.cpp create mode 100644 src/paimon/common/utils/crc32c.h create mode 100644 src/paimon/common/utils/crc32c_test.cpp create mode 100644 src/paimon/common/utils/delta_varint_compressor.cpp create mode 100644 src/paimon/common/utils/delta_varint_compressor.h create mode 100644 src/paimon/common/utils/delta_varint_compressor_test.cpp create mode 100644 src/paimon/common/utils/var_length_int_utils.h create mode 100644 src/paimon/common/utils/var_length_int_utils_test.cpp create mode 100644 src/paimon/common/utils/xxhash_test.cpp diff --git a/src/paimon/common/data/data_define_test.cpp b/src/paimon/common/data/data_define_test.cpp new file mode 100644 index 0000000..2a76837 --- /dev/null +++ b/src/paimon/common/data/data_define_test.cpp @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/data/data_define.h" + +#include "arrow/api.h" +#include "arrow/array/array_base.h" +#include "arrow/array/array_binary.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/common/data/binary_array.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/binary_string.h" +#include "paimon/common/utils/decimal_utils.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +// Test case: IsVariantNull should return true for NullType +TEST(DataDefineTest, IsVariantNullReturnsTrueForNull) { + VariantType null_variant = NullType{}; + ASSERT_TRUE(DataDefine::IsVariantNull(null_variant)); +} + +// Test case: IsVariantNull should return false for non-NullType variants +TEST(DataDefineTest, IsVariantNullReturnsFalseForNonNullTypes) { + VariantType non_null_variant = 42; // Example with int + ASSERT_FALSE(DataDefine::IsVariantNull(non_null_variant)); + + non_null_variant = true; // Example with bool + ASSERT_FALSE(DataDefine::IsVariantNull(non_null_variant)); +} + +// Test case: GetVariantValue should return valid value for matched types +TEST(DataDefineTest, GetVariantValue) { + { + VariantType int_variant = 42; // Variant holding an int + const auto int_value = DataDefine::GetVariantValue(int_variant); + ASSERT_EQ(int_value, 42); + } + { + VariantType bool_variant = true; // Variant holding a bool + const bool bool_value = DataDefine::GetVariantValue(bool_variant); + ASSERT_EQ(bool_value, true); + } + { + auto array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::utf8(), R"(["abc", "def", "hello"])") + .ValueOrDie()); + VariantType view_variant = array->GetView(2); // Variant holding a StringView + const auto view_value = DataDefine::GetVariantValue(view_variant); + ASSERT_EQ(std::string(view_value), "hello"); + ASSERT_EQ(DataDefine::VariantValueToString(view_variant), "hello"); + } +} + +// Test case: GetVariantValue should return valid value for matching types +TEST(DataDefineTest, GetVariantValueReturnsPointerForMatchingType) {} + +// Test case: VariantValueToString should handle NullType +TEST(DataDefineTest, VariantValueToStringReturnsStringForNullType) { + VariantType null_variant = NullType{}; + ASSERT_EQ(DataDefine::VariantValueToString(null_variant), "null"); +} + +// Test case: VariantValueToString should handle bool +TEST(DataDefineTest, VariantValueToStringReturnsStringForBool) { + VariantType bool_variant = true; + ASSERT_EQ(DataDefine::VariantValueToString(bool_variant), "true"); + + bool_variant = false; + ASSERT_EQ(DataDefine::VariantValueToString(bool_variant), "false"); +} + +// Test case: VariantValueToString should handle integer types +TEST(DataDefineTest, VariantValueToStringReturnsStringForInt) { + VariantType int_variant = 42; + ASSERT_EQ(DataDefine::VariantValueToString(int_variant), "42"); +} + +// Test case: VariantValueToString should handle string data (BinaryString) +TEST(DataDefineTest, VariantValueToStringReturnsStringForBinaryString) { + auto pool = GetDefaultPool(); + auto binary_str = BinaryString::FromString("Hello, world!", pool.get()); + VariantType binary_variant = binary_str; + ASSERT_EQ(DataDefine::VariantValueToString(binary_variant), "Hello, world!"); +} + +// Test case: VariantValueToString should handle shared_ptr +TEST(DataDefineTest, VariantValueToStringReturnsStringForSharedPtrBytes) { + auto pool = GetDefaultPool(); + std::shared_ptr bytes_ptr = Bytes::AllocateBytes("abc", pool.get()); + VariantType bytes_variant = bytes_ptr; + ASSERT_EQ(DataDefine::VariantValueToString(bytes_variant), "abc"); +} + +// Test case: VariantValueToString should handle Timestamp +TEST(DataDefineTest, VariantValueToStringReturnsStringForTimestamp) { + auto timestamp = + Timestamp(/*millisecond=*/1622520000000l, /*nano_of_millisecond=*/0); // A timestamp value + VariantType timestamp_variant = timestamp; + ASSERT_EQ(DataDefine::VariantValueToString(timestamp_variant), timestamp.ToString()); +} + +// Test case: VariantValueToString should handle Decimal +TEST(DataDefineTest, VariantValueToStringReturnsStringForDecimal) { + Decimal decimal(38, 38, DecimalUtils::StrToInt128("12345678998765432145678").value()); + VariantType decimal_variant = decimal; + ASSERT_EQ(DataDefine::VariantValueToString(decimal_variant), decimal.ToString()); +} + +// Test case: VariantValueToString should handle shared_ptr (mocking with a string) +TEST(DataDefineTest, VariantValueToStringReturnsStringForInternalRow) { + std::shared_ptr row_ptr = std::make_shared(0); + VariantType row_variant = row_ptr; + ASSERT_EQ(DataDefine::VariantValueToString(row_variant), "row"); +} + +// Test case: VariantValueToString should handle shared_ptr (mocking with a string) +TEST(DataDefineTest, VariantValueToStringReturnsStringForInternalArray) { + auto pool = GetDefaultPool(); + std::shared_ptr array_ptr = std::make_shared(); + VariantType array_variant = array_ptr; + ASSERT_EQ(DataDefine::VariantValueToString(array_variant), "array"); +} + +// Test case: GetStringView should handle all variant types and edge cases +TEST(DataDefineTest, GetStringView) { + auto pool = GetDefaultPool(); + + { + // from string_view + std::string original = "hello world"; + VariantType view_variant = std::string_view(original.data(), original.size()); + auto result = DataDefine::GetStringView(view_variant); + ASSERT_EQ(result, "hello world"); + ASSERT_EQ(result.data(), original.data()); + } + { + // from shared_ptr + std::shared_ptr bytes = Bytes::AllocateBytes("test bytes", pool.get()); + VariantType bytes_variant = bytes; + auto result = DataDefine::GetStringView(bytes_variant); + ASSERT_EQ(std::string(result), "test bytes"); + ASSERT_EQ(result.size(), 10); + } + { + // from BinaryString + auto binary_str = BinaryString::FromString("binary string content", pool.get()); + VariantType binary_variant = binary_str; + auto result = DataDefine::GetStringView(binary_variant); + ASSERT_EQ(std::string(result), "binary string content"); + } + { + // empty string_view + VariantType view_variant = std::string_view(); + auto result = DataDefine::GetStringView(view_variant); + ASSERT_TRUE(result.empty()); + ASSERT_EQ(result.size(), 0); + } + { + // empty Bytes + std::shared_ptr bytes = Bytes::AllocateBytes("", pool.get()); + VariantType bytes_variant = bytes; + auto result = DataDefine::GetStringView(bytes_variant); + ASSERT_TRUE(result.empty()); + ASSERT_EQ(result.size(), 0); + } + { + // empty BinaryString + VariantType binary_variant = BinaryString::EmptyUtf8(); + auto result = DataDefine::GetStringView(binary_variant); + ASSERT_TRUE(result.empty()); + ASSERT_EQ(result.size(), 0); + } +} + +TEST(DataDefineTest, VariantValueToLiteral) { + auto pool = GetDefaultPool(); + + { + // BOOL + VariantType value = true; + ASSERT_OK_AND_ASSIGN(auto literal, + DataDefine::VariantValueToLiteral(value, arrow::Type::type::BOOL)); + ASSERT_EQ(literal.GetValue(), true); + } + { + // INT8 + VariantType value = static_cast(42); + ASSERT_OK_AND_ASSIGN(auto literal, + DataDefine::VariantValueToLiteral(value, arrow::Type::type::INT8)); + ASSERT_EQ(literal.GetValue(), 42); + } + { + // INT16 + VariantType value = static_cast(1000); + ASSERT_OK_AND_ASSIGN(auto literal, + DataDefine::VariantValueToLiteral(value, arrow::Type::type::INT16)); + ASSERT_EQ(literal.GetValue(), 1000); + } + { + // INT32 + VariantType value = static_cast(100000); + ASSERT_OK_AND_ASSIGN(auto literal, + DataDefine::VariantValueToLiteral(value, arrow::Type::type::INT32)); + ASSERT_EQ(literal.GetValue(), 100000); + } + { + // INT64 + VariantType value = static_cast(123456789L); + ASSERT_OK_AND_ASSIGN(auto literal, + DataDefine::VariantValueToLiteral(value, arrow::Type::type::INT64)); + ASSERT_EQ(literal.GetValue(), 123456789L); + } + { + // FLOAT + VariantType value = 3.14f; + ASSERT_OK_AND_ASSIGN(auto literal, + DataDefine::VariantValueToLiteral(value, arrow::Type::type::FLOAT)); + ASSERT_FLOAT_EQ(literal.GetValue(), 3.14f); + } + { + // DOUBLE + VariantType value = 2.718; + ASSERT_OK_AND_ASSIGN(auto literal, + DataDefine::VariantValueToLiteral(value, arrow::Type::type::DOUBLE)); + ASSERT_DOUBLE_EQ(literal.GetValue(), 2.718); + } + { + // STRING from BinaryString + auto binary_str = BinaryString::FromString("hello", pool.get()); + VariantType value = binary_str; + ASSERT_OK_AND_ASSIGN(auto literal, + DataDefine::VariantValueToLiteral(value, arrow::Type::type::STRING)); + ASSERT_EQ(literal.GetValue(), "hello"); + } + { + // STRING from shared_ptr + std::shared_ptr bytes = Bytes::AllocateBytes("world", pool.get()); + VariantType value = bytes; + ASSERT_OK_AND_ASSIGN(auto literal, + DataDefine::VariantValueToLiteral(value, arrow::Type::type::STRING)); + ASSERT_EQ(literal.GetValue(), "world"); + } + { + // STRING from string_view + std::string original = "view_str"; + VariantType value = std::string_view(original.data(), original.size()); + ASSERT_OK_AND_ASSIGN(auto literal, + DataDefine::VariantValueToLiteral(value, arrow::Type::type::STRING)); + ASSERT_EQ(literal.GetValue(), "view_str"); + } + { + // BINARY from shared_ptr + std::shared_ptr bytes = Bytes::AllocateBytes("binary_data", pool.get()); + VariantType value = bytes; + ASSERT_OK_AND_ASSIGN(auto literal, + DataDefine::VariantValueToLiteral(value, arrow::Type::type::BINARY)); + ASSERT_EQ(literal.GetValue(), "binary_data"); + } + { + // BINARY from BinaryString + auto binary_str = BinaryString::FromString("bin_str", pool.get()); + VariantType value = binary_str; + ASSERT_OK_AND_ASSIGN(auto literal, + DataDefine::VariantValueToLiteral(value, arrow::Type::type::BINARY)); + ASSERT_EQ(literal.GetValue(), "bin_str"); + } + { + // TIMESTAMP + Timestamp ts(12345, 1); + VariantType value = ts; + ASSERT_OK_AND_ASSIGN( + auto literal, DataDefine::VariantValueToLiteral(value, arrow::Type::type::TIMESTAMP)); + ASSERT_EQ(literal.GetValue(), ts); + } + { + // DECIMAL128 + Decimal decimal(20, 3, 123456); + VariantType value = decimal; + ASSERT_OK_AND_ASSIGN( + auto literal, DataDefine::VariantValueToLiteral(value, arrow::Type::type::DECIMAL128)); + ASSERT_EQ(literal.GetValue(), decimal); + } + { + // DATE32 + VariantType value = static_cast(19000); + ASSERT_OK_AND_ASSIGN(auto literal, + DataDefine::VariantValueToLiteral(value, arrow::Type::type::DATE32)); + ASSERT_EQ(literal.GetValue(), 19000); + } + { + // unsupported type + VariantType value = static_cast(0); + ASSERT_NOK_WITH_MSG(DataDefine::VariantValueToLiteral(value, arrow::Type::type::LIST), + "Not support arrow type"); + } +} + +} // namespace paimon::test diff --git a/src/paimon/common/utils/bloom_filter.cpp b/src/paimon/common/utils/bloom_filter.cpp new file mode 100644 index 0000000..ccdbc7d --- /dev/null +++ b/src/paimon/common/utils/bloom_filter.cpp @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/bloom_filter.h" + +#include +#include +#include +#include + +namespace paimon { + +int32_t BloomFilter::OptimalNumOfBits(int64_t expect_entries, double fpp) { + if (expect_entries <= 0 || fpp <= 0.0 || fpp >= 1.0) { + return 0; + } + double result = -static_cast(expect_entries) * log(fpp) / (log(2) * log(2)); + if (result > INT32_MAX) return INT32_MAX; + if (result < 0) return 0; + return static_cast(result); +} + +int32_t BloomFilter::OptimalNumOfHashFunctions(int64_t expect_entries, int64_t bit_size) { + if (expect_entries <= 0) { + return 1; + } + double ratio = static_cast(bit_size) / static_cast(expect_entries); + double result = ratio * std::log(2.0); + return std::max(1, static_cast(std::round(result))); +} + +std::shared_ptr BloomFilter::Create(int64_t expect_entries, double fpp) { + auto bytes = + static_cast(ceil(BloomFilter::OptimalNumOfBits(expect_entries, fpp) / 8.0)); + return std::make_shared(expect_entries, bytes); +} + +BloomFilter::BloomFilter(int64_t expected_entries, int32_t byte_length) + : expected_entries_(expected_entries) { + num_hash_functions_ = OptimalNumOfHashFunctions( + expected_entries, static_cast(static_cast(byte_length) << 3)); + bit_set_ = std::make_shared(byte_length); +} + +Status BloomFilter::AddHash(int32_t hash1) { + auto hash2 = static_cast(static_cast(hash1) >> 16); + + for (int32_t i = 1; i <= num_hash_functions_; i++) { + int32_t combined_hash = hash1 + (i * hash2); + // hashcode should be positive, flip all the bits if it's negative + if (combined_hash < 0) { + combined_hash = ~combined_hash; + } + int32_t pos = combined_hash % bit_set_->BitSize(); + PAIMON_RETURN_NOT_OK(bit_set_->Set(pos)); + } + return Status::OK(); +} + +bool BloomFilter::TestHash(int32_t hash1) const { + auto hash2 = static_cast(static_cast(hash1) >> 16); + + for (int32_t i = 1; i <= num_hash_functions_; i++) { + int32_t combined_hash = hash1 + (i * hash2); + // hashcode should be positive, flip all the bits if it's negative + if (combined_hash < 0) { + combined_hash = ~combined_hash; + } + int32_t pos = combined_hash % bit_set_->BitSize(); + if (!bit_set_->Get(pos)) { + return false; + } + } + return true; +} + +Status BloomFilter::SetMemorySegment(MemorySegment segment, int32_t offset) { + return bit_set_->SetMemorySegment(segment, offset); +} + +void BloomFilter::Reset() { + bit_set_->Clear(); +} + +} // namespace paimon diff --git a/src/paimon/common/utils/bloom_filter.h b/src/paimon/common/utils/bloom_filter.h new file mode 100644 index 0000000..8f1b327 --- /dev/null +++ b/src/paimon/common/utils/bloom_filter.h @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include + +#include "paimon/common/utils/bit_set.h" +#include "paimon/memory/bytes.h" +#include "paimon/visibility.h" + +namespace paimon { + +/// Bloom filter based on MemorySegment. +class PAIMON_EXPORT BloomFilter { + public: + static int32_t OptimalNumOfBits(int64_t expect_entries, double fpp); + static int32_t OptimalNumOfHashFunctions(int64_t expect_entries, int64_t bit_size); + static std::shared_ptr Create(int64_t expect_entries, double fpp); + + public: + BloomFilter(int64_t expected_entries, int32_t byte_length); + + int32_t GetNumHashFunctions() const { + return num_hash_functions_; + } + + int64_t ExpectedEntries() const { + return expected_entries_; + } + + int64_t ByteLength() const { + return bit_set_->ByteLength(); + } + + std::shared_ptr GetBitSet() const { + return bit_set_; + } + + Status SetMemorySegment(MemorySegment segment, int32_t offset = 0); + + Status AddHash(int32_t hash1); + + bool TestHash(int32_t hash1) const; + + void Reset(); + + private: + static constexpr int32_t BYTE_SIZE = 8; + + private: + int64_t expected_entries_; + int32_t num_hash_functions_ = -1; + std::shared_ptr bit_set_; +}; +} // namespace paimon diff --git a/src/paimon/common/utils/bloom_filter64.cpp b/src/paimon/common/utils/bloom_filter64.cpp new file mode 100644 index 0000000..02be3e2 --- /dev/null +++ b/src/paimon/common/utils/bloom_filter64.cpp @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/bloom_filter64.h" + +#include +#include +#include +#include + +#include "paimon/memory/bytes.h" + +namespace paimon { +class MemoryPool; + +BloomFilter64::BitSet::BitSet(const std::shared_ptr& bytes, int32_t offset) + : offset_(offset), bytes_(bytes) { + assert(bytes_->size() > 0); + assert(offset_ >= 0); +} + +void BloomFilter64::BitSet::Set(int32_t index) { + char* data = bytes_->data(); + data[(static_cast(index) >> 3) + offset_] |= + static_cast(1u << (index & BloomFilter64::BitSet::MASK)); +} + +bool BloomFilter64::BitSet::Get(int32_t index) const { + const char* data = bytes_->data(); + return (data[(static_cast(index) >> 3) + offset_] & + static_cast(1u << (index & BloomFilter64::BitSet::MASK))) != 0; +} + +int32_t BloomFilter64::BitSet::BitSize() const { + return (bytes_->size() - offset_) * BloomFilter64::BYTE_SIZE; +} + +BloomFilter64::BloomFilter64(int64_t items, double fpp, const std::shared_ptr& pool) + : pool_(pool) { + auto nb = static_cast(-items * std::log(fpp) / (std::log(2) * std::log(2))); + num_bits_ = nb + (BloomFilter64::BYTE_SIZE - (nb % BloomFilter64::BYTE_SIZE)); + num_hash_functions_ = std::max( + 1, static_cast(std::round(static_cast(num_bits_) / items * std::log(2)))); + auto bytes = std::make_shared(num_bits_ / BloomFilter64::BYTE_SIZE, pool_.get()); + bit_set_ = std::make_unique(bytes, /*offset=*/0); +} + +BloomFilter64::BloomFilter64(int32_t num_hash_functions, std::unique_ptr&& bit_set) + : num_bits_(bit_set->BitSize()), + num_hash_functions_(num_hash_functions), + bit_set_(std::move(bit_set)) {} + +void BloomFilter64::AddHash(int64_t hash64) { + auto hash1 = static_cast(hash64); + auto hash2 = static_cast(static_cast(hash64) >> 32); + + for (int32_t i = 1; i <= num_hash_functions_; i++) { + int32_t combined_hash = hash1 + (i * hash2); + // hashcode should be positive, flip all the bits if it's negative + if (combined_hash < 0) { + combined_hash = ~combined_hash; + } + int32_t pos = combined_hash % num_bits_; + bit_set_->Set(pos); + } +} + +bool BloomFilter64::TestHash(int64_t hash64) const { + auto hash1 = static_cast(hash64); + auto hash2 = static_cast(static_cast(hash64) >> 32); + + for (int32_t i = 1; i <= num_hash_functions_; i++) { + int32_t combined_hash = hash1 + (i * hash2); + // hashcode should be positive, flip all the bits if it's negative + if (combined_hash < 0) { + combined_hash = ~combined_hash; + } + int32_t pos = combined_hash % num_bits_; + if (!bit_set_->Get(pos)) { + return false; + } + } + return true; +} + +} // namespace paimon diff --git a/src/paimon/common/utils/bloom_filter64.h b/src/paimon/common/utils/bloom_filter64.h new file mode 100644 index 0000000..3ba4e86 --- /dev/null +++ b/src/paimon/common/utils/bloom_filter64.h @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include + +#include "paimon/memory/bytes.h" +#include "paimon/visibility.h" + +namespace paimon { +class Bytes; +class MemoryPool; + +/// Bloom filter 64 handle 64 bits hash. +class PAIMON_EXPORT BloomFilter64 { + public: + BloomFilter64(int64_t items, double fpp, const std::shared_ptr& pool); + class BitSet; + + BloomFilter64(int32_t num_hash_functions, std::unique_ptr&& bit_set); + + void AddHash(int64_t hash64); + + bool TestHash(int64_t hash64) const; + + int32_t GetNumHashFunctions() const { + return num_hash_functions_; + } + + const BitSet& GetBitSet() const { + return *bit_set_; + } + + class BitSet { + public: + BitSet(const std::shared_ptr& bytes, int32_t offset); + void Set(int32_t index); + bool Get(int32_t index) const; + int32_t BitSize() const; + + private: + static constexpr int8_t MASK = 0x07; + + private: + int32_t offset_; + std::shared_ptr bytes_; + }; + + private: + static constexpr int32_t BYTE_SIZE = 8; + + private: + int32_t num_bits_ = -1; + int32_t num_hash_functions_ = -1; + std::shared_ptr pool_; + std::unique_ptr bit_set_; +}; +} // namespace paimon diff --git a/src/paimon/common/utils/bloom_filter64_test.cpp b/src/paimon/common/utils/bloom_filter64_test.cpp new file mode 100644 index 0000000..aa548fb --- /dev/null +++ b/src/paimon/common/utils/bloom_filter64_test.cpp @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/bloom_filter64.h" + +#include +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" + +namespace paimon::test { + +TEST(BloomFilter64Test, TestSimple) { + int32_t items = 10000; + auto pool = GetDefaultPool(); + BloomFilter64 bloom_filter(items, 0.02, pool); + std::mt19937_64 engine(std::random_device{}()); // NOLINT(whitespace/braces) + std::uniform_int_distribution distribution(std::numeric_limits::min(), + std::numeric_limits::max()); + std::set test_data; + for (int32_t i = 0; i < items; i++) { + int64_t random = distribution(engine); + test_data.insert(random); + bloom_filter.AddHash(random); + } + + for (const auto& value : test_data) { + ASSERT_TRUE(bloom_filter.TestHash(value)); + } + + // test false positive + int32_t false_positives = 0; + int32_t num = 1000000; + for (int32_t i = 0; i < num; i++) { + int64_t random = distribution(engine); + if (bloom_filter.TestHash(random) && test_data.find(random) == test_data.end()) { + false_positives++; + } + } + ASSERT_TRUE(static_cast(false_positives) / num < 0.03); +} + +TEST(BloomFilter64Test, TestCompatibleWithJava) { + // data: -10, -5, 0, 13, 100, 200, 500 + std::vector se_bytes = {241, 255, 17, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + auto pool = GetDefaultPool(); + auto bytes = std::make_shared(se_bytes.size(), pool.get()); + memcpy(bytes->data(), reinterpret_cast(se_bytes.data()), bytes->size()); + auto bit_set = std::make_unique(bytes, /*offset=*/0); + BloomFilter64 bloom_filter(/*num_hash_functions=*/7, std::move(bit_set)); + std::vector expected_data = {-10, -5, 0, 13, 100, 200, 500}; + for (const auto& value : expected_data) { + ASSERT_TRUE(bloom_filter.TestHash(value)); + } + + BloomFilter64 bloom_filter2(10, 0.01, pool); + ASSERT_EQ(7, bloom_filter2.GetNumHashFunctions()); + ASSERT_EQ(se_bytes.size() * BloomFilter64::BYTE_SIZE, bloom_filter2.num_bits_); + ASSERT_EQ(se_bytes.size(), bloom_filter2.GetBitSet().bytes_->size()); +} + +} // namespace paimon::test diff --git a/src/paimon/common/utils/bloom_filter_test.cpp b/src/paimon/common/utils/bloom_filter_test.cpp new file mode 100644 index 0000000..fdd6abb --- /dev/null +++ b/src/paimon/common/utils/bloom_filter_test.cpp @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/bloom_filter.h" + +#include +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(BloomFilterTest, TestOneSegmentBuilder) { + int32_t items = 100; + auto pool = GetDefaultPool(); + auto bloom_filter = BloomFilter::Create(items, 0.01); + auto seg = MemorySegment::AllocateHeapMemory(1024, pool.get()); + ASSERT_OK(bloom_filter->SetMemorySegment(seg)); + + std::mt19937_64 engine(std::random_device{}()); // NOLINT(whitespace/braces) + std::uniform_int_distribution distribution(0, items); + std::set test_data; + for (int32_t i = 0; i < items; i++) { + int32_t random = distribution(engine); + test_data.insert(random); + ASSERT_OK(bloom_filter->AddHash(random)); + } + + for (const auto& value : test_data) { + ASSERT_TRUE(bloom_filter->TestHash(value)); + } +} + +TEST(BloomFilterTest, TestEstimatedHashFunctions) { + ASSERT_EQ(7, BloomFilter::Create(1000, 0.01)->GetNumHashFunctions()); + ASSERT_EQ(7, BloomFilter::Create(10000, 0.01)->GetNumHashFunctions()); + ASSERT_EQ(7, BloomFilter::Create(100000, 0.01)->GetNumHashFunctions()); + ASSERT_EQ(4, BloomFilter::Create(100000, 0.05)->GetNumHashFunctions()); + ASSERT_EQ(7, BloomFilter::Create(1000000, 0.01)->GetNumHashFunctions()); + ASSERT_EQ(4, BloomFilter::Create(1000000, 0.05)->GetNumHashFunctions()); +} + +TEST(BloomFilterTest, TestBloomNumBits) { + ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(0, 0)); + ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(0, 1)); + ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(1, 1)); + ASSERT_EQ(7, BloomFilter::OptimalNumOfBits(1, 0.03)); + ASSERT_EQ(72, BloomFilter::OptimalNumOfBits(10, 0.03)); + ASSERT_EQ(729, BloomFilter::OptimalNumOfBits(100, 0.03)); + ASSERT_EQ(7298, BloomFilter::OptimalNumOfBits(1000, 0.03)); + ASSERT_EQ(72984, BloomFilter::OptimalNumOfBits(10000, 0.03)); + ASSERT_EQ(729844, BloomFilter::OptimalNumOfBits(100000, 0.03)); + ASSERT_EQ(7298440, BloomFilter::OptimalNumOfBits(1000000, 0.03)); + ASSERT_EQ(6235224, BloomFilter::OptimalNumOfBits(1000000, 0.05)); + ASSERT_EQ(1870567268, BloomFilter::OptimalNumOfBits(300000000, 0.05)); + ASSERT_EQ(1437758756, BloomFilter::OptimalNumOfBits(300000000, 0.1)); + ASSERT_EQ(432808512, BloomFilter::OptimalNumOfBits(300000000, 0.5)); + ASSERT_EQ(1393332198, BloomFilter::OptimalNumOfBits(3000000000, 0.8)); + ASSERT_EQ(657882327, BloomFilter::OptimalNumOfBits(3000000000, 0.9)); + ASSERT_EQ(0, BloomFilter::OptimalNumOfBits(3000000000, 1)); +} + +TEST(BloomFilterTest, TestBloomNumHashFunctions) { + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(-1, -1)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(0, 0)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(10, 0)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(10, 10)); + ASSERT_EQ(7, BloomFilter::OptimalNumOfHashFunctions(10, 100)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(100, 100)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(1000, 100)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(10000, 100)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(100000, 100)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(1000000, 100)); + ASSERT_EQ(3634, BloomFilter::OptimalNumOfHashFunctions(100, 64 * 1024 * 8)); + ASSERT_EQ(363, BloomFilter::OptimalNumOfHashFunctions(1000, 64 * 1024 * 8)); + ASSERT_EQ(36, BloomFilter::OptimalNumOfHashFunctions(10000, 64 * 1024 * 8)); + ASSERT_EQ(4, BloomFilter::OptimalNumOfHashFunctions(100000, 64 * 1024 * 8)); + ASSERT_EQ(1, BloomFilter::OptimalNumOfHashFunctions(1000000, 64 * 1024 * 8)); +} + +TEST(BloomFilterTest, TestBloomFilter) { + int32_t items = 100; + auto pool = GetDefaultPool(); + auto bloom_filter = std::make_shared(100, 1024); + + std::mt19937_64 engine(std::random_device{}()); // NOLINT(whitespace/braces) + std::uniform_int_distribution distribution(0, items); + + // segments 1 + auto seg1 = MemorySegment::AllocateHeapMemory(1024, pool.get()); + ASSERT_OK(bloom_filter->SetMemorySegment(seg1)); + + std::set test_data1; + for (int32_t i = 0; i < items; i++) { + int32_t random = distribution(engine); + test_data1.insert(random); + ASSERT_OK(bloom_filter->AddHash(random)); + } + for (const auto& value : test_data1) { + ASSERT_TRUE(bloom_filter->TestHash(value)); + } + + // segments 2 + std::set test_data2; + auto seg2 = MemorySegment::AllocateHeapMemory(1024, pool.get()); + ASSERT_OK(bloom_filter->SetMemorySegment(seg2)); + for (int32_t i = 0; i < items; i++) { + int32_t random = distribution(engine); + test_data2.insert(random); + ASSERT_OK(bloom_filter->AddHash(random)); + } + for (const auto& value : test_data2) { + ASSERT_TRUE(bloom_filter->TestHash(value)); + } + // switch to segment1 + ASSERT_OK(bloom_filter->SetMemorySegment(seg1)); + for (const auto& value : test_data1) { + ASSERT_TRUE(bloom_filter->TestHash(value)); + } + + // clear segment1 + bloom_filter->Reset(); + for (const auto& value : test_data1) { + ASSERT_FALSE(bloom_filter->TestHash(value)); + } + + // switch to segment2 and clear + ASSERT_OK(bloom_filter->SetMemorySegment(seg2)); + bloom_filter->Reset(); + for (const auto& value : test_data2) { + ASSERT_FALSE(bloom_filter->TestHash(value)); + } +} + +} // namespace paimon::test diff --git a/src/paimon/common/utils/crc32c.cpp b/src/paimon/common/utils/crc32c.cpp new file mode 100644 index 0000000..1cba744 --- /dev/null +++ b/src/paimon/common/utils/crc32c.cpp @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/crc32c.h" + +#include "arrow/util/crc32.h" + +namespace paimon { +uint32_t CRC32C::calculate(const char* data, size_t length, uint32_t crc) { +#if defined(PAIMON_HAVE_SSE4_2) + return crc32c_hw(data, length, crc); +#else + return arrow::internal::crc32(crc, data, length); +#endif +} + +#if defined(PAIMON_HAVE_SSE4_2) +uint32_t CRC32C::crc32c_hw(const char* data, size_t length, uint32_t crc) { + crc = ~crc; + + while (length && (reinterpret_cast(data) & 7)) { + crc = _mm_crc32_u8(crc, *data++); + length--; + } + + while (length >= 8) { + crc = _mm_crc32_u64(crc, *reinterpret_cast(data)); + data += 8; + length -= 8; + } + + while (length >= 4) { + crc = _mm_crc32_u32(crc, *reinterpret_cast(data)); + data += 4; + length -= 4; + } + + while (length >= 2) { + crc = _mm_crc32_u16(crc, *reinterpret_cast(data)); + data += 2; + length -= 2; + } + + while (length--) { + crc = _mm_crc32_u8(crc, *data++); + } + + return ~crc; +} +#endif +} // namespace paimon diff --git a/src/paimon/common/utils/crc32c.h b/src/paimon/common/utils/crc32c.h new file mode 100644 index 0000000..7c4db75 --- /dev/null +++ b/src/paimon/common/utils/crc32c.h @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#if defined(PAIMON_HAVE_SSE4_2) +#include +#endif +#include "paimon/visibility.h" + +namespace paimon { + +/// CRC32C +class PAIMON_EXPORT CRC32C { + public: + static uint32_t calculate(const char* data, size_t length, uint32_t crc = 0); + + private: +#if defined(PAIMON_HAVE_SSE4_2) + /// Simd implementation for crc32c. + /// + /// @param data data to be calculated + /// @param length length of data + /// @param crc initial crc value + /// @return crc32c value + static uint32_t crc32c_hw(const char* data, size_t length, uint32_t crc); +#endif +}; +} // namespace paimon diff --git a/src/paimon/common/utils/crc32c_test.cpp b/src/paimon/common/utils/crc32c_test.cpp new file mode 100644 index 0000000..10dbbbf --- /dev/null +++ b/src/paimon/common/utils/crc32c_test.cpp @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/crc32c.h" + +#include +#include +#include +#include +#include +#include + +#include "arrow/util/crc32.h" +#include "gtest/gtest.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" +namespace paimon::test { + +TEST(CRC32CTest, TestSimple) { + char a = 'a'; + ASSERT_EQ(CRC32C::calculate(&a, 1), 3904355907); + + std::string data = "hello paimon c++"; + ASSERT_EQ(CRC32C::calculate(data.c_str(), data.size()), 1311805437); +} + +} // namespace paimon::test diff --git a/src/paimon/common/utils/delta_varint_compressor.cpp b/src/paimon/common/utils/delta_varint_compressor.cpp new file mode 100644 index 0000000..8e5d5ec --- /dev/null +++ b/src/paimon/common/utils/delta_varint_compressor.cpp @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/delta_varint_compressor.h" + +#include +#include +#include + +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { + +std::vector DeltaVarintCompressor::Compress(const std::vector& data) { + if (data.empty()) { + return {}; + } + + // 1. Delta encoding + std::vector deltas; + deltas.reserve(data.size()); + deltas.push_back(data[0]); + for (size_t i = 1; i < data.size(); i++) { + deltas.push_back(data[i] - data[i - 1]); + } + + // 2. ZigZag + Varint + std::vector out; + out.reserve(data.size() * 10); + for (const auto& delta : deltas) { + EncodeVarint(delta, &out); + } + return out; +} + +Result> DeltaVarintCompressor::Decompress(const std::vector& bytes) { + if (bytes.empty()) { + return std::vector(); + } + + // 1. Decode ZigZag + Varint → delta + std::vector deltas; + deltas.reserve(bytes.size()); + size_t pos = 0; + while (pos < bytes.size()) { + PAIMON_ASSIGN_OR_RAISE(int64_t delta, DecodeVarint(bytes, &pos)); + deltas.push_back(delta); + } + + // 2. Delta decoding + std::vector result(deltas.size()); + result[0] = deltas[0]; + for (size_t i = 1; i < result.size(); i++) { + result[i] = result[i - 1] + deltas[i]; + } + return result; +} + +void DeltaVarintCompressor::EncodeVarint(int64_t value, std::vector* out) { + uint64_t tmp = ZigZag(value); + // Check if multiple bytes are needed + while (tmp & ~0x7F) { + // Set MSB to 1 (continuation) + out->push_back(static_cast((tmp & 0x7F) | 0x80)); + // Unsigned right shift + tmp >>= 7; + } + // Final byte with MSB set to 0 + out->push_back(static_cast(tmp)); +} + +Result DeltaVarintCompressor::DecodeVarint(const std::vector& in, size_t* pos) { + uint64_t result = 0; + int32_t shift = 0; + while (true) { + if (*pos >= in.size()) { + return Status::Invalid("Unexpected end of input"); + } + char byte = in[(*pos)++]; + // Extract 7 bits + result |= (static_cast(byte & 0x7F) << shift); + // MSB is 0, end of encoding + if ((byte & 0x80) == 0) { + break; + } + shift += 7; + if (shift > 63) { + return Status::Invalid("Varint overflow"); + } + } + return UnZigZag(result); +} + +} // namespace paimon diff --git a/src/paimon/common/utils/delta_varint_compressor.h b/src/paimon/common/utils/delta_varint_compressor.h new file mode 100644 index 0000000..a66cbcf --- /dev/null +++ b/src/paimon/common/utils/delta_varint_compressor.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include "paimon/result.h" +#include "paimon/visibility.h" + +namespace paimon { + +/// Combining Delta Encoding and Varints Encoding, suitable for integer sequences that are +/// increasing or not significantly different. +class PAIMON_EXPORT DeltaVarintCompressor { + public: + // Compresses an int64_t array using delta encoding, ZigZag transformation, and Varints encoding + static std::vector Compress(const std::vector& data); + // Decompresses a byte array back to the original long array + static Result> Decompress(const std::vector& bytes); + + private: + // Encodes a long value using ZigZag and Varints + static void EncodeVarint(int64_t value, std::vector* out); + // Decodes a Varints-encoded value and reverses ZigZag transformation + static Result DecodeVarint(const std::vector& in, size_t* pos); + + inline static uint64_t ZigZag(int64_t value) { + return ((static_cast(value) << 1) ^ (value >> 63)); + } + inline static int64_t UnZigZag(uint64_t value) { + return (value >> 1) ^ -(value & 1); + } +}; + +} // namespace paimon diff --git a/src/paimon/common/utils/delta_varint_compressor_test.cpp b/src/paimon/common/utils/delta_varint_compressor_test.cpp new file mode 100644 index 0000000..ac0017b --- /dev/null +++ b/src/paimon/common/utils/delta_varint_compressor_test.cpp @@ -0,0 +1,459 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/delta_varint_compressor.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(DeltaVarintCompressorTest, TestCompatibleWithJava) { + std::shared_ptr fs = std::make_shared(); + + for (int32_t i = 0; i < 5; ++i) { + std::string file_prefix = paimon::test::GetDataDir() + + "/delta_varint_compressor.data/case-000" + std::to_string(i); + + // Read original data from text file + std::string original_file = file_prefix + ".txt"; + std::ifstream in(original_file); + std::vector original; + int64_t value; + while (in >> value) { + original.push_back(value); + } + auto compressed = DeltaVarintCompressor::Compress(original); + + // Read expected compressed bytes from binary file + std::string binary_file = file_prefix + ".bin"; + std::string expected_bytes; + ASSERT_OK(fs->ReadFile(binary_file, &expected_bytes)); + ASSERT_EQ(expected_bytes, std::string(compressed.data(), compressed.size())); + + // Decompress and verify + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + ASSERT_EQ(original, decompressed); + } +} + +/// Test cases from Java Paimon +// Test case for normal compression and decompression +TEST(DeltaVarintCompressorTest, TestNormalCase1) { + std::vector original = {80, 50, 90, 80, 70}; + auto compressed = DeltaVarintCompressor::Compress(original); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + + ASSERT_EQ(original, decompressed); + ASSERT_EQ(6u, compressed.size()); +} + +TEST(DeltaVarintCompressorTest, TestNormalCase2) { + std::vector original = {100, 50, 150, 100, 200}; + auto compressed = DeltaVarintCompressor::Compress(original); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + + ASSERT_EQ(original, decompressed); + ASSERT_EQ(8u, compressed.size()); +} + +TEST(DeltaVarintCompressorTest, TestRandomRoundTrip) { + std::mt19937 gen(123456789); + std::uniform_int_distribution dist; + + for (int32_t iter = 0; iter < 10000; ++iter) { + std::vector original; + for (int32_t i = 0; i < 100; ++i) { + original.push_back(static_cast(dist(gen))); + } + + auto compressed = DeltaVarintCompressor::Compress(original); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + + ASSERT_EQ(original, decompressed); + } +} + +// Test case for empty array +TEST(DeltaVarintCompressorTest, TestEmptyArray) { + std::vector original; + auto compressed = DeltaVarintCompressor::Compress(original); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + + ASSERT_EQ(original, decompressed); + ASSERT_EQ(0u, compressed.size()); +} + +// Test case for single-element array +TEST(DeltaVarintCompressorTest, TestSingleElement) { + std::vector original = {42}; + auto compressed = DeltaVarintCompressor::Compress(original); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + + ASSERT_EQ(original, decompressed); + // Calculate expected size: Varint encoding for 42 (0x2A -> 1 byte) + ASSERT_EQ(1u, compressed.size()); +} + +// Test case for extreme values (INT64.MIN_VALUE and MAX_VALUE) +TEST(DeltaVarintCompressorTest, TestExtremeValues) { + std::vector original( + {std::numeric_limits::min(), std::numeric_limits::max()}); + auto compressed = DeltaVarintCompressor::Compress(original); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + + ASSERT_EQ(original, decompressed); + // Expected size: 10 bytes (MIN_VALUE) + 1 bytes (delta overflow) = 11 bytes + ASSERT_EQ(11u, compressed.size()); +} + +// Test case for negative deltas with ZigZag optimization +TEST(DeltaVarintCompressorTest, TestNegativeDeltas) { + std::vector original = {100, -50, -150, -100}; + auto compressed = DeltaVarintCompressor::Compress(original); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + + ASSERT_EQ(original, decompressed); + // Verify ZigZag optimization: -1 → 1 (1 byte) + // Delta sequence: [100, -150, -100, 50] → ZigZag → Each encoded in 1-2 bytes + ASSERT_LE(compressed.size(), 8u); +} + +// Test case for unsorted data (worse compression ratio) +TEST(DeltaVarintCompressorTest, TestUnsortedData) { + std::vector original = {1000, 5, 9999, 12345, 6789}; + auto compressed = DeltaVarintCompressor::Compress(original); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + + ASSERT_EQ(original, decompressed); + // Larger deltas → more bytes (e.g., 9994 → 3 bytes) + ASSERT_GT(compressed.size(), 5u); // Worse than sorted case +} + +// Test case for corrupted input (invalid Varint) +TEST(DeltaVarintCompressorTest, TestCorruptedInput) { + std::vector corrupted = {static_cast(0x80), static_cast(0x80), + static_cast(0x80)}; + ASSERT_NOK(DeltaVarintCompressor::Decompress(corrupted)); +} + +/// Test cases from Python Paimon +// Test case for arrays with zero values +TEST(DeltaVarintCompressorTest, TestZeroValues) { + std::vector original = {0, 0, 0, 0, 0}; + auto compressed = DeltaVarintCompressor::Compress(original); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + + ASSERT_EQ(original, decompressed); + // All deltas are 0, so should compress very well + ASSERT_LE(compressed.size(), 5u); +} + +// Test case for ascending sequence (optimal for delta compression) +TEST(DeltaVarintCompressorTest, TestAscendingSequence) { + std::vector original = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + auto compressed = DeltaVarintCompressor::Compress(original); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + + ASSERT_EQ(original, decompressed); + // All deltas are 1, so should compress very well + ASSERT_LE(compressed.size(), 15u); // Much smaller than original +} + +// Test case for descending sequence +TEST(DeltaVarintCompressorTest, TestDescendingSequence) { + std::vector original = {10, 9, 8, 7, 6, 5, 4, 3, 2, 1}; + auto compressed = DeltaVarintCompressor::Compress(original); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + + ASSERT_EQ(original, decompressed); + // All deltas are -1, should still compress well with ZigZag + ASSERT_LE(compressed.size(), 15u); +} + +// Test case for large positive values +TEST(DeltaVarintCompressorTest, TestLargePositiveValues) { + std::vector original = {1000000, 2000000, 3000000, 4000000}; + auto compressed = DeltaVarintCompressor::Compress(original); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + + ASSERT_EQ(original, decompressed); + // Large values but consistent deltas should still compress reasonably + ASSERT_GT(compressed.size(), 4u); // Will be larger due to big numbers +} + +// Test case for mixed positive and negative values +TEST(DeltaVarintCompressorTest, TestMixedPositiveNegative) { + std::vector original = {100, -200, 300, -400, 500}; + auto compressed = DeltaVarintCompressor::Compress(original); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + + ASSERT_EQ(original, decompressed); + // Mixed signs create larger deltas + ASSERT_GT(compressed.size(), 5u); +} + +// Test that compression actually reduces size for suitable data +TEST(DeltaVarintCompressorTest, TestCompressionEfficiency) { + // Create a sequence with small deltas + std::vector original; + std::mt19937 gen(42); // Fixed seed for reproducibility + std::uniform_int_distribution delta_dist(-10, 10); + + int64_t base = 1000; + for (int32_t i = 0; i < 100; ++i) { + base += delta_dist(gen); // Small deltas + original.push_back(base); + } + + auto compressed = DeltaVarintCompressor::Compress(original); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + + ASSERT_EQ(original, decompressed); + // For small deltas, compression should be effective + // Original would need 8 bytes per int64_t (800 bytes), compressed should be much smaller + ASSERT_LT(compressed.size(), original.size() * 4); // At least 50% compression +} + +// Test that multiple compress/decompress cycles are consistent +TEST(DeltaVarintCompressorTest, TestRoundTripConsistency) { + std::vector original = {1, 10, 100, 1000, 10000}; + + // First round trip + auto compressed1 = DeltaVarintCompressor::Compress(original); + ASSERT_OK_AND_ASSIGN(auto decompressed1, DeltaVarintCompressor::Decompress(compressed1)); + + // Second round trip + auto compressed2 = DeltaVarintCompressor::Compress(decompressed1); + ASSERT_OK_AND_ASSIGN(auto decompressed2, DeltaVarintCompressor::Decompress(compressed2)); + + // All should be identical + ASSERT_EQ(original, decompressed1); + ASSERT_EQ(original, decompressed2); + ASSERT_EQ(compressed1, compressed2); +} + +// Test boundary values for varint encoding +TEST(DeltaVarintCompressorTest, TestBoundaryValues) { + // Test values around varint boundaries (127, 16383, etc.) + std::vector boundary_values = {0, 1, 127, 128, 255, 256, 16383, + 16384, 32767, 32768, -1, -127, -128, -255, + -256, -16383, -16384, -32767, -32768}; + + auto compressed = DeltaVarintCompressor::Compress(boundary_values); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + ASSERT_EQ(boundary_values, decompressed); +} + +// Test ZigZag encoding compatibility with Java implementation +TEST(DeltaVarintCompressorTest, TestJavaCompatibilityZigZagEncoding) { + // Test cases that verify ZigZag encoding matches Java's implementation + // ZigZag mapping: 0->0, -1->1, 1->2, -2->3, 2->4, -3->5, 3->6, etc. + std::vector> zigzag_test_cases = { + {0, 0}, // 0 -> 0 + {-1, 1}, // -1 -> 1 + {1, 2}, // 1 -> 2 + {-2, 3}, // -2 -> 3 + {2, 4}, // 2 -> 4 + {-3, 5}, // -3 -> 5 + {3, 6}, // 3 -> 6 + {-64, 127}, // -64 -> 127 + {64, 128}, // 64 -> 128 + {-65, 129}, // -65 -> 129 + }; + + for (const auto& [original_value, expected_zigzag] : zigzag_test_cases) { + // Test single value compression to verify ZigZag encoding + std::vector single_value = {original_value}; + auto compressed = DeltaVarintCompressor::Compress(single_value); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + + ASSERT_EQ(single_value, decompressed) + << "ZigZag encoding failed for value " << original_value; + ASSERT_EQ(expected_zigzag, DeltaVarintCompressor::ZigZag(original_value)) + << "ZigZag encoding failed for value " << original_value; + } +} + +// Test with known test vectors that should match Java implementation +TEST(DeltaVarintCompressorTest, TestJavaCompatibilityKnownVectors) { + // Test vectors with expected compressed output (hexadecimal) + std::vector, std::string>> test_vectors = { + // Simple cases + {{0}, "00"}, // 0 -> ZigZag(0) = 0 -> Varint(0) = 0x00 + {{1}, "02"}, // 1 -> ZigZag(1) = 2 -> Varint(2) = 0x02 + {{-1}, "01"}, // -1 -> ZigZag(-1) = 1 -> Varint(1) = 0x01 + {{2}, "04"}, // 2 -> ZigZag(2) = 4 -> Varint(4) = 0x04 + {{-2}, "03"}, // -2 -> ZigZag(-2) = 3 -> Varint(3) = 0x03 + + // Delta encoding cases + {{0, 1}, "0002"}, // [0, 1] -> [0, delta=1] -> [0x00, 0x02] + {{1, 2}, "0202"}, // [1, 2] -> [1, delta=1] -> [0x02, 0x02] + {{0, -1}, "0001"}, // [0, -1] -> [0, delta=-1] -> [0x00, 0x01] + {{1, 0}, "0201"}, // [1, 0] -> [1, delta=-1] -> [0x02, 0x01] + + // Larger values + {{127}, "fe01"}, // 127 -> ZigZag(127) = 254 -> Varint(254) = 0xfe01 + {{-127}, "fd01"}, // -127 -> ZigZag(-127) = 253 -> Varint(253) = 0xfd01 + {{128}, "8002"}, // 128 -> ZigZag(128) = 256 -> Varint(256) = 0x8002 + {{-128}, "ff01"}, // -128 -> ZigZag(-128) = 255 -> Varint(255) = 0xff01 + }; + + auto bytes_to_hex = [](const std::vector& bytes) -> std::string { + std::string hex; + for (char byte : bytes) { + char buf[3]; + snprintf(buf, sizeof(buf), "%02x", static_cast(byte)); + hex += buf; + } + return hex; + }; + + for (const auto& [original, expected_hex] : test_vectors) { + auto compressed = DeltaVarintCompressor::Compress(original); + std::string actual_hex = bytes_to_hex(compressed); + + ASSERT_EQ(expected_hex, actual_hex) + << "Binary compatibility failed for original data. " + << "Expected: " << expected_hex << ", Got: " << actual_hex; + + // Also verify round-trip + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + ASSERT_EQ(original, decompressed) << "Round-trip failed for original data"; + } +} + +// Test compatibility with Java for large numbers (64-bit range) +TEST(DeltaVarintCompressorTest, TestJavaCompatibilityLargeNumbers) { + // Test cases covering the full 64-bit signed integer range + std::vector large_number_cases = { + 2147483647LL, // Integer.MAX_VALUE + -2147483648LL, // Integer.MIN_VALUE + 9223372036854775807LL, // Long.MAX_VALUE + -9223372036854775807LL, // Long.MIN_VALUE + 1 (avoid overflow) + 4294967295LL, // 2^32 - 1 + -4294967296LL, // -2^32 + }; + + for (int64_t value : large_number_cases) { + // Test individual values + std::vector single_value = {value}; + auto compressed = DeltaVarintCompressor::Compress(single_value); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + ASSERT_EQ(single_value, decompressed) << "Large number compatibility failed for " << value; + } + + // Test as a sequence to verify delta encoding with large numbers + auto compressed_seq = DeltaVarintCompressor::Compress(large_number_cases); + ASSERT_OK_AND_ASSIGN(auto decompressed_seq, DeltaVarintCompressor::Decompress(compressed_seq)); + ASSERT_EQ(large_number_cases, decompressed_seq) << "Large number sequence compatibility failed"; +} + +// Test Varint encoding boundaries that match Java implementation +TEST(DeltaVarintCompressorTest, TestJavaCompatibilityVarintBoundaries) { + // Test values at Varint encoding boundaries + std::vector varint_boundary_cases = { + // 1-byte Varint boundary + 63, // ZigZag(63) = 126, fits in 1 byte + 64, // ZigZag(64) = 128, needs 2 bytes + -64, // ZigZag(-64) = 127, fits in 1 byte + -65, // ZigZag(-65) = 129, needs 2 bytes + + // 2-byte Varint boundary + 8191, // ZigZag(8191) = 16382, fits in 2 bytes + 8192, // ZigZag(8192) = 16384, needs 3 bytes + -8192, // ZigZag(-8192) = 16383, fits in 2 bytes + -8193, // ZigZag(-8193) = 16385, needs 3 bytes + + // 3-byte Varint boundary + 1048575, // ZigZag(1048575) = 2097150, fits in 3 bytes + 1048576, // ZigZag(1048576) = 2097152, needs 4 bytes + }; + + for (int64_t value : varint_boundary_cases) { + std::vector single_value = {value}; + auto compressed = DeltaVarintCompressor::Compress(single_value); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + ASSERT_EQ(single_value, decompressed) + << "Varint boundary compatibility failed for " << value; + } +} + +// Test delta encoding edge cases for Java compatibility +TEST(DeltaVarintCompressorTest, TestJavaCompatibilityDeltaEdgeCases) { + // Edge cases that test delta encoding behavior + std::vector> delta_edge_cases = { + // Maximum positive delta + {0, std::numeric_limits::max()}, + // Maximum negative delta + {std::numeric_limits::max(), 0}, + // Alternating large deltas + {0, 1000000, -1000000, 2000000, -2000000}, + // Sequence with zero deltas + {42, 42, 42, 42}, + // Mixed small and large deltas + {0, 1, 1000000, 1000001, 0}, + }; + + for (const auto& test_case : delta_edge_cases) { + auto compressed = DeltaVarintCompressor::Compress(test_case); + ASSERT_OK_AND_ASSIGN(auto decompressed, DeltaVarintCompressor::Decompress(compressed)); + ASSERT_EQ(test_case, decompressed) << "Delta edge case compatibility failed"; + } +} + +// Test error conditions that should match Java behavior +TEST(DeltaVarintCompressorTest, TestJavaCompatibilityErrorConditions) { + // Test cases for error handling - our implementation gracefully handles + // truncated data by returning errors, which is acceptable behavior + + // Test with various truncated/invalid byte sequences + std::vector> invalid_cases = { + {static_cast(0x80)}, // Single incomplete byte + {static_cast(0x80), static_cast(0x80)}, // Incomplete 3-byte varint + {static_cast(0xFF), static_cast(0xFF), static_cast(0xFF), + static_cast(0xFF), static_cast(0xFF), static_cast(0xFF), + static_cast(0xFF), static_cast(0xFF), static_cast(0xFF), + static_cast(0x80)}, // Long sequence + }; + + for (const auto& invalid_data : invalid_cases) { + // Our implementation handles invalid data by returning an error + // This is acceptable behavior for robustness + auto result = DeltaVarintCompressor::Decompress(invalid_data); + ASSERT_NOK(result) << "Should return error for invalid data"; + } + + // Test that valid empty input returns empty list + std::vector empty_input; + ASSERT_OK_AND_ASSIGN(auto empty_result, DeltaVarintCompressor::Decompress(empty_input)); + ASSERT_TRUE(empty_result.empty()) << "Empty input should return empty list"; +} + +} // namespace paimon::test diff --git a/src/paimon/common/utils/murmurhash_utils.h b/src/paimon/common/utils/murmurhash_utils.h index cebf41b..52831c9 100644 --- a/src/paimon/common/utils/murmurhash_utils.h +++ b/src/paimon/common/utils/murmurhash_utils.h @@ -7,14 +7,13 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ /* diff --git a/src/paimon/common/utils/murmurhash_utils_test.cpp b/src/paimon/common/utils/murmurhash_utils_test.cpp index b4a475f..d0c09ea 100644 --- a/src/paimon/common/utils/murmurhash_utils_test.cpp +++ b/src/paimon/common/utils/murmurhash_utils_test.cpp @@ -7,14 +7,13 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #include "paimon/common/utils/murmurhash_utils.h" diff --git a/src/paimon/common/utils/var_length_int_utils.h b/src/paimon/common/utils/var_length_int_utils.h new file mode 100644 index 0000000..ec76460 --- /dev/null +++ b/src/paimon/common/utils/var_length_int_utils.h @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "fmt/format.h" +#include "paimon/macros.h" +#include "paimon/result.h" +namespace paimon { + +/// Variable-length integer encoding/decoding utilities. +/// +/// Encoding format (same as protobuf unsigned varint): +/// - Each byte stores 7 payload bits in bits [6:0]. +/// - Bit 7 (0x80) is the continuation flag: 1 = more bytes follow, 0 = last byte. +/// - A varint32 uses at most 5 bytes; a varint64 uses at most 9 bytes. +/// +/// Based on the LongPacker from PalDB (https://github.com/linkedin/PalDB), +/// licensed under Apache 2.0. +class VarLengthIntUtils { + public: + VarLengthIntUtils() = delete; + ~VarLengthIntUtils() = delete; + + static constexpr int32_t kMaxVarIntSize = 5; + static constexpr int32_t kMaxVarLongSize = 9; + + // ==================== Encoding (writes to char*) ==================== + + /// Encodes a non-negative int32 as varint into `dest`. + /// Returns the number of bytes written. + static Result EncodeInt(int32_t value, char* dest) { + if (PAIMON_UNLIKELY(value < 0)) { + return Status::Invalid( + fmt::format("negative value: v={} for VarLengthInt Encoding", value)); + } + int32_t num_bytes = 0; + while ((value & ~0x7F) != 0) { + dest[num_bytes] = static_cast((value & 0x7F) | 0x80); + value >>= 7; + ++num_bytes; + } + dest[num_bytes] = static_cast(value); + return num_bytes + 1; + } + + /// Encodes a non-negative int64 as varint into `dest`. + /// Returns the number of bytes written. + static Result EncodeLong(int64_t value, char* dest) { + if (PAIMON_UNLIKELY(value < 0)) { + return Status::Invalid( + fmt::format("negative value: v={} for VarLengthInt Encoding", value)); + } + int32_t num_bytes = 0; + while ((value & ~0x7FLL) != 0) { + dest[num_bytes] = static_cast(static_cast(value & 0x7F) | 0x80); + value >>= 7; + ++num_bytes; + } + dest[num_bytes] = static_cast(value); + return num_bytes + 1; + } + + // ==================== Decoding (reads from const char*) ==================== + + /// Decodes a varint32 from `data` at `*offset`, advancing `*offset` past the consumed bytes. + /// Inlines a 1-byte fast path (values 0-127), which is the most common case. + static inline Result DecodeInt(const char* data, int32_t* offset) { + auto first_byte = static_cast(data[*offset]); + if (PAIMON_LIKELY((first_byte & 0x80) == 0)) { + ++(*offset); + return static_cast(first_byte); + } + // Multi-byte: fall through to generic loop. + // NOTE: EncodeInt only encodes non-negative values, so a decoded negative result + // indicates malformed data. + uint32_t result = 0; + for (int32_t shift = 0; shift < 32; shift += 7) { + auto byte_val = static_cast(data[*offset]); + ++(*offset); + result |= static_cast(byte_val & 0x7F) << shift; + if ((byte_val & 0x80) == 0) { + auto signed_result = static_cast(result); + if (PAIMON_UNLIKELY(signed_result < 0)) { + return Status::Invalid("Malformed varint32: decoded negative value"); + } + return signed_result; + } + } + return Status::Invalid("Malformed varint32: too many continuation bytes"); + } + + /// Decodes a varint64 from `data` at `*offset`, advancing `*offset` past the consumed bytes. + /// Inlines a 1-byte fast path (values 0-127), which is the most common case. + static inline Result DecodeLong(const char* data, int32_t* offset) { + auto first_byte = static_cast(data[*offset]); + if (PAIMON_LIKELY((first_byte & 0x80) == 0)) { + ++(*offset); + return static_cast(first_byte); + } + // Multi-byte: fall through to generic loop. + // NOTE: EncodeLong only encodes non-negative values, so a decoded negative result + // indicates malformed data. + uint64_t result = 0; + for (int32_t shift = 0; shift < 64; shift += 7) { + auto byte_val = static_cast(data[*offset]); + ++(*offset); + result |= static_cast(byte_val & 0x7F) << shift; + if ((byte_val & 0x80) == 0) { + auto signed_result = static_cast(result); + if (PAIMON_UNLIKELY(signed_result < 0)) { + return Status::Invalid("Malformed varint64: decoded negative value"); + } + return signed_result; + } + } + return Status::Invalid("Malformed varint64: too many continuation bytes"); + } +}; + +} // namespace paimon diff --git a/src/paimon/common/utils/var_length_int_utils_test.cpp b/src/paimon/common/utils/var_length_int_utils_test.cpp new file mode 100644 index 0000000..c2404e2 --- /dev/null +++ b/src/paimon/common/utils/var_length_int_utils_test.cpp @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/var_length_int_utils.h" + +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(VarLengthIntUtilsTest, TestEncodeAndDecodeInt) { + std::vector test_values = {0, 1, 127, 128, 255, 256, + 1000, 10000, 100000, 1000000, 10000000, 2147483647}; + + for (int32_t value : test_values) { + char buffer[VarLengthIntUtils::kMaxVarIntSize]; + std::memset(buffer, 0, sizeof(buffer)); + + ASSERT_OK_AND_ASSIGN(int32_t encoded_length, VarLengthIntUtils::EncodeInt(value, buffer)); + + int32_t offset = 0; + ASSERT_OK_AND_ASSIGN(int32_t decoded_value, VarLengthIntUtils::DecodeInt(buffer, &offset)); + + ASSERT_EQ(value, decoded_value) << "Encoded and decoded values don't match for: " << value; + ASSERT_EQ(offset, encoded_length) << "Offset doesn't match encoded length for: " << value; + } +} + +TEST(VarLengthIntUtilsTest, TestEncodeAndDecodeLong) { + std::vector test_values = {0ll, + 127ll, + 128ll, + 16383ll, + 16384ll, + 2097151ll, + 2097152ll, + 268435455ll, + 268435456ll, + 1234567890123456789ll, + 202405170000000000ll, + 999999999999999999ll, + std::numeric_limits::max()}; + + for (int64_t value : test_values) { + char buffer[VarLengthIntUtils::kMaxVarLongSize + 1]; + std::memset(buffer, 0, sizeof(buffer)); + + ASSERT_OK_AND_ASSIGN([[maybe_unused]] int32_t encoded_length, + VarLengthIntUtils::EncodeLong(value, buffer)); + + int32_t offset = 0; + ASSERT_OK_AND_ASSIGN(int64_t decoded_value, VarLengthIntUtils::DecodeLong(buffer, &offset)); + + ASSERT_EQ(value, decoded_value) << "Encoded and decoded values don't match for: " << value; + } +} + +TEST(VarLengthIntUtilsTest, TestEncodeNegativeValue) { + char buffer[VarLengthIntUtils::kMaxVarIntSize]; + ASSERT_NOK_WITH_MSG(VarLengthIntUtils::EncodeInt(-1, buffer), + "negative value: v=-1 for VarLengthInt Encoding"); +} + +TEST(VarLengthIntUtilsTest, TestEncodeNegativeLongValue) { + char buffer[VarLengthIntUtils::kMaxVarLongSize + 1]; + ASSERT_NOK_WITH_MSG(VarLengthIntUtils::EncodeLong(-1, buffer), + "negative value: v=-1 for VarLengthInt Encoding"); +} + +TEST(VarLengthIntUtilsTest, TestEncodeIntSequential) { + char buffer[VarLengthIntUtils::kMaxVarIntSize * 2]; + std::memset(buffer, 0, sizeof(buffer)); + int32_t value1 = 100; + int32_t value2 = 200; + + ASSERT_OK_AND_ASSIGN(auto length1, VarLengthIntUtils::EncodeInt(value1, buffer)); + ASSERT_OK_AND_ASSIGN([[maybe_unused]] auto length2, + VarLengthIntUtils::EncodeInt(value2, buffer + length1)); + + int32_t offset = 0; + ASSERT_OK_AND_ASSIGN(int32_t decoded_result1, VarLengthIntUtils::DecodeInt(buffer, &offset)); + ASSERT_EQ(value1, decoded_result1); + ASSERT_OK_AND_ASSIGN(int32_t decoded_result2, VarLengthIntUtils::DecodeInt(buffer, &offset)); + ASSERT_EQ(value2, decoded_result2); +} + +TEST(VarLengthIntUtilsTest, TestEncodeBytesNumber) { + std::vector values = { + 0x7F, // 127 - fits in 1 byte + 0x80, // 128 - needs 2 bytes + 0x4000, // 16384 - needs 3 bytes + 0x200000, // 2097152 - needs 4 bytes + 2147483647 // 2147483647 - needs 5 bytes + }; + + for (int32_t i = 0; i < static_cast(values.size()); ++i) { + char buffer[VarLengthIntUtils::kMaxVarIntSize]; + std::memset(buffer, 0, sizeof(buffer)); + ASSERT_OK_AND_ASSIGN(int32_t encoded_length, + VarLengthIntUtils::EncodeInt(values[i], buffer)); + ASSERT_EQ(encoded_length, i + 1); + } +} + +TEST(VarLengthIntUtilsTest, TestEncodeLongBytesNumber) { + std::vector values = { + 0x7F, // 127 - fits in 1 byte + 0x80, // 128 - needs 2 bytes + 0x4000, // 16384 - needs 3 bytes + 0x200000, // 2097152 - needs 4 bytes + 2147483647, // 2147483647 - needs 5 bytes + 34359738368ll, // 0x800000000 - needs 6 bytes + 562949953421311ll, // 0x1FFFFFFFFFFFFF - needs 7 bytes + 72057594037927935ll, // 0xFFFFFFFFFFFFFF - needs 8 bytes + std::numeric_limits::max() // needs 9 bytes + }; + + for (int32_t i = 0; i < static_cast(values.size()); ++i) { + char buffer[VarLengthIntUtils::kMaxVarLongSize + 1]; + std::memset(buffer, 0, sizeof(buffer)); + ASSERT_OK_AND_ASSIGN(int32_t encoded_length, + VarLengthIntUtils::EncodeLong(values[i], buffer)); + ASSERT_EQ(encoded_length, i + 1) << values[i]; + } +} +} // namespace paimon::test diff --git a/src/paimon/common/utils/xxhash_test.cpp b/src/paimon/common/utils/xxhash_test.cpp new file mode 100644 index 0000000..5d2daca --- /dev/null +++ b/src/paimon/common/utils/xxhash_test.cpp @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "xxhash.h" // NOLINT(build/include_subdir) + +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(XXHashTest, TestCompatibleWithJava) { + auto file_system = std::make_unique(); + auto file_name = paimon::test::GetDataDir() + "/xxhash.data"; + std::string bytes; + ASSERT_OK(file_system->ReadFile(file_name, &bytes)); + auto lines = StringUtils::Split(bytes, "\n"); + // 1000 random str and empty str + ASSERT_EQ(1001, lines.size()); + for (const auto& line : lines) { + auto data_and_hash = StringUtils::Split(line, ",", /*ignore_empty=*/false); + ASSERT_EQ(2, data_and_hash.size()); + const auto& str = data_and_hash[0]; + int64_t expected_hash = std::stoull(data_and_hash[1], nullptr, 16); + ASSERT_EQ(expected_hash, XXH64(str.data(), str.size(), /*seed=*/0)); + } +} + +} // namespace paimon::test From 57495f181277286103242137b4f98ca0197e27ef Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Tue, 2 Jun 2026 09:21:26 +0800 Subject: [PATCH 2/5] complete testharness --- LICENSE | 10 ++ NOTICE | 3 + src/paimon/testing/utils/testharness.cpp | 170 ++++++++++++++++++++++- src/paimon/testing/utils/testharness.h | 76 +++++++++- 4 files changed, 245 insertions(+), 14 deletions(-) diff --git a/LICENSE b/LICENSE index 3321b54..103c326 100644 --- a/LICENSE +++ b/LICENSE @@ -256,6 +256,16 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- +This product includes code from LiteRT. + +* UniqueTestDirectory utility in src/paimon/testing/utils/testharness.cpp and src/paimon/testing/utils/testharness.h + +Copyright: 2024 Google LLC. +Home page: https://ai.google.dev/edge/litert +License: https://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + This product includes code from Apache Arrow. * Core utilities: diff --git a/NOTICE b/NOTICE index 303de2a..3be42fb 100644 --- a/NOTICE +++ b/NOTICE @@ -17,6 +17,9 @@ This product includes software from RocksDB project (Apache 2.0 and BSD 3-clause Copyright (c) 2011-present, Facebook, Inc. All rights reserved. Copyright (c) 2011 The LevelDB Authors. All rights reserved. +This product includes software from LiteRT project (Apache 2.0) +Copyright 2024 Google LLC. + This product includes software from xxHash project (BSD 2-clause) Copyright (C) 2012-2023 Yann Collet diff --git a/src/paimon/testing/utils/testharness.cpp b/src/paimon/testing/utils/testharness.cpp index fa62690..9c61ac0 100644 --- a/src/paimon/testing/utils/testharness.cpp +++ b/src/paimon/testing/utils/testharness.cpp @@ -7,14 +7,13 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. @@ -29,12 +28,104 @@ // Assert utilities is adapted from RocksDB // https://github.com/facebook/rocksdb/blob/main/test_util/testharness.cc +// Copyright 2024 Google LLC. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// UniqueTestDirectory utility is adapted from LiteRT +// https://github.com/google-ai-edge/LiteRT/blob/main/litert/test/common.cc + #include "paimon/testing/utils/testharness.h" +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "paimon/common/utils/string_utils.h" +#include "paimon/common/utils/uuid.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/file_system_factory.h" #include "paimon/status.h" namespace paimon::test { +std::string GetDataDir() { + const auto result = std::getenv("PAIMON_TEST_DATA"); + if (!result || !result[0]) { + return "test/test_data/"; + } + return std::string(result); +} + +std::map GetJindoTestOptions() { + const char* home = std::getenv("HOME"); + std::string home_str = ""; + if (home) { + home_str = std::string(home); + } + std::string config_file_path = home_str + "/.osscredentials"; + std::ifstream config_file(config_file_path); + std::string access_key_id = ""; + std::string access_key_secret = ""; + if (config_file.is_open()) { + std::string line; + while (std::getline(config_file, line)) { + std::vector key_value = StringUtils::Split(line, "="); + if (key_value.size() != 2) { + continue; + } + if (key_value[0].find("accessid") != std::string::npos) { + StringUtils::Trim(&key_value[1]); + access_key_id = key_value[1]; + } + if (key_value[0].find("accesskey") != std::string::npos) { + StringUtils::Trim(&key_value[1]); + access_key_secret = key_value[1]; + } + } + config_file.close(); + } + std::map options = { + {"fs.oss.bucket.paimon-unittest.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com"}, + {"fs.oss.bucket.paimon-unittest.accessKeyId", access_key_id}, + {"fs.oss.bucket.paimon-unittest.accessKeySecret", access_key_secret}, + {"fs.oss.user", "paimon"}, + }; + return options; +} +std::string GetJindoTestDir() { + static const std::string dir = "oss://paimon-unittest/temp/"; + return dir; +} + +int64_t RandomNumber(int64_t min, int64_t max) { + static thread_local std::mt19937 generator( + std::random_device{}()); // NOLINT(whitespace/braces) + std::uniform_int_distribution distribution(min, max); + return distribution(generator); +} + +std::string GetPidStr() { + return std::to_string(getpid()); +} + ::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s) { if (s.ok()) { return ::testing::AssertionSuccess(); @@ -43,4 +134,69 @@ ::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s) { } } +std::unique_ptr UniqueTestDirectory::Create(const std::string& fs_identifier) { + static const size_t kMaxTries = 1000; + std::string tmp_dir = fs_identifier == "jindo" + ? GetJindoTestDir() + : std::filesystem::temp_directory_path().string() + "/"; + std::map fs_options = + fs_identifier == "jindo" ? GetJindoTestOptions() : std::map(); + auto fs = FileSystemFactory::Get(fs_identifier, tmp_dir, fs_options); + if (!fs.ok()) { + return nullptr; + } + for (size_t i = 0; i < kMaxTries; ++i) { + std::string uuid; + if (!UUID::Generate(&uuid)) { + continue; + } + std::string test_dir = tmp_dir + "paimon_test_" + uuid; + auto is_exist = fs.value()->Exists(test_dir); + if (!is_exist.ok() || is_exist.value()) { + continue; + } + auto status = fs.value()->Mkdirs(test_dir); + if (status.ok()) { + return std::unique_ptr( + new UniqueTestDirectory(test_dir, std::move(fs).value())); + } + } + return nullptr; +} + +UniqueTestDirectory::~UniqueTestDirectory() { + auto is_exist = fs_->Exists(tmpdir_); + assert(is_exist.ok()); + if (is_exist.value()) { + [[maybe_unused]] auto status = fs_->Delete(tmpdir_, /*recursive=*/true); + assert(status.ok()); + } + fs_.reset(); +} + +bool TestUtil::CopyDirectory(const std::filesystem::path& source, + const std::filesystem::path& destination) { + namespace fs = std::filesystem; + try { + if (!fs::exists(destination)) { + fs::create_directories(destination); + } + + for (const auto& entry : fs::directory_iterator(source)) { + const auto& source_path = entry.path(); + auto destination_path = destination / source_path.filename(); + + if (fs::is_directory(source_path)) { + CopyDirectory(source_path, destination_path); + } else if (fs::is_regular_file(source_path)) { + fs::copy(source_path, destination_path, fs::copy_options::overwrite_existing); + } + } + } catch (const std::exception& e) { + std::cerr << "Error while copying directory: " << e.what() << std::endl; + return false; + } + return true; +} + } // namespace paimon::test diff --git a/src/paimon/testing/utils/testharness.h b/src/paimon/testing/utils/testharness.h index e945b75..5c92fa1 100644 --- a/src/paimon/testing/utils/testharness.h +++ b/src/paimon/testing/utils/testharness.h @@ -7,14 +7,13 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. @@ -29,17 +28,49 @@ // Assert utilities is adapted from RocksDB // https://github.com/facebook/rocksdb/blob/main/test_util/testharness.h +// Copyright 2024 Google LLC. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// UniqueTestDirectory utility is adapted from LiteRT +// https://github.com/google-ai-edge/LiteRT/blob/main/litert/test/common.h + #pragma once +#include +#include +#include #include +#include #include "gtest/gtest.h" #include "paimon/macros.h" #include "paimon/result.h" #include "paimon/status.h" +namespace paimon { +class FileSystem; +class Status; +} // namespace paimon + namespace paimon::test { +std::string GetDataDir(); +std::map GetJindoTestOptions(); +std::string GetJindoTestDir(); + +int64_t RandomNumber(int64_t min, int64_t max); + ::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s); #define ASSERT_OK(expr) \ @@ -75,4 +106,35 @@ ::testing::AssertionResult AssertStatus(const char* s_expr, const Status& s); #define EXPECT_OK(s) EXPECT_PRED_FORMAT1(paimon::test::AssertStatus, s) #define EXPECT_NOK(s) EXPECT_FALSE((s).ok()) +class UniqueTestDirectory { + public: + static std::unique_ptr Create(const std::string& fs_identifier = "local"); + ~UniqueTestDirectory(); + + UniqueTestDirectory(const UniqueTestDirectory&) = delete; + UniqueTestDirectory(UniqueTestDirectory&&) = default; + UniqueTestDirectory& operator=(const UniqueTestDirectory&) = delete; + UniqueTestDirectory& operator=(UniqueTestDirectory&&) = default; + + const std::string& Str() const { + return tmpdir_; + } + + std::shared_ptr GetFileSystem() const { + return fs_; + } + + private: + UniqueTestDirectory(const std::string& tmpdir, std::shared_ptr&& fs) + : tmpdir_(tmpdir), fs_(std::move(fs)) {} + std::string tmpdir_; + std::shared_ptr fs_; +}; + +class TestUtil { + public: + static bool CopyDirectory(const std::filesystem::path& source, + const std::filesystem::path& destination); +}; + } // namespace paimon::test From 4587f021a4ca1a122e1213a8a90f4b00ce3fe885 Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Tue, 2 Jun 2026 09:57:37 +0800 Subject: [PATCH 3/5] fix ub in DeltaVarintCompressor --- src/paimon/common/utils/delta_varint_compressor.cpp | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/paimon/common/utils/delta_varint_compressor.cpp b/src/paimon/common/utils/delta_varint_compressor.cpp index 8e5d5ec..89f908d 100644 --- a/src/paimon/common/utils/delta_varint_compressor.cpp +++ b/src/paimon/common/utils/delta_varint_compressor.cpp @@ -32,12 +32,14 @@ std::vector DeltaVarintCompressor::Compress(const std::vector& da return {}; } - // 1. Delta encoding + // 1. Delta encoding (use unsigned subtraction to avoid signed overflow UB) std::vector deltas; deltas.reserve(data.size()); deltas.push_back(data[0]); for (size_t i = 1; i < data.size(); i++) { - deltas.push_back(data[i] - data[i - 1]); + uint64_t unsigned_delta = + static_cast(data[i]) - static_cast(data[i - 1]); + deltas.push_back(static_cast(unsigned_delta)); } // 2. ZigZag + Varint @@ -63,11 +65,13 @@ Result> DeltaVarintCompressor::Decompress(const std::vector deltas.push_back(delta); } - // 2. Delta decoding + // 2. Delta decoding (use unsigned addition to avoid signed overflow UB) std::vector result(deltas.size()); result[0] = deltas[0]; for (size_t i = 1; i < result.size(); i++) { - result[i] = result[i - 1] + deltas[i]; + uint64_t reconstructed = + static_cast(result[i - 1]) + static_cast(deltas[i]); + result[i] = static_cast(reconstructed); } return result; } From 8e4866b41e3f327802bafa9f69a450499d8569fa Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Tue, 2 Jun 2026 11:11:44 +0800 Subject: [PATCH 4/5] fix ub in BloomFilter --- src/paimon/common/utils/bloom_filter.cpp | 8 ++++++-- src/paimon/common/utils/bloom_filter64.cpp | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/paimon/common/utils/bloom_filter.cpp b/src/paimon/common/utils/bloom_filter.cpp index ccdbc7d..056a013 100644 --- a/src/paimon/common/utils/bloom_filter.cpp +++ b/src/paimon/common/utils/bloom_filter.cpp @@ -61,7 +61,9 @@ Status BloomFilter::AddHash(int32_t hash1) { auto hash2 = static_cast(static_cast(hash1) >> 16); for (int32_t i = 1; i <= num_hash_functions_; i++) { - int32_t combined_hash = hash1 + (i * hash2); + // Use uint32_t arithmetic to avoid signed overflow UB (matches Java int wrap semantics) + int32_t combined_hash = static_cast( + static_cast(hash1) + (static_cast(i) * static_cast(hash2))); // hashcode should be positive, flip all the bits if it's negative if (combined_hash < 0) { combined_hash = ~combined_hash; @@ -76,7 +78,9 @@ bool BloomFilter::TestHash(int32_t hash1) const { auto hash2 = static_cast(static_cast(hash1) >> 16); for (int32_t i = 1; i <= num_hash_functions_; i++) { - int32_t combined_hash = hash1 + (i * hash2); + // Use uint32_t arithmetic to avoid signed overflow UB (matches Java int wrap semantics) + int32_t combined_hash = static_cast( + static_cast(hash1) + (static_cast(i) * static_cast(hash2))); // hashcode should be positive, flip all the bits if it's negative if (combined_hash < 0) { combined_hash = ~combined_hash; diff --git a/src/paimon/common/utils/bloom_filter64.cpp b/src/paimon/common/utils/bloom_filter64.cpp index 02be3e2..92a76ab 100644 --- a/src/paimon/common/utils/bloom_filter64.cpp +++ b/src/paimon/common/utils/bloom_filter64.cpp @@ -70,7 +70,9 @@ void BloomFilter64::AddHash(int64_t hash64) { auto hash2 = static_cast(static_cast(hash64) >> 32); for (int32_t i = 1; i <= num_hash_functions_; i++) { - int32_t combined_hash = hash1 + (i * hash2); + // Use uint32_t arithmetic to avoid signed overflow UB (matches Java int wrap semantics) + int32_t combined_hash = static_cast( + static_cast(hash1) + (static_cast(i) * static_cast(hash2))); // hashcode should be positive, flip all the bits if it's negative if (combined_hash < 0) { combined_hash = ~combined_hash; @@ -85,7 +87,9 @@ bool BloomFilter64::TestHash(int64_t hash64) const { auto hash2 = static_cast(static_cast(hash64) >> 32); for (int32_t i = 1; i <= num_hash_functions_; i++) { - int32_t combined_hash = hash1 + (i * hash2); + // Use uint32_t arithmetic to avoid signed overflow UB (matches Java int wrap semantics) + int32_t combined_hash = static_cast( + static_cast(hash1) + (static_cast(i) * static_cast(hash2))); // hashcode should be positive, flip all the bits if it's negative if (combined_hash < 0) { combined_hash = ~combined_hash; From 48a78acf5e0fe28ded73915fd4e3bbe0c8285040 Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Tue, 2 Jun 2026 21:31:08 +0800 Subject: [PATCH 5/5] fix clang tidy --- src/paimon/common/utils/bloom_filter.cpp | 10 ++++++---- src/paimon/common/utils/bloom_filter64.cpp | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/paimon/common/utils/bloom_filter.cpp b/src/paimon/common/utils/bloom_filter.cpp index 056a013..d66420f 100644 --- a/src/paimon/common/utils/bloom_filter.cpp +++ b/src/paimon/common/utils/bloom_filter.cpp @@ -62,8 +62,9 @@ Status BloomFilter::AddHash(int32_t hash1) { for (int32_t i = 1; i <= num_hash_functions_; i++) { // Use uint32_t arithmetic to avoid signed overflow UB (matches Java int wrap semantics) - int32_t combined_hash = static_cast( - static_cast(hash1) + (static_cast(i) * static_cast(hash2))); + auto combined_hash = + static_cast(static_cast(hash1) + + (static_cast(i) * static_cast(hash2))); // hashcode should be positive, flip all the bits if it's negative if (combined_hash < 0) { combined_hash = ~combined_hash; @@ -79,8 +80,9 @@ bool BloomFilter::TestHash(int32_t hash1) const { for (int32_t i = 1; i <= num_hash_functions_; i++) { // Use uint32_t arithmetic to avoid signed overflow UB (matches Java int wrap semantics) - int32_t combined_hash = static_cast( - static_cast(hash1) + (static_cast(i) * static_cast(hash2))); + auto combined_hash = + static_cast(static_cast(hash1) + + (static_cast(i) * static_cast(hash2))); // hashcode should be positive, flip all the bits if it's negative if (combined_hash < 0) { combined_hash = ~combined_hash; diff --git a/src/paimon/common/utils/bloom_filter64.cpp b/src/paimon/common/utils/bloom_filter64.cpp index 92a76ab..f2685a1 100644 --- a/src/paimon/common/utils/bloom_filter64.cpp +++ b/src/paimon/common/utils/bloom_filter64.cpp @@ -71,8 +71,9 @@ void BloomFilter64::AddHash(int64_t hash64) { for (int32_t i = 1; i <= num_hash_functions_; i++) { // Use uint32_t arithmetic to avoid signed overflow UB (matches Java int wrap semantics) - int32_t combined_hash = static_cast( - static_cast(hash1) + (static_cast(i) * static_cast(hash2))); + auto combined_hash = + static_cast(static_cast(hash1) + + (static_cast(i) * static_cast(hash2))); // hashcode should be positive, flip all the bits if it's negative if (combined_hash < 0) { combined_hash = ~combined_hash; @@ -88,8 +89,9 @@ bool BloomFilter64::TestHash(int64_t hash64) const { for (int32_t i = 1; i <= num_hash_functions_; i++) { // Use uint32_t arithmetic to avoid signed overflow UB (matches Java int wrap semantics) - int32_t combined_hash = static_cast( - static_cast(hash1) + (static_cast(i) * static_cast(hash2))); + auto combined_hash = + static_cast(static_cast(hash1) + + (static_cast(i) * static_cast(hash2))); // hashcode should be positive, flip all the bits if it's negative if (combined_hash < 0) { combined_hash = ~combined_hash;