From a0c0fb5f88c2b068efac52ad452dc7d1b2f866fc Mon Sep 17 00:00:00 2001
From: Caideyipi <87789683+Caideyipi@users.noreply.github.com>
Date: Tue, 9 Jun 2026 14:26:23 +0800
Subject: [PATCH 1/2] Fix overflow edge cases in query utilities
---
iotdb-core/calc-commons/pom.xml | 5 +
.../fill/filter/FixedIntervalFillFilter.java | 10 +-
.../aggregation/ExtremeAccumulator.java | 32 ++++--
.../grouped/GroupedExtremeAccumulator.java | 70 ++++++++----
.../filter/FixedIntervalFillFilterTest.java | 34 ++++++
.../aggregation/ExtremeAccumulatorTest.java | 101 ++++++++++++++++++
.../router/leader/HashLeaderBalancer.java | 2 +-
.../router/leader/HashLeaderBalancerTest.java | 65 +++++++++++
.../AsyncIoTConsensusServiceClient.java | 2 +-
.../changing/ChangingValueFilter.java | 20 +++-
.../sdt/SwingingDoorTrendingFilter.java | 19 +++-
.../TumblingTimeSamplingProcessor.java | 7 +-
.../sender/TwoStageAggregateSender.java | 2 +-
.../tsfile/PipeTsFileResourceSegmentLock.java | 10 +-
.../aggregation/ExtremeAccumulator.java | 32 ++++--
.../SlidingWindowAggregatorFactory.java | 36 ++++---
.../operator/window/SessionWindow.java | 24 ++++-
.../operator/window/SessionWindowManager.java | 2 +-
.../plan/AbstractFragmentParallelPlanner.java | 3 +-
...SubscriptionPipeEventBatchSegmentLock.java | 4 +-
.../sdt/SwingingDoorTrendingFilterTest.java | 56 ++++++++++
.../PipeTsFileResourceSegmentLockTest.java | 71 ++++++++++++
.../aggregation/AccumulatorTest.java | 46 ++++++++
.../SlidingWindowAggregatorFactoryTest.java | 41 +++++++
.../operator/window/SessionWindowTest.java | 52 +++++++++
.../AsyncAINodeInternalServiceClient.java | 2 +-
.../AsyncConfigNodeInternalServiceClient.java | 2 +-
.../AsyncDataNodeExternalServiceClient.java | 2 +-
.../AsyncDataNodeInternalServiceClient.java | 2 +-
...cDataNodeMPPDataExchangeServiceClient.java | 2 +-
.../AsyncIoTConsensusV2ServiceClient.java | 2 +-
.../AsyncPipeDataTransferServiceClient.java | 2 +-
32 files changed, 676 insertions(+), 84 deletions(-)
create mode 100644 iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java
create mode 100644 iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java
create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancerTest.java
create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilterTest.java
create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLockTest.java
create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactoryTest.java
create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java
diff --git a/iotdb-core/calc-commons/pom.xml b/iotdb-core/calc-commons/pom.xml
index d854f29cce77e..fbe69d074c839 100644
--- a/iotdb-core/calc-commons/pom.xml
+++ b/iotdb-core/calc-commons/pom.xml
@@ -99,6 +99,11 @@
at.yawk.lz4
lz4-java
+
+ junit
+ junit
+ test
+
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilter.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilter.java
index 8fbd4ce379d78..e7dd783b38722 100644
--- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilter.java
+++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilter.java
@@ -34,6 +34,14 @@ public FixedIntervalFillFilter(long timeInterval) {
public boolean needFill(long time, long previousTime) {
// the reason that we use Math.abs is that we may use order by time desc which will cause
// previousTime is larger than time
- return Math.abs(time - previousTime) <= timeInterval;
+ return isTimeDistanceLessThanOrEqual(time, previousTime, timeInterval);
+ }
+
+ private boolean isTimeDistanceLessThanOrEqual(long left, long right, long maxDistance) {
+ if (maxDistance < 0) {
+ return false;
+ }
+ long distance = left >= right ? left - right : right - left;
+ return Long.compareUnsigned(distance, maxDistance) <= 0;
}
}
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulator.java
index 85fbee4bd5ea9..f546282d31b6b 100644
--- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulator.java
+++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulator.java
@@ -247,13 +247,9 @@ private void addIntInput(Column column, AggregationMask mask) {
}
private void updateIntResult(int val) {
- int absExtVal = Math.abs(val);
int candidateResult = extremeResult.getInt();
- int absCandidateResult = Math.abs(extremeResult.getInt());
- if (!initResult
- || (absExtVal > absCandidateResult)
- || (absExtVal == absCandidateResult) && val > candidateResult) {
+ if (!initResult || compareExtreme(val, candidateResult) > 0) {
initResult = true;
extremeResult.setInt(val);
}
@@ -281,13 +277,9 @@ private void addLongInput(Column column, AggregationMask mask) {
}
private void updateLongResult(long val) {
- long absExtVal = Math.abs(val);
long candidateResult = extremeResult.getLong();
- long absCandidateResult = Math.abs(extremeResult.getLong());
- if (!initResult
- || (absExtVal > absCandidateResult)
- || (absExtVal == absCandidateResult) && val > candidateResult) {
+ if (!initResult || compareExtreme(val, candidateResult) > 0) {
initResult = true;
extremeResult.setLong(val);
}
@@ -360,4 +352,24 @@ private void updateDoubleResult(double val) {
extremeResult.setDouble(val);
}
}
+
+ private int compareExtreme(int left, int right) {
+ int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) right));
+ return absComparison == 0 ? Integer.compare(left, right) : absComparison;
+ }
+
+ private int compareExtreme(long left, long right) {
+ int absComparison = compareAbs(left, right);
+ return absComparison == 0 ? Long.compare(left, right) : absComparison;
+ }
+
+ private int compareAbs(long left, long right) {
+ if (left == Long.MIN_VALUE) {
+ return right == Long.MIN_VALUE ? 0 : 1;
+ }
+ if (right == Long.MIN_VALUE) {
+ return -1;
+ }
+ return Long.compare(Math.abs(left), Math.abs(right));
+ }
}
diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java
index bd5aca876b7af..b4102de7aa041 100644
--- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java
+++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java
@@ -173,16 +173,16 @@ public void addIntermediate(int[] groupIds, Column argument) {
switch (seriesDataType) {
case INT32:
- updateIntValue(groupIds[i], Math.abs(argument.getInt(i)));
+ updateIntValue(groupIds[i], argument.getInt(i));
break;
case INT64:
- updateLongValue(groupIds[i], Math.abs(argument.getLong(i)));
+ updateLongValue(groupIds[i], argument.getLong(i));
break;
case FLOAT:
- updateFloatValue(groupIds[i], Math.abs(argument.getFloat(i)));
+ updateFloatValue(groupIds[i], argument.getFloat(i));
break;
case DOUBLE:
- updateDoubleValue(groupIds[i], Math.abs(argument.getDouble(i)));
+ updateDoubleValue(groupIds[i], argument.getDouble(i));
break;
case TEXT:
case STRING:
@@ -300,7 +300,7 @@ private void addIntInput(int[] groupIds, Column valueColumn, AggregationMask mas
if (mask.isSelectAll()) {
for (int i = 0; i < positionCount; i++) {
if (!valueColumn.isNull(i)) {
- updateIntValue(groupIds[i], Math.abs(valueColumn.getInt(i)));
+ updateIntValue(groupIds[i], valueColumn.getInt(i));
}
}
} else {
@@ -309,15 +309,15 @@ private void addIntInput(int[] groupIds, Column valueColumn, AggregationMask mas
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
- updateIntValue(groupIds[position], Math.abs(valueColumn.getInt(position)));
+ updateIntValue(groupIds[position], valueColumn.getInt(position));
}
}
}
}
protected void updateIntValue(int groupId, int value) {
- int max = intValues.get(groupId);
- if (value >= max) {
+ int candidate = intValues.get(groupId);
+ if (!inits.get(groupId) || compareExtreme(value, candidate) >= 0) {
inits.set(groupId, true);
intValues.set(groupId, value);
}
@@ -329,7 +329,7 @@ private void addLongInput(int[] groupIds, Column valueColumn, AggregationMask ma
if (mask.isSelectAll()) {
for (int i = 0; i < positionCount; i++) {
if (!valueColumn.isNull(i)) {
- updateLongValue(groupIds[i], Math.abs(valueColumn.getLong(i)));
+ updateLongValue(groupIds[i], valueColumn.getLong(i));
}
}
} else {
@@ -338,15 +338,15 @@ private void addLongInput(int[] groupIds, Column valueColumn, AggregationMask ma
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
- updateLongValue(groupIds[position], Math.abs(valueColumn.getLong(position)));
+ updateLongValue(groupIds[position], valueColumn.getLong(position));
}
}
}
}
protected void updateLongValue(int groupId, long value) {
- long max = longValues.get(groupId);
- if (value >= max) {
+ long candidate = longValues.get(groupId);
+ if (!inits.get(groupId) || compareExtreme(value, candidate) >= 0) {
inits.set(groupId, true);
longValues.set(groupId, value);
}
@@ -358,7 +358,7 @@ private void addFloatInput(int[] groupIds, Column valueColumn, AggregationMask m
if (mask.isSelectAll()) {
for (int i = 0; i < positionCount; i++) {
if (!valueColumn.isNull(i)) {
- updateFloatValue(groupIds[i], Math.abs(valueColumn.getFloat(i)));
+ updateFloatValue(groupIds[i], valueColumn.getFloat(i));
}
}
} else {
@@ -367,15 +367,15 @@ private void addFloatInput(int[] groupIds, Column valueColumn, AggregationMask m
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
- updateFloatValue(groupIds[position], Math.abs(valueColumn.getFloat(position)));
+ updateFloatValue(groupIds[position], valueColumn.getFloat(position));
}
}
}
}
protected void updateFloatValue(int groupId, float value) {
- float max = floatValues.get(groupId);
- if (value >= max) {
+ float candidate = floatValues.get(groupId);
+ if (!inits.get(groupId) || compareExtreme(value, candidate) >= 0) {
inits.set(groupId, true);
floatValues.set(groupId, value);
}
@@ -387,7 +387,7 @@ private void addDoubleInput(int[] groupIds, Column valueColumn, AggregationMask
if (mask.isSelectAll()) {
for (int i = 0; i < positionCount; i++) {
if (!valueColumn.isNull(i)) {
- updateDoubleValue(groupIds[i], Math.abs(valueColumn.getDouble(i)));
+ updateDoubleValue(groupIds[i], valueColumn.getDouble(i));
}
}
} else {
@@ -396,17 +396,47 @@ private void addDoubleInput(int[] groupIds, Column valueColumn, AggregationMask
for (int i = 0; i < positionCount; i++) {
position = selectedPositions[i];
if (!valueColumn.isNull(position)) {
- updateDoubleValue(groupIds[position], Math.abs(valueColumn.getDouble(position)));
+ updateDoubleValue(groupIds[position], valueColumn.getDouble(position));
}
}
}
}
protected void updateDoubleValue(int groupId, double value) {
- double max = doubleValues.get(groupId);
- if (value >= max) {
+ double candidate = doubleValues.get(groupId);
+ if (!inits.get(groupId) || compareExtreme(value, candidate) >= 0) {
inits.set(groupId, true);
doubleValues.set(groupId, value);
}
}
+
+ private int compareExtreme(int left, int right) {
+ int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) right));
+ return absComparison == 0 ? Integer.compare(left, right) : absComparison;
+ }
+
+ private int compareExtreme(long left, long right) {
+ int absComparison = compareAbs(left, right);
+ return absComparison == 0 ? Long.compare(left, right) : absComparison;
+ }
+
+ private int compareAbs(long left, long right) {
+ if (left == Long.MIN_VALUE) {
+ return right == Long.MIN_VALUE ? 0 : 1;
+ }
+ if (right == Long.MIN_VALUE) {
+ return -1;
+ }
+ return Long.compare(Math.abs(left), Math.abs(right));
+ }
+
+ private int compareExtreme(float left, float right) {
+ int absComparison = Float.compare(Math.abs(left), Math.abs(right));
+ return absComparison == 0 ? Float.compare(left, right) : absComparison;
+ }
+
+ private int compareExtreme(double left, double right) {
+ int absComparison = Double.compare(Math.abs(left), Math.abs(right));
+ return absComparison == 0 ? Double.compare(left, right) : absComparison;
+ }
}
diff --git a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java
new file mode 100644
index 0000000000000..544c95b8cadf9
--- /dev/null
+++ b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.calc.execution.operator.process.fill.filter;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FixedIntervalFillFilterTest {
+
+ @Test
+ public void needFillHandlesOverflowedTimeDistance() {
+ FixedIntervalFillFilter filter = new FixedIntervalFillFilter(1);
+
+ Assert.assertTrue(filter.needFill(Long.MIN_VALUE, Long.MIN_VALUE + 1));
+ Assert.assertFalse(filter.needFill(Long.MIN_VALUE, Long.MAX_VALUE));
+ }
+}
diff --git a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java
new file mode 100644
index 0000000000000..c796f499a96e7
--- /dev/null
+++ b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.calc.execution.operator.source.relational.aggregation;
+
+import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedExtremeAccumulator;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.IntColumnBuilder;
+import org.apache.tsfile.read.common.block.column.LongColumnBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class ExtremeAccumulatorTest {
+
+ @Test
+ public void tableExtremeAccumulatorHandlesMinValues() {
+ ExtremeAccumulator intAccumulator = new ExtremeAccumulator(TSDataType.INT32);
+ TsBlock intBlock = buildIntBlock(Integer.MAX_VALUE, Integer.MIN_VALUE);
+ intAccumulator.addInput(
+ new org.apache.tsfile.block.column.Column[] {intBlock.getColumn(0)},
+ AggregationMask.createSelectAll(intBlock.getPositionCount()));
+ ColumnBuilder intResult = new IntColumnBuilder(null, 1);
+ intAccumulator.evaluateFinal(intResult);
+ Assert.assertEquals(Integer.MIN_VALUE, intResult.build().getInt(0));
+
+ ExtremeAccumulator longAccumulator = new ExtremeAccumulator(TSDataType.INT64);
+ TsBlock longBlock = buildLongBlock(Long.MAX_VALUE, Long.MIN_VALUE);
+ longAccumulator.addInput(
+ new org.apache.tsfile.block.column.Column[] {longBlock.getColumn(0)},
+ AggregationMask.createSelectAll(longBlock.getPositionCount()));
+ ColumnBuilder longResult = new LongColumnBuilder(null, 1);
+ longAccumulator.evaluateFinal(longResult);
+ Assert.assertEquals(Long.MIN_VALUE, longResult.build().getLong(0));
+ }
+
+ @Test
+ public void groupedExtremeAccumulatorKeepsOriginalValueAndHandlesMinValues() {
+ GroupedExtremeAccumulator intAccumulator = new GroupedExtremeAccumulator(TSDataType.INT32);
+ intAccumulator.setGroupCount(1);
+ TsBlock intBlock = buildIntBlock(-5, 4);
+ intAccumulator.addInput(
+ new int[] {0, 0},
+ new org.apache.tsfile.block.column.Column[] {intBlock.getColumn(0)},
+ AggregationMask.createSelectAll(intBlock.getPositionCount()));
+ ColumnBuilder intResult = new IntColumnBuilder(null, 1);
+ intAccumulator.evaluateFinal(0, intResult);
+ Assert.assertEquals(-5, intResult.build().getInt(0));
+
+ GroupedExtremeAccumulator longAccumulator = new GroupedExtremeAccumulator(TSDataType.INT64);
+ longAccumulator.setGroupCount(1);
+ TsBlock longBlock = buildLongBlock(Long.MAX_VALUE, Long.MIN_VALUE);
+ longAccumulator.addIntermediate(new int[] {0, 0}, longBlock.getColumn(0));
+ ColumnBuilder longResult = new LongColumnBuilder(null, 1);
+ longAccumulator.evaluateFinal(0, longResult);
+ Assert.assertEquals(Long.MIN_VALUE, longResult.build().getLong(0));
+ }
+
+ private TsBlock buildIntBlock(int... values) {
+ TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ ColumnBuilder valueBuilder = builder.getValueColumnBuilders()[0];
+ for (int i = 0; i < values.length; i++) {
+ builder.getTimeColumnBuilder().writeLong(i);
+ valueBuilder.writeInt(values[i]);
+ builder.declarePosition();
+ }
+ return builder.build();
+ }
+
+ private TsBlock buildLongBlock(long... values) {
+ TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT64));
+ ColumnBuilder valueBuilder = builder.getValueColumnBuilders()[0];
+ for (int i = 0; i < values.length; i++) {
+ builder.getTimeColumnBuilder().writeLong(i);
+ valueBuilder.writeLong(values[i]);
+ builder.declarePosition();
+ }
+ return builder.build();
+ }
+}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java
index aff023e82da8a..06ad4a76eb513 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java
@@ -43,7 +43,7 @@ public Map generateOptimalLeaderDistribution(
(gid, nodeSet) -> {
List nodeList = new ArrayList<>(nodeSet);
nodeList.sort(null);
- int startNodeIndex = Math.abs(gid.hashCode()) % nodeList.size();
+ int startNodeIndex = (int) (Math.abs((long) gid.hashCode()) % nodeList.size());
int finalNodeId = nodeList.get(startNodeIndex);
for (int i = 0; i < nodeList.size(); i++) {
int currentNodeIndex = (startNodeIndex + i) % nodeList.size();
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancerTest.java
new file mode 100644
index 0000000000000..f2ccccd857d2f
--- /dev/null
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancerTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.balancer.router.leader;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+public class HashLeaderBalancerTest {
+
+ private static final HashLeaderBalancer BALANCER = new HashLeaderBalancer();
+ private static final int MIN_VALUE_HASH_REGION_ID = 268255235;
+
+ @Test
+ public void minValueHashCodeTest() {
+ TConsensusGroupId regionGroupId =
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, MIN_VALUE_HASH_REGION_ID);
+ Assert.assertEquals(Integer.MIN_VALUE, regionGroupId.hashCode());
+
+ Map> regionReplicaSetMap = new TreeMap<>();
+ regionReplicaSetMap.put(regionGroupId, new HashSet<>(Arrays.asList(1, 2, 3)));
+
+ Map dataNodeStatisticsMap = new TreeMap<>();
+ dataNodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Running));
+ dataNodeStatisticsMap.put(2, new NodeStatistics(NodeStatus.Running));
+ dataNodeStatisticsMap.put(3, new NodeStatistics(NodeStatus.Running));
+
+ Map leaderDistribution =
+ BALANCER.generateOptimalLeaderDistribution(
+ new TreeMap<>(),
+ regionReplicaSetMap,
+ new TreeMap<>(),
+ dataNodeStatisticsMap,
+ new TreeMap<>());
+
+ Assert.assertEquals(Integer.valueOf(3), leaderDistribution.get(regionGroupId));
+ }
+}
diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
index ad0bcc0d9a25a..99e6cbcf71eaf 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
@@ -160,7 +160,7 @@ public PooledObject makeObject(TEndPoint endPoin
new AsyncIoTConsensusServiceClient(
thriftClientProperty,
endPoint,
- tManagers[clientCnt.incrementAndGet() % tManagers.length],
+ tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)],
clientManager));
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java
index 7dc8c87c09b2d..9bb5dfcb9a761 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java
@@ -59,13 +59,13 @@ public boolean filter(final long timestamp, final T value) {
}
private boolean tryFilter(final long timestamp, final T value) {
- final long timeDiff = Math.abs(timestamp - lastStoredTimestamp);
-
- if (timeDiff <= processor.getCompressionMinTimeInterval()) {
+ if (isTimeDistanceLessThanOrEqual(
+ timestamp, lastStoredTimestamp, processor.getCompressionMinTimeInterval())) {
return false;
}
- if (timeDiff >= processor.getCompressionMaxTimeInterval()) {
+ if (isTimeDistanceGreaterThanOrEqual(
+ timestamp, lastStoredTimestamp, processor.getCompressionMaxTimeInterval())) {
reset(timestamp, value);
return true;
}
@@ -94,6 +94,18 @@ private boolean tryFilter(final long timestamp, final T value) {
return false;
}
+ private boolean isTimeDistanceLessThanOrEqual(
+ final long left, final long right, final long maxDistance) {
+ final long distance = left >= right ? left - right : right - left;
+ return Long.compareUnsigned(distance, maxDistance) <= 0;
+ }
+
+ private boolean isTimeDistanceGreaterThanOrEqual(
+ final long left, final long right, final long minDistance) {
+ final long distance = left >= right ? left - right : right - left;
+ return Long.compareUnsigned(distance, minDistance) >= 0;
+ }
+
private void reset(final long timestamp, final T value) {
lastStoredTimestamp = timestamp;
lastStoredValue = value;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
index 850cbd3ed0729..bfc0bc5d3f791 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java
@@ -86,13 +86,14 @@ public boolean filter(final long timestamp, final T value) {
private boolean tryFilter(final long timestamp, final T value) {
final long timeDiff = timestamp - lastStoredTimestamp;
- final long absTimeDiff = Math.abs(timeDiff);
- if (absTimeDiff <= processor.getCompressionMinTimeInterval()) {
+ if (isTimeDistanceLessThanOrEqual(
+ timestamp, lastStoredTimestamp, processor.getCompressionMinTimeInterval())) {
return false;
}
- if (absTimeDiff >= processor.getCompressionMaxTimeInterval()) {
+ if (isTimeDistanceGreaterThanOrEqual(
+ timestamp, lastStoredTimestamp, processor.getCompressionMaxTimeInterval())) {
reset(timestamp, value);
return true;
}
@@ -144,6 +145,18 @@ private boolean tryFilter(final long timestamp, final T value) {
return false;
}
+ private boolean isTimeDistanceLessThanOrEqual(
+ final long left, final long right, final long maxDistance) {
+ final long distance = left >= right ? left - right : right - left;
+ return Long.compareUnsigned(distance, maxDistance) <= 0;
+ }
+
+ private boolean isTimeDistanceGreaterThanOrEqual(
+ final long left, final long right, final long minDistance) {
+ final long distance = left >= right ? left - right : right - left;
+ return Long.compareUnsigned(distance, minDistance) >= 0;
+ }
+
private void reset(final long timestamp, final T value) {
upperDoor = Double.MIN_VALUE;
lowerDoor = Double.MAX_VALUE;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java
index 36fa0b8382f7b..8beab1343888c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java
@@ -116,7 +116,7 @@ protected void processRow(
final Long lastSampleTime = pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix);
if (lastSampleTime == null
- || Math.abs(currentRowTime - lastSampleTime) >= intervalInCurrentPrecision) {
+ || isTimeDistanceGreaterThanOrEqual(currentRowTime, lastSampleTime)) {
try {
rowCollector.collectRow(row);
@@ -135,4 +135,9 @@ protected void processRow(
}
}
}
+
+ private boolean isTimeDistanceGreaterThanOrEqual(long left, long right) {
+ long distance = left >= right ? left - right : right - left;
+ return Long.compareUnsigned(distance, intervalInCurrentPrecision) >= 0;
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
index 9b6e17e6e4a76..e221b4a19c779 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -76,7 +76,7 @@ public synchronized TPipeTransferResp request(long watermark, TPipeTransferReq r
final boolean endPointsChanged = tryFetchEndPointsIfNecessary();
tryConstructClients(endPointsChanged);
- final TEndPoint endPoint = endPoints[(int) watermark % endPoints.length];
+ final TEndPoint endPoint = endPoints[(int) Math.floorMod(watermark, endPoints.length)];
IoTDBSyncClient client = endPointIoTDBSyncClientMap.get(endPoint);
if (client == null) {
client = reconstructIoTDBSyncClient(endPoint);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
index aaf5ce4db5a5f..a73198c2713ef 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java
@@ -78,17 +78,21 @@ private void initIfNecessary() {
public void lock(final File file) {
initIfNecessary();
- locks[Math.abs(file.hashCode()) % locks.length].lock();
+ locks[getLockIndex(file)].lock();
}
public boolean tryLock(final File file, final long timeout, final TimeUnit timeUnit)
throws InterruptedException {
initIfNecessary();
- return locks[Math.abs(file.hashCode()) % locks.length].tryLock(timeout, timeUnit);
+ return locks[getLockIndex(file)].tryLock(timeout, timeUnit);
}
public void unlock(final File file) {
initIfNecessary();
- locks[Math.abs(file.hashCode()) % locks.length].unlock();
+ locks[getLockIndex(file)].unlock();
+ }
+
+ private int getLockIndex(final File file) {
+ return (int) (Math.abs((long) file.hashCode()) % locks.length);
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java
index f99e01e8d5a59..af9812e0a17bc 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java
@@ -270,13 +270,9 @@ private void addIntInput(Column[] column, BitMap bitMap) {
}
private void updateIntResult(int extVal) {
- int absExtVal = Math.abs(extVal);
int candidateResult = extremeResult.getInt();
- int absCandidateResult = Math.abs(extremeResult.getInt());
- if (!initResult
- || (absExtVal > absCandidateResult)
- || (absExtVal == absCandidateResult) && extVal > candidateResult) {
+ if (!initResult || compareExtreme(extVal, candidateResult) > 0) {
initResult = true;
extremeResult.setInt(extVal);
}
@@ -295,13 +291,9 @@ private void addLongInput(Column[] column, BitMap bitMap) {
}
private void updateLongResult(long extVal) {
- long absExtVal = Math.abs(extVal);
long candidateResult = extremeResult.getLong();
- long absCandidateResult = Math.abs(extremeResult.getLong());
- if (!initResult
- || (absExtVal > absCandidateResult)
- || (absExtVal == absCandidateResult) && extVal > candidateResult) {
+ if (!initResult || compareExtreme(extVal, candidateResult) > 0) {
initResult = true;
extremeResult.setLong(extVal);
}
@@ -356,4 +348,24 @@ private void updateDoubleResult(double extVal) {
extremeResult.setDouble(extVal);
}
}
+
+ private int compareExtreme(int left, int right) {
+ int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) right));
+ return absComparison == 0 ? Integer.compare(left, right) : absComparison;
+ }
+
+ private int compareExtreme(long left, long right) {
+ int absComparison = compareAbs(left, right);
+ return absComparison == 0 ? Long.compare(left, right) : absComparison;
+ }
+
+ private int compareAbs(long left, long right) {
+ if (left == Long.MIN_VALUE) {
+ return right == Long.MIN_VALUE ? 0 : 1;
+ }
+ if (right == Long.MIN_VALUE) {
+ return -1;
+ }
+ return Long.compare(Math.abs(left), Math.abs(right));
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
index e46a2597b220a..8a11ffc773961 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
@@ -77,26 +77,14 @@ private SlidingWindowAggregatorFactory() {}
(o1, o2) -> {
int value1 = o1.getInt(0);
int value2 = o2.getInt(0);
- if (Math.abs(value1) > Math.abs(value2)
- || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) {
- return 1;
- } else if (value1 == value2) {
- return 0;
- }
- return -1;
+ return compareExtreme(value1, value2);
});
extremeComparators.put(
TSDataType.INT64,
(o1, o2) -> {
long value1 = o1.getLong(0);
long value2 = o2.getLong(0);
- if (Math.abs(value1) > Math.abs(value2)
- || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) {
- return 1;
- } else if (value1 == value2) {
- return 0;
- }
- return -1;
+ return compareExtreme(value1, value2);
});
extremeComparators.put(
TSDataType.FLOAT,
@@ -249,4 +237,24 @@ public static SlidingWindowAggregator createSlidingWindowAggregator(
DataNodeQueryMessages.INVALID_AGGREGATION_TYPE + aggregationType);
}
}
+
+ static int compareExtreme(int left, int right) {
+ int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) right));
+ return absComparison == 0 ? Integer.compare(left, right) : absComparison;
+ }
+
+ static int compareExtreme(long left, long right) {
+ int absComparison = compareAbs(left, right);
+ return absComparison == 0 ? Long.compare(left, right) : absComparison;
+ }
+
+ private static int compareAbs(long left, long right) {
+ if (left == Long.MIN_VALUE) {
+ return right == Long.MIN_VALUE ? 0 : 1;
+ }
+ if (right == Long.MIN_VALUE) {
+ return -1;
+ }
+ return Long.compare(Math.abs(left), Math.abs(right));
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java
index 4c5e004c60d46..59f955580b918 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java
@@ -54,9 +54,9 @@ public boolean satisfy(Column column, int index) {
return true;
}
if (index == 0) {
- return Math.abs(column.getLong(index) - lastTsBlockTime) <= timeInterval;
+ return isTimeDistanceLessThanOrEqual(column.getLong(index), lastTsBlockTime);
}
- return Math.abs(column.getLong(index) - column.getLong(index - 1)) <= timeInterval;
+ return isTimeDistanceLessThanOrEqual(column.getLong(index), column.getLong(index - 1));
}
@Override
@@ -92,8 +92,8 @@ public boolean contains(Column column) {
long maxTime = Math.max(columnStartTime, columnEndTime);
boolean contains =
- Math.abs(columnStartTime - lastTsBlockTime) < timeInterval
- && maxTime - minTime <= timeInterval;
+ isTimeDistanceLessThan(columnStartTime, lastTsBlockTime)
+ && isTimeDistanceLessThanOrEqual(maxTime, minTime);
if (contains) {
if (!initializedTimeValue) {
startTime = Long.MAX_VALUE;
@@ -113,6 +113,22 @@ public long getTimeInterval() {
return timeInterval;
}
+ boolean isTimeDistanceLessThanOrEqual(long left, long right) {
+ return compareTimeDistance(left, right) <= 0;
+ }
+
+ private boolean isTimeDistanceLessThan(long left, long right) {
+ return compareTimeDistance(left, right) < 0;
+ }
+
+ private int compareTimeDistance(long left, long right) {
+ if (timeInterval < 0) {
+ return 1;
+ }
+ long distance = left >= right ? left - right : right - left;
+ return Long.compareUnsigned(distance, timeInterval);
+ }
+
public long getTimeValue() {
return timeValue;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java
index 42ca3becedd3a..bef50233b2263 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java
@@ -93,7 +93,7 @@ public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
for (; i < size; i++) {
long currentTime = timeColumn.getLong(i);
- if (Math.abs(currentTime - previousTimeValue) > sessionWindow.getTimeInterval()) {
+ if (!sessionWindow.isTimeDistanceLessThanOrEqual(currentTime, previousTimeValue)) {
sessionWindow.setTimeValue(previousTimeValue);
break;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
index e5bd6ca38c200..0fddb41d6e689 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java
@@ -138,7 +138,8 @@ protected TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplica
if (!selectRandomDataNode || queryContext.getSession() == null) {
targetIndex = 0;
} else {
- targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size());
+ targetIndex =
+ (int) Math.floorMod(queryContext.getSession().getSessionId(), availableDataNodes.size());
}
return availableDataNodes.get(targetIndex);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java
index 194eb5a8fa53b..0c91492e4b3ac 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java
@@ -65,11 +65,11 @@ private void initIfNecessary() {
public void lock(final int regionId) {
initIfNecessary();
- locks[regionId % locks.length].lock();
+ locks[Math.floorMod(regionId, locks.length)].lock();
}
public void unlock(final int regionId) {
initIfNecessary();
- locks[regionId % locks.length].unlock();
+ locks[Math.floorMod(regionId, locks.length)].unlock();
}
}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilterTest.java
new file mode 100644
index 0000000000000..cd04df0419498
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilterTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.sdt;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+
+public class SwingingDoorTrendingFilterTest {
+
+ @Test
+ public void testExtremeTimestampDistanceReachesMaxInterval() throws Exception {
+ final SwingingDoorTrendingFilter filter =
+ new SwingingDoorTrendingFilter<>(createProcessor(0, Long.MAX_VALUE, 0), Long.MIN_VALUE, 0);
+
+ Assert.assertTrue(filter.filter(Long.MAX_VALUE, 0));
+ }
+
+ private SwingingDoorTrendingSamplingProcessor createProcessor(
+ final long compressionMinTimeInterval,
+ final long compressionMaxTimeInterval,
+ final double compressionDeviation)
+ throws Exception {
+ final SwingingDoorTrendingSamplingProcessor processor =
+ new SwingingDoorTrendingSamplingProcessor();
+ setField(processor, "compressionMinTimeInterval", compressionMinTimeInterval);
+ setField(processor, "compressionMaxTimeInterval", compressionMaxTimeInterval);
+ setField(processor, "compressionDeviation", compressionDeviation);
+ return processor;
+ }
+
+ private void setField(final Object target, final String fieldName, final Object value)
+ throws Exception {
+ final Field field = target.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLockTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLockTest.java
new file mode 100644
index 0000000000000..e1b53e0dc6ca8
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLockTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.tsfile;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+public class PipeTsFileResourceSegmentLockTest {
+
+ @Test
+ public void minValueHashCodeTest() throws InterruptedException {
+ int originalSegmentLockNum =
+ CommonDescriptor.getInstance().getConfig().getPipeTsFileResourceSegmentLockNum();
+ CommonDescriptor.getInstance().getConfig().setPipeTsFileResourceSegmentLockNum(32);
+
+ try {
+ PipeTsFileResourceSegmentLock segmentLock = new PipeTsFileResourceSegmentLock();
+ File file = new MinValueHashCodeFile("target/min-value-hash-code.tsfile");
+
+ Assert.assertEquals(Integer.MIN_VALUE, file.hashCode());
+
+ segmentLock.lock(file);
+ try {
+ Assert.assertTrue(segmentLock.tryLock(file, 1, TimeUnit.MILLISECONDS));
+ segmentLock.unlock(file);
+ } finally {
+ segmentLock.unlock(file);
+ }
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeTsFileResourceSegmentLockNum(originalSegmentLockNum);
+ }
+ }
+
+ private static class MinValueHashCodeFile extends File {
+
+ private static final long serialVersionUID = 1L;
+
+ private MinValueHashCodeFile(String pathname) {
+ super(pathname);
+ }
+
+ @Override
+ public int hashCode() {
+ return Integer.MIN_VALUE;
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
index c81981eb1618d..108f58224484b 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
@@ -271,6 +271,52 @@ public void extremeAccumulatorTest() {
Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001);
}
+ @Test
+ public void extremeAccumulatorMinValueTest() {
+ Accumulator intAccumulator =
+ AccumulatorFactory.createBuiltinAccumulator(
+ TAggregationType.EXTREME,
+ Collections.singletonList(TSDataType.INT32),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ true);
+ TsBlockBuilder intBlockBuilder =
+ new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ intBlockBuilder.getTimeColumnBuilder().writeLong(0);
+ intBlockBuilder.getValueColumnBuilders()[0].writeInt(Integer.MAX_VALUE);
+ intBlockBuilder.declarePosition();
+ intBlockBuilder.getTimeColumnBuilder().writeLong(1);
+ intBlockBuilder.getValueColumnBuilders()[0].writeInt(Integer.MIN_VALUE);
+ intBlockBuilder.declarePosition();
+ TsBlock intBlock = intBlockBuilder.build();
+ intAccumulator.addInput(new Column[] {intBlock.getTimeColumn(), intBlock.getColumn(0)}, null);
+ ColumnBuilder intFinalResult = new IntColumnBuilder(null, 1);
+ intAccumulator.outputFinal(intFinalResult);
+ Assert.assertEquals(Integer.MIN_VALUE, intFinalResult.build().getInt(0));
+
+ Accumulator longAccumulator =
+ AccumulatorFactory.createBuiltinAccumulator(
+ TAggregationType.EXTREME,
+ Collections.singletonList(TSDataType.INT64),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ true);
+ TsBlockBuilder longBlockBuilder =
+ new TsBlockBuilder(Collections.singletonList(TSDataType.INT64));
+ longBlockBuilder.getTimeColumnBuilder().writeLong(0);
+ longBlockBuilder.getValueColumnBuilders()[0].writeLong(Long.MAX_VALUE);
+ longBlockBuilder.declarePosition();
+ longBlockBuilder.getTimeColumnBuilder().writeLong(1);
+ longBlockBuilder.getValueColumnBuilders()[0].writeLong(Long.MIN_VALUE);
+ longBlockBuilder.declarePosition();
+ TsBlock longBlock = longBlockBuilder.build();
+ longAccumulator.addInput(
+ new Column[] {longBlock.getTimeColumn(), longBlock.getColumn(0)}, null);
+ ColumnBuilder longFinalResult = new LongColumnBuilder(null, 1);
+ longAccumulator.outputFinal(longFinalResult);
+ Assert.assertEquals(Long.MIN_VALUE, longFinalResult.build().getLong(0));
+ }
+
@Test
public void firstValueAccumulatorTest() {
Accumulator firstValueAccumulator =
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactoryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactoryTest.java
new file mode 100644
index 0000000000000..a029441b4dc80
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactoryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.aggregation.slidingwindow;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SlidingWindowAggregatorFactoryTest {
+
+ @Test
+ public void compareExtremeMinValueTest() {
+ Assert.assertTrue(
+ SlidingWindowAggregatorFactory.compareExtreme(Integer.MIN_VALUE, Integer.MAX_VALUE) > 0);
+ Assert.assertTrue(
+ SlidingWindowAggregatorFactory.compareExtreme(Integer.MAX_VALUE, Integer.MIN_VALUE) < 0);
+ Assert.assertTrue(SlidingWindowAggregatorFactory.compareExtreme(1, -1) > 0);
+
+ Assert.assertTrue(
+ SlidingWindowAggregatorFactory.compareExtreme(Long.MIN_VALUE, Long.MAX_VALUE) > 0);
+ Assert.assertTrue(
+ SlidingWindowAggregatorFactory.compareExtreme(Long.MAX_VALUE, Long.MIN_VALUE) < 0);
+ Assert.assertTrue(SlidingWindowAggregatorFactory.compareExtreme(1L, -1L) > 0);
+ }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java
new file mode 100644
index 0000000000000..4f42ea9117d71
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.window;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+
+public class SessionWindowTest {
+
+ @Test
+ public void satisfyHandlesOverflowedTimeDistance() {
+ SessionWindow window = new SessionWindow(1, true);
+ Column timeColumn = buildTimeColumn(Long.MIN_VALUE, Long.MAX_VALUE);
+ window.mergeOnePoint(new Column[] {timeColumn}, 0);
+
+ Assert.assertFalse(window.satisfy(timeColumn, 1));
+ }
+
+ private Column buildTimeColumn(long... timestamps) {
+ TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+ for (long timestamp : timestamps) {
+ builder.getTimeColumnBuilder().writeLong(timestamp);
+ builder.getColumnBuilder(0).appendNull();
+ builder.declarePosition();
+ }
+ TsBlock tsBlock = builder.build();
+ return tsBlock.getTimeColumn();
+ }
+}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncAINodeInternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncAINodeInternalServiceClient.java
index 7977b43653625..b031198be4f39 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncAINodeInternalServiceClient.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncAINodeInternalServiceClient.java
@@ -145,7 +145,7 @@ public PooledObject makeObject(TEndPoint endPo
new AsyncAINodeInternalServiceClient(
thriftClientProperty,
endPoint,
- tManagers[clientCnt.incrementAndGet() % tManagers.length],
+ tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)],
clientManager));
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
index 26aa3c80b0b16..ac7772be3e1c3 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
@@ -190,7 +190,7 @@ public PooledObject makeObject(TEndPoint e
new AsyncConfigNodeInternalServiceClient(
thriftClientProperty,
endPoint,
- tManagers[clientCnt.incrementAndGet() % tManagers.length],
+ tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)],
clientManager));
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java
index f6096eb44cd7f..06127464fb7b1 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java
@@ -174,7 +174,7 @@ public PooledObject makeObject(TEndPoint end
new AsyncDataNodeExternalServiceClient(
thriftClientProperty,
endPoint,
- tManagers[clientCnt.incrementAndGet() % tManagers.length],
+ tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)],
clientManager));
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index 5f6aa0a6bb4e0..3d1566d435071 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -202,7 +202,7 @@ public PooledObject makeObject(TEndPoint end
new AsyncDataNodeInternalServiceClient(
thriftClientProperty,
endPoint,
- tManagers[clientCnt.incrementAndGet() % tManagers.length],
+ tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)],
clientManager));
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
index 2329281062da6..072a5f89a6221 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
@@ -162,7 +162,7 @@ public PooledObject makeObject(TEndPo
new AsyncDataNodeMPPDataExchangeServiceClient(
thriftClientProperty,
endPoint,
- tManagers[clientCnt.incrementAndGet() % tManagers.length],
+ tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)],
clientManager));
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncIoTConsensusV2ServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncIoTConsensusV2ServiceClient.java
index f4b02eda0f69b..cc8b6f6e1bfbf 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncIoTConsensusV2ServiceClient.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncIoTConsensusV2ServiceClient.java
@@ -171,7 +171,7 @@ public PooledObject makeObject(TEndPoint endPo
new AsyncIoTConsensusV2ServiceClient(
thriftClientProperty,
endPoint,
- tManagers[clientCnt.incrementAndGet() % tManagers.length],
+ tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)],
clientManager));
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 42bba91f1fb32..7e01b08ef2120 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -234,7 +234,7 @@ public PooledObject makeObject(final TEndPoi
new AsyncPipeDataTransferServiceClient(
thriftClientProperty,
endPoint,
- tManagers[clientCnt.incrementAndGet() % tManagers.length],
+ tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)],
clientManager));
}
From 2a5ef1c0f34e4879ee5023b76d43c41259292c97 Mon Sep 17 00:00:00 2001
From: Caideyipi <87789683+Caideyipi@users.noreply.github.com>
Date: Wed, 10 Jun 2026 11:22:25 +0800
Subject: [PATCH 2/2] Add overflow edge case unit tests
---
.../filter/FixedIntervalFillFilterTest.java | 7 +
.../aggregation/ExtremeAccumulatorTest.java | 49 +++++
.../changing/ChangingValueFilterTest.java | 55 +++++
.../TumblingTimeSamplingProcessorTest.java | 192 ++++++++++++++++++
.../operator/window/SessionWindowTest.java | 22 +-
...criptionPipeEventBatchSegmentLockTest.java | 50 +++++
6 files changed, 373 insertions(+), 2 deletions(-)
create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilterTest.java
create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessorTest.java
create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLockTest.java
diff --git a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java
index 544c95b8cadf9..a40f6ec48f694 100644
--- a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java
+++ b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java
@@ -31,4 +31,11 @@ public void needFillHandlesOverflowedTimeDistance() {
Assert.assertTrue(filter.needFill(Long.MIN_VALUE, Long.MIN_VALUE + 1));
Assert.assertFalse(filter.needFill(Long.MIN_VALUE, Long.MAX_VALUE));
}
+
+ @Test
+ public void needFillRejectsNegativeInterval() {
+ FixedIntervalFillFilter filter = new FixedIntervalFillFilter(-1);
+
+ Assert.assertFalse(filter.needFill(0, 0));
+ }
}
diff --git a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java
index c796f499a96e7..1edb86100138a 100644
--- a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java
+++ b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java
@@ -25,6 +25,8 @@
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.tsfile.read.common.block.column.FloatColumnBuilder;
import org.apache.tsfile.read.common.block.column.IntColumnBuilder;
import org.apache.tsfile.read.common.block.column.LongColumnBuilder;
import org.junit.Assert;
@@ -77,6 +79,31 @@ public void groupedExtremeAccumulatorKeepsOriginalValueAndHandlesMinValues() {
Assert.assertEquals(Long.MIN_VALUE, longResult.build().getLong(0));
}
+ @Test
+ public void groupedExtremeAccumulatorKeepsOriginalFloatingValues() {
+ GroupedExtremeAccumulator floatAccumulator = new GroupedExtremeAccumulator(TSDataType.FLOAT);
+ floatAccumulator.setGroupCount(1);
+ TsBlock floatBlock = buildFloatBlock(-5.5f, 4.5f);
+ floatAccumulator.addInput(
+ new int[] {0, 0},
+ new org.apache.tsfile.block.column.Column[] {floatBlock.getColumn(0)},
+ AggregationMask.createSelectAll(floatBlock.getPositionCount()));
+ ColumnBuilder floatResult = new FloatColumnBuilder(null, 1);
+ floatAccumulator.evaluateFinal(0, floatResult);
+ Assert.assertEquals(-5.5f, floatResult.build().getFloat(0), 0.001);
+
+ GroupedExtremeAccumulator doubleAccumulator = new GroupedExtremeAccumulator(TSDataType.DOUBLE);
+ doubleAccumulator.setGroupCount(1);
+ TsBlock doubleBlock = buildDoubleBlock(-10.25, 9.25);
+ doubleAccumulator.addInput(
+ new int[] {0, 0},
+ new org.apache.tsfile.block.column.Column[] {doubleBlock.getColumn(0)},
+ AggregationMask.createSelectAll(doubleBlock.getPositionCount()));
+ ColumnBuilder doubleResult = new DoubleColumnBuilder(null, 1);
+ doubleAccumulator.evaluateFinal(0, doubleResult);
+ Assert.assertEquals(-10.25, doubleResult.build().getDouble(0), 0.001);
+ }
+
private TsBlock buildIntBlock(int... values) {
TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
ColumnBuilder valueBuilder = builder.getValueColumnBuilders()[0];
@@ -98,4 +125,26 @@ private TsBlock buildLongBlock(long... values) {
}
return builder.build();
}
+
+ private TsBlock buildFloatBlock(float... values) {
+ TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.FLOAT));
+ ColumnBuilder valueBuilder = builder.getValueColumnBuilders()[0];
+ for (int i = 0; i < values.length; i++) {
+ builder.getTimeColumnBuilder().writeLong(i);
+ valueBuilder.writeFloat(values[i]);
+ builder.declarePosition();
+ }
+ return builder.build();
+ }
+
+ private TsBlock buildDoubleBlock(double... values) {
+ TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.DOUBLE));
+ ColumnBuilder valueBuilder = builder.getValueColumnBuilders()[0];
+ for (int i = 0; i < values.length; i++) {
+ builder.getTimeColumnBuilder().writeLong(i);
+ valueBuilder.writeDouble(values[i]);
+ builder.declarePosition();
+ }
+ return builder.build();
+ }
}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilterTest.java
new file mode 100644
index 0000000000000..2bf4da9957c18
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilterTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.changing;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+
+public class ChangingValueFilterTest {
+
+ @Test
+ public void testExtremeTimestampDistanceReachesMaxInterval() throws Exception {
+ final ChangingValueFilter filter =
+ new ChangingValueFilter<>(createProcessor(0, Long.MAX_VALUE, 0), Long.MIN_VALUE, 0);
+
+ Assert.assertTrue(filter.filter(Long.MAX_VALUE, 0));
+ }
+
+ private ChangingValueSamplingProcessor createProcessor(
+ final long compressionMinTimeInterval,
+ final long compressionMaxTimeInterval,
+ final double compressionDeviation)
+ throws Exception {
+ final ChangingValueSamplingProcessor processor = new ChangingValueSamplingProcessor();
+ setField(processor, "compressionMinTimeInterval", compressionMinTimeInterval);
+ setField(processor, "compressionMaxTimeInterval", compressionMaxTimeInterval);
+ setField(processor, "compressionDeviation", compressionDeviation);
+ return processor;
+ }
+
+ private void setField(final Object target, final String fieldName, final Object value)
+ throws Exception {
+ final Field field = target.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessorTest.java
new file mode 100644
index 0000000000000..df18dba434e7d
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessorTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.tumbling;
+
+import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache;
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+import org.apache.iotdb.pipe.api.type.Binary;
+import org.apache.iotdb.pipe.api.type.Type;
+
+import org.apache.tsfile.read.common.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.time.LocalDate;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TumblingTimeSamplingProcessorTest {
+
+ @Test
+ public void testExtremeTimestampDistanceReachesInterval() throws Exception {
+ final TumblingTimeSamplingProcessor processor = new TumblingTimeSamplingProcessor();
+ final TestLastObjectCache cache = new TestLastObjectCache();
+ setField(processor, "intervalInCurrentPrecision", Long.MAX_VALUE);
+ setField(processor, "pathLastObjectCache", cache);
+
+ try {
+ final CountingRowCollector rowCollector = new CountingRowCollector();
+
+ processor.processRow(
+ new TestRow(Long.MIN_VALUE), rowCollector, "root.db.d1", new AtomicReference<>());
+ processor.processRow(
+ new TestRow(Long.MAX_VALUE), rowCollector, "root.db.d1", new AtomicReference<>());
+ processor.processRow(
+ new TestRow(Long.MAX_VALUE - 1), rowCollector, "root.db.d1", new AtomicReference<>());
+
+ Assert.assertEquals(2, rowCollector.getCollectedRowCount());
+ } finally {
+ cache.close();
+ }
+ }
+
+ private void setField(final Object target, final String fieldName, final Object value)
+ throws Exception {
+ final Field field = target.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+
+ private static class TestLastObjectCache extends PartialPathLastObjectCache {
+
+ private TestLastObjectCache() {
+ super(1024);
+ }
+
+ @Override
+ protected long calculateMemoryUsage(final Long object) {
+ return Long.BYTES;
+ }
+ }
+
+ private static class CountingRowCollector implements RowCollector {
+
+ private final AtomicInteger collectedRowCount = new AtomicInteger();
+
+ @Override
+ public void collectRow(final Row row) throws IOException {
+ collectedRowCount.incrementAndGet();
+ }
+
+ private int getCollectedRowCount() {
+ return collectedRowCount.get();
+ }
+ }
+
+ private static class TestRow implements Row {
+
+ private final long timestamp;
+
+ private TestRow(final long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public long getTime() {
+ return timestamp;
+ }
+
+ @Override
+ public int getInt(final int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LocalDate getDate(final int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getLong(final int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public float getFloat(final int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public double getDouble(final int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean getBoolean(final int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Binary getBinary(final int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getString(final int columnIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object getObject(final int columnIndex) {
+ return 1;
+ }
+
+ @Override
+ public Type getDataType(final int columnIndex) {
+ return Type.INT32;
+ }
+
+ @Override
+ public boolean isNull(final int columnIndex) {
+ return false;
+ }
+
+ @Override
+ public int size() {
+ return 1;
+ }
+
+ @Override
+ public int getColumnIndex(final Path columnName) throws PipeParameterNotValidException {
+ return 0;
+ }
+
+ @Override
+ public String getColumnName(final int columnIndex) {
+ return "s1";
+ }
+
+ @Override
+ public List getColumnTypes() {
+ return Collections.singletonList(Type.INT32);
+ }
+
+ @Override
+ public String getDeviceId() {
+ return "root.db.d1";
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java
index 4f42ea9117d71..ea534a76ebc82 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java
@@ -39,14 +39,32 @@ public void satisfyHandlesOverflowedTimeDistance() {
Assert.assertFalse(window.satisfy(timeColumn, 1));
}
+ @Test
+ public void skipPointsOutOfCurWindowHandlesOverflowedTimeDistance() {
+ SessionWindowManager manager = new SessionWindowManager(false, 1, true);
+ manager.initCurWindow();
+ Column previousTimeColumn = buildTimeColumn(Long.MIN_VALUE);
+ manager.getCurWindow().mergeOnePoint(new Column[] {previousTimeColumn}, 0);
+ manager.next();
+
+ TsBlock nextBlock = buildTsBlock(Long.MAX_VALUE);
+ TsBlock skippedBlock = manager.skipPointsOutOfCurWindow(nextBlock);
+
+ Assert.assertEquals(1, skippedBlock.getPositionCount());
+ Assert.assertEquals(Long.MAX_VALUE, skippedBlock.getTimeColumn().getLong(0));
+ }
+
private Column buildTimeColumn(long... timestamps) {
+ return buildTsBlock(timestamps).getTimeColumn();
+ }
+
+ private TsBlock buildTsBlock(long... timestamps) {
TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
for (long timestamp : timestamps) {
builder.getTimeColumnBuilder().writeLong(timestamp);
builder.getColumnBuilder(0).appendNull();
builder.declarePosition();
}
- TsBlock tsBlock = builder.build();
- return tsBlock.getTimeColumn();
+ return builder.build();
}
}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLockTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLockTest.java
new file mode 100644
index 0000000000000..7488f5ffa26e4
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLockTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.subscription.event.batch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class SubscriptionPipeEventBatchSegmentLockTest {
+
+ @Test
+ public void negativeRegionIdTest() throws Exception {
+ final SubscriptionPipeEventBatchSegmentLock segmentLock =
+ new SubscriptionPipeEventBatchSegmentLock();
+
+ segmentLock.lock(-1);
+ try {
+ final ReentrantLock[] locks = getLocks(segmentLock);
+ Assert.assertTrue(locks[locks.length - 1].isHeldByCurrentThread());
+ } finally {
+ segmentLock.unlock(-1);
+ }
+ }
+
+ private ReentrantLock[] getLocks(final SubscriptionPipeEventBatchSegmentLock segmentLock)
+ throws Exception {
+ final Field locksField = SubscriptionPipeEventBatchSegmentLock.class.getDeclaredField("locks");
+ locksField.setAccessible(true);
+ return (ReentrantLock[]) locksField.get(segmentLock);
+ }
+}