diff --git a/include/paimon/orphan_files_cleaner.h b/include/paimon/orphan_files_cleaner.h new file mode 100644 index 0000000..c2349a2 --- /dev/null +++ b/include/paimon/orphan_files_cleaner.h @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/result.h" +#include "paimon/type_fwd.h" +#include "paimon/visibility.h" + +namespace paimon { +class Executor; +class MemoryPool; + +/// `CleanContext` is some configuration for orphan files clean operations. +/// +/// Please do not use this class directly, use `CleanContextBuilder` to build a `CleanContext` which +/// has input validation. +/// @see CleanContextBuilder +class PAIMON_EXPORT CleanContext { + public: + CleanContext(const std::string& root_path, const std::map& options, + int64_t older_than_ms, const std::shared_ptr& pool, + const std::shared_ptr& executor, + const std::shared_ptr& specific_file_system, + std::function should_be_retained); + ~CleanContext(); + + const std::string& GetRootPath() const { + return root_path_; + } + + const std::map& GetOptions() const { + return options_; + } + + int64_t GetOlderThanMs() const { + return older_than_ms_; + } + + std::shared_ptr GetMemoryPool() const { + return memory_pool_; + } + + std::shared_ptr GetExecutor() const { + return executor_; + } + + std::shared_ptr GetSpecificFileSystem() const { + return specific_file_system_; + } + + std::function GetFileRetainCondition() const { + return should_be_retained_; + } + + private: + std::string root_path_; + std::map options_; + int64_t older_than_ms_; + std::shared_ptr memory_pool_; + std::shared_ptr executor_; + std::shared_ptr specific_file_system_; + std::function should_be_retained_; +}; + +/// `CleanContextBuilder` used to build a `CleanContext`, has input validation. +class PAIMON_EXPORT CleanContextBuilder { + public: + /// Constructs a `CleanContextBuilder` with required parameters. + /// @param root_path The root path of the table. + explicit CleanContextBuilder(const std::string& root_path); + + ~CleanContextBuilder(); + + /// Set a configuration options map to set some option entries which are not defined in the + /// table schema or whose values you want to overwrite. + /// @note The options map will clear the options added by `AddOption()` before. + /// @param options The configuration options map. + /// @return Reference to this builder for method chaining. + CleanContextBuilder& SetOptions(const std::map& options); + + /// Add a single configuration option which is not defined in the table schema or whose value + /// you want to overwrite. + /// + /// If you want to add multiple options, call `AddOption()` multiple times or use `SetOptions()` + /// instead. + /// @param key The option key. + /// @param value The option value. + /// @return Reference to this builder for method chaining. + CleanContextBuilder& AddOption(const std::string& key, const std::string& value); + + /// An optional time threshold in milliseconds for filtering. If not provided, defaults to the + /// current time minus one day. + CleanContextBuilder& WithOlderThanMs(int64_t older_than_ms); + + /// Specifies a custom condition to determine which files should be retained. + /// @param should_be_retained A callable object that takes a filename and returns `true` if the + /// file should be kept, or `false` if it can be deleted. + /// @return Reference to this builder for method chaining. + CleanContextBuilder& WithFileRetainCondition( + std::function should_be_retained); + + /// Set custom memory pool for memory management. + /// @param pool The memory pool to use. + /// @return Reference to this builder for method chaining. + CleanContextBuilder& WithMemoryPool(const std::shared_ptr& pool); + + /// Set custom executor for task execution. + /// @param executor The executor to use. + /// @return Reference to this builder for method chaining. + CleanContextBuilder& WithExecutor(const std::shared_ptr& executor); + + /// Sets a custom file system instance to be used for all file operations in this clean context. + /// This bypasses the global file system registry and uses the provided implementation directly. + /// + /// @param file_system The file system to use. + /// @return Reference to this builder for method chaining. + /// @note If not set, use default file system (configured in `Options::FILE_SYSTEM`) + CleanContextBuilder& WithFileSystem(const std::shared_ptr& file_system); + + /// Build and return a `CleanContext` instance with input validation. + /// @return Result containing the constructed `CleanContext` or an error status. + Result> Finish(); + + private: + class Impl; + + std::unique_ptr impl_; +}; + +/// To remove the data files and metadata files that are not used by table (so-called "orphan +/// files"). +/// +/// It will ignore exception when listing all files because it's OK to not delete unread files. +/// +/// To avoid deleting newly written files, it only deletes orphan files older than `olderThanMillis` +/// (1 day by default). +/// +/// To avoid deleting files that are used but not read by mistaken, it will stop removing process +/// when failed to read used files. +/// +/// To avoid deleting files that were newly added to the Paimon Java protocol but are unrecognized +/// by Paimon C++, we implemented a strong pattern-matching validation, deleting only files in +/// patterns we recognize. +/// +/// @note `OrphanFilesCleaner` in Paimon C++ only support cleaning append table, do not support +/// cleaning table with tag, table with external paths, table with branch, table with index, table +/// with changelog, and primary key table. +class PAIMON_EXPORT OrphanFilesCleaner { + public: + virtual ~OrphanFilesCleaner() = default; + + /// Create an instance of `OrphanFilesCleaner`. + /// + /// @param context A unique pointer to the `CleanContext` used for cleanup tasks. + /// + /// @return A Result containing a unique pointer to the `OrphanFilesCleaner` instance. + static Result> Create( + std::unique_ptr&& context); + + /// Cleans orphan files. + /// + /// @return A Result object containing a set of strings representing the paths of the cleaned + /// files. + virtual Result> Clean() = 0; + + /// Retrieve metrics related to orphan files cleaning operations. + /// + /// @return A shared pointer to a `Metrics` object containing cleaning metrics. + virtual std::shared_ptr GetMetrics() const = 0; + + protected: + OrphanFilesCleaner() = default; +}; +} // namespace paimon diff --git a/src/paimon/core/operation/expire_snapshots.cpp b/src/paimon/core/operation/expire_snapshots.cpp new file mode 100644 index 0000000..d6f4b12 --- /dev/null +++ b/src/paimon/core/operation/expire_snapshots.cpp @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/operation/expire_snapshots.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/executor/future.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/manifest/file_kind.h" +#include "paimon/core/manifest/manifest_entry.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/core/manifest/manifest_file_meta.h" +#include "paimon/core/manifest/manifest_list.h" +#include "paimon/core/snapshot.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/fs/file_system.h" + +namespace paimon { + +ExpireSnapshots::ExpireSnapshots(const std::shared_ptr& snapshot_manager, + const std::shared_ptr& path_factory, + const std::shared_ptr& manifest_list, + const std::shared_ptr& manifest_file, + const std::shared_ptr& fs, const ExpireConfig& config, + const std::shared_ptr& executor) + : snapshot_manager_(snapshot_manager), + path_factory_(path_factory), + manifest_list_(manifest_list), + manifest_file_(manifest_file), + fs_(fs), + config_(config), + executor_(executor), + logger_(Logger::GetLogger("ExpireSnapshots")) {} + +Result ExpireSnapshots::Expire() { + int32_t retain_min = config_.GetSnapshotRetainMin(); + if (retain_min < 1) { + return Status::Invalid( + fmt::format("Expire failed: snapshot retain minimum '{}' is less than 1", retain_min)); + } + + int32_t retain_max = config_.GetSnapshotRetainMax(); + if (retain_max < retain_min) { + return Status::Invalid(fmt::format( + "Expire failed: snapshot retain maximum '{}' must be greater or equal than retain " + "minimum '{}'", + retain_max, retain_min)); + } + int32_t max_deletes = config_.GetSnapshotMaxDeletes(); + if (max_deletes < 0) { + return Status::Invalid(fmt::format( + "Expire failed: snapshot max delete num '{}' must be greater than 0", max_deletes)); + } + if (snapshot_manager_ == nullptr) { + return Status::Invalid("Expire failed: snapshot manager is nullptr"); + } + PAIMON_ASSIGN_OR_RAISE(std::optional latest_snapshot_id, + snapshot_manager_->LatestSnapshotId()); + if (latest_snapshot_id == std::nullopt) { + // no snapshot, nothing to expire + return 0; + } + PAIMON_ASSIGN_OR_RAISE(std::optional earliest_snapshot_id, + snapshot_manager_->EarliestSnapshotId()); + if (earliest_snapshot_id == std::nullopt) { + // no snapshot, nothing to expire + return 0; + } + + // TODO(jinli.zjw): why not only use earliest snapshot id + int64_t min = + std::max(latest_snapshot_id.value() - retain_max + 1, earliest_snapshot_id.value()); + int64_t max = latest_snapshot_id.value() - retain_min + 1; + max = std::min(max, earliest_snapshot_id.value() + max_deletes); + // TODO(jinli.zjw): support consumer manager + int64_t older_than_ms = + DateTimeUtils::GetCurrentUTCTimeUs() / 1000 - config_.GetSnapshotTimeRetainMs(); + for (int64_t snapshot_id = min; snapshot_id < max; snapshot_id++) { + PAIMON_ASSIGN_OR_RAISE(bool exist, snapshot_manager_->SnapshotExists(snapshot_id)); + if (exist) { + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, snapshot_manager_->LoadSnapshot(snapshot_id)); + if (older_than_ms <= snapshot.TimeMillis()) { + return ExpireUntil(earliest_snapshot_id.value(), snapshot_id); + } + } + } + return ExpireUntil(earliest_snapshot_id.value(), max); +} + +Result ExpireSnapshots::ExpireUntil(int64_t earliest_snapshot_id, + int64_t end_exclusive_id) { + if (end_exclusive_id <= earliest_snapshot_id) { + // TODO(jinli.zjw): write earliest hint + return 0; + } + int64_t begin_inclusive_id = earliest_snapshot_id; + for (int64_t id = end_exclusive_id - 1; id >= earliest_snapshot_id; id--) { + PAIMON_ASSIGN_OR_RAISE(bool exist, snapshot_manager_->SnapshotExists(id)); + if (!exist) { + begin_inclusive_id = id + 1; + break; + } + } + PAIMON_LOG_DEBUG(logger_, "Snapshot expire range is [%ld, %ld]", begin_inclusive_id, + end_exclusive_id); + + // Since the data file deletion information for each snapshot is recorded in the delta part of + // the next snapshot, it is necessary to check the next snapshot. Otherwise, its data files will + // not be deleted in this round. + for (int64_t id = begin_inclusive_id + 1; id <= end_exclusive_id; id++) { + PAIMON_ASSIGN_OR_RAISE(bool exist, snapshot_manager_->SnapshotExists(id)); + if (!exist) { + begin_inclusive_id++; + continue; + } + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, snapshot_manager_->LoadSnapshot(id)); + PAIMON_RETURN_NOT_OK(CleanUnusedDataFiles(snapshot.DeltaManifestList())); + } + // TODO(jinli.zjw): support delete changelog files + + // data files in bucket directories has been deleted + // then delete changed bucket directories if they are empty + PAIMON_RETURN_NOT_OK(CleanEmptyDirectories()); + + PAIMON_ASSIGN_OR_RAISE(bool exist, snapshot_manager_->SnapshotExists(end_exclusive_id)); + if (!exist) { + return 0; + } + std::vector retained_snapshots; + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, snapshot_manager_->LoadSnapshot(end_exclusive_id)); + retained_snapshots.push_back(snapshot); + std::set skipping_sets; + PAIMON_RETURN_NOT_OK(GetManifestSkippingSet(retained_snapshots, &skipping_sets)); + for (int64_t id = begin_inclusive_id; id < end_exclusive_id; id++) { + PAIMON_LOG_DEBUG(logger_, "Ready to delete manifests in snapshot #%ld", id); + PAIMON_ASSIGN_OR_RAISE(bool exist, snapshot_manager_->SnapshotExists(id)); + if (!exist) { + begin_inclusive_id++; + continue; + } + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, snapshot_manager_->LoadSnapshot(id)); + PAIMON_RETURN_NOT_OK(CleanUnusedManifests(snapshot.BaseManifestList(), skipping_sets)); + PAIMON_RETURN_NOT_OK(CleanUnusedManifests(snapshot.DeltaManifestList(), skipping_sets)); + auto status = fs_->Delete(snapshot_manager_->SnapshotPath(id)); + // delete quietly will ignore any status error + (void)status; + } + PAIMON_RETURN_NOT_OK(snapshot_manager_->CommitEarliestHint(end_exclusive_id)); + return end_exclusive_id - begin_inclusive_id; +} + +/// Try to delete data directories that may be empty after data file deletion. +Status ExpireSnapshots::CleanEmptyDirectories() { + if (!config_.CleanEmptyDirectories() || deletion_buckets_.empty()) { + return Status::OK(); + } + + // All directory paths are deduplicated and sorted by hierarchy level + std::map> deduplicate; + for (const auto& [partition, buckets] : deletion_buckets_) { + std::vector to_delete_empty_directories; + // try to delete bucket directories + for (const auto& bucket : buckets) { + PAIMON_ASSIGN_OR_RAISE(std::string bucket_path, + path_factory_->BucketPath(partition, bucket)); + to_delete_empty_directories.push_back(bucket_path); + } + std::vector> futures; + for (const auto& empty_directory : to_delete_empty_directories) { + futures.push_back(Via(executor_.get(), [this, &empty_directory] { + auto ret = TryDeleteEmptyDirectory(empty_directory); + (void)ret; + })); + } + Wait(futures); + + PAIMON_ASSIGN_OR_RAISE(std::vector hierarchical_paths, + path_factory_->GetHierarchicalPartitionPath(partition)); + size_t hierarchies = hierarchical_paths.size(); + if (hierarchies == 0) { + continue; + } + + if (TryDeleteEmptyDirectory(hierarchical_paths[hierarchies - 1])) { + // deduplicate high level partition directories + for (size_t hierarchy = 0; hierarchy < hierarchies - 1; hierarchy++) { + deduplicate[hierarchy].insert(hierarchical_paths[hierarchy]); + } + } + } + + for (int32_t hierarchy = deduplicate.size() - 1; hierarchy >= 0; hierarchy--) { + auto iter = deduplicate.find(hierarchy); + if (iter == deduplicate.end()) { + continue; + } + for (const auto& path : iter->second) { + TryDeleteEmptyDirectory(path); + } + } + + deletion_buckets_.clear(); + return Status::OK(); +} + +bool ExpireSnapshots::TryDeleteEmptyDirectory(const std::string& path) const { + auto s = fs_->Delete(path, /*recursive=*/false); + if (s.ok()) { + return true; + } + return false; +} + +Status ExpireSnapshots::CleanUnusedManifests(const std::string& manifest_list_name, + const std::set& skipping_sets) { + std::vector manifest_file_metas; + auto status = manifest_list_->Read(manifest_list_name, nullptr, &manifest_file_metas); + if (status.ok()) { + std::vector to_delete_manifests; + // TODO(jinli.zjw): optimize for async + for (const auto& manifest_file_meta : manifest_file_metas) { + if (skipping_sets.count(manifest_file_meta.FileName()) == 0) { + manifest_file_->DeleteQuietly(manifest_file_meta.FileName()); + } + } + if (skipping_sets.count(manifest_list_name) == 0) { + manifest_list_->DeleteQuietly(manifest_list_name); + } + } + return Status::OK(); +} + +Status ExpireSnapshots::CleanUnusedDataFiles(const std::string& manifest_list_name) { + std::vector manifest_file_metas; + auto status = manifest_list_->Read(manifest_list_name, nullptr, &manifest_file_metas); + if (status.ok()) { + std::map data_files_to_delete; + for (const auto& manifest_file_meta : manifest_file_metas) { + std::vector manifest_entries; + auto status = + manifest_file_->Read(manifest_file_meta.FileName(), nullptr, &manifest_entries); + if (!status.ok()) { + // cancel deletion if any exception occurs + PAIMON_LOG_WARN(logger_, "Failed to read some manifest files. Cancel deletion. %s", + status.ToString().c_str()); + return Status::OK(); + } else { + PAIMON_RETURN_NOT_OK(GetDataFilesToDelete(manifest_entries, &data_files_to_delete)); + } + } + + std::vector> futures; + ScopeGuard guard([&futures]() { Wait(futures); }); + for (const auto& [data_file_to_delete, entry] : data_files_to_delete) { + auto delete_file_path = data_file_to_delete; + futures.push_back(Via(executor_.get(), [this, delete_file_path]() { + auto status = fs_->Delete(delete_file_path); + // delete quietly will ignore any status error + (void)status; + })); + deletion_buckets_[entry.Partition()].insert(entry.Bucket()); + } + } + return Status::OK(); +} + +Status ExpireSnapshots::GetDataFilesToDelete( + const std::vector& data_file_entries, + std::map* data_files_to_delete) const { + for (const auto& entry : data_file_entries) { + PAIMON_ASSIGN_OR_RAISE(std::string bucket_path, + path_factory_->BucketPath(entry.Partition(), entry.Bucket())); + std::string data_file_path = PathUtil::JoinPath(bucket_path, entry.FileName()); + if (entry.Kind() == FileKind::Add()) { + data_files_to_delete->erase(data_file_path); + } else if (entry.Kind() == FileKind::Delete()) { + // TODO(jinli.zjw): do not support extra files + data_files_to_delete->insert({data_file_path, entry}); + } else { + return Status::Invalid( + fmt::format("Unknown value kind {}", entry.Kind().ToByteValue())); + } + } + return Status::OK(); +} + +Status ExpireSnapshots::GetManifestSkippingSet(const std::vector& retained_snapshots, + std::set* skipping_manifest_set) const { + for (const auto& snapshot : retained_snapshots) { + skipping_manifest_set->insert(snapshot.BaseManifestList()); + skipping_manifest_set->insert(snapshot.DeltaManifestList()); + std::vector manifests; + PAIMON_RETURN_NOT_OK(manifest_list_->ReadDataManifests(snapshot, &manifests)); + for (const auto& manifest : manifests) { + skipping_manifest_set->insert(manifest.FileName()); + } + // TODO(jinli.zjw): skip index manifests + if (snapshot.IndexManifest() && snapshot.IndexManifest().value() != "") { + assert(false); + return Status::NotImplemented("do not support expire snapshot with index manifest"); + } + if (snapshot.Statistics()) { + skipping_manifest_set->insert(snapshot.Statistics().value()); + } + } + return Status::OK(); +} + +} // namespace paimon diff --git a/src/paimon/core/operation/expire_snapshots.h b/src/paimon/core/operation/expire_snapshots.h new file mode 100644 index 0000000..75b5fff --- /dev/null +++ b/src/paimon/core/operation/expire_snapshots.h @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "paimon/common/data/binary_row.h" +#include "paimon/core/options/expire_config.h" +#include "paimon/logging.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { + +class Snapshot; +class SnapshotManager; +class FileStorePathFactory; +class FileSystem; +class ManifestEntry; +class ManifestList; +class ManifestFile; +class Executor; +class BinaryRow; + +class ExpireSnapshots { + public: + ExpireSnapshots(const std::shared_ptr& snapshot_manager, + const std::shared_ptr& path_factory, + const std::shared_ptr& manifest_list, + const std::shared_ptr& manifest_file, + const std::shared_ptr& fs, const ExpireConfig& config, + const std::shared_ptr& executor); + + Result Expire(); + + private: + Result ExpireUntil(int64_t earliest_snapshot_id, int64_t end_exclusive_id); + + Status CleanUnusedDataFiles(const std::string& manifest_list_name); + Status CleanUnusedManifests(const std::string& manifest_list_name, + const std::set& skipping_sets); + Status CleanEmptyDirectories(); + Status GetDataFilesToDelete(const std::vector& data_file_entries, + std::map* data_files_to_delete) const; + Status GetManifestSkippingSet(const std::vector& retained_snapshots, + std::set* skipping_manifest_set) const; + bool TryDeleteEmptyDirectory(const std::string& path) const; + + std::shared_ptr snapshot_manager_; + std::shared_ptr path_factory_; + std::shared_ptr manifest_list_; + std::shared_ptr manifest_file_; + std::shared_ptr fs_; + ExpireConfig config_; + std::shared_ptr executor_; + std::unordered_map> deletion_buckets_; + + std::unique_ptr logger_; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/expire_snapshots_test.cpp b/src/paimon/core/operation/expire_snapshots_test.cpp new file mode 100644 index 0000000..f802194 --- /dev/null +++ b/src/paimon/core/operation/expire_snapshots_test.cpp @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/operation/expire_snapshots.h" + +#include +#include +#include + +#include "arrow/type.h" +#include "gtest/gtest.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/binary_row_writer.h" +#include "paimon/core/core_options.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/manifest/file_kind.h" +#include "paimon/core/manifest/manifest_entry.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/core/manifest/manifest_list.h" +#include "paimon/core/stats/simple_stats.h" +#include "paimon/core/utils/field_mapping.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/data/timestamp.h" +#include "paimon/defs.h" +#include "paimon/executor.h" +#include "paimon/format/file_format.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class ExpireSnapshotsTest : public testing::Test { + public: + void SetUp() override { + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + mem_pool_ = GetDefaultPool(); + executor_ = CreateDefaultExecutor(); + partition_keys_ = {"f0", "f3"}; + arrow::FieldVector fields = {arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), + arrow::field("f3", arrow::int16()), + arrow::field("f4", arrow::int16()), + arrow::field("f5", arrow::int32()), + arrow::field("f6", arrow::int32()), + arrow::field("f7", arrow::int64()), + arrow::field("f8", arrow::int64()), + arrow::field("f9", arrow::float32()), + arrow::field("f10", arrow::float64()), + arrow::field("f11", arrow::utf8()), + arrow::field("f12", arrow::binary()), + arrow::field("non-partition-field", arrow::int32())}; + schema_ = arrow::schema(fields); + ASSERT_OK_AND_ASSIGN(partition_schema_, + FieldMapping::GetPartitionSchema(schema_, partition_keys_)); + fs_ = std::make_shared(); + + test_data_path_ = "tmp"; + path_factory_ = CreateFactory(test_data_path_); + + ASSERT_OK_AND_ASSIGN(manifest_list_, ManifestList::Create(fs_, options.GetManifestFormat(), + options.GetManifestCompression(), + path_factory_, mem_pool_)); + + ASSERT_OK_AND_ASSIGN( + manifest_file_, + ManifestFile::Create(fs_, options.GetManifestFormat(), options.GetManifestCompression(), + path_factory_, options.GetManifestTargetFileSize(), mem_pool_, + options, partition_schema_)); + } + void TearDown() override {} + + template + static bool IsEqualMap(const std::map& expected, const std::map& actual) { + if (&expected == &actual) { + return true; + } + if (expected.size() != actual.size()) { + return false; + } + for (const auto& [key, value] : actual) { + auto iter = expected.find(key); + if (iter == expected.end()) { + return false; + } + if (iter->second == value) { + continue; + } else { + return false; + } + } + return true; + } + + std::unique_ptr CreateFactory(const std::string& root) const { + std::map raw_options; + raw_options[Options::FILE_FORMAT] = "orc"; + raw_options[Options::MANIFEST_FORMAT] = "orc"; + raw_options[Options::FILE_SYSTEM] = "local"; + EXPECT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap(raw_options)); + EXPECT_OK_AND_ASSIGN(std::vector external_paths, + options.CreateExternalPaths()); + EXPECT_OK_AND_ASSIGN(std::optional global_index_external_path, + options.CreateGlobalIndexExternalPath()); + EXPECT_OK_AND_ASSIGN( + auto path_factory, + FileStorePathFactory::Create( + root, schema_, partition_keys_, options.GetPartitionDefaultName(), + options.GetFileFormat()->Identifier(), options.DataFilePrefix(), + options.LegacyPartitionNameEnabled(), external_paths, global_index_external_path, + options.IndexFileInDataFileDir(), mem_pool_)); + return path_factory; + } + + ManifestEntry CreateManifestEntry(const std::string& file_name, int32_t bucket, + const FileKind& kind) const { + int32_t arity = 2; + BinaryRow row(arity); + BinaryRowWriter writer(&row, 20, mem_pool_.get()); + writer.WriteBoolean(0, true); + writer.WriteShort(1, 3); + writer.Complete(); + + auto data_file_meta = std::make_shared( + file_name, 1024, 8, DataFileMeta::EmptyMinKey(), DataFileMeta::EmptyMaxKey(), + SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), /*min_seq_no=*/16, + /*max_seq_no=*/32, + /*schema_id=*/1, /*level=*/2, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(0, 0), /*delete_row_count=*/3, + /*embedded_index=*/nullptr, /*file_source=*/std::nullopt, + /*external_path=*/std::nullopt, + /*value_stats_cols=*/std::nullopt, /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + return ManifestEntry(kind, row, bucket, /*total_buckets=*/3, data_file_meta); + } + + private: + std::string test_data_path_; + std::vector partition_keys_; + std::shared_ptr schema_; + std::shared_ptr partition_schema_; + std::shared_ptr mem_pool_; + std::shared_ptr executor_; + std::shared_ptr manifest_list_; + std::shared_ptr manifest_file_; + std::shared_ptr fs_; + std::shared_ptr path_factory_; +}; + +TEST_F(ExpireSnapshotsTest, TestInvalidInput) { + auto mgr = std::make_shared(fs_, test_data_path_); + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + ExpireSnapshots expire(mgr, path_factory_, manifest_list_, manifest_file_, fs_, + options.GetExpireConfig(), executor_); + ASSERT_OK_AND_ASSIGN(int32_t count, expire.Expire()); + ASSERT_EQ(count, 0); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::SNAPSHOT_NUM_RETAINED_MIN, "0"}})); + ExpireSnapshots expire(mgr, path_factory_, manifest_list_, manifest_file_, fs_, + options.GetExpireConfig(), executor_); + ASSERT_NOK(expire.Expire()); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::SNAPSHOT_NUM_RETAINED_MIN, "10"}, + {Options::SNAPSHOT_NUM_RETAINED_MAX, "9"}})); + ExpireSnapshots expire(mgr, path_factory_, manifest_list_, manifest_file_, fs_, + options.GetExpireConfig(), executor_); + ASSERT_NOK(expire.Expire()); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::SNAPSHOT_NUM_RETAINED_MIN, "10"}, + {Options::SNAPSHOT_NUM_RETAINED_MAX, "10"}})); + ExpireSnapshots expire(mgr, path_factory_, manifest_list_, manifest_file_, fs_, + options.GetExpireConfig(), executor_); + ASSERT_OK_AND_ASSIGN(int32_t count, expire.Expire()); + ASSERT_EQ(count, 0); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::SNAPSHOT_EXPIRE_LIMIT, "-1"}, + {Options::SNAPSHOT_NUM_RETAINED_MIN, "10"}, + {Options::SNAPSHOT_NUM_RETAINED_MAX, "10"}})); + ExpireSnapshots expire(mgr, path_factory_, manifest_list_, manifest_file_, fs_, + options.GetExpireConfig(), executor_); + ASSERT_NOK(expire.Expire()); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::SNAPSHOT_NUM_RETAINED_MIN, "10"}, + {Options::SNAPSHOT_NUM_RETAINED_MAX, "10"}})); + ExpireSnapshots expire(nullptr, path_factory_, manifest_list_, manifest_file_, fs_, + options.GetExpireConfig(), executor_); + ASSERT_NOK(expire.Expire()); + } +} + +TEST_F(ExpireSnapshotsTest, TestGetDataFileToDelete) { + auto mgr = std::make_shared(fs_, test_data_path_); + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({})); + { + ExpireSnapshots expire(mgr, path_factory_, manifest_list_, manifest_file_, fs_, + options.GetExpireConfig(), executor_); + std::map data_file_to_delete; + std::vector data_file_entries; + data_file_entries.push_back(CreateManifestEntry("file1", /*bucket=*/0, FileKind::Delete())); + data_file_entries.push_back(CreateManifestEntry("file2", /*bucket=*/1, FileKind::Delete())); + data_file_entries.push_back(CreateManifestEntry("file1", /*bucket=*/0, FileKind::Add())); + data_file_entries.push_back(CreateManifestEntry("file3", /*bucket=*/2, FileKind::Delete())); + ASSERT_OK(expire.GetDataFilesToDelete(data_file_entries, &data_file_to_delete)); + ASSERT_TRUE( + IsEqualMap(data_file_to_delete, + {{test_data_path_ + "/f0=true/f3=3/bucket-1/file2", data_file_entries[1]}, + {test_data_path_ + "/f0=true/f3=3/bucket-2/file3", data_file_entries[3]}})); + } + { + ExpireSnapshots expire(mgr, path_factory_, manifest_list_, manifest_file_, fs_, + options.GetExpireConfig(), executor_); + std::map data_file_to_delete; + std::vector data_file_entries; + data_file_entries.push_back(CreateManifestEntry("file1", /*bucket=*/0, FileKind::Add())); + data_file_entries.push_back(CreateManifestEntry("file2", /*bucket=*/1, FileKind::Delete())); + data_file_entries.push_back(CreateManifestEntry("file1", /*bucket=*/0, FileKind::Delete())); + data_file_entries.push_back(CreateManifestEntry("file3", /*bucket=*/2, FileKind::Delete())); + ASSERT_OK(expire.GetDataFilesToDelete(data_file_entries, &data_file_to_delete)); + ASSERT_TRUE( + IsEqualMap(data_file_to_delete, + {{test_data_path_ + "/f0=true/f3=3/bucket-0/file1", data_file_entries[2]}, + {test_data_path_ + "/f0=true/f3=3/bucket-1/file2", data_file_entries[1]}, + {test_data_path_ + "/f0=true/f3=3/bucket-2/file3", data_file_entries[3]}})); + } +} + +} // namespace paimon::test diff --git a/src/paimon/core/operation/metrics/clean_metrics.h b/src/paimon/core/operation/metrics/clean_metrics.h new file mode 100644 index 0000000..11d58af --- /dev/null +++ b/src/paimon/core/operation/metrics/clean_metrics.h @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace paimon { + +/// Metrics to measure clean operation. +class CleanMetrics { + public: + static constexpr char CLEAN_DURATION[] = "cleanDuration"; + static constexpr char CLEAN_LIST_DIRECTORIES_DURATION[] = "listDirectoriesDuration"; + static constexpr char CLEAN_LIST_DIRECTORIES[] = "listDirectories"; + static constexpr char CLEAN_LIST_FILE_STATUS_DURATION[] = "listFileStatusDuration"; + static constexpr char CLEAN_LIST_FILE_STATUS_TASKS[] = "listFileStatusTasks"; + static constexpr char CLEAN_LIST_USED_FILES_DURATION[] = "listUsedFilesDuration"; + static constexpr char CLEAN_SNAPSHOT_FILES[] = "snapshotFiles"; + static constexpr char CLEAN_USED_FILES[] = "usedFiles"; + static constexpr char CLEAN_SCAN_ORPHAN_FILES_DURATION[] = "scanOrphanFilesDuration"; + static constexpr char CLEAN_ORPHAN_FILES[] = "orphanFiles"; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/orphan_files_cleaner.cpp b/src/paimon/core/operation/orphan_files_cleaner.cpp new file mode 100644 index 0000000..0b1df48 --- /dev/null +++ b/src/paimon/core/operation/orphan_files_cleaner.cpp @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/orphan_files_cleaner.h" + +#include +#include +#include +#include + +#include "paimon/common/types/data_field.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/core_options.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/core/manifest/manifest_list.h" +#include "paimon/core/operation/orphan_files_cleaner_impl.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/utils/field_mapping.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/executor.h" +#include "paimon/format/file_format.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/status.h" + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { + +static constexpr int64_t FILE_MODIFICATION_THRESHOLD_MS = 86400 * 1000L; // 1 day + +CleanContext::CleanContext(const std::string& root_path, + const std::map& options, int64_t older_than_ms, + const std::shared_ptr& pool, + const std::shared_ptr& executor, + const std::shared_ptr& specific_file_system, + std::function should_be_retained) + : root_path_(root_path), + options_(options), + older_than_ms_(older_than_ms), + memory_pool_(pool), + executor_(executor), + specific_file_system_(specific_file_system), + should_be_retained_(should_be_retained) {} + +CleanContext::~CleanContext() = default; + +class CleanContextBuilder::Impl { + public: + friend class CleanContextBuilder; + + void Reset() { + options_.clear(); + older_than_ms_ = + DateTimeUtils::GetCurrentUTCTimeUs() / 1000 - FILE_MODIFICATION_THRESHOLD_MS; + memory_pool_ = GetDefaultPool(); + executor_ = CreateDefaultExecutor(); + specific_file_system_.reset(); + should_be_retained_ = nullptr; + } + + private: + std::string root_path_; + std::map options_; + int64_t older_than_ms_ = + DateTimeUtils::GetCurrentUTCTimeUs() / 1000 - FILE_MODIFICATION_THRESHOLD_MS; + std::shared_ptr memory_pool_ = GetDefaultPool(); + std::shared_ptr executor_ = CreateDefaultExecutor(); + std::shared_ptr specific_file_system_ = nullptr; + std::function should_be_retained_ = nullptr; +}; + +CleanContextBuilder::CleanContextBuilder(const std::string& root_path) + : impl_(std::make_unique()) { + impl_->root_path_ = root_path; +} + +CleanContextBuilder::~CleanContextBuilder() = default; + +CleanContextBuilder& CleanContextBuilder::AddOption(const std::string& key, + const std::string& value) { + impl_->options_[key] = value; + return *this; +} + +CleanContextBuilder& CleanContextBuilder::SetOptions( + const std::map& opts) { + impl_->options_ = opts; + return *this; +} + +CleanContextBuilder& CleanContextBuilder::WithOlderThanMs(int64_t older_than_ms) { + impl_->older_than_ms_ = older_than_ms; + return *this; +} + +CleanContextBuilder& CleanContextBuilder::WithMemoryPool(const std::shared_ptr& pool) { + impl_->memory_pool_ = pool; + return *this; +} + +CleanContextBuilder& CleanContextBuilder::WithExecutor(const std::shared_ptr& executor) { + impl_->executor_ = executor; + return *this; +} + +CleanContextBuilder& CleanContextBuilder::WithFileSystem( + const std::shared_ptr& file_system) { + impl_->specific_file_system_ = file_system; + return *this; +} + +CleanContextBuilder& CleanContextBuilder::WithFileRetainCondition( + std::function should_be_retained) { + impl_->should_be_retained_ = should_be_retained; + return *this; +} + +Result> CleanContextBuilder::Finish() { + PAIMON_ASSIGN_OR_RAISE(impl_->root_path_, PathUtil::NormalizePath(impl_->root_path_)); + if (impl_->root_path_.empty()) { + return Status::Invalid("root path is empty"); + } + auto ctx = std::make_unique( + impl_->root_path_, impl_->options_, impl_->older_than_ms_, impl_->memory_pool_, + impl_->executor_, impl_->specific_file_system_, impl_->should_be_retained_); + impl_->Reset(); + return ctx; +} + +Result> OrphanFilesCleaner::Create( + std::unique_ptr&& ctx) { + if (ctx == nullptr) { + return Status::Invalid("clean context is null pointer"); + } + if (ctx->GetMemoryPool() == nullptr) { + return Status::Invalid("memory pool is null pointer"); + } + if (ctx->GetExecutor() == nullptr) { + return Status::Invalid("executor is null pointer"); + } + if (ctx->GetOlderThanMs() < 0) { + return Status::Invalid("older than time needs to be greater than or equal to 0."); + } + PAIMON_ASSIGN_OR_RAISE(CoreOptions tmp_options, + CoreOptions::FromMap(ctx->GetOptions(), ctx->GetSpecificFileSystem())); + SchemaManager schema_manager(tmp_options.GetFileSystem(), ctx->GetRootPath()); + PAIMON_ASSIGN_OR_RAISE(std::optional> table_schema, + schema_manager.Latest()); + if (table_schema == std::nullopt) { + return Status::Invalid("not found latest schema"); + } + if (!table_schema.value()->PrimaryKeys().empty()) { + return Status::NotImplemented("orphan files cleaner only support append table"); + } + // merge options + const auto& schema = table_schema.value(); + auto opts = schema->Options(); + for (const auto& [key, value] : ctx->GetOptions()) { + opts[key] = value; + } + PAIMON_ASSIGN_OR_RAISE(CoreOptions options, + CoreOptions::FromMap(opts, ctx->GetSpecificFileSystem())); + auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(schema->Fields()); + PAIMON_ASSIGN_OR_RAISE(std::vector external_paths, options.CreateExternalPaths()); + PAIMON_ASSIGN_OR_RAISE(std::optional global_index_external_path, + options.CreateGlobalIndexExternalPath()); + + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr path_factory, + FileStorePathFactory::Create( + ctx->GetRootPath(), arrow_schema, schema->PartitionKeys(), + options.GetPartitionDefaultName(), options.GetFileFormat()->Identifier(), + options.DataFilePrefix(), options.LegacyPartitionNameEnabled(), external_paths, + global_index_external_path, options.IndexFileInDataFileDir(), ctx->GetMemoryPool())); + auto snapshot_manager = + std::make_shared(options.GetFileSystem(), ctx->GetRootPath()); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr manifest_list, + ManifestList::Create(options.GetFileSystem(), options.GetManifestFormat(), + options.GetManifestCompression(), path_factory, ctx->GetMemoryPool())); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr partition_schema, + FieldMapping::GetPartitionSchema(arrow_schema, table_schema.value()->PartitionKeys())); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr manifest_file, + ManifestFile::Create(options.GetFileSystem(), options.GetManifestFormat(), + options.GetManifestCompression(), path_factory, + options.GetManifestTargetFileSize(), ctx->GetMemoryPool(), options, + partition_schema)); + return std::make_unique( + ctx->GetMemoryPool(), ctx->GetExecutor(), arrow_schema, ctx->GetRootPath(), options, + snapshot_manager, schema->PartitionKeys(), manifest_file, manifest_list, + ctx->GetOlderThanMs(), ctx->GetFileRetainCondition()); +} + +} // namespace paimon diff --git a/src/paimon/core/operation/orphan_files_cleaner_impl.cpp b/src/paimon/core/operation/orphan_files_cleaner_impl.cpp new file mode 100644 index 0000000..9fa4b3b --- /dev/null +++ b/src/paimon/core/operation/orphan_files_cleaner_impl.cpp @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/operation/orphan_files_cleaner_impl.h" + +#include +#include +#include +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/executor/future.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/manifest/manifest_entry.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/core/manifest/manifest_file_meta.h" +#include "paimon/core/manifest/manifest_list.h" +#include "paimon/core/operation/metrics/clean_metrics.h" +#include "paimon/core/snapshot.h" +#include "paimon/core/utils/duration.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/status.h" + +namespace paimon { +class MemoryPool; + +const int64_t OrphanFilesCleanerImpl::MIN_VALID_FILE_MODIFICATION_MS = 10 * 1000 * 1000 * 1000L; + +OrphanFilesCleanerImpl::OrphanFilesCleanerImpl( + const std::shared_ptr& memory_pool, const std::shared_ptr& executor, + const std::shared_ptr& schema, const std::string& root_path, + const CoreOptions& options, const std::shared_ptr& snapshot_manager, + const std::vector& partition_keys, + const std::shared_ptr& manifest_file, + const std::shared_ptr& manifest_list, int64_t older_than_ms, + std::function should_be_retained) + : memory_pool_(memory_pool), + executor_(executor), + schema_(schema), + root_path_(root_path), + options_(options), + fs_(options.GetFileSystem()), + snapshot_manager_(snapshot_manager), + partition_keys_(partition_keys), + manifest_file_(manifest_file), + manifest_list_(manifest_list), + older_than_ms_(older_than_ms), + should_be_retained_(should_be_retained), + metrics_(std::make_shared()) {} + +bool OrphanFilesCleanerImpl::SupportToClean(const std::string& file_name) { + static std::vector> supported_pattern = { + {"manifest-", ""}, {"manifest-list-", ""}, {".", ".tmp"}}; + for (const auto& pattern : supported_pattern) { + if (StringUtils::StartsWith(file_name, pattern.first) && + StringUtils::EndsWith(file_name, pattern.second)) { + return true; + } + } + static std::vector supported_formats = {".orc", ".parquet", ".avro", ".lance"}; + for (const auto& format : supported_formats) { + if (StringUtils::StartsWith(file_name, "data-") && + StringUtils::EndsWith(file_name, format)) { + return true; + } + } + return false; +} + +Result> OrphanFilesCleanerImpl::Clean() { + Duration main_duration; + if (!MinimalTryBestListingDirs(PathUtil::JoinPath(root_path_, "tag")).empty()) { + return Status::NotImplemented("OrphanFilesCleaner do not support cleaning table with tag"); + } + if (!MinimalTryBestListingDirs(PathUtil::JoinPath(root_path_, "branch")).empty()) { + return Status::NotImplemented( + "OrphanFilesCleaner do not support cleaning table with branch"); + } + PAIMON_ASSIGN_OR_RAISE(std::set all_dirs, ListPaimonFileDirs()); + std::vector>>> file_statuses_futures; + for (const auto& dir : all_dirs) { + file_statuses_futures.push_back( + Via(executor_.get(), [this, dir] { return TryBestListingDirs(dir); })); + } + PAIMON_ASSIGN_OR_RAISE(std::set used_file_names, GetUsedFiles()); + + Duration duration; + std::set need_to_deletes; + std::vector> futures; + ScopeGuard guard([&futures]() { Wait(futures); }); + uint64_t file_statuses_duration = duration.Reset(); + for (const auto& file_statuses : CollectAll(file_statuses_futures)) { + for (const auto& file_status : file_statuses) { + if (file_status->IsDir()) { + continue; + } + std::string path = file_status->GetPath(); + std::string file_name = PathUtil::GetName(path); + if (!SupportToClean(file_name)) { + continue; + } + if (file_status->GetModificationTime() < older_than_ms_ && + !used_file_names.count(file_name)) { + if (should_be_retained_ && should_be_retained_(file_name)) { + continue; + } + if (file_status->GetModificationTime() <= MIN_VALID_FILE_MODIFICATION_MS) { + return Status::Invalid( + fmt::format("file '{}' modification '{}' is not in millisecond", path, + file_status->GetModificationTime())); + } + need_to_deletes.insert(path); + futures.push_back(Via(executor_.get(), [this, path]() { + auto s = fs_->Delete(path, /*recursive=*/false); + (void)s; + })); + } + } + } + metrics_->SetCounter(CleanMetrics::CLEAN_DURATION, main_duration.Get()); + metrics_->SetCounter(CleanMetrics::CLEAN_SCAN_ORPHAN_FILES_DURATION, duration.Get()); + metrics_->SetCounter(CleanMetrics::CLEAN_LIST_FILE_STATUS_DURATION, file_statuses_duration); + metrics_->SetCounter(CleanMetrics::CLEAN_LIST_FILE_STATUS_TASKS, + static_cast(file_statuses_futures.size())); + metrics_->SetCounter(CleanMetrics::CLEAN_ORPHAN_FILES, + static_cast(need_to_deletes.size())); + return need_to_deletes; +} + +Result> OrphanFilesCleanerImpl::ListPaimonFileDirs() const { + Duration duration; + std::set paimon_file_dirs; + paimon_file_dirs.insert(snapshot_manager_->SnapshotDirectory()); + paimon_file_dirs.insert(FileStorePathFactory::ManifestPath(root_path_)); + // TODO(jinli.zjw): support clean index, stats, changelog in the future + // paimon_file_dirs.insert(FileStorePathFactory::IndexPath(root_path_)); + // paimon_file_dirs.insert(FileStorePathFactory::StatisticsPath(root_path_)); + std::set file_dirs = ListFileDirs(root_path_, partition_keys_.size()); + paimon_file_dirs.insert(file_dirs.begin(), file_dirs.end()); + // add external data paths + PAIMON_ASSIGN_OR_RAISE(std::vector data_file_external_paths, + options_.CreateExternalPaths()); + if (!data_file_external_paths.empty()) { + return Status::Invalid( + "OrphanFilesCleaner do not support cleaning table with external paths"); + } + // TODO(liancheng): support clean external paths in the future + // PAIMON_ASSIGN_OR_RAISE(std::vector external_paths, + // options_.CreateExternalPaths()); + // for (const auto& external_path : external_paths) { + // std::set external_file_dirs = + // ListFileDirs(external_path, partition_keys_.size()); + // paimon_file_dirs.insert(external_file_dirs.begin(), external_file_dirs.end()); + // } + metrics_->SetCounter(CleanMetrics::CLEAN_LIST_DIRECTORIES_DURATION, duration.Get()); + metrics_->SetCounter(CleanMetrics::CLEAN_LIST_DIRECTORIES, + static_cast(paimon_file_dirs.size())); + return paimon_file_dirs; +} + +std::set OrphanFilesCleanerImpl::ListFileDirs(const std::string& path, + int32_t max_level) const { + std::queue queue; + queue.push(path); + std::set results; + for (int32_t current_level = 0; current_level <= max_level; current_level++) { + std::vector>>> futures; + while (!queue.empty()) { + auto current_path = queue.front(); + futures.push_back(Via(executor_.get(), [this, current_path] { + return MinimalTryBestListingDirs(current_path); + })); + queue.pop(); + } + for (const auto& dirs : CollectAll(futures)) { + for (const auto& dir : dirs) { + const auto& dir_name = PathUtil::GetName(dir->GetPath()); + if (current_level == max_level) { + if (StringUtils::StartsWith( + dir_name, std::string(FileStorePathFactory::BUCKET_PATH_PREFIX))) { + results.insert(dir->GetPath()); + } + } else { + if (dir_name.find("=") != std::string::npos) { + queue.push(dir->GetPath()); + } + } + } + } + } + return results; +} + +std::vector> OrphanFilesCleanerImpl::TryBestListingDirs( + const std::string& path) const { + Result is_exist = fs_->Exists(path); + if (!is_exist.ok()) { + return {}; + } + std::vector> file_statuses; + auto status = fs_->ListFileStatus(path, &file_statuses); + if (!status.ok()) { + return {}; + } + return file_statuses; +} + +std::vector> OrphanFilesCleanerImpl::MinimalTryBestListingDirs( + const std::string& path) const { + Result is_exist = fs_->Exists(path); + if (!is_exist.ok()) { + return {}; + } + std::vector> file_statuses; + auto status = fs_->ListDir(path, &file_statuses); + if (!status.ok()) { + return {}; + } + return file_statuses; +} + +Result> OrphanFilesCleanerImpl::GetUsedFiles() const { + std::set used_files; + // TODO(jinli.zjw): consider changelog(add tests), stats + used_files.insert(SnapshotManager::EARLIEST); + used_files.insert(SnapshotManager::LATEST); + Duration duration; + PAIMON_ASSIGN_OR_RAISE(std::vector snapshots, snapshot_manager_->GetAllSnapshots()); + std::vector>>> used_files_futures; + std::vector>> used_files_results; + { + ScopeGuard guard([&used_files_futures, &used_files_results]() { + used_files_results = CollectAll(used_files_futures); + }); + for (const auto& snapshot : snapshots) { + const std::optional& changelog_manifest_list = + snapshot.ChangelogManifestList(); + if (changelog_manifest_list) { + used_files.insert(changelog_manifest_list.value()); + return Status::NotImplemented("OrphanFilesCleaner do not support clean changelog"); + } + const std::optional& index_manifest_name = snapshot.IndexManifest(); + if (index_manifest_name) { + return Status::NotImplemented( + "OrphanFilesCleaner do not support clean index manifest"); + // TODO(jinli.zjw): support IndexManifestEntry and add tests + // used_files.insert(index_manifest_name.value()); + } + + used_files_futures.emplace_back(Via( + executor_.get(), [this, snapshot] { return GetUsedFilesBySnapshot(snapshot); })); + } + } + + for (const auto& used_files_result : used_files_results) { + PAIMON_RETURN_NOT_OK(used_files_result); + used_files.insert(used_files_result.value().begin(), used_files_result.value().end()); + } + + metrics_->SetCounter(CleanMetrics::CLEAN_LIST_USED_FILES_DURATION, duration.Get()); + metrics_->SetCounter(CleanMetrics::CLEAN_USED_FILES, static_cast(used_files.size())); + metrics_->SetCounter(CleanMetrics::CLEAN_SNAPSHOT_FILES, + static_cast(snapshots.size())); + return used_files; +} + +Result> OrphanFilesCleanerImpl::GetUsedFilesBySnapshot( + const Snapshot& snapshot) const { + std::set used_files; + + used_files.insert(SnapshotManager::SNAPSHOT_PREFIX + std::to_string(snapshot.Id())); + used_files.insert(snapshot.BaseManifestList()); + used_files.insert(snapshot.DeltaManifestList()); + std::vector manifests; + PAIMON_RETURN_NOT_OK(manifest_list_->ReadIfFileExist(snapshot.BaseManifestList(), + /*filter=*/nullptr, &manifests)); + PAIMON_RETURN_NOT_OK(manifest_list_->ReadIfFileExist(snapshot.DeltaManifestList(), + /*filter=*/nullptr, &manifests)); + + for (const auto& manifest : manifests) { + used_files.insert(manifest.FileName()); + std::vector manifest_entries; + PAIMON_RETURN_NOT_OK(manifest_file_->ReadIfFileExist( + manifest.FileName(), /*filter=*/nullptr, &manifest_entries)); + for (const auto& manifest_entry : manifest_entries) { + used_files.insert(manifest_entry.FileName()); + } + } + + return used_files; +} + +} // namespace paimon diff --git a/src/paimon/core/operation/orphan_files_cleaner_impl.h b/src/paimon/core/operation/orphan_files_cleaner_impl.h new file mode 100644 index 0000000..be96b3b --- /dev/null +++ b/src/paimon/core/operation/orphan_files_cleaner_impl.h @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "paimon/core/core_options.h" +#include "paimon/core/snapshot.h" +#include "paimon/fs/file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/metrics.h" +#include "paimon/orphan_files_cleaner.h" +#include "paimon/result.h" + +struct ArrowSchema; + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { + +class SnapshotManager; +class FileStorePathFactory; +class FileSystem; +class ManifestFile; +class ManifestList; +class Executor; +class MemoryPool; + +class OrphanFilesCleanerImpl : public OrphanFilesCleaner { + public: + OrphanFilesCleanerImpl(const std::shared_ptr& memory_pool, + const std::shared_ptr& executor, + const std::shared_ptr& schema, + const std::string& root_path, const CoreOptions& options, + const std::shared_ptr& snapshot_manager, + const std::vector& partition_keys, + const std::shared_ptr& manifest_file, + const std::shared_ptr& manifest_list, + int64_t older_than_ms, + std::function should_be_retained); + + Result> Clean() override; + + std::shared_ptr GetMetrics() const override { + return metrics_; + } + + private: + Result> ListPaimonFileDirs() const; + std::vector> TryBestListingDirs(const std::string& path) const; + std::vector> MinimalTryBestListingDirs( + const std::string& path) const; + std::set ListFileDirs(const std::string& path, int32_t max_level) const; + Result> GetUsedFiles() const; + Result> GetUsedFilesBySnapshot(const Snapshot& snapshot) const; + static bool SupportToClean(const std::string& file_name); + + private: + static const int64_t MIN_VALID_FILE_MODIFICATION_MS; + + std::shared_ptr memory_pool_; + std::shared_ptr executor_; + std::shared_ptr schema_; + std::string root_path_; + CoreOptions options_; + std::shared_ptr fs_; + std::shared_ptr snapshot_manager_; + std::vector partition_keys_; + std::shared_ptr manifest_file_; + std::shared_ptr manifest_list_; + int64_t older_than_ms_; + std::function should_be_retained_; + + std::shared_ptr metrics_; +}; +} // namespace paimon diff --git a/src/paimon/core/operation/orphan_files_cleaner_test.cpp b/src/paimon/core/operation/orphan_files_cleaner_test.cpp new file mode 100644 index 0000000..4fb94f8 --- /dev/null +++ b/src/paimon/core/operation/orphan_files_cleaner_test.cpp @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/orphan_files_cleaner.h" + +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/operation/orphan_files_cleaner_impl.h" +#include "paimon/defs.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(OrphanFilesCleanerTest, TestSupportToClean) { + ASSERT_TRUE( + OrphanFilesCleanerImpl::SupportToClean("data-2d5ea1ea-77c1-47ff-bb87-19a509962a37-0.orc")); + ASSERT_TRUE(OrphanFilesCleanerImpl::SupportToClean( + "data-2d5ea1ea-77c1-47ff-bb87-19a509962a37-0.parquet")); + ASSERT_TRUE( + OrphanFilesCleanerImpl::SupportToClean("data-2d5ea1ea-77c1-47ff-bb87-19a509962a37-0.avro")); + ASSERT_TRUE(OrphanFilesCleanerImpl::SupportToClean( + "data-2d5ea1ea-77c1-47ff-bb87-19a509962a37-0.parquet")); + ASSERT_TRUE(OrphanFilesCleanerImpl::SupportToClean( + "data-2d5ea1ea-77c1-47ff-bb87-19a509962a37-0.lance")); + ASSERT_TRUE( + OrphanFilesCleanerImpl::SupportToClean("manifest-3ea5ee21-d399-4f1c-a749-2fc63dbf0852-0")); + ASSERT_TRUE(OrphanFilesCleanerImpl::SupportToClean( + "manifest-list-469f3a0f-f6f1-4027-91bf-d1e897e8ea23-1")); + ASSERT_TRUE(OrphanFilesCleanerImpl::SupportToClean( + ".snapshot-2.13c988c3-784d-493d-8884-016ddddb1fc2.tmp")); + ASSERT_FALSE(OrphanFilesCleanerImpl::SupportToClean("tmp")); + ASSERT_FALSE(OrphanFilesCleanerImpl::SupportToClean("snapshot-1")); + ASSERT_FALSE(OrphanFilesCleanerImpl::SupportToClean("schema-0")); + ASSERT_FALSE(OrphanFilesCleanerImpl::SupportToClean("bucket-0")); + ASSERT_FALSE(OrphanFilesCleanerImpl::SupportToClean( + "changelog-ce64d06d-c4cd-456b-a1b3-ae570042620f-0.parquet")); + ASSERT_FALSE(OrphanFilesCleanerImpl::SupportToClean( + "data-5515726b-0f0f-4556-a942-e795e9f94c4a-0.orc.index")); + ASSERT_FALSE( + OrphanFilesCleanerImpl::SupportToClean("index-aa60193d-d7cd-434f-bc1a-c1adb210e1f7-0")); + ASSERT_FALSE( + OrphanFilesCleanerImpl::SupportToClean("data-2d5ea1ea-77c1-47ff-bb87-19a509962a37-0.json")); + ASSERT_FALSE(OrphanFilesCleanerImpl::SupportToClean( + "some_data-2d5ea1ea-77c1-47ff-bb87-19a509962a37-0.orc")); +} + +TEST(OrphanFilesCleanerTest, TestPkTable) { + std::string table_path = + paimon::test::GetDataDir() + "/orc/pk_table_with_mor.db/pk_table_with_mor/"; + CleanContextBuilder clean_context_builder(table_path); + ASSERT_OK_AND_ASSIGN(std::unique_ptr clean_context, + clean_context_builder.AddOption(Options::FILE_SYSTEM, "local").Finish()); + ASSERT_NOK_WITH_MSG(OrphanFilesCleaner::Create(std::move(clean_context)), + "orphan files cleaner only support append table"); +} + +TEST(OrphanFilesCleanerTest, TestTableWithTag) { + std::string test_data_path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09/"; + auto dir = UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, table_path)); + auto file_system = std::make_shared(); + ASSERT_OK(file_system->WriteFile(PathUtil::JoinPath(table_path, "tag/tag-1"), " ", true)); + + CleanContextBuilder clean_context_builder(table_path); + ASSERT_OK_AND_ASSIGN(std::unique_ptr clean_context, + clean_context_builder.AddOption(Options::FILE_SYSTEM, "local").Finish()); + ASSERT_OK_AND_ASSIGN(auto cleaner, OrphanFilesCleaner::Create(std::move(clean_context))); + ASSERT_NOK_WITH_MSG(cleaner->Clean(), + "OrphanFilesCleaner do not support cleaning table with tag"); +} + +TEST(OrphanFilesCleanerTest, TestTableWithBranch) { + std::string test_data_path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09/"; + auto dir = UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, table_path)); + auto file_system = std::make_shared(); + ASSERT_OK(file_system->WriteFile(PathUtil::JoinPath(table_path, "branch/branch-1"), " ", true)); + + CleanContextBuilder clean_context_builder(table_path); + ASSERT_OK_AND_ASSIGN(std::unique_ptr clean_context, + clean_context_builder.AddOption(Options::FILE_SYSTEM, "local").Finish()); + ASSERT_OK_AND_ASSIGN(auto cleaner, OrphanFilesCleaner::Create(std::move(clean_context))); + ASSERT_NOK_WITH_MSG(cleaner->Clean(), + "OrphanFilesCleaner do not support cleaning table with branch"); +} + +TEST(OrphanFilesCleanerTest, TestTableWithIndex) { + std::string test_data_path = + paimon::test::GetDataDir() + "/orc/append_with_bsi.db/append_with_bsi/"; + auto dir = UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, table_path)); + auto file_system = std::make_shared(); + + CleanContextBuilder clean_context_builder(table_path); + ASSERT_OK_AND_ASSIGN(std::unique_ptr clean_context, + clean_context_builder.AddOption(Options::FILE_SYSTEM, "local") + .WithOlderThanMs(std::numeric_limits::max()) + .Finish()); + ASSERT_OK_AND_ASSIGN(auto cleaner, OrphanFilesCleaner::Create(std::move(clean_context))); + ASSERT_OK_AND_ASSIGN(std::set cleaned_paths, cleaner->Clean()); + ASSERT_TRUE(cleaned_paths.empty()); +} + +TEST(OrphanFilesCleanerTest, TestTableWithBrokenSnapshot) { + std::string test_data_path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09/"; + auto file_system = std::make_shared(); + auto check_result = [](const std::set& actual, + const std::set& expected) -> bool { + std::set file_names; + for (const auto& file_path : actual) { + file_names.insert(PathUtil::GetName(file_path)); + } + return file_names == expected; + }; + + // test with non-exist manifest list, which manifest has reference by other manifest-list, so it + // will not be cleaned + { + auto dir = UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, table_path)); + ASSERT_OK(file_system->Delete(PathUtil::JoinPath( + table_path, "manifest/manifest-list-616d1847-a02c-495f-9cca-2c8b7def0fec-1"))); + + CleanContextBuilder clean_context_builder(table_path); + ASSERT_OK_AND_ASSIGN(std::unique_ptr clean_context, + clean_context_builder.AddOption(Options::FILE_SYSTEM, "local") + .WithOlderThanMs(std::numeric_limits::max()) + .Finish()); + ASSERT_OK_AND_ASSIGN(auto cleaner, OrphanFilesCleaner::Create(std::move(clean_context))); + ASSERT_OK_AND_ASSIGN(std::set cleaned_paths, cleaner->Clean()); + ASSERT_TRUE(cleaned_paths.empty()); + } + // test with non-exist manifest list, which manifest has no other reference, so it will be + // cleaned + { + auto dir = UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, table_path)); + ASSERT_OK(file_system->Delete(PathUtil::JoinPath( + table_path, "manifest/manifest-list-f2d59cb8-3ec6-4860-b34b-050b1a533416-3"))); + + CleanContextBuilder clean_context_builder(table_path); + ASSERT_OK_AND_ASSIGN(std::unique_ptr clean_context, + clean_context_builder.AddOption(Options::FILE_SYSTEM, "local") + .WithOlderThanMs(std::numeric_limits::max()) + .Finish()); + ASSERT_OK_AND_ASSIGN(auto cleaner, OrphanFilesCleaner::Create(std::move(clean_context))); + ASSERT_OK_AND_ASSIGN(std::set cleaned_paths, cleaner->Clean()); + ASSERT_TRUE( + check_result(cleaned_paths, {"data-b9e7c41f-66e8-4dad-b25a-e6e1963becc4-0.orc", + "manifest-3ea5ee21-d399-4f1c-a749-2fc63dbf0852-1"})); + } + // test with non-exist manifest + { + auto dir = UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, table_path)); + ASSERT_OK(file_system->Delete(PathUtil::JoinPath( + table_path, "manifest/manifest-f8b15cfc-437a-4d21-a6a0-e45b639ae7ed-0"))); + + CleanContextBuilder clean_context_builder(table_path); + ASSERT_OK_AND_ASSIGN(std::unique_ptr clean_context, + clean_context_builder.AddOption(Options::FILE_SYSTEM, "local") + .WithOlderThanMs(std::numeric_limits::max()) + .Finish()); + ASSERT_OK_AND_ASSIGN(auto cleaner, OrphanFilesCleaner::Create(std::move(clean_context))); + ASSERT_OK_AND_ASSIGN(std::set cleaned_paths, cleaner->Clean()); + ASSERT_TRUE( + check_result(cleaned_paths, {"data-d41fd7d1-b3e4-4905-aad9-b20a780e90a2-0.orc", + "data-db2b44c0-0d73-449d-82a0-4075bd2cb6e3-0.orc"})); + } + // test with non-exist data file + { + std::string test_data_path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09/"; + auto dir = UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, table_path)); + auto file_system = std::make_shared(); + ASSERT_OK(file_system->Delete(PathUtil::JoinPath( + table_path, "f1=10/bucket-0/data-d41fd7d1-b3e4-4905-aad9-b20a780e90a2-0.orc"))); + + CleanContextBuilder clean_context_builder(table_path); + ASSERT_OK_AND_ASSIGN(std::unique_ptr clean_context, + clean_context_builder.AddOption(Options::FILE_SYSTEM, "local") + .WithOlderThanMs(std::numeric_limits::max()) + .Finish()); + ASSERT_OK_AND_ASSIGN(auto cleaner, OrphanFilesCleaner::Create(std::move(clean_context))); + ASSERT_OK_AND_ASSIGN(std::set cleaned_paths, cleaner->Clean()); + ASSERT_TRUE(cleaned_paths.empty()); + } +} + +TEST(OrphanFilesCleanerTest, TestTableWithChangelog) { + std::string test_data_path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09/"; + auto dir = UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, table_path)); + auto file_system = std::make_shared(); + auto snapshot_str = R"({ + "version" : 3, + "id" : 6, + "schemaId" : 0, + "baseManifestList" : "manifest-list-f2d59cb8-3ec6-4860-b34b-050b1a533416-0", + "deltaManifestList" : "manifest-list-f2d59cb8-3ec6-4860-b34b-050b1a533416-1", + "changelogManifestList" : "manifest-list-f2d59cb8-3ec6-4860-b34b-050b1a533416-2", + "commitUser" : "febb1e71-79fc-4abc-9b9d-464ecbc198f7", + "commitIdentifier" : 9223372036854775807, + "commitKind" : "APPEND", + "timeMillis" : 1721615035363, + "logOffsets" : { }, + "totalRecordCount" : 11, + "deltaRecordCount" : 1, + "changelogRecordCount" : 0 +})"; + ASSERT_OK(file_system->WriteFile(PathUtil::JoinPath(table_path, "snapshot/snapshot-6"), + snapshot_str, true)); + + CleanContextBuilder clean_context_builder(table_path); + ASSERT_OK_AND_ASSIGN(std::unique_ptr clean_context, + clean_context_builder.AddOption(Options::FILE_SYSTEM, "local").Finish()); + ASSERT_OK_AND_ASSIGN(auto cleaner, OrphanFilesCleaner::Create(std::move(clean_context))); + ASSERT_NOK_WITH_MSG(cleaner->Clean(), "OrphanFilesCleaner do not support clean changelog"); +} + +TEST(OrphanFilesCleanerTest, TestTableWithIndexManifest) { + std::string test_data_path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09/"; + auto dir = UniqueTestDirectory::Create(); + std::string table_path = dir->Str(); + ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, table_path)); + auto file_system = std::make_shared(); + auto snapshot_str = R"({ + "version" : 3, + "id" : 6, + "schemaId" : 0, + "baseManifestList" : "manifest-list-f2d59cb8-3ec6-4860-b34b-050b1a533416-0", + "deltaManifestList" : "manifest-list-f2d59cb8-3ec6-4860-b34b-050b1a533416-1", + "changelogManifestList" : null, + "indexManifest" : "index-manifest-bd43150e-cce1-4231-bfc1-8fdc2b0b5994-0", + "commitUser" : "febb1e71-79fc-4abc-9b9d-464ecbc198f7", + "commitIdentifier" : 9223372036854775807, + "commitKind" : "APPEND", + "timeMillis" : 1721615035363, + "logOffsets" : { }, + "totalRecordCount" : 11, + "deltaRecordCount" : 1, + "changelogRecordCount" : 0 +})"; + ASSERT_OK(file_system->WriteFile(PathUtil::JoinPath(table_path, "snapshot/snapshot-6"), + snapshot_str, true)); + + CleanContextBuilder clean_context_builder(table_path); + ASSERT_OK_AND_ASSIGN(std::unique_ptr clean_context, + clean_context_builder.AddOption(Options::FILE_SYSTEM, "local").Finish()); + ASSERT_OK_AND_ASSIGN(auto cleaner, OrphanFilesCleaner::Create(std::move(clean_context))); + ASSERT_NOK_WITH_MSG(cleaner->Clean(), "OrphanFilesCleaner do not support clean index manifest"); +} + +} // namespace paimon::test