diff --git a/src/paimon/core/mergetree/external_sort_buffer.cpp b/src/paimon/core/mergetree/external_sort_buffer.cpp new file mode 100644 index 0000000..9bfeec8 --- /dev/null +++ b/src/paimon/core/mergetree/external_sort_buffer.cpp @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/external_sort_buffer.h" + +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "arrow/compute/api.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/disk/io_manager.h" +#include "paimon/core/io/async_key_value_producer_and_consumer.h" +#include "paimon/core/io/key_value_in_memory_record_reader.h" +#include "paimon/core/io/key_value_meta_projection_consumer.h" +#include "paimon/core/io/key_value_record_reader.h" +#include "paimon/core/io/row_to_arrow_array_converter.h" +#include "paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.h" +#include "paimon/core/mergetree/spill_channel_manager.h" +#include "paimon/core/mergetree/spill_reader.h" +#include "paimon/core/mergetree/spill_writer.h" + +namespace paimon { + +Result> ExternalSortBuffer::Create( + std::unique_ptr&& in_memory_buffer, + const std::shared_ptr& value_schema, + const std::vector& trimmed_primary_keys, + const std::shared_ptr& key_comparator, + const std::shared_ptr& user_defined_seq_comparator, + const CoreOptions& options, const std::shared_ptr& io_manager, + bool enable_multi_thread_spill, const std::shared_ptr& pool) { + if (options.GetLocalSortMaxNumFileHandles() < kSpillMinFanIn) { + return Status::Invalid(fmt::format( + "invalid '{}': {}, must be at least {}", Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES, + options.GetLocalSortMaxNumFileHandles(), kSpillMinFanIn)); + } + arrow::FieldVector key_fields; + key_fields.reserve(trimmed_primary_keys.size()); + for (const auto& primary_key : trimmed_primary_keys) { + auto key_field = value_schema->GetFieldByName(primary_key); + assert(key_field != nullptr); + key_fields.push_back(key_field); + } + auto key_schema = arrow::schema(key_fields); + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr spill_channel_enumerator, + io_manager->CreateChannelEnumerator()); + return std::unique_ptr( + new ExternalSortBuffer(std::move(in_memory_buffer), key_schema, value_schema, + key_comparator, user_defined_seq_comparator, options, + spill_channel_enumerator, enable_multi_thread_spill, pool)); +} + +ExternalSortBuffer::ExternalSortBuffer( + std::unique_ptr&& in_memory_buffer, + const std::shared_ptr& key_schema, + const std::shared_ptr& value_schema, + const std::shared_ptr& key_comparator, + const std::shared_ptr& user_defined_seq_comparator, + const CoreOptions& options, + const std::shared_ptr& spill_channel_enumerator, + bool enable_multi_thread_spill, const std::shared_ptr& pool) + : in_memory_buffer_(std::move(in_memory_buffer)), + pool_(pool), + key_schema_(key_schema), + value_schema_(value_schema), + key_comparator_(key_comparator), + user_defined_seq_comparator_(user_defined_seq_comparator), + write_schema_(SpecialFields::CompleteSequenceAndValueKindField(value_schema)), + options_(options), + max_fan_in_(options.GetLocalSortMaxNumFileHandles()), + enable_multi_thread_spill_(enable_multi_thread_spill), + spill_channel_manager_( + std::make_shared(options_.GetFileSystem(), max_fan_in_)), + spill_merger_(std::make_unique(max_fan_in_)), + spill_channel_enumerator_(spill_channel_enumerator), + actual_max_fan_in_(max_fan_in_), + spill_batch_size_(options_.GetWriteBatchSize()) {} + +ExternalSortBuffer::~ExternalSortBuffer() { + DoClear(); +} + +bool ExternalSortBuffer::HasSpilledData() const { + return !spill_channel_manager_->GetChannels().empty(); +} + +void ExternalSortBuffer::DoClear() { + in_memory_buffer_->Clear(); + + spill_channel_manager_->Reset(); + total_spill_disk_bytes_ = 0; + spill_merger_->Clear(); +} + +void ExternalSortBuffer::Clear() { + DoClear(); +} + +uint64_t ExternalSortBuffer::GetMemorySize() const { + return in_memory_buffer_->GetMemorySize(); +} + +void ExternalSortBuffer::UpdateSpillParameters() { + int64_t estimated_row_size = in_memory_buffer_->GetEstimateMemoryUseForEachRow(); + if (estimated_row_size <= 0) { + return; + } + + const int32_t max_batch_size = options_.GetWriteBatchSize(); + const int32_t min_batch_size = std::min(kSpillMinBatchSize, max_batch_size); + const int64_t merge_budget = options_.GetWriteBufferSize(); + const int64_t max_memory_use_per_handle = merge_budget / max_fan_in_; + + spill_batch_size_ = max_memory_use_per_handle / estimated_row_size; + spill_batch_size_ = std::clamp(spill_batch_size_, min_batch_size, max_batch_size); + + actual_max_fan_in_ = merge_budget / (spill_batch_size_ * estimated_row_size); + actual_max_fan_in_ = std::clamp(actual_max_fan_in_, kSpillMinFanIn, max_fan_in_); + + // Re-derive spill_batch_size_ from the clamped actual_max_fan_in_ to stay within merge_budget. + spill_batch_size_ = merge_budget / (actual_max_fan_in_ * estimated_row_size); + spill_batch_size_ = std::clamp(spill_batch_size_, 1, max_batch_size); + + spill_merger_->SetMaxFanIn(actual_max_fan_in_); +} + +Result ExternalSortBuffer::FlushMemory() { + if (!in_memory_buffer_->HasData()) { + return true; + } + + UpdateSpillParameters(); + PAIMON_ASSIGN_OR_RAISE(std::vector> memory_buffer_readers, + in_memory_buffer_->CreateReaders()); + PAIMON_RETURN_NOT_OK(SpillMemoryBuffer(std::move(memory_buffer_readers))); + in_memory_buffer_->Clear(); + return total_spill_disk_bytes_ < options_.GetWriteBufferSpillMaxDiskSize(); +} + +Result ExternalSortBuffer::Write(std::unique_ptr&& batch) { + PAIMON_ASSIGN_OR_RAISE(bool has_remaining_memory, in_memory_buffer_->Write(std::move(batch))); + if (has_remaining_memory) { + return true; + } + return FlushMemory(); +} + +Result>> ExternalSortBuffer::CreateReaders() { + PAIMON_ASSIGN_OR_RAISE(std::vector> memory_readers, + in_memory_buffer_->CreateReaders()); + if (!HasSpilledData()) { + return memory_readers; + } + + int32_t max_spill_files = actual_max_fan_in_ - 1; + PAIMON_RETURN_NOT_OK( + spill_merger_->RunFinalMergeIfNeeded(max_spill_files, CreateSpillFileMergeFn())); + PAIMON_ASSIGN_OR_RAISE(std::vector> readers, + CreateSpillReaders(spill_merger_->GetAllFiles())); + readers.insert(readers.end(), std::make_move_iterator(memory_readers.begin()), + std::make_move_iterator(memory_readers.end())); + return readers; +} + +bool ExternalSortBuffer::HasData() const { + return in_memory_buffer_->HasData() || HasSpilledData(); +} + +Result>> ExternalSortBuffer::CreateSpillReaders( + const std::vector& files) const { + std::vector> readers; + readers.reserve(files.size()); + for (const auto& file : files) { + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr reader, + SpillReader::Create(options_.GetFileSystem(), key_schema_, value_schema_, + enable_multi_thread_spill_, file.channel_id, pool_)); + readers.push_back(std::move(reader)); + } + return readers; +} + +Result ExternalSortBuffer::SpillToDisk( + std::vector>&& readers, int32_t write_batch_size) { + const auto& spill_compress_options = options_.GetSpillCompressOptions(); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr spill_writer, + SpillWriter::Create(options_.GetFileSystem(), write_schema_, spill_channel_enumerator_, + spill_channel_manager_, spill_compress_options.compress, + spill_compress_options.zstd_level, enable_multi_thread_spill_, pool_)); + auto cleanup_guard = ScopeGuard([&]() { + [[maybe_unused]] auto status = + spill_channel_manager_->DeleteChannel(spill_writer->GetChannelId()); + }); + + auto sorted_reader = std::make_unique( + std::move(readers), key_comparator_, user_defined_seq_comparator_, + /*merge_function_wrapper=*/nullptr); + auto create_consumer = [target_schema = write_schema_, pool = pool_]() + -> Result>> { + return KeyValueMetaProjectionConsumer::Create(target_schema, pool); + }; + auto async_key_value_producer_consumer = + std::make_unique>( + std::move(sorted_reader), create_consumer, write_batch_size, + /*projection_thread_num=*/1, pool_); + auto close_guard = ScopeGuard([&]() { async_key_value_producer_consumer->Close(); }); + + while (true) { + PAIMON_ASSIGN_OR_RAISE(KeyValueBatch key_value_batch, + async_key_value_producer_consumer->NextBatch()); + if (key_value_batch.batch == nullptr) { + break; + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr record_batch, + arrow::ImportRecordBatch(key_value_batch.batch.get(), write_schema_)); + PAIMON_RETURN_NOT_OK(spill_writer->WriteBatch(record_batch)); + } + + PAIMON_RETURN_NOT_OK(spill_writer->Close()); + PAIMON_ASSIGN_OR_RAISE(int64_t spilled_file_size, spill_writer->GetFileSize()); + cleanup_guard.Release(); + return FileChannelInfo{spill_writer->GetChannelId(), spilled_file_size}; +} + +Status ExternalSortBuffer::SpillMemoryBuffer( + std::vector>&& readers) { + PAIMON_ASSIGN_OR_RAISE(FileChannelInfo file_info, + SpillToDisk(std::move(readers), spill_batch_size_)); + total_spill_disk_bytes_ += file_info.file_size; + spill_merger_->AddFile(file_info); + return spill_merger_->RunMergeIfNeeded(CreateSpillFileMergeFn()); +} + +SpillFileMerger::MergeFn ExternalSortBuffer::CreateSpillFileMergeFn() { + return [this](const std::vector& files) -> Result { + return MergeAndReplaceFiles(files); + }; +} + +Result ExternalSortBuffer::MergeAndReplaceFiles( + const std::vector& files) { + PAIMON_ASSIGN_OR_RAISE(std::vector> readers, + CreateSpillReaders(files)); + PAIMON_ASSIGN_OR_RAISE(FileChannelInfo output, + SpillToDisk(std::move(readers), spill_batch_size_)); + total_spill_disk_bytes_ += output.file_size; + + for (const auto& file : files) { + [[maybe_unused]] auto status = spill_channel_manager_->DeleteChannel(file.channel_id); + total_spill_disk_bytes_ -= file.file_size; + } + return output; +} + +} // namespace paimon diff --git a/src/paimon/core/mergetree/external_sort_buffer.h b/src/paimon/core/mergetree/external_sort_buffer.h new file mode 100644 index 0000000..b89e95f --- /dev/null +++ b/src/paimon/core/mergetree/external_sort_buffer.h @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "arrow/type_fwd.h" +#include "paimon/core/core_options.h" +#include "paimon/core/disk/file_io_channel.h" +#include "paimon/core/mergetree/in_memory_sort_buffer.h" +#include "paimon/core/mergetree/sort_buffer.h" +#include "paimon/core/mergetree/spill_file_merger.h" +#include "paimon/record_batch.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { +class FieldsComparator; +class IOManager; +class KeyValueRecordReader; +class MemoryPool; +class SpillChannelManager; + +/// Spillable SortBuffer. Buffers RecordBatches in an underlying in-memory sort buffer; +/// when the in-memory budget is reached, sorted data is spilled to a new on-disk file. +class ExternalSortBuffer : public SortBuffer { + public: + static Result> Create( + std::unique_ptr&& in_memory_buffer, + const std::shared_ptr& value_schema, + const std::vector& trimmed_primary_keys, + const std::shared_ptr& key_comparator, + const std::shared_ptr& user_defined_seq_comparator, + const CoreOptions& options, const std::shared_ptr& io_manager, + bool enable_multi_thread_spill, const std::shared_ptr& pool); + ~ExternalSortBuffer() override; + + void Clear() override; + uint64_t GetMemorySize() const override; + Result FlushMemory() override; + Result Write(std::unique_ptr&& batch) override; + Result>> CreateReaders() override; + bool HasData() const override; + + private: + static constexpr int32_t kSpillMinFanIn = 2; + static constexpr int32_t kSpillMinBatchSize = 256; + + void DoClear(); + void UpdateSpillParameters(); + bool HasSpilledData() const; + Result>> CreateSpillReaders( + const std::vector& files) const; + Result SpillToDisk( + std::vector>&& readers, int32_t write_batch_size); + SpillFileMerger::MergeFn CreateSpillFileMergeFn(); + Result MergeAndReplaceFiles(const std::vector& files); + Status SpillMemoryBuffer(std::vector>&& readers); + + ExternalSortBuffer(std::unique_ptr&& in_memory_buffer, + const std::shared_ptr& key_schema, + const std::shared_ptr& value_schema, + const std::shared_ptr& key_comparator, + const std::shared_ptr& user_defined_seq_comparator, + const CoreOptions& options, + const std::shared_ptr& spill_channel_enumerator, + bool enable_multi_thread_spill, const std::shared_ptr& pool); + + std::unique_ptr in_memory_buffer_; + + const std::shared_ptr pool_; + const std::shared_ptr key_schema_; + const std::shared_ptr value_schema_; + const std::shared_ptr key_comparator_; + const std::shared_ptr user_defined_seq_comparator_; + const std::shared_ptr write_schema_; + const CoreOptions options_; + const int32_t max_fan_in_; + const bool enable_multi_thread_spill_; + const std::shared_ptr spill_channel_manager_; + + std::unique_ptr spill_merger_; + std::shared_ptr spill_channel_enumerator_; + int64_t total_spill_disk_bytes_ = 0; + int32_t actual_max_fan_in_; + int32_t spill_batch_size_; +}; + +} // namespace paimon diff --git a/src/paimon/core/mergetree/sort_buffer_test.cpp b/src/paimon/core/mergetree/sort_buffer_test.cpp new file mode 100644 index 0000000..648a95b --- /dev/null +++ b/src/paimon/core/mergetree/sort_buffer_test.cpp @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/common/types/data_field.h" +#include "paimon/common/types/row_kind.h" +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/core/disk/io_manager.h" +#include "paimon/core/io/key_value_record_reader.h" +#include "paimon/core/mergetree/external_sort_buffer.h" +#include "paimon/core/mergetree/in_memory_sort_buffer.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/defs.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/data_generator.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +namespace { + +struct ReaderResult { + std::string key; + int32_t sequence_field; + int32_t value_field; + int64_t sequence_number; + const RowKind* row_kind; +}; + +} // namespace + +class SortBufferTest : public ::testing::Test { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + test_dir_ = UniqueTestDirectory::Create(); + io_manager_ = std::make_shared(test_dir_->Str(), test_dir_->GetFileSystem()); + + std::vector value_fields = {DataField(0, arrow::field("key", arrow::utf8())), + DataField(1, arrow::field("seq", arrow::int32())), + DataField(2, arrow::field("val", arrow::int32()))}; + value_type_ = DataField::ConvertDataFieldsToArrowStructType(value_fields); + value_schema_ = DataField::ConvertDataFieldsToArrowSchema(value_fields); + primary_keys_ = {"key"}; + sequence_fields_ = {"seq"}; + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, value_schema_, /*partition_keys=*/{}, + primary_keys_, /*options=*/{{Options::BUCKET, "1"}})); + data_generator_ = std::make_shared(table_schema, pool_); + + ASSERT_OK_AND_ASSIGN(key_comparator_, FieldsComparator::Create( + value_fields, {0}, /*is_ascending_order=*/true)); + ASSERT_OK_AND_ASSIGN( + sequence_comparator_, + FieldsComparator::Create(value_fields, {1}, /*is_ascending_order=*/true)); + } + + protected: + void CheckResult(std::vector>&& readers, + const std::vector>& expected, bool need_sort) const { + ASSERT_EQ(readers.size(), expected.size()); + std::vector> actual; + for (size_t i = 0; i < expected.size(); ++i) { + ASSERT_OK_AND_ASSIGN(auto rows, CollectRows(readers[i].get())); + actual.push_back(std::move(rows)); + } + if (need_sort) { + std::sort(actual.begin(), actual.end(), + [](const auto& a, const auto& b) { return a.size() < b.size(); }); + } + for (size_t i = 0; i < expected.size(); ++i) { + AssertRows(actual[i], expected[i]); + } + CloseReaders(readers); + } + + BinaryRow MakeRow(const RowKind* kind, const std::string& key, int32_t seq, int32_t val) { + return BinaryRowGenerator::GenerateRow(kind, {key, seq, val}, pool_.get()); + } + std::unique_ptr MakeBatch(const std::vector& input_rows) { + EXPECT_OK_AND_ASSIGN(auto batches, + data_generator_->SplitArrayByPartitionAndBucket(input_rows)); + EXPECT_EQ(1, batches.size()); + return std::move(batches[0]); + } + + Result> CollectRows(KeyValueRecordReader* reader) const { + std::vector rows; + while (true) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr iterator, + reader->NextBatch()); + if (iterator == nullptr) { + break; + } + while (true) { + PAIMON_ASSIGN_OR_RAISE(bool has_next, iterator->HasNext()); + if (!has_next) { + break; + } + PAIMON_ASSIGN_OR_RAISE(KeyValue key_value, iterator->Next()); + rows.push_back(ReaderResult{std::string(key_value.key->GetStringView(0)), + key_value.value->GetInt(1), key_value.value->GetInt(2), + key_value.sequence_number, key_value.value_kind}); + } + } + return rows; + } + + Result> CreateExternalSortBuffer( + int64_t last_sequence_number, uint64_t write_buffer_size) const { + PAIMON_ASSIGN_OR_RAISE( + CoreOptions options, + CoreOptions::FromMap({{Options::SPILL_COMPRESSION, "uncompressed"}})); + auto in_memory_buffer = std::make_unique( + last_sequence_number, value_type_, primary_keys_, sequence_fields_, + /*sequence_fields_ascending=*/true, key_comparator_, write_buffer_size, pool_); + return ExternalSortBuffer::Create(std::move(in_memory_buffer), value_schema_, primary_keys_, + key_comparator_, sequence_comparator_, options, + io_manager_, /*enable_multi_thread_spill=*/false, pool_); + } + + void AssertRows(const std::vector& actual, + const std::vector& expected) const { + ASSERT_EQ(actual.size(), expected.size()); + for (size_t index = 0; index < expected.size(); ++index) { + ASSERT_EQ(actual[index].key, expected[index].key); + ASSERT_EQ(actual[index].sequence_field, expected[index].sequence_field); + ASSERT_EQ(actual[index].value_field, expected[index].value_field); + ASSERT_EQ(actual[index].sequence_number, expected[index].sequence_number); + ASSERT_EQ(actual[index].row_kind, expected[index].row_kind); + } + } + + void CloseReaders(const std::vector>& readers) const { + for (const auto& reader : readers) { + reader->Close(); + } + } + + std::shared_ptr pool_; + std::shared_ptr data_generator_; + std::unique_ptr test_dir_; + std::shared_ptr io_manager_; + std::shared_ptr value_type_; + std::shared_ptr value_schema_; + std::vector primary_keys_; + std::vector sequence_fields_; + std::shared_ptr key_comparator_; + std::shared_ptr sequence_comparator_; +}; + +TEST_F(SortBufferTest, TestInMemorySortBufferEstimateMemoryUse) { + { + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + std::shared_ptr array = + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + ["Lucy", 20, 1, 14.1], + ["Paul", 20, 1, null], + ["Alice", 10, 0, 13.1] + ])") + .ValueOrDie(); + ASSERT_OK_AND_ASSIGN(int64_t memory_use, InMemorySortBuffer::EstimateMemoryUse(array)); + int64_t expected_memory_use = + 1 + (13 + 3 * 4 + 1) + (3 * 4 + 1) + (3 * 4 + 1) + (3 * 8 + 1); + ASSERT_EQ(memory_use, expected_memory_use); + } + { + arrow::FieldVector fields = {arrow::field("v0", arrow::boolean()), + arrow::field("v1", arrow::int8()), + arrow::field("v2", arrow::int16()), + arrow::field("v3", arrow::int32()), + arrow::field("v4", arrow::int64()), + arrow::field("v5", arrow::float32()), + arrow::field("v6", arrow::float64()), + arrow::field("v7", arrow::date32()), + arrow::field("v8", arrow::timestamp(arrow::TimeUnit::NANO)), + arrow::field("v9", arrow::decimal128(30, 20)), + arrow::field("v10", arrow::utf8()), + arrow::field("v11", arrow::binary())}; + + auto array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + [true, 10, 200, 65536, 123456789, 0.0, 0.0, 2000, -86399999999500, "2134.48690000000000000009", "difference", "Alice"], + [false, -128, -32768, -2147483648, -9223372036854775808, -3.4028235E38, -1.7976931348623157E308, -719528, -9223372036854775808, "-999999999999999999.99999999999999999999", "Alice", "Two"], + [true, 127, 32767, 2147483647, 9223372036854775807, 3.4028235E38, 1.7976931348623157E308, 2932896, 9223372036854775807, "999999999999999999.99999999999999999999", "Alice", "made"], + [true, 0, 0, 0, 0, 1.4E-45, 4.9E-324, 0, 0, "0.00000000000000000000", "Alice", "wood"] +])") + .ValueOrDie()); + ASSERT_OK_AND_ASSIGN(int64_t memory_use, InMemorySortBuffer::EstimateMemoryUse(array)); + int64_t expected_memory_use = 1 + (4 + 1) + (4 + 1) + (2 * 4 + 1) + (4 * 4 + 1) + + (8 * 4 + 1) + (4 * 4 + 1) + (8 * 4 + 1) + (4 * 4 + 1) + + (8 * 4 + 1) + (4 * 16 + 1) + (25 + 4 * 4 + 1) + + (16 + 4 * 4 + 1); + ASSERT_EQ(memory_use, expected_memory_use); + } + { + arrow::FieldVector fields = { + arrow::field("f0", arrow::list(arrow::int32())), + arrow::field("f1", arrow::map(arrow::utf8(), arrow::int64())), + arrow::field("f2", arrow::struct_({arrow::field("sub1", arrow::int64()), + arrow::field("sub2", arrow::float64()), + arrow::field("sub3", arrow::boolean())})), + }; + auto array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + [[1, 2, 3], [["apple", 3], ["banana", 4]], [10, 10.1, false]], + [[4, 5], [["cat", 5], ["dog", 6], ["mouse", 7]], [20, 20.1, true]], + [[6], [["elephant", 7], ["fox", 8]], [null, 30.1, true]] + ])") + .ValueOrDie()); + ASSERT_OK_AND_ASSIGN(int64_t memory_use, InMemorySortBuffer::EstimateMemoryUse(array)); + int64_t list_mem = 1 + (4 * 6 + 1); + int64_t map_mem = 1 + (33 + 4 * 7 + 1) + (8 * 7 + 1); + int64_t struct_mem = 1 + (8 * 3 + 1) + (8 * 3 + 1) + (1 * 3 + 1); + int64_t expected_memory_use = 1 + list_mem + map_mem + struct_mem; + ASSERT_EQ(memory_use, expected_memory_use); + } +} + +TEST_F(SortBufferTest, TestInMemorySortBufferSimple) { + InMemorySortBuffer buffer(/*last_sequence_number=*/9, value_type_, primary_keys_, + sequence_fields_, /*sequence_fields_ascending=*/true, key_comparator_, + /*write_buffer_size=*/1024 * 1024, pool_); + + std::vector input_rows; + input_rows.push_back(MakeRow(RowKind::Insert(), "b", 2, 200)); + input_rows.push_back(MakeRow(RowKind::Delete(), "a", 3, 300)); + input_rows.push_back(MakeRow(RowKind::UpdateAfter(), "a", 1, 100)); + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, buffer.Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + + input_rows.clear(); + input_rows.push_back(MakeRow(RowKind::UpdateBefore(), "c", 1, 400)); + input_rows.push_back(MakeRow(RowKind::Insert(), "b", 1, 150)); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, buffer.Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_TRUE(buffer.HasData()); + ASSERT_GT(buffer.GetMemorySize(), 0); + + ASSERT_OK_AND_ASSIGN(auto readers, buffer.CreateReaders()); + ASSERT_TRUE(buffer.HasData()); + ASSERT_GT(buffer.GetMemorySize(), 0); + + CheckResult( + std::move(readers), + {{{"a", 1, 100, 12, RowKind::UpdateAfter()}, + {"a", 3, 300, 11, RowKind::Delete()}, + {"b", 2, 200, 10, RowKind::Insert()}}, + {{"b", 1, 150, 14, RowKind::Insert()}, {"c", 1, 400, 13, RowKind::UpdateBefore()}}}, + /*need_sort=*/false); + + buffer.Clear(); + ASSERT_FALSE(buffer.HasData()); + ASSERT_EQ(buffer.GetMemorySize(), 0); +} + +TEST_F(SortBufferTest, TestInMemorySortBufferEstimateMemoryUseForEachRow) { + InMemorySortBuffer buffer(/*last_sequence_number=*/9, value_type_, primary_keys_, + sequence_fields_, /*sequence_fields_ascending=*/true, key_comparator_, + /*write_buffer_size=*/1024 * 1024, pool_); + + ASSERT_EQ(buffer.GetEstimateMemoryUseForEachRow(), 0); + + uint64_t cached_memory_use_per_row = 0; + for (int32_t index = 0; index < 3; ++index) { + std::vector input_rows; + input_rows.push_back(MakeRow(RowKind::Insert(), std::string(index + 1, 'a'), index, index)); + + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, buffer.Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + + cached_memory_use_per_row = buffer.GetMemorySize() / (index + 1); + ASSERT_EQ(buffer.GetEstimateMemoryUseForEachRow(), cached_memory_use_per_row); + } + + // Clear does not reset the estimated per-row memory usage. + buffer.Clear(); + ASSERT_EQ(buffer.GetEstimateMemoryUseForEachRow(), cached_memory_use_per_row); + + // Verify behavior when writing an empty batch. + std::shared_ptr empty_array = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([])").ValueOrDie(); + ::ArrowArray c_array; + ASSERT_TRUE(arrow::ExportArray(*empty_array, &c_array).ok()); + RecordBatchBuilder batch_builder(&c_array); + batch_builder.SetRowKinds({}); + ASSERT_OK_AND_ASSIGN(std::unique_ptr empty_batch, batch_builder.Finish()); + + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, buffer.Write(std::move(empty_batch))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_EQ(buffer.GetEstimateMemoryUseForEachRow(), cached_memory_use_per_row); +} + +TEST_F(SortBufferTest, TestExternalSortBufferWithInMemoryDataAndNoSpill) { + ASSERT_OK_AND_ASSIGN(auto buffer, CreateExternalSortBuffer(/*last_sequence_number=*/4, + /*write_buffer_size=*/1024 * 1024)); + std::vector input_rows; + input_rows.push_back(MakeRow(RowKind::Delete(), "b", 2, 200)); + input_rows.push_back(MakeRow(RowKind::UpdateAfter(), "a", 3, 300)); + input_rows.push_back(MakeRow(RowKind::Insert(), "a", 1, 100)); + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, buffer->Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_TRUE(buffer->HasData()); + ASSERT_GT(buffer->GetMemorySize(), 0); + + ASSERT_OK_AND_ASSIGN(auto readers, buffer->CreateReaders()); + ASSERT_TRUE(buffer->HasData()); + ASSERT_GT(buffer->GetMemorySize(), 0); + + CheckResult(std::move(readers), + {{{"a", 1, 100, 7, RowKind::Insert()}, + {"a", 3, 300, 6, RowKind::UpdateAfter()}, + {"b", 2, 200, 5, RowKind::Delete()}}}, + /*need_sort=*/false); + + buffer->Clear(); + ASSERT_FALSE(buffer->HasData()); + ASSERT_EQ(buffer->GetMemorySize(), 0); +} + +TEST_F(SortBufferTest, TestExternalSortBufferWithSpilledDataAndInMemoryData) { + // the write buffer size limit 35 bytes is larger than 2 rows but smaller than 3 rows. + ASSERT_OK_AND_ASSIGN(auto buffer, CreateExternalSortBuffer(/*last_sequence_number=*/19, + /*write_buffer_size=*/35)); + + // in memory data + std::vector input_rows; + input_rows.push_back(MakeRow(RowKind::Insert(), "b", 1, 200)); + input_rows.push_back(MakeRow(RowKind::Delete(), "b", 2, 200)); + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, buffer->Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_TRUE(buffer->HasData()); + ASSERT_GT(buffer->GetMemorySize(), 0); + + // spill file 1 (with above in memory data) + input_rows.clear(); + input_rows.push_back(MakeRow(RowKind::UpdateAfter(), "a", 3, 300)); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, buffer->Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_TRUE(buffer->HasData()); + ASSERT_EQ(buffer->GetMemorySize(), 0); + + // spill file 2 + input_rows.clear(); + input_rows.push_back(MakeRow(RowKind::Insert(), "c", 5, 500)); + input_rows.push_back(MakeRow(RowKind::Insert(), "c", 4, 400)); + input_rows.push_back(MakeRow(RowKind::UpdateBefore(), "a", 1, 100)); + input_rows.push_back(MakeRow(RowKind::Insert(), "b", 1, 150)); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, buffer->Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_TRUE(buffer->HasData()); + ASSERT_EQ(buffer->GetMemorySize(), 0); + + // in memory data + input_rows.clear(); + input_rows.push_back(MakeRow(RowKind::Insert(), "c", 4, 400)); + input_rows.push_back(MakeRow(RowKind::UpdateBefore(), "a", 1, 100)); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, buffer->Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_TRUE(buffer->HasData()); + ASSERT_GT(buffer->GetMemorySize(), 0); + + ASSERT_OK_AND_ASSIGN(auto readers, buffer->CreateReaders()); + ASSERT_TRUE(buffer->HasData()); + ASSERT_GT(buffer->GetMemorySize(), 0); + + CheckResult(std::move(readers), + {{{"a", 1, 100, 28, RowKind::UpdateBefore()}, {"c", 4, 400, 27, RowKind::Insert()}}, + {{"a", 3, 300, 22, RowKind::UpdateAfter()}, + {"b", 1, 200, 20, RowKind::Insert()}, + {"b", 2, 200, 21, RowKind::Delete()}}, + {{"a", 1, 100, 25, RowKind::UpdateBefore()}, + {"b", 1, 150, 26, RowKind::Insert()}, + {"c", 4, 400, 24, RowKind::Insert()}, + {"c", 5, 500, 23, RowKind::Insert()}}}, + /*need_sort=*/true); + + buffer->Clear(); + ASSERT_FALSE(buffer->HasData()); + ASSERT_EQ(buffer->GetMemorySize(), 0); +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/spill_file_merger.cpp b/src/paimon/core/mergetree/spill_file_merger.cpp new file mode 100644 index 0000000..16c5adb --- /dev/null +++ b/src/paimon/core/mergetree/spill_file_merger.cpp @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/spill_file_merger.h" + +#include +#include + +namespace paimon { + +SpillFileMerger::SpillFileMerger(int32_t max_fan_in) : max_fan_in_(max_fan_in) { + assert(max_fan_in >= 2); +} + +void SpillFileMerger::SetMaxFanIn(int32_t max_fan_in) { + assert(max_fan_in >= 2); + max_fan_in_ = max_fan_in; +} + +void SpillFileMerger::Clear() { + levels_.clear(); +} + +void SpillFileMerger::AddFile(const FileChannelInfo& file_info) { + EnsureLevel(0); + levels_[0].push_back(file_info); +} + +Status SpillFileMerger::RunMergeIfNeeded(const MergeFn& merge_fn) { + while (NeedMerge()) { + auto task = PickMergeTask(); + PAIMON_ASSIGN_OR_RAISE(FileChannelInfo output, merge_fn(task.input_files)); + ApplyMergeResult(task, output); + } + return Status::OK(); +} + +Status SpillFileMerger::RunFinalMergeIfNeeded(int32_t target_file_count, const MergeFn& merge_fn) { + while (GetTotalFileCount() > target_file_count) { + auto task = PickFinalMergeBatch(target_file_count); + PAIMON_ASSIGN_OR_RAISE(FileChannelInfo output, merge_fn(task.input_files)); + ApplyMergeResult(task, output); + } + return Status::OK(); +} + +bool SpillFileMerger::NeedMerge() const { + for (const auto& level : levels_) { + if (static_cast(level.size()) >= max_fan_in_) { + return true; + } + } + return false; +} + +void SpillFileMerger::ApplyMergeResult(const MergeTask& task, const FileChannelInfo& output) { + for (const auto& file : task.input_files) { + RemoveFile(file.channel_id); + } + EnsureLevel(task.target_level); + levels_[task.target_level].push_back(output); +} + +SpillFileMerger::MergeTask SpillFileMerger::PickMergeTask() const { + for (int32_t i = 0; i < static_cast(levels_.size()); ++i) { + if (static_cast(levels_[i].size()) >= max_fan_in_) { + MergeTask task; + task.target_level = i + 1; + task.input_files.assign(levels_[i].begin(), levels_[i].begin() + max_fan_in_); + return task; + } + } + assert(false && "PickMergeTask called but no pending merge"); + return {}; +} + +SpillFileMerger::MergeTask SpillFileMerger::PickFinalMergeBatch(int32_t target_file_count) const { + int32_t total = GetTotalFileCount(); + assert(total > target_file_count); + + // Collect all files with their levels, sort by size ascending. + struct LeveledFile { + int32_t level; + FileChannelInfo entry; + }; + std::vector all_files; + for (int32_t level_idx = 0; level_idx < static_cast(levels_.size()); ++level_idx) { + for (const auto& file : levels_[level_idx]) { + all_files.push_back({level_idx, file}); + } + } + std::sort(all_files.begin(), all_files.end(), + [](const LeveledFile& lhs, const LeveledFile& rhs) { + return lhs.entry.file_size < rhs.entry.file_size; + }); + + // Merge `files_to_merge` (alias: n) files into 1 eliminates (n-1) files. + // Need to eliminate (total - target_file_count), so n = total - target_file_count + 1. + // Bounded by max_fan_in_ (max merge width per round). + int32_t files_to_merge = std::min(total - target_file_count + 1, max_fan_in_); + + MergeTask task; + int32_t max_level = 0; + for (int32_t i = 0; i < files_to_merge; ++i) { + max_level = std::max(max_level, all_files[i].level); + task.input_files.push_back(all_files[i].entry); + } + task.target_level = max_level + 1; + return task; +} + +std::vector SpillFileMerger::GetAllFiles() const { + std::vector result; + for (const auto& level : levels_) { + for (const auto& file : level) { + result.push_back(file); + } + } + return result; +} + +int32_t SpillFileMerger::GetTotalFileCount() const { + int32_t total = 0; + for (const auto& level : levels_) { + total += static_cast(level.size()); + } + return total; +} + +void SpillFileMerger::EnsureLevel(int32_t level) { + while (static_cast(levels_.size()) <= level) { + levels_.emplace_back(); + } +} + +void SpillFileMerger::RemoveFile(const FileIOChannel::ID& channel_id) { + for (auto& level : levels_) { + for (auto it = level.begin(); it != level.end(); ++it) { + if (it->channel_id == channel_id) { + level.erase(it); + return; + } + } + } +} + +} // namespace paimon diff --git a/src/paimon/core/mergetree/spill_file_merger.h b/src/paimon/core/mergetree/spill_file_merger.h new file mode 100644 index 0000000..cfa2007 --- /dev/null +++ b/src/paimon/core/mergetree/spill_file_merger.h @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/core/disk/file_io_channel.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { + +/// Manages spill files in a leveled structure (similar to LSM tree) to minimize read/write +/// amplification during external sort merge operations. +/// +/// Files are organized into levels. Level 0 contains the original spill files. When a level +/// accumulates max_fan_in files, they are merged into a single file at the next level. Before the +/// final read, a greedy merge reduces the total file count to <= max_fan_in. +/// +/// Read/write amplification: O(log_K(N)) vs O(N/K) for naive sequential merge. +class SpillFileMerger { + public: + using MergeFn = std::function(const std::vector&)>; + + explicit SpillFileMerger(int32_t max_fan_in); + + /// Update the maximum fan-in (merge width). + void SetMaxFanIn(int32_t max_fan_in); + + /// Remove all files from all levels. + void Clear(); + + /// Add a new spill file to level 0. + void AddFile(const FileChannelInfo& file_info); + + /// Merge any single level that has accumulated >= max_fan_in files into one file at the next + /// level. Repeats until every level has fewer than max_fan_in files. + Status RunMergeIfNeeded(const MergeFn& merge_fn); + + /// Reduce the total file count across all levels to <= target_file_count by greedily merging + /// the smallest files first. Each round merges at most max_fan_in files. + Status RunFinalMergeIfNeeded(int32_t target_file_count, const MergeFn& merge_fn); + + /// Collect all files across all levels into a flat vector. + std::vector GetAllFiles() const; + + private: + struct MergeTask { + int32_t target_level; + std::vector input_files; + }; + + bool NeedMerge() const; + void ApplyMergeResult(const MergeTask& task, const FileChannelInfo& output); + MergeTask PickMergeTask() const; + MergeTask PickFinalMergeBatch(int32_t target_file_count) const; + int32_t GetTotalFileCount() const; + void EnsureLevel(int32_t level); + void RemoveFile(const FileIOChannel::ID& channel_id); + + int32_t max_fan_in_; + std::vector> levels_; +}; + +} // namespace paimon diff --git a/src/paimon/core/mergetree/spill_file_merger_test.cpp b/src/paimon/core/mergetree/spill_file_merger_test.cpp new file mode 100644 index 0000000..026b657 --- /dev/null +++ b/src/paimon/core/mergetree/spill_file_merger_test.cpp @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/spill_file_merger.h" + +#include + +#include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class SpillFileMergerTest : public ::testing::Test { + protected: + FileChannelInfo MakeFile(int32_t id, int64_t size) { + return FileChannelInfo{FileIOChannel::ID(std::to_string(id)), size}; + } + + SpillFileMerger::MergeFn CreateMockMergeFn() { + return [this](const std::vector& inputs) -> Result { + merge_call_count_++; + int64_t total_size = 0; + for (const auto& file : inputs) { + total_size += file.file_size; + } + return MakeFile(next_file_id_++, total_size); + }; + } + + SpillFileMerger::MergeFn CreateFailingMergeFn() { + return [this](const std::vector&) -> Result { + merge_call_count_++; + return Status::IOError("simulated write failure"); + }; + } + + int32_t merge_call_count_ = 0; + int32_t next_file_id_ = 1000; +}; + +TEST_F(SpillFileMergerTest, NoMergeBelowFanIn) { + SpillFileMerger merger(4); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + merger.AddFile(MakeFile(3, 300)); + + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 0); + ASSERT_EQ(merger.GetAllFiles().size(), 3); +} + +TEST_F(SpillFileMergerTest, MergeTriggeredAtFanIn) { + SpillFileMerger merger(3); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + merger.AddFile(MakeFile(3, 300)); + + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 1); + + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 1); + ASSERT_EQ(files[0].file_size, 600); +} + +TEST_F(SpillFileMergerTest, MinimalFanInTwo) { + SpillFileMerger merger(2); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 1); + + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 1); + ASSERT_EQ(files[0].file_size, 300); +} + +TEST_F(SpillFileMergerTest, MultiLevelMerge) { + SpillFileMerger merger(2); + + // Adding 4 files with fan_in=2 should trigger multi-level merge: + // Add file 1,2 -> merge to level 1 (1 file at level 1) + // Add file 3,4 -> merge level 0 to level 1 (2 files at level 1) -> merge level 1 + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 100)); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 1); + + merger.AddFile(MakeFile(3, 100)); + merger.AddFile(MakeFile(4, 100)); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + // level 0 merge + level 1 merge + ASSERT_EQ(merge_call_count_, 3); + + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 1); + ASSERT_EQ(files[0].file_size, 400); +} + +TEST_F(SpillFileMergerTest, ManyFilesWithFanInTwo) { + SpillFileMerger merger(2); + + for (int32_t i = 0; i < 8; ++i) { + merger.AddFile(MakeFile(i, 100)); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + } + + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 1); + ASSERT_EQ(files[0].file_size, 800); +} + +TEST_F(SpillFileMergerTest, FinalCleanupReducesFileCount) { + SpillFileMerger merger(4); + + // Add 5 files (just above fan_in). Level 0 gets merged once, leaving: + // level 0: 1 file, level 1: 1 file + for (int32_t i = 0; i < 5; ++i) { + merger.AddFile(MakeFile(i, 100)); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + } + + auto files_before = merger.GetAllFiles(); + ASSERT_EQ(files_before.size(), 2); + + ASSERT_OK(merger.RunFinalMergeIfNeeded(1, CreateMockMergeFn())); + + auto files_after = merger.GetAllFiles(); + ASSERT_EQ(files_after.size(), 1); +} + +TEST_F(SpillFileMergerTest, FinalCleanupMergesSmallestFirst) { + SpillFileMerger merger(10); + + merger.AddFile(MakeFile(1, 1000)); + merger.AddFile(MakeFile(2, 10)); + merger.AddFile(MakeFile(3, 20)); + merger.AddFile(MakeFile(4, 500)); + + // target=2, need to eliminate 2 files, so merge 3 smallest into 1 + ASSERT_OK(merger.RunFinalMergeIfNeeded(2, CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 1); + + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 2); + + // The 3 smallest (10, 20, 500) should be merged into one 530-sized file, + // leaving the largest (1000) untouched. + std::vector sizes; + for (const auto& file : files) { + sizes.push_back(file.file_size); + } + std::sort(sizes.begin(), sizes.end()); + ASSERT_EQ(sizes[0], 530); + ASSERT_EQ(sizes[1], 1000); +} + +TEST_F(SpillFileMergerTest, FinalCleanupNoOpWhenAlreadyBelowTarget) { + SpillFileMerger merger(4); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + + ASSERT_OK(merger.RunFinalMergeIfNeeded(3, CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 0); + ASSERT_EQ(merger.GetAllFiles().size(), 2); +} + +TEST_F(SpillFileMergerTest, FinalCleanupConvergesToTarget) { + // Add many files without running merge (fan_in large enough) + SpillFileMerger merger(100); + for (int32_t i = 0; i < 20; ++i) { + merger.AddFile(MakeFile(i, (i + 1) * 10)); + } + ASSERT_EQ(merger.GetAllFiles().size(), 20); + + ASSERT_OK(merger.RunFinalMergeIfNeeded(3, CreateMockMergeFn())); + ASSERT_LE(static_cast(merger.GetAllFiles().size()), 3); +} + +TEST_F(SpillFileMergerTest, MergeFnFailurePreservesState) { + SpillFileMerger merger(2); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + + auto status = merger.RunMergeIfNeeded(CreateFailingMergeFn()); + ASSERT_FALSE(status.ok()); + ASSERT_EQ(merge_call_count_, 1); + + // Files should still be present since merge failed + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 2); +} + +TEST_F(SpillFileMergerTest, ClearRemovesAllFiles) { + SpillFileMerger merger(4); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + merger.AddFile(MakeFile(3, 300)); + + merger.Clear(); + ASSERT_EQ(merger.GetAllFiles().size(), 0); +} + +TEST_F(SpillFileMergerTest, SetMaxFanInAffectsMerge) { + SpillFileMerger merger(4); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + merger.AddFile(MakeFile(3, 300)); + + // No merge at fan_in=4 + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 0); + + // Lower fan_in to 3, now merge should trigger + merger.SetMaxFanIn(3); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 1); + ASSERT_EQ(merger.GetAllFiles().size(), 1); +} + +TEST_F(SpillFileMergerTest, SetMaxFanInToLargerValueSuppressesMerge) { + SpillFileMerger merger(3); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + merger.AddFile(MakeFile(3, 300)); + + // At fan_in=3, merge should trigger + // But first, increase fan_in to 5 before running merge + merger.SetMaxFanIn(5); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 0); + + // All 3 files should still be present + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 3); + + // Add more files up to 5, still no merge + merger.AddFile(MakeFile(4, 400)); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 0); + ASSERT_EQ(merger.GetAllFiles().size(), 4); + + // Add 5th file, now merge triggers + merger.AddFile(MakeFile(5, 500)); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 1); +} + +TEST_F(SpillFileMergerTest, MergeOnlyTakesFanInFilesFromLevel) { + SpillFileMerger merger(3); + + // Add 5 files to level 0 (exceeds fan_in=3) + for (int32_t i = 0; i < 5; ++i) { + merger.AddFile(MakeFile(i, 100)); + } + + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + + // First merge takes 3 from level 0 -> 1 at level 1 + // Remaining: 2 at level 0, 1 at level 1 = 3 total + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 3); +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/write_buffer.cpp b/src/paimon/core/mergetree/write_buffer.cpp new file mode 100644 index 0000000..549975a --- /dev/null +++ b/src/paimon/core/mergetree/write_buffer.cpp @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/write_buffer.h" + +#include +#include + +#include "arrow/type.h" +#include "paimon/core/disk/io_manager.h" +#include "paimon/core/io/key_value_record_reader.h" +#include "paimon/core/io/merged_key_value_record_reader.h" +#include "paimon/core/mergetree/external_sort_buffer.h" +#include "paimon/core/mergetree/in_memory_sort_buffer.h" + +namespace paimon { + +Result> WriteBuffer::Create( + int64_t last_sequence_number, const std::shared_ptr& value_schema, + const std::vector& trimmed_primary_keys, + const std::vector& user_defined_sequence_fields, + const std::shared_ptr& key_comparator, + const std::shared_ptr& user_defined_seq_comparator, + const std::shared_ptr>& merge_function_wrapper, + const CoreOptions& options, const std::shared_ptr& io_manager, + bool enable_multi_thread_spill, const std::shared_ptr& pool) { + auto value_type = arrow::struct_(value_schema->fields()); + auto in_memory_buffer = std::make_unique( + last_sequence_number, value_type, trimmed_primary_keys, user_defined_sequence_fields, + options.SequenceFieldSortOrderIsAscending(), key_comparator, options.GetWriteBufferSize(), + pool); + std::unique_ptr sort_buffer; + if (!options.GetWriteBufferSpillable() || io_manager == nullptr) { + sort_buffer = std::move(in_memory_buffer); + } else { + PAIMON_ASSIGN_OR_RAISE( + sort_buffer, ExternalSortBuffer::Create(std::move(in_memory_buffer), value_schema, + trimmed_primary_keys, key_comparator, + user_defined_seq_comparator, options, + io_manager, enable_multi_thread_spill, pool)); + } + return std::unique_ptr( + new WriteBuffer(std::move(sort_buffer), key_comparator, merge_function_wrapper)); +} + +WriteBuffer::WriteBuffer( + std::unique_ptr&& sort_buffer, + const std::shared_ptr& key_comparator, + const std::shared_ptr>& merge_function_wrapper) + : sort_buffer_(std::move(sort_buffer)), + key_comparator_(key_comparator), + merge_function_wrapper_(merge_function_wrapper) {} + +Result WriteBuffer::Write(std::unique_ptr&& batch) { + return sort_buffer_->Write(std::move(batch)); +} + +Result>> WriteBuffer::CreateReaders() { + PAIMON_ASSIGN_OR_RAISE(std::vector> readers, + sort_buffer_->CreateReaders()); + std::vector> merged_readers; + merged_readers.reserve(readers.size()); + for (auto& reader : readers) { + merged_readers.push_back(std::make_unique( + std::move(reader), key_comparator_, merge_function_wrapper_)); + } + return merged_readers; +} + +Result WriteBuffer::FlushMemory() { + return sort_buffer_->FlushMemory(); +} + +void WriteBuffer::Clear() { + sort_buffer_->Clear(); +} + +} // namespace paimon diff --git a/src/paimon/core/mergetree/write_buffer.h b/src/paimon/core/mergetree/write_buffer.h new file mode 100644 index 0000000..f4f2310 --- /dev/null +++ b/src/paimon/core/mergetree/write_buffer.h @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "arrow/type_fwd.h" +#include "paimon/core/core_options.h" +#include "paimon/core/mergetree/in_memory_sort_buffer.h" +#include "paimon/core/mergetree/sort_buffer.h" +#include "paimon/record_batch.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class Array; +class DataType; +class Schema; +class StructArray; +} // namespace arrow + +namespace paimon { +class IOManager; +class KeyValueRecordReader; +class FieldsComparator; +class MemoryPool; +struct KeyValue; +template +class MergeFunctionWrapper; + +/// WriteBuffer manages the batch buffer for MergeTreeWriter. +/// It delegates to a SortBuffer implementation (InMemorySortBuffer or ExternalSortBuffer) based on +/// the spillable configuration. +class WriteBuffer { + public: + static Result> Create( + int64_t last_sequence_number, const std::shared_ptr& value_schema, + const std::vector& trimmed_primary_keys, + const std::vector& user_defined_sequence_fields, + const std::shared_ptr& key_comparator, + const std::shared_ptr& user_defined_seq_comparator, + const std::shared_ptr>& merge_function_wrapper, + const CoreOptions& options, const std::shared_ptr& io_manager, + bool enable_multi_thread_spill, const std::shared_ptr& pool); + + /// Import a RecordBatch into the buffer. + /// Return false when the batch was accepted but the caller should fall back to + /// FlushWriteBuffer before buffering more data. + Result Write(std::unique_ptr&& batch); + + /// Create KeyValueRecordReaders from sort buffer without clearing the buffer. + /// The caller should invoke Clear() after consuming the readers. + /// @return list of KeyValueRecordReaders built from buffered data + Result>> CreateReaders(); + + /// Try to spill current buffered data. Return false when the call completed normally but the + /// caller should fall back to FlushWriteBuffer before buffering more data. + Result FlushMemory(); + + /// Return current memory usage in bytes. + uint64_t GetMemoryUsage() const { + return sort_buffer_->GetMemorySize(); + } + + /// Return whether the buffer is empty. + bool IsEmpty() const { + return !sort_buffer_->HasData(); + } + + /// Clear the buffer without building readers (for error paths or Close). + void Clear(); + + private: + WriteBuffer(std::unique_ptr&& sort_buffer, + const std::shared_ptr& key_comparator, + const std::shared_ptr>& merge_function_wrapper); + + std::unique_ptr sort_buffer_; + std::shared_ptr key_comparator_; + std::shared_ptr> merge_function_wrapper_; +}; + +} // namespace paimon diff --git a/src/paimon/core/mergetree/write_buffer_test.cpp b/src/paimon/core/mergetree/write_buffer_test.cpp new file mode 100644 index 0000000..0f08273 --- /dev/null +++ b/src/paimon/core/mergetree/write_buffer_test.cpp @@ -0,0 +1,775 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/write_buffer.h" + +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/common/types/data_field.h" +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/core/core_options.h" +#include "paimon/core/disk/io_manager.h" +#include "paimon/core/io/key_value_record_reader.h" +#include "paimon/core/mergetree/compact/deduplicate_merge_function.h" +#include "paimon/core/mergetree/compact/reducer_merge_function_wrapper.h" +#include "paimon/fs/file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/test_helper.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon { +template +class MergeFunctionWrapper; +} // namespace paimon + +namespace paimon::test { +struct ReaderResult { + std::vector sequence_numbers; + std::vector row_kind_values; +}; + +class WriteBufferTest : public ::testing::Test { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + tmp_dir_ = UniqueTestDirectory::Create(); + ASSERT_TRUE(tmp_dir_); + io_manager_ = std::make_shared(tmp_dir_->Str(), tmp_dir_->GetFileSystem()); + value_fields_ = {DataField(0, arrow::field("f0", arrow::utf8())), + DataField(1, arrow::field("f1", arrow::int32())), + DataField(2, arrow::field("f2", arrow::int32())), + DataField(3, arrow::field("f3", arrow::float64()))}; + value_schema_ = DataField::ConvertDataFieldsToArrowSchema(value_fields_); + value_type_ = DataField::ConvertDataFieldsToArrowStructType(value_fields_); + primary_keys_ = {"f0"}; + ASSERT_OK_AND_ASSIGN(key_comparator_, + FieldsComparator::Create({value_fields_[0]}, + /*is_ascending_order=*/true)); + + auto merge_function = std::make_unique(/*ignore_delete=*/false); + merge_function_wrapper_ = + std::make_shared(std::move(merge_function)); + } + + std::unique_ptr CreateWriteBuffer(int64_t last_sequence_number, + const CoreOptions& options) const { + EXPECT_OK_AND_ASSIGN( + auto write_buffer, + WriteBuffer::Create(last_sequence_number, value_schema_, primary_keys_, + /*user_defined_sequence_fields=*/{}, key_comparator_, + /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper_, + options, io_manager_, /*enable_multi_thread_spill=*/false, pool_)); + return write_buffer; + } + + std::unique_ptr CreateBatch( + const std::shared_ptr& array, + const std::vector& row_kinds) const { + ::ArrowArray c_array; + EXPECT_TRUE(arrow::ExportArray(*array, &c_array).ok()); + RecordBatchBuilder batch_builder(&c_array); + batch_builder.SetRowKinds(row_kinds); + EXPECT_OK_AND_ASSIGN(std::unique_ptr batch, batch_builder.Finish()); + return batch; + } + + Result GetOnlySpillFileSize() const { + PAIMON_ASSIGN_OR_RAISE(std::string spill_dir, io_manager_->GetSpillDir()); + std::vector> spill_files; + PAIMON_RETURN_NOT_OK(tmp_dir_->GetFileSystem()->ListFileStatus(spill_dir, &spill_files)); + if (spill_files.size() != 1 || spill_files[0]->IsDir()) { + return Status::Invalid("expected exactly one spill file"); + } + return static_cast(spill_files[0]->GetLen()); + } + + Result ReadReaderResult(KeyValueRecordReader* reader) const { + PAIMON_ASSIGN_OR_RAISE(auto iterator, reader->NextBatch()); + + ReaderResult result; + while (true) { + PAIMON_ASSIGN_OR_RAISE(bool has_next, iterator->HasNext()); + if (!has_next) { + break; + } + PAIMON_ASSIGN_OR_RAISE(KeyValue key_value, iterator->Next()); + result.sequence_numbers.push_back(key_value.sequence_number); + result.row_kind_values.push_back(key_value.value_kind->ToByteValue()); + } + return result; + } + + protected: + std::shared_ptr pool_; + std::unique_ptr tmp_dir_; + std::shared_ptr io_manager_; + std::vector value_fields_; + std::shared_ptr value_schema_; + std::shared_ptr value_type_; + std::vector primary_keys_; + std::shared_ptr key_comparator_; + std::shared_ptr> merge_function_wrapper_; +}; + +TEST_F(WriteBufferTest, TestFlushResetsStateAndAdvancesSequenceNumber) { + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap(/*options_map=*/{})); + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/9, options); + + std::shared_ptr array1 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 10, 0, 13.1], + ["Bob", 20, 1, 14.1] + ])") + .ValueOrDie(); + std::shared_ptr array2 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Charlie", 30, 2, 15.1] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array1, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, + write_buffer->Write(CreateBatch(array2, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_FALSE(write_buffer->IsEmpty()); + ASSERT_GT(write_buffer->GetMemoryUsage(), 0); + + ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); + + ASSERT_EQ(readers.size(), 2); + write_buffer->Clear(); + ASSERT_TRUE(write_buffer->IsEmpty()); + ASSERT_EQ(write_buffer->GetMemoryUsage(), 0); + + ASSERT_OK_AND_ASSIGN(auto first_result, ReadReaderResult(readers[0].get())); + ASSERT_EQ(first_result.sequence_numbers, (std::vector{10, 11})); + ASSERT_EQ( + first_result.row_kind_values, + (std::vector{RowKind::Insert()->ToByteValue(), RowKind::Insert()->ToByteValue()})); + + ASSERT_OK_AND_ASSIGN(auto second_result, ReadReaderResult(readers[1].get())); + ASSERT_EQ(second_result.sequence_numbers, (std::vector{12})); + ASSERT_EQ(second_result.row_kind_values, + (std::vector{RowKind::Insert()->ToByteValue()})); +} + +TEST_F(WriteBufferTest, TestFlushPreservesRowKinds) { + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap(/*options_map=*/{})); + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); + + std::shared_ptr array = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 10, 0, 13.1], + ["Bob", 20, 1, 14.1], + ["Charlie", 30, 2, 15.1], + ["Diana", 40, 3, 16.1] + ])") + .ValueOrDie(); + std::vector row_kinds = { + RecordBatch::RowKind::INSERT, + RecordBatch::RowKind::UPDATE_BEFORE, + RecordBatch::RowKind::UPDATE_AFTER, + RecordBatch::RowKind::DELETE, + }; + + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array, row_kinds))); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); + ASSERT_EQ(readers.size(), 1); + + ASSERT_OK_AND_ASSIGN(auto reader_result, ReadReaderResult(readers[0].get())); + + ASSERT_EQ(reader_result.row_kind_values, + (std::vector{ + RowKind::Insert()->ToByteValue(), RowKind::UpdateBefore()->ToByteValue(), + RowKind::UpdateAfter()->ToByteValue(), RowKind::Delete()->ToByteValue()})); + ASSERT_EQ(reader_result.sequence_numbers, (std::vector{0, 1, 2, 3})); +} + +TEST_F(WriteBufferTest, TestWriteRequestsFlushWriteBufferWhenSpillDisabled) { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::WRITE_BUFFER_SIZE, "1"}, + {Options::WRITE_BUFFER_SPILLABLE, "false"}})); + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); + + std::shared_ptr array = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 10, 0, 13.1] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(bool has_remaining_memory, + write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); + ASSERT_FALSE(has_remaining_memory); + ASSERT_FALSE(write_buffer->IsEmpty()); + ASSERT_GT(write_buffer->GetMemoryUsage(), 0); +} + +TEST_F(WriteBufferTest, TestSpillDiskQuotaEnforcement) { + std::shared_ptr array = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 10, 0, 13.1] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(CoreOptions ref_options, + CoreOptions::FromMap({{Options::WRITE_BUFFER_SIZE, "1"}, + {Options::WRITE_BUFFER_SPILLABLE, "true"}})); + auto ref_write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, ref_options); + ASSERT_OK(ref_write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); + ASSERT_OK_AND_ASSIGN(int64_t spill_file_size, GetOnlySpillFileSize()); + ref_write_buffer->Clear(); + + // Case 1: FlushMemory consumes remaining disk quota exactly → returns false. + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::WRITE_BUFFER_SPILLABLE, "true"}, + {Options::WRITE_BUFFER_SPILL_MAX_DISK_SIZE, + std::to_string(spill_file_size)}})); + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); + + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_OK_AND_ASSIGN(bool has_remaining_disk, write_buffer->FlushMemory()); + ASSERT_FALSE(has_remaining_disk); + // write_buffer is not empty because spilled data on disk still belongs to the buffer. + ASSERT_FALSE(write_buffer->IsEmpty()); + ASSERT_EQ(write_buffer->GetMemoryUsage(), 0); + } + + // Case 2: Write auto-spill consumes remaining disk quota exactly → returns false. + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::WRITE_BUFFER_SIZE, "1"}, + {Options::WRITE_BUFFER_SPILLABLE, "true"}, + {Options::WRITE_BUFFER_SPILL_MAX_DISK_SIZE, + std::to_string(spill_file_size)}})); + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); + + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); + ASSERT_FALSE(has_remaining_quota); + // write_buffer is not empty because spilled data on disk still belongs to the buffer. + ASSERT_FALSE(write_buffer->IsEmpty()); + ASSERT_EQ(write_buffer->GetMemoryUsage(), 0); + } + + // Case 3: Multiple spills exhaust disk quota (quota = 2 spill files). + { + int64_t quota_for_two_files = spill_file_size * 2; + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::WRITE_BUFFER_SIZE, "1"}, + {Options::WRITE_BUFFER_SPILLABLE, "true"}, + {Options::WRITE_BUFFER_SPILL_MAX_DISK_SIZE, + std::to_string(quota_for_two_files)}})); + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); + + std::shared_ptr array2 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Bob", 20, 1, 14.1] + ])") + .ValueOrDie(); + + // Write 1: under disk quota → true. + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + + // Write 2: WRITE_BUFFER_SIZE=1 causes UpdateSpillParameters() to clamp actual_max_fan_in_ + // to 2, triggering intermediate merge which reduces total_spill_disk_bytes_ below disk + // quota. So quota is NOT exhausted here. + ASSERT_OK_AND_ASSIGN(has_remaining_quota, + write_buffer->Write(CreateBatch(array2, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + + // Write 3: spill adds a new file to level 0, but no merge is triggered (each level has + // fewer than fan_in files), so total disk usage exceeds quota → returns false. + ASSERT_OK_AND_ASSIGN(has_remaining_quota, + write_buffer->Write(CreateBatch(array2, /*row_kinds=*/{}))); + ASSERT_FALSE(has_remaining_quota); + + ASSERT_FALSE(write_buffer->IsEmpty()); + ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); + std::vector all_sequence_numbers; + for (auto& reader : readers) { + ASSERT_OK_AND_ASSIGN(auto result, ReadReaderResult(reader.get())); + all_sequence_numbers.insert(all_sequence_numbers.end(), result.sequence_numbers.begin(), + result.sequence_numbers.end()); + } + std::sort(all_sequence_numbers.begin(), all_sequence_numbers.end()); + ASSERT_EQ(all_sequence_numbers, (std::vector{0, 2})); + } +} + +TEST_F(WriteBufferTest, TestCreateReadersMergesSingleInMemoryReaderLocally) { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::WRITE_BUFFER_SPILLABLE, "false"}})); + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); + + std::shared_ptr array = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 10, 0, 13.1], + ["Alice", 20, 1, 14.1] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(bool has_remaining_memory, + write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_memory); + + ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); + ASSERT_EQ(readers.size(), 1); + + ASSERT_OK_AND_ASSIGN(auto reader_result, ReadReaderResult(readers[0].get())); + ASSERT_EQ(reader_result.sequence_numbers, (std::vector{1})); + ASSERT_EQ(reader_result.row_kind_values, + (std::vector{RowKind::Insert()->ToByteValue()})); +} + +TEST_F(WriteBufferTest, TestCreateReadersReturnsBothSpillAndMemoryReaders) { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::WRITE_BUFFER_SIZE, "4096000"}, + {Options::WRITE_BUFFER_SPILLABLE, "true"}})); + + // Case 1: Same key in spill and memory — each reader returns its own data independently. + { + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); + + std::shared_ptr spill_array = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 10, 0, 13.1] + ])") + .ValueOrDie(); + std::shared_ptr memory_array = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 20, 1, 14.1] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(spill_array, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->FlushMemory()); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_OK_AND_ASSIGN(has_remaining_quota, + write_buffer->Write(CreateBatch(memory_array, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); + ASSERT_EQ(readers.size(), 2); + + ASSERT_OK_AND_ASSIGN(auto first_result, ReadReaderResult(readers[0].get())); + ASSERT_EQ(first_result.sequence_numbers, (std::vector{0})); + + ASSERT_OK_AND_ASSIGN(auto second_result, ReadReaderResult(readers[1].get())); + ASSERT_EQ(second_result.sequence_numbers, (std::vector{1})); + } + + // Case 2: Multiple rows in spill and memory — readers cover all data, Clear cleans up files. + { + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); + + std::shared_ptr array1 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 10, 0, 13.1], + ["Bob", 20, 1, 14.1], + ["Charlie", 30, 2, 15.1] + ])") + .ValueOrDie(); + std::shared_ptr array2 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Diana", 40, 3, 16.1], + ["Eve", 50, 4, 17.1], + ["Frank", 60, 5, 18.1] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array1, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->FlushMemory()); + ASSERT_TRUE(has_remaining_quota); + ASSERT_EQ(write_buffer->GetMemoryUsage(), 0); + ASSERT_FALSE(write_buffer->IsEmpty()); + + ASSERT_OK_AND_ASSIGN(has_remaining_quota, + write_buffer->Write(CreateBatch(array2, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_GT(write_buffer->GetMemoryUsage(), 0); + + ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); + ASSERT_GE(readers.size(), 2); + + std::vector all_sequence_numbers; + for (auto& reader : readers) { + ASSERT_OK_AND_ASSIGN(auto result, ReadReaderResult(reader.get())); + all_sequence_numbers.insert(all_sequence_numbers.end(), result.sequence_numbers.begin(), + result.sequence_numbers.end()); + } + std::sort(all_sequence_numbers.begin(), all_sequence_numbers.end()); + ASSERT_EQ(all_sequence_numbers, (std::vector{0, 1, 2, 3, 4, 5})); + + // Verify Clear deletes spill files and resets state. + write_buffer->Clear(); + ASSERT_TRUE(write_buffer->IsEmpty()); + ASSERT_EQ(write_buffer->GetMemoryUsage(), 0); + ASSERT_EQ(TestHelper::CountChannelFiles(tmp_dir_->GetFileSystem(), tmp_dir_->Str()), 0); + } +} + +TEST_F(WriteBufferTest, TestSpillReaderReturnsDataInSortedOrder) { + // Write rows in reverse key order; each Write triggers a spill (SIZE=1). + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::WRITE_BUFFER_SIZE, "1"}, + {Options::WRITE_BUFFER_SPILLABLE, "true"}, + {Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES, "128"}})); + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); + + std::shared_ptr array1 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Charlie", 30, 2, 15.1] + ])") + .ValueOrDie(); + std::shared_ptr array2 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Bob", 20, 1, 14.1] + ])") + .ValueOrDie(); + std::shared_ptr array3 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 10, 0, 13.1] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array1, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, + write_buffer->Write(CreateBatch(array2, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, + write_buffer->Write(CreateBatch(array3, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); + + std::vector all_sequence_numbers; + for (auto& reader : readers) { + ASSERT_OK_AND_ASSIGN(auto result, ReadReaderResult(reader.get())); + all_sequence_numbers.insert(all_sequence_numbers.end(), result.sequence_numbers.begin(), + result.sequence_numbers.end()); + } + std::sort(all_sequence_numbers.begin(), all_sequence_numbers.end()); + ASSERT_EQ(all_sequence_numbers, (std::vector{0, 1, 2})); + + // Also test sorted order within a single spill file with multiple rows. + ASSERT_OK_AND_ASSIGN(CoreOptions options2, + CoreOptions::FromMap({{Options::WRITE_BUFFER_SIZE, "4096000"}, + {Options::WRITE_BUFFER_SPILLABLE, "true"}})); + auto write_buffer2 = CreateWriteBuffer(/*last_sequence_number=*/-1, options2); + + std::shared_ptr multi_row_array = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Charlie", 30, 2, 15.1], + ["Alice", 10, 0, 13.1], + ["Bob", 20, 1, 14.1] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(has_remaining_quota, + write_buffer2->Write(CreateBatch(multi_row_array, {}))); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer2->FlushMemory()); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_OK_AND_ASSIGN(auto readers2, write_buffer2->CreateReaders()); + ASSERT_EQ(readers2.size(), 1); + + ASSERT_OK_AND_ASSIGN(auto result2, ReadReaderResult(readers2[0].get())); + ASSERT_EQ(result2.sequence_numbers.size(), 3); + // Sorted by key: Alice(seq=1), Bob(seq=2), Charlie(seq=0). + ASSERT_EQ(result2.sequence_numbers, (std::vector{1, 2, 0})); +} + +TEST_F(WriteBufferTest, TestEmptyBufferBehavior) { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::WRITE_BUFFER_SIZE, "4096000"}, + {Options::WRITE_BUFFER_SPILLABLE, "true"}})); + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); + + ASSERT_TRUE(write_buffer->IsEmpty()); + ASSERT_EQ(write_buffer->GetMemoryUsage(), 0); + ASSERT_OK_AND_ASSIGN(bool has_remaining_disk, write_buffer->FlushMemory()); + ASSERT_TRUE(has_remaining_disk); + ASSERT_TRUE(write_buffer->IsEmpty()); + ASSERT_EQ(write_buffer->GetMemoryUsage(), 0); + + ASSERT_EQ(TestHelper::CountChannelFiles(tmp_dir_->GetFileSystem(), tmp_dir_->Str()), 0); + + ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); + ASSERT_TRUE(readers.empty()); +} + +TEST_F(WriteBufferTest, TestMergeSpilledFilesSkipsWithSingleFile) { + // HANDLES=2: with only 1 spill file, merge is not triggered. + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::WRITE_BUFFER_SIZE, "1"}, + {Options::WRITE_BUFFER_SPILLABLE, "true"}, + {Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES, "2"}})); + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); + + std::shared_ptr array = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 10, 0, 13.1] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_FALSE(write_buffer->IsEmpty()); + ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); + ASSERT_FALSE(readers.empty()); + + std::vector all_sequence_numbers; + for (auto& reader : readers) { + ASSERT_OK_AND_ASSIGN(auto result, ReadReaderResult(reader.get())); + all_sequence_numbers.insert(all_sequence_numbers.end(), result.sequence_numbers.begin(), + result.sequence_numbers.end()); + } + ASSERT_EQ(all_sequence_numbers, (std::vector{0})); +} + +TEST_F(WriteBufferTest, TestMultipleFlushWriteCyclesWorkCorrectly) { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::WRITE_BUFFER_SIZE, "4096000"}, + {Options::WRITE_BUFFER_SPILLABLE, "true"}})); + + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); + + std::shared_ptr array1 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 10, 0, 13.1] + ])") + .ValueOrDie(); + // Cycle 1: Write → Flush → Read → Clear. + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array1, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->FlushMemory()); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_OK_AND_ASSIGN(auto readers1, write_buffer->CreateReaders()); + ASSERT_FALSE(readers1.empty()); + std::vector seq1; + for (auto& reader : readers1) { + ASSERT_OK_AND_ASSIGN(auto result, ReadReaderResult(reader.get())); + seq1.insert(seq1.end(), result.sequence_numbers.begin(), result.sequence_numbers.end()); + } + ASSERT_EQ(seq1, (std::vector{0})); + + write_buffer->Clear(); + ASSERT_TRUE(write_buffer->IsEmpty()); + ASSERT_EQ(write_buffer->GetMemoryUsage(), 0); + + // Cycle 2: Write after Clear + std::shared_ptr array2 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Bob", 20, 1, 14.1], + ["Charlie", 30, 2, 15.1] + ])") + .ValueOrDie(); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, + write_buffer->Write(CreateBatch(array2, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->FlushMemory()); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_OK_AND_ASSIGN(auto readers2, write_buffer->CreateReaders()); + ASSERT_FALSE(readers2.empty()); + std::vector seq2; + for (auto& reader : readers2) { + ASSERT_OK_AND_ASSIGN(auto result, ReadReaderResult(reader.get())); + seq2.insert(seq2.end(), result.sequence_numbers.begin(), result.sequence_numbers.end()); + } + std::sort(seq2.begin(), seq2.end()); + ASSERT_EQ(seq2, (std::vector{1, 2})); + + write_buffer->Clear(); + ASSERT_TRUE(write_buffer->IsEmpty()); +} + +TEST_F(WriteBufferTest, TestMergeSpilledFilesDeduplicationAndRowKinds) { + // SIZE=1: every Write triggers spill. HANDLES=2: merge after every 2 spill files. + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::WRITE_BUFFER_SIZE, "1"}, + {Options::WRITE_BUFFER_SPILLABLE, "true"}, + {Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES, "2"}})); + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); + + std::shared_ptr array1 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 10, 0, 13.1], + ["Charlie", 30, 2, 15.1] + ])") + .ValueOrDie(); + std::vector row_kinds1 = { + RecordBatch::RowKind::INSERT, + RecordBatch::RowKind::UPDATE_AFTER, + }; + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array1, row_kinds1))); + ASSERT_TRUE(has_remaining_quota); + + // Bob(DELETE), Alice(UPDATE_AFTER) — cross-file overlap on Alice. + std::shared_ptr array2 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Bob", 20, 1, 14.1], + ["Alice", 40, 3, 16.1] + ])") + .ValueOrDie(); + std::vector row_kinds2 = { + RecordBatch::RowKind::DELETE, + RecordBatch::RowKind::UPDATE_AFTER, + }; + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->Write(CreateBatch(array2, row_kinds2))); + ASSERT_TRUE(has_remaining_quota); + + // Diana(INSERT), Bob(INSERT) — overlap on Bob. + std::shared_ptr array3 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Diana", 50, 4, 17.1], + ["Bob", 60, 5, 18.1] + ])") + .ValueOrDie(); + std::vector row_kinds3 = { + RecordBatch::RowKind::INSERT, + RecordBatch::RowKind::INSERT, + }; + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->Write(CreateBatch(array3, row_kinds3))); + ASSERT_TRUE(has_remaining_quota); + + // After dedup: Alice keeps seq=3, Bob keeps seq=5, Charlie seq=1, Diana seq=4. + ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); + + ASSERT_EQ(readers.size(), 1); + ASSERT_OK_AND_ASSIGN(auto result, ReadReaderResult(readers[0].get())); + + ASSERT_EQ(result.sequence_numbers.size(), 4); + ASSERT_EQ(result.sequence_numbers, (std::vector{3, 5, 1, 4})); + ASSERT_EQ(result.row_kind_values, + (std::vector{ + RowKind::UpdateAfter()->ToByteValue(), RowKind::Insert()->ToByteValue(), + RowKind::UpdateAfter()->ToByteValue(), RowKind::Insert()->ToByteValue()})); +} + +TEST_F(WriteBufferTest, TestSpillPreservesNullValues) { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::WRITE_BUFFER_SIZE, "4096000"}, + {Options::WRITE_BUFFER_SPILLABLE, "true"}})); + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); + + std::shared_ptr array = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 10, 0, null], + ["Bob", 20, 1, 14.1], + ["Charlie", 30, 2, null] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->FlushMemory()); + ASSERT_TRUE(has_remaining_quota); + ASSERT_EQ(write_buffer->GetMemoryUsage(), 0); + + ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); + ASSERT_EQ(readers.size(), 1); + auto& reader = readers[0]; + ASSERT_OK_AND_ASSIGN(auto iterator, reader->NextBatch()); + + std::vector sequence_numbers; + std::vector f3_is_null; + std::vector f3_values; + + while (true) { + ASSERT_OK_AND_ASSIGN(bool has_next, iterator->HasNext()); + if (!has_next) break; + ASSERT_OK_AND_ASSIGN(KeyValue key_value, iterator->Next()); + sequence_numbers.push_back(key_value.sequence_number); + bool is_null = key_value.value->IsNullAt(3); + f3_is_null.push_back(is_null); + if (!is_null) { + f3_values.push_back(key_value.value->GetDouble(3)); + } + } + + ASSERT_EQ(sequence_numbers, (std::vector{0, 1, 2})); + ASSERT_EQ(f3_is_null, (std::vector{true, false, true})); + ASSERT_EQ(f3_values.size(), 1); + ASSERT_DOUBLE_EQ(f3_values[0], 14.1); +} + +TEST_F(WriteBufferTest, TestDestructorCleansUpSpillFiles) { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::WRITE_BUFFER_SIZE, "4096000"}, + {Options::WRITE_BUFFER_SPILLABLE, "true"}})); + auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); + + std::shared_ptr array = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 10, 0, 13.1], + ["Bob", 20, 1, 14.1] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->FlushMemory()); + ASSERT_TRUE(has_remaining_quota); + + ASSERT_EQ(TestHelper::CountChannelFiles(tmp_dir_->GetFileSystem(), tmp_dir_->Str()), 1); + + // Destroy without calling Clear() — destructor should clean up spill files. + write_buffer.reset(); + + ASSERT_EQ(TestHelper::CountChannelFiles(tmp_dir_->GetFileSystem(), tmp_dir_->Str()), 0); +} + +} // namespace paimon::test