Skip to content

Commit 3ced20a

Browse files
authored
feat: Migrate bucket module (#71)
1 parent 4a77916 commit 3ced20a

17 files changed

Lines changed: 2830 additions & 0 deletions
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
#pragma once
20+
21+
#include <cstdint>
22+
23+
#include "paimon/defs.h"
24+
#include "paimon/visibility.h"
25+
26+
namespace paimon {
27+
28+
/// Specifies the bucket function type for paimon bucket.
29+
/// This determines how rows are assigned to buckets during data writing.
30+
enum class BucketFunctionType {
31+
/// The default bucket function which will use arithmetic:
32+
/// bucket_id = abs(hash_bucket_binary_row % numBuckets) to get bucket.
33+
DEFAULT = 1,
34+
/// The modulus bucket function which will use modulus arithmetic:
35+
/// bucket_id = floorMod(bucket_key_value, numBuckets) to get bucket.
36+
/// Note: the bucket key must be a single field of INT or BIGINT datatype.
37+
MOD = 2,
38+
/// The hive bucket function which will use hive-compatible hash arithmetic to get bucket.
39+
HIVE = 3
40+
};
41+
42+
/// Describes a field's type information needed for Hive hashing.
43+
struct PAIMON_EXPORT HiveFieldInfo {
44+
FieldType type;
45+
int32_t precision = 0; // Used for DECIMAL type
46+
int32_t scale = 0; // Used for DECIMAL type
47+
48+
explicit HiveFieldInfo(FieldType t) : type(t) {}
49+
HiveFieldInfo(FieldType t, int32_t p, int32_t s) : type(t), precision(p), scale(s) {}
50+
};
51+
52+
} // namespace paimon
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
#pragma once
20+
#include <cstdint>
21+
#include <memory>
22+
#include <vector>
23+
24+
#include "paimon/bucket/bucket_function_type.h"
25+
#include "paimon/memory/memory_pool.h"
26+
#include "paimon/result.h"
27+
#include "paimon/status.h"
28+
#include "paimon/visibility.h"
29+
30+
struct ArrowSchema;
31+
struct ArrowArray;
32+
33+
namespace paimon {
34+
class BucketFunction;
35+
class MemoryPool;
36+
37+
/// Calculator for determining bucket ids based on the given bucket keys.
38+
///
39+
/// @note `BucketIdCalculator` is compatible with the Java implementation and uses
40+
/// hash-based distribution to ensure even data distribution across buckets.
41+
class PAIMON_EXPORT BucketIdCalculator {
42+
public:
43+
/// Create `BucketIdCalculator` with default bucket function.
44+
/// @param is_pk_table Whether this is for a primary key table.
45+
/// @param num_buckets Number of buckets.
46+
/// @param pool Memory pool for memory allocation.
47+
static Result<std::unique_ptr<BucketIdCalculator>> Create(
48+
bool is_pk_table, int32_t num_buckets, const std::shared_ptr<MemoryPool>& pool);
49+
50+
/// Create `BucketIdCalculator` with a custom bucket function.
51+
/// @param is_pk_table Whether this is for a primary key table.
52+
/// @param num_buckets Number of buckets.
53+
/// @param bucket_function The bucket function to use for bucket assignment.
54+
/// @param pool Memory pool for memory allocation.
55+
static Result<std::unique_ptr<BucketIdCalculator>> Create(
56+
bool is_pk_table, int32_t num_buckets, std::unique_ptr<BucketFunction> bucket_function,
57+
const std::shared_ptr<MemoryPool>& pool);
58+
59+
/// Create `BucketIdCalculator` with MOD bucket function.
60+
/// @param is_pk_table Whether this is for a primary key table.
61+
/// @param num_buckets Number of buckets.
62+
/// @param bucket_key_type The type of the single bucket key field. Must be INT or BIGINT.
63+
/// @param pool Memory pool for memory allocation.
64+
static Result<std::unique_ptr<BucketIdCalculator>> CreateMod(
65+
bool is_pk_table, int32_t num_buckets, FieldType bucket_key_type,
66+
const std::shared_ptr<MemoryPool>& pool);
67+
68+
/// Create `BucketIdCalculator` with HIVE bucket function.
69+
/// @param is_pk_table Whether this is for a primary key table.
70+
/// @param num_buckets Number of buckets.
71+
/// @param field_infos The detailed type info of all fields in the bucket key row.
72+
/// @param pool Memory pool for memory allocation.
73+
static Result<std::unique_ptr<BucketIdCalculator>> CreateHive(
74+
bool is_pk_table, int32_t num_buckets, const std::vector<HiveFieldInfo>& field_infos,
75+
const std::shared_ptr<MemoryPool>& pool);
76+
77+
/// Calculate bucket ids for the given bucket keys.
78+
/// @param bucket_keys Arrow struct array containing the bucket key values.
79+
/// @param bucket_schema Arrow schema describing the structure of bucket_keys.
80+
/// @param bucket_ids Output array to store calculated bucket ids.
81+
/// @note 1. bucket_keys is a struct array, the order of fields needs to be consistent with
82+
/// "bucket-key" options in table schema. 2. bucket_keys and bucket_schema match each other. 3.
83+
/// bucket_ids is allocated enough space, at least >= bucket_keys->length
84+
Status CalculateBucketIds(ArrowArray* bucket_keys, ArrowSchema* bucket_schema,
85+
int32_t* bucket_ids) const;
86+
87+
/// Destructor
88+
~BucketIdCalculator();
89+
90+
private:
91+
BucketIdCalculator(int32_t num_buckets, std::unique_ptr<BucketFunction> bucket_function,
92+
const std::shared_ptr<MemoryPool>& pool);
93+
94+
private:
95+
int32_t num_buckets_;
96+
std::unique_ptr<BucketFunction> bucket_function_;
97+
std::shared_ptr<MemoryPool> pool_;
98+
};
99+
} // namespace paimon
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
#pragma once
20+
21+
#include <cstdint>
22+
23+
namespace paimon {
24+
25+
class BinaryRow;
26+
27+
/// Abstract interface for bucket functions.
28+
/// A bucket function determines which bucket a row should be assigned to.
29+
class BucketFunction {
30+
public:
31+
virtual ~BucketFunction() = default;
32+
33+
/// Compute the bucket for the given row.
34+
/// @param row The binary row to compute the bucket for.
35+
/// @param num_buckets The total number of buckets.
36+
/// @return The bucket index (0-based).
37+
virtual int32_t Bucket(const BinaryRow& row, int32_t num_buckets) const = 0;
38+
};
39+
40+
} // namespace paimon

0 commit comments

Comments
 (0)