From a0c0fb5f88c2b068efac52ad452dc7d1b2f866fc Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 9 Jun 2026 14:26:23 +0800 Subject: [PATCH 1/2] Fix overflow edge cases in query utilities --- iotdb-core/calc-commons/pom.xml | 5 + .../fill/filter/FixedIntervalFillFilter.java | 10 +- .../aggregation/ExtremeAccumulator.java | 32 ++++-- .../grouped/GroupedExtremeAccumulator.java | 70 ++++++++---- .../filter/FixedIntervalFillFilterTest.java | 34 ++++++ .../aggregation/ExtremeAccumulatorTest.java | 101 ++++++++++++++++++ .../router/leader/HashLeaderBalancer.java | 2 +- .../router/leader/HashLeaderBalancerTest.java | 65 +++++++++++ .../AsyncIoTConsensusServiceClient.java | 2 +- .../changing/ChangingValueFilter.java | 20 +++- .../sdt/SwingingDoorTrendingFilter.java | 19 +++- .../TumblingTimeSamplingProcessor.java | 7 +- .../sender/TwoStageAggregateSender.java | 2 +- .../tsfile/PipeTsFileResourceSegmentLock.java | 10 +- .../aggregation/ExtremeAccumulator.java | 32 ++++-- .../SlidingWindowAggregatorFactory.java | 36 ++++--- .../operator/window/SessionWindow.java | 24 ++++- .../operator/window/SessionWindowManager.java | 2 +- .../plan/AbstractFragmentParallelPlanner.java | 3 +- ...SubscriptionPipeEventBatchSegmentLock.java | 4 +- .../sdt/SwingingDoorTrendingFilterTest.java | 56 ++++++++++ .../PipeTsFileResourceSegmentLockTest.java | 71 ++++++++++++ .../aggregation/AccumulatorTest.java | 46 ++++++++ .../SlidingWindowAggregatorFactoryTest.java | 41 +++++++ .../operator/window/SessionWindowTest.java | 52 +++++++++ .../AsyncAINodeInternalServiceClient.java | 2 +- .../AsyncConfigNodeInternalServiceClient.java | 2 +- .../AsyncDataNodeExternalServiceClient.java | 2 +- .../AsyncDataNodeInternalServiceClient.java | 2 +- ...cDataNodeMPPDataExchangeServiceClient.java | 2 +- .../AsyncIoTConsensusV2ServiceClient.java | 2 +- .../AsyncPipeDataTransferServiceClient.java | 2 +- 32 files changed, 676 insertions(+), 84 deletions(-) create mode 100644 iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java create mode 100644 iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancerTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilterTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLockTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactoryTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java diff --git a/iotdb-core/calc-commons/pom.xml b/iotdb-core/calc-commons/pom.xml index d854f29cce77e..fbe69d074c839 100644 --- a/iotdb-core/calc-commons/pom.xml +++ b/iotdb-core/calc-commons/pom.xml @@ -99,6 +99,11 @@ at.yawk.lz4 lz4-java + + junit + junit + test + diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilter.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilter.java index 8fbd4ce379d78..e7dd783b38722 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilter.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilter.java @@ -34,6 +34,14 @@ public FixedIntervalFillFilter(long timeInterval) { public boolean needFill(long time, long previousTime) { // the reason that we use Math.abs is that we may use order by time desc which will cause // previousTime is larger than time - return Math.abs(time - previousTime) <= timeInterval; + return isTimeDistanceLessThanOrEqual(time, previousTime, timeInterval); + } + + private boolean isTimeDistanceLessThanOrEqual(long left, long right, long maxDistance) { + if (maxDistance < 0) { + return false; + } + long distance = left >= right ? left - right : right - left; + return Long.compareUnsigned(distance, maxDistance) <= 0; } } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulator.java index 85fbee4bd5ea9..f546282d31b6b 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulator.java @@ -247,13 +247,9 @@ private void addIntInput(Column column, AggregationMask mask) { } private void updateIntResult(int val) { - int absExtVal = Math.abs(val); int candidateResult = extremeResult.getInt(); - int absCandidateResult = Math.abs(extremeResult.getInt()); - if (!initResult - || (absExtVal > absCandidateResult) - || (absExtVal == absCandidateResult) && val > candidateResult) { + if (!initResult || compareExtreme(val, candidateResult) > 0) { initResult = true; extremeResult.setInt(val); } @@ -281,13 +277,9 @@ private void addLongInput(Column column, AggregationMask mask) { } private void updateLongResult(long val) { - long absExtVal = Math.abs(val); long candidateResult = extremeResult.getLong(); - long absCandidateResult = Math.abs(extremeResult.getLong()); - if (!initResult - || (absExtVal > absCandidateResult) - || (absExtVal == absCandidateResult) && val > candidateResult) { + if (!initResult || compareExtreme(val, candidateResult) > 0) { initResult = true; extremeResult.setLong(val); } @@ -360,4 +352,24 @@ private void updateDoubleResult(double val) { extremeResult.setDouble(val); } } + + private int compareExtreme(int left, int right) { + int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) right)); + return absComparison == 0 ? Integer.compare(left, right) : absComparison; + } + + private int compareExtreme(long left, long right) { + int absComparison = compareAbs(left, right); + return absComparison == 0 ? Long.compare(left, right) : absComparison; + } + + private int compareAbs(long left, long right) { + if (left == Long.MIN_VALUE) { + return right == Long.MIN_VALUE ? 0 : 1; + } + if (right == Long.MIN_VALUE) { + return -1; + } + return Long.compare(Math.abs(left), Math.abs(right)); + } } diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java index bd5aca876b7af..b4102de7aa041 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java @@ -173,16 +173,16 @@ public void addIntermediate(int[] groupIds, Column argument) { switch (seriesDataType) { case INT32: - updateIntValue(groupIds[i], Math.abs(argument.getInt(i))); + updateIntValue(groupIds[i], argument.getInt(i)); break; case INT64: - updateLongValue(groupIds[i], Math.abs(argument.getLong(i))); + updateLongValue(groupIds[i], argument.getLong(i)); break; case FLOAT: - updateFloatValue(groupIds[i], Math.abs(argument.getFloat(i))); + updateFloatValue(groupIds[i], argument.getFloat(i)); break; case DOUBLE: - updateDoubleValue(groupIds[i], Math.abs(argument.getDouble(i))); + updateDoubleValue(groupIds[i], argument.getDouble(i)); break; case TEXT: case STRING: @@ -300,7 +300,7 @@ private void addIntInput(int[] groupIds, Column valueColumn, AggregationMask mas if (mask.isSelectAll()) { for (int i = 0; i < positionCount; i++) { if (!valueColumn.isNull(i)) { - updateIntValue(groupIds[i], Math.abs(valueColumn.getInt(i))); + updateIntValue(groupIds[i], valueColumn.getInt(i)); } } } else { @@ -309,15 +309,15 @@ private void addIntInput(int[] groupIds, Column valueColumn, AggregationMask mas for (int i = 0; i < positionCount; i++) { position = selectedPositions[i]; if (!valueColumn.isNull(position)) { - updateIntValue(groupIds[position], Math.abs(valueColumn.getInt(position))); + updateIntValue(groupIds[position], valueColumn.getInt(position)); } } } } protected void updateIntValue(int groupId, int value) { - int max = intValues.get(groupId); - if (value >= max) { + int candidate = intValues.get(groupId); + if (!inits.get(groupId) || compareExtreme(value, candidate) >= 0) { inits.set(groupId, true); intValues.set(groupId, value); } @@ -329,7 +329,7 @@ private void addLongInput(int[] groupIds, Column valueColumn, AggregationMask ma if (mask.isSelectAll()) { for (int i = 0; i < positionCount; i++) { if (!valueColumn.isNull(i)) { - updateLongValue(groupIds[i], Math.abs(valueColumn.getLong(i))); + updateLongValue(groupIds[i], valueColumn.getLong(i)); } } } else { @@ -338,15 +338,15 @@ private void addLongInput(int[] groupIds, Column valueColumn, AggregationMask ma for (int i = 0; i < positionCount; i++) { position = selectedPositions[i]; if (!valueColumn.isNull(position)) { - updateLongValue(groupIds[position], Math.abs(valueColumn.getLong(position))); + updateLongValue(groupIds[position], valueColumn.getLong(position)); } } } } protected void updateLongValue(int groupId, long value) { - long max = longValues.get(groupId); - if (value >= max) { + long candidate = longValues.get(groupId); + if (!inits.get(groupId) || compareExtreme(value, candidate) >= 0) { inits.set(groupId, true); longValues.set(groupId, value); } @@ -358,7 +358,7 @@ private void addFloatInput(int[] groupIds, Column valueColumn, AggregationMask m if (mask.isSelectAll()) { for (int i = 0; i < positionCount; i++) { if (!valueColumn.isNull(i)) { - updateFloatValue(groupIds[i], Math.abs(valueColumn.getFloat(i))); + updateFloatValue(groupIds[i], valueColumn.getFloat(i)); } } } else { @@ -367,15 +367,15 @@ private void addFloatInput(int[] groupIds, Column valueColumn, AggregationMask m for (int i = 0; i < positionCount; i++) { position = selectedPositions[i]; if (!valueColumn.isNull(position)) { - updateFloatValue(groupIds[position], Math.abs(valueColumn.getFloat(position))); + updateFloatValue(groupIds[position], valueColumn.getFloat(position)); } } } } protected void updateFloatValue(int groupId, float value) { - float max = floatValues.get(groupId); - if (value >= max) { + float candidate = floatValues.get(groupId); + if (!inits.get(groupId) || compareExtreme(value, candidate) >= 0) { inits.set(groupId, true); floatValues.set(groupId, value); } @@ -387,7 +387,7 @@ private void addDoubleInput(int[] groupIds, Column valueColumn, AggregationMask if (mask.isSelectAll()) { for (int i = 0; i < positionCount; i++) { if (!valueColumn.isNull(i)) { - updateDoubleValue(groupIds[i], Math.abs(valueColumn.getDouble(i))); + updateDoubleValue(groupIds[i], valueColumn.getDouble(i)); } } } else { @@ -396,17 +396,47 @@ private void addDoubleInput(int[] groupIds, Column valueColumn, AggregationMask for (int i = 0; i < positionCount; i++) { position = selectedPositions[i]; if (!valueColumn.isNull(position)) { - updateDoubleValue(groupIds[position], Math.abs(valueColumn.getDouble(position))); + updateDoubleValue(groupIds[position], valueColumn.getDouble(position)); } } } } protected void updateDoubleValue(int groupId, double value) { - double max = doubleValues.get(groupId); - if (value >= max) { + double candidate = doubleValues.get(groupId); + if (!inits.get(groupId) || compareExtreme(value, candidate) >= 0) { inits.set(groupId, true); doubleValues.set(groupId, value); } } + + private int compareExtreme(int left, int right) { + int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) right)); + return absComparison == 0 ? Integer.compare(left, right) : absComparison; + } + + private int compareExtreme(long left, long right) { + int absComparison = compareAbs(left, right); + return absComparison == 0 ? Long.compare(left, right) : absComparison; + } + + private int compareAbs(long left, long right) { + if (left == Long.MIN_VALUE) { + return right == Long.MIN_VALUE ? 0 : 1; + } + if (right == Long.MIN_VALUE) { + return -1; + } + return Long.compare(Math.abs(left), Math.abs(right)); + } + + private int compareExtreme(float left, float right) { + int absComparison = Float.compare(Math.abs(left), Math.abs(right)); + return absComparison == 0 ? Float.compare(left, right) : absComparison; + } + + private int compareExtreme(double left, double right) { + int absComparison = Double.compare(Math.abs(left), Math.abs(right)); + return absComparison == 0 ? Double.compare(left, right) : absComparison; + } } diff --git a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java new file mode 100644 index 0000000000000..544c95b8cadf9 --- /dev/null +++ b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.calc.execution.operator.process.fill.filter; + +import org.junit.Assert; +import org.junit.Test; + +public class FixedIntervalFillFilterTest { + + @Test + public void needFillHandlesOverflowedTimeDistance() { + FixedIntervalFillFilter filter = new FixedIntervalFillFilter(1); + + Assert.assertTrue(filter.needFill(Long.MIN_VALUE, Long.MIN_VALUE + 1)); + Assert.assertFalse(filter.needFill(Long.MIN_VALUE, Long.MAX_VALUE)); + } +} diff --git a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java new file mode 100644 index 0000000000000..c796f499a96e7 --- /dev/null +++ b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.calc.execution.operator.source.relational.aggregation; + +import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedExtremeAccumulator; + +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.IntColumnBuilder; +import org.apache.tsfile.read.common.block.column.LongColumnBuilder; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; + +public class ExtremeAccumulatorTest { + + @Test + public void tableExtremeAccumulatorHandlesMinValues() { + ExtremeAccumulator intAccumulator = new ExtremeAccumulator(TSDataType.INT32); + TsBlock intBlock = buildIntBlock(Integer.MAX_VALUE, Integer.MIN_VALUE); + intAccumulator.addInput( + new org.apache.tsfile.block.column.Column[] {intBlock.getColumn(0)}, + AggregationMask.createSelectAll(intBlock.getPositionCount())); + ColumnBuilder intResult = new IntColumnBuilder(null, 1); + intAccumulator.evaluateFinal(intResult); + Assert.assertEquals(Integer.MIN_VALUE, intResult.build().getInt(0)); + + ExtremeAccumulator longAccumulator = new ExtremeAccumulator(TSDataType.INT64); + TsBlock longBlock = buildLongBlock(Long.MAX_VALUE, Long.MIN_VALUE); + longAccumulator.addInput( + new org.apache.tsfile.block.column.Column[] {longBlock.getColumn(0)}, + AggregationMask.createSelectAll(longBlock.getPositionCount())); + ColumnBuilder longResult = new LongColumnBuilder(null, 1); + longAccumulator.evaluateFinal(longResult); + Assert.assertEquals(Long.MIN_VALUE, longResult.build().getLong(0)); + } + + @Test + public void groupedExtremeAccumulatorKeepsOriginalValueAndHandlesMinValues() { + GroupedExtremeAccumulator intAccumulator = new GroupedExtremeAccumulator(TSDataType.INT32); + intAccumulator.setGroupCount(1); + TsBlock intBlock = buildIntBlock(-5, 4); + intAccumulator.addInput( + new int[] {0, 0}, + new org.apache.tsfile.block.column.Column[] {intBlock.getColumn(0)}, + AggregationMask.createSelectAll(intBlock.getPositionCount())); + ColumnBuilder intResult = new IntColumnBuilder(null, 1); + intAccumulator.evaluateFinal(0, intResult); + Assert.assertEquals(-5, intResult.build().getInt(0)); + + GroupedExtremeAccumulator longAccumulator = new GroupedExtremeAccumulator(TSDataType.INT64); + longAccumulator.setGroupCount(1); + TsBlock longBlock = buildLongBlock(Long.MAX_VALUE, Long.MIN_VALUE); + longAccumulator.addIntermediate(new int[] {0, 0}, longBlock.getColumn(0)); + ColumnBuilder longResult = new LongColumnBuilder(null, 1); + longAccumulator.evaluateFinal(0, longResult); + Assert.assertEquals(Long.MIN_VALUE, longResult.build().getLong(0)); + } + + private TsBlock buildIntBlock(int... values) { + TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32)); + ColumnBuilder valueBuilder = builder.getValueColumnBuilders()[0]; + for (int i = 0; i < values.length; i++) { + builder.getTimeColumnBuilder().writeLong(i); + valueBuilder.writeInt(values[i]); + builder.declarePosition(); + } + return builder.build(); + } + + private TsBlock buildLongBlock(long... values) { + TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT64)); + ColumnBuilder valueBuilder = builder.getValueColumnBuilders()[0]; + for (int i = 0; i < values.length; i++) { + builder.getTimeColumnBuilder().writeLong(i); + valueBuilder.writeLong(values[i]); + builder.declarePosition(); + } + return builder.build(); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java index aff023e82da8a..06ad4a76eb513 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancer.java @@ -43,7 +43,7 @@ public Map generateOptimalLeaderDistribution( (gid, nodeSet) -> { List nodeList = new ArrayList<>(nodeSet); nodeList.sort(null); - int startNodeIndex = Math.abs(gid.hashCode()) % nodeList.size(); + int startNodeIndex = (int) (Math.abs((long) gid.hashCode()) % nodeList.size()); int finalNodeId = nodeList.get(startNodeIndex); for (int i = 0; i < nodeList.size(); i++) { int currentNodeIndex = (startNodeIndex + i) % nodeList.size(); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancerTest.java new file mode 100644 index 0000000000000..f2ccccd857d2f --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/HashLeaderBalancerTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.router.leader; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +public class HashLeaderBalancerTest { + + private static final HashLeaderBalancer BALANCER = new HashLeaderBalancer(); + private static final int MIN_VALUE_HASH_REGION_ID = 268255235; + + @Test + public void minValueHashCodeTest() { + TConsensusGroupId regionGroupId = + new TConsensusGroupId(TConsensusGroupType.DataRegion, MIN_VALUE_HASH_REGION_ID); + Assert.assertEquals(Integer.MIN_VALUE, regionGroupId.hashCode()); + + Map> regionReplicaSetMap = new TreeMap<>(); + regionReplicaSetMap.put(regionGroupId, new HashSet<>(Arrays.asList(1, 2, 3))); + + Map dataNodeStatisticsMap = new TreeMap<>(); + dataNodeStatisticsMap.put(1, new NodeStatistics(NodeStatus.Running)); + dataNodeStatisticsMap.put(2, new NodeStatistics(NodeStatus.Running)); + dataNodeStatisticsMap.put(3, new NodeStatistics(NodeStatus.Running)); + + Map leaderDistribution = + BALANCER.generateOptimalLeaderDistribution( + new TreeMap<>(), + regionReplicaSetMap, + new TreeMap<>(), + dataNodeStatisticsMap, + new TreeMap<>()); + + Assert.assertEquals(Integer.valueOf(3), leaderDistribution.get(regionGroupId)); + } +} diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java index ad0bcc0d9a25a..99e6cbcf71eaf 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java @@ -160,7 +160,7 @@ public PooledObject makeObject(TEndPoint endPoin new AsyncIoTConsensusServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java index 7dc8c87c09b2d..9bb5dfcb9a761 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilter.java @@ -59,13 +59,13 @@ public boolean filter(final long timestamp, final T value) { } private boolean tryFilter(final long timestamp, final T value) { - final long timeDiff = Math.abs(timestamp - lastStoredTimestamp); - - if (timeDiff <= processor.getCompressionMinTimeInterval()) { + if (isTimeDistanceLessThanOrEqual( + timestamp, lastStoredTimestamp, processor.getCompressionMinTimeInterval())) { return false; } - if (timeDiff >= processor.getCompressionMaxTimeInterval()) { + if (isTimeDistanceGreaterThanOrEqual( + timestamp, lastStoredTimestamp, processor.getCompressionMaxTimeInterval())) { reset(timestamp, value); return true; } @@ -94,6 +94,18 @@ private boolean tryFilter(final long timestamp, final T value) { return false; } + private boolean isTimeDistanceLessThanOrEqual( + final long left, final long right, final long maxDistance) { + final long distance = left >= right ? left - right : right - left; + return Long.compareUnsigned(distance, maxDistance) <= 0; + } + + private boolean isTimeDistanceGreaterThanOrEqual( + final long left, final long right, final long minDistance) { + final long distance = left >= right ? left - right : right - left; + return Long.compareUnsigned(distance, minDistance) >= 0; + } + private void reset(final long timestamp, final T value) { lastStoredTimestamp = timestamp; lastStoredValue = value; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java index 850cbd3ed0729..bfc0bc5d3f791 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilter.java @@ -86,13 +86,14 @@ public boolean filter(final long timestamp, final T value) { private boolean tryFilter(final long timestamp, final T value) { final long timeDiff = timestamp - lastStoredTimestamp; - final long absTimeDiff = Math.abs(timeDiff); - if (absTimeDiff <= processor.getCompressionMinTimeInterval()) { + if (isTimeDistanceLessThanOrEqual( + timestamp, lastStoredTimestamp, processor.getCompressionMinTimeInterval())) { return false; } - if (absTimeDiff >= processor.getCompressionMaxTimeInterval()) { + if (isTimeDistanceGreaterThanOrEqual( + timestamp, lastStoredTimestamp, processor.getCompressionMaxTimeInterval())) { reset(timestamp, value); return true; } @@ -144,6 +145,18 @@ private boolean tryFilter(final long timestamp, final T value) { return false; } + private boolean isTimeDistanceLessThanOrEqual( + final long left, final long right, final long maxDistance) { + final long distance = left >= right ? left - right : right - left; + return Long.compareUnsigned(distance, maxDistance) <= 0; + } + + private boolean isTimeDistanceGreaterThanOrEqual( + final long left, final long right, final long minDistance) { + final long distance = left >= right ? left - right : right - left; + return Long.compareUnsigned(distance, minDistance) >= 0; + } + private void reset(final long timestamp, final T value) { upperDoor = Double.MIN_VALUE; lowerDoor = Double.MAX_VALUE; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java index 36fa0b8382f7b..8beab1343888c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessor.java @@ -116,7 +116,7 @@ protected void processRow( final Long lastSampleTime = pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix); if (lastSampleTime == null - || Math.abs(currentRowTime - lastSampleTime) >= intervalInCurrentPrecision) { + || isTimeDistanceGreaterThanOrEqual(currentRowTime, lastSampleTime)) { try { rowCollector.collectRow(row); @@ -135,4 +135,9 @@ protected void processRow( } } } + + private boolean isTimeDistanceGreaterThanOrEqual(long left, long right) { + long distance = left >= right ? left - right : right - left; + return Long.compareUnsigned(distance, intervalInCurrentPrecision) >= 0; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java index 9b6e17e6e4a76..e221b4a19c779 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java @@ -76,7 +76,7 @@ public synchronized TPipeTransferResp request(long watermark, TPipeTransferReq r final boolean endPointsChanged = tryFetchEndPointsIfNecessary(); tryConstructClients(endPointsChanged); - final TEndPoint endPoint = endPoints[(int) watermark % endPoints.length]; + final TEndPoint endPoint = endPoints[(int) Math.floorMod(watermark, endPoints.length)]; IoTDBSyncClient client = endPointIoTDBSyncClientMap.get(endPoint); if (client == null) { client = reconstructIoTDBSyncClient(endPoint); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java index aaf5ce4db5a5f..a73198c2713ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLock.java @@ -78,17 +78,21 @@ private void initIfNecessary() { public void lock(final File file) { initIfNecessary(); - locks[Math.abs(file.hashCode()) % locks.length].lock(); + locks[getLockIndex(file)].lock(); } public boolean tryLock(final File file, final long timeout, final TimeUnit timeUnit) throws InterruptedException { initIfNecessary(); - return locks[Math.abs(file.hashCode()) % locks.length].tryLock(timeout, timeUnit); + return locks[getLockIndex(file)].tryLock(timeout, timeUnit); } public void unlock(final File file) { initIfNecessary(); - locks[Math.abs(file.hashCode()) % locks.length].unlock(); + locks[getLockIndex(file)].unlock(); + } + + private int getLockIndex(final File file) { + return (int) (Math.abs((long) file.hashCode()) % locks.length); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java index f99e01e8d5a59..af9812e0a17bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/ExtremeAccumulator.java @@ -270,13 +270,9 @@ private void addIntInput(Column[] column, BitMap bitMap) { } private void updateIntResult(int extVal) { - int absExtVal = Math.abs(extVal); int candidateResult = extremeResult.getInt(); - int absCandidateResult = Math.abs(extremeResult.getInt()); - if (!initResult - || (absExtVal > absCandidateResult) - || (absExtVal == absCandidateResult) && extVal > candidateResult) { + if (!initResult || compareExtreme(extVal, candidateResult) > 0) { initResult = true; extremeResult.setInt(extVal); } @@ -295,13 +291,9 @@ private void addLongInput(Column[] column, BitMap bitMap) { } private void updateLongResult(long extVal) { - long absExtVal = Math.abs(extVal); long candidateResult = extremeResult.getLong(); - long absCandidateResult = Math.abs(extremeResult.getLong()); - if (!initResult - || (absExtVal > absCandidateResult) - || (absExtVal == absCandidateResult) && extVal > candidateResult) { + if (!initResult || compareExtreme(extVal, candidateResult) > 0) { initResult = true; extremeResult.setLong(extVal); } @@ -356,4 +348,24 @@ private void updateDoubleResult(double extVal) { extremeResult.setDouble(extVal); } } + + private int compareExtreme(int left, int right) { + int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) right)); + return absComparison == 0 ? Integer.compare(left, right) : absComparison; + } + + private int compareExtreme(long left, long right) { + int absComparison = compareAbs(left, right); + return absComparison == 0 ? Long.compare(left, right) : absComparison; + } + + private int compareAbs(long left, long right) { + if (left == Long.MIN_VALUE) { + return right == Long.MIN_VALUE ? 0 : 1; + } + if (right == Long.MIN_VALUE) { + return -1; + } + return Long.compare(Math.abs(left), Math.abs(right)); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java index e46a2597b220a..8a11ffc773961 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java @@ -77,26 +77,14 @@ private SlidingWindowAggregatorFactory() {} (o1, o2) -> { int value1 = o1.getInt(0); int value2 = o2.getInt(0); - if (Math.abs(value1) > Math.abs(value2) - || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) { - return 1; - } else if (value1 == value2) { - return 0; - } - return -1; + return compareExtreme(value1, value2); }); extremeComparators.put( TSDataType.INT64, (o1, o2) -> { long value1 = o1.getLong(0); long value2 = o2.getLong(0); - if (Math.abs(value1) > Math.abs(value2) - || (Math.abs(value1) == Math.abs(value2) && value1 > value2)) { - return 1; - } else if (value1 == value2) { - return 0; - } - return -1; + return compareExtreme(value1, value2); }); extremeComparators.put( TSDataType.FLOAT, @@ -249,4 +237,24 @@ public static SlidingWindowAggregator createSlidingWindowAggregator( DataNodeQueryMessages.INVALID_AGGREGATION_TYPE + aggregationType); } } + + static int compareExtreme(int left, int right) { + int absComparison = Long.compare(Math.abs((long) left), Math.abs((long) right)); + return absComparison == 0 ? Integer.compare(left, right) : absComparison; + } + + static int compareExtreme(long left, long right) { + int absComparison = compareAbs(left, right); + return absComparison == 0 ? Long.compare(left, right) : absComparison; + } + + private static int compareAbs(long left, long right) { + if (left == Long.MIN_VALUE) { + return right == Long.MIN_VALUE ? 0 : 1; + } + if (right == Long.MIN_VALUE) { + return -1; + } + return Long.compare(Math.abs(left), Math.abs(right)); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java index 4c5e004c60d46..59f955580b918 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java @@ -54,9 +54,9 @@ public boolean satisfy(Column column, int index) { return true; } if (index == 0) { - return Math.abs(column.getLong(index) - lastTsBlockTime) <= timeInterval; + return isTimeDistanceLessThanOrEqual(column.getLong(index), lastTsBlockTime); } - return Math.abs(column.getLong(index) - column.getLong(index - 1)) <= timeInterval; + return isTimeDistanceLessThanOrEqual(column.getLong(index), column.getLong(index - 1)); } @Override @@ -92,8 +92,8 @@ public boolean contains(Column column) { long maxTime = Math.max(columnStartTime, columnEndTime); boolean contains = - Math.abs(columnStartTime - lastTsBlockTime) < timeInterval - && maxTime - minTime <= timeInterval; + isTimeDistanceLessThan(columnStartTime, lastTsBlockTime) + && isTimeDistanceLessThanOrEqual(maxTime, minTime); if (contains) { if (!initializedTimeValue) { startTime = Long.MAX_VALUE; @@ -113,6 +113,22 @@ public long getTimeInterval() { return timeInterval; } + boolean isTimeDistanceLessThanOrEqual(long left, long right) { + return compareTimeDistance(left, right) <= 0; + } + + private boolean isTimeDistanceLessThan(long left, long right) { + return compareTimeDistance(left, right) < 0; + } + + private int compareTimeDistance(long left, long right) { + if (timeInterval < 0) { + return 1; + } + long distance = left >= right ? left - right : right - left; + return Long.compareUnsigned(distance, timeInterval); + } + public long getTimeValue() { return timeValue; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java index 42ca3becedd3a..bef50233b2263 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowManager.java @@ -93,7 +93,7 @@ public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) { for (; i < size; i++) { long currentTime = timeColumn.getLong(i); - if (Math.abs(currentTime - previousTimeValue) > sessionWindow.getTimeInterval()) { + if (!sessionWindow.isTimeDistanceLessThanOrEqual(currentTime, previousTimeValue)) { sessionWindow.setTimeValue(previousTimeValue); break; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java index e5bd6ca38c200..0fddb41d6e689 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/AbstractFragmentParallelPlanner.java @@ -138,7 +138,8 @@ protected TDataNodeLocation selectTargetDataNode(TRegionReplicaSet regionReplica if (!selectRandomDataNode || queryContext.getSession() == null) { targetIndex = 0; } else { - targetIndex = (int) (queryContext.getSession().getSessionId() % availableDataNodes.size()); + targetIndex = + (int) Math.floorMod(queryContext.getSession().getSessionId(), availableDataNodes.size()); } return availableDataNodes.get(targetIndex); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java index 194eb5a8fa53b..0c91492e4b3ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLock.java @@ -65,11 +65,11 @@ private void initIfNecessary() { public void lock(final int regionId) { initIfNecessary(); - locks[regionId % locks.length].lock(); + locks[Math.floorMod(regionId, locks.length)].lock(); } public void unlock(final int regionId) { initIfNecessary(); - locks[regionId % locks.length].unlock(); + locks[Math.floorMod(regionId, locks.length)].unlock(); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilterTest.java new file mode 100644 index 0000000000000..cd04df0419498 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/sdt/SwingingDoorTrendingFilterTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.downsampling.sdt; + +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; + +public class SwingingDoorTrendingFilterTest { + + @Test + public void testExtremeTimestampDistanceReachesMaxInterval() throws Exception { + final SwingingDoorTrendingFilter filter = + new SwingingDoorTrendingFilter<>(createProcessor(0, Long.MAX_VALUE, 0), Long.MIN_VALUE, 0); + + Assert.assertTrue(filter.filter(Long.MAX_VALUE, 0)); + } + + private SwingingDoorTrendingSamplingProcessor createProcessor( + final long compressionMinTimeInterval, + final long compressionMaxTimeInterval, + final double compressionDeviation) + throws Exception { + final SwingingDoorTrendingSamplingProcessor processor = + new SwingingDoorTrendingSamplingProcessor(); + setField(processor, "compressionMinTimeInterval", compressionMinTimeInterval); + setField(processor, "compressionMaxTimeInterval", compressionMaxTimeInterval); + setField(processor, "compressionDeviation", compressionDeviation); + return processor; + } + + private void setField(final Object target, final String fieldName, final Object value) + throws Exception { + final Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLockTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLockTest.java new file mode 100644 index 0000000000000..e1b53e0dc6ca8 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceSegmentLockTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.resource.tsfile; + +import org.apache.iotdb.commons.conf.CommonDescriptor; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +public class PipeTsFileResourceSegmentLockTest { + + @Test + public void minValueHashCodeTest() throws InterruptedException { + int originalSegmentLockNum = + CommonDescriptor.getInstance().getConfig().getPipeTsFileResourceSegmentLockNum(); + CommonDescriptor.getInstance().getConfig().setPipeTsFileResourceSegmentLockNum(32); + + try { + PipeTsFileResourceSegmentLock segmentLock = new PipeTsFileResourceSegmentLock(); + File file = new MinValueHashCodeFile("target/min-value-hash-code.tsfile"); + + Assert.assertEquals(Integer.MIN_VALUE, file.hashCode()); + + segmentLock.lock(file); + try { + Assert.assertTrue(segmentLock.tryLock(file, 1, TimeUnit.MILLISECONDS)); + segmentLock.unlock(file); + } finally { + segmentLock.unlock(file); + } + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeTsFileResourceSegmentLockNum(originalSegmentLockNum); + } + } + + private static class MinValueHashCodeFile extends File { + + private static final long serialVersionUID = 1L; + + private MinValueHashCodeFile(String pathname) { + super(pathname); + } + + @Override + public int hashCode() { + return Integer.MIN_VALUE; + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java index c81981eb1618d..108f58224484b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java @@ -271,6 +271,52 @@ public void extremeAccumulatorTest() { Assert.assertEquals(100d, finalResult.build().getDouble(0), 0.001); } + @Test + public void extremeAccumulatorMinValueTest() { + Accumulator intAccumulator = + AccumulatorFactory.createBuiltinAccumulator( + TAggregationType.EXTREME, + Collections.singletonList(TSDataType.INT32), + Collections.emptyList(), + Collections.emptyMap(), + true); + TsBlockBuilder intBlockBuilder = + new TsBlockBuilder(Collections.singletonList(TSDataType.INT32)); + intBlockBuilder.getTimeColumnBuilder().writeLong(0); + intBlockBuilder.getValueColumnBuilders()[0].writeInt(Integer.MAX_VALUE); + intBlockBuilder.declarePosition(); + intBlockBuilder.getTimeColumnBuilder().writeLong(1); + intBlockBuilder.getValueColumnBuilders()[0].writeInt(Integer.MIN_VALUE); + intBlockBuilder.declarePosition(); + TsBlock intBlock = intBlockBuilder.build(); + intAccumulator.addInput(new Column[] {intBlock.getTimeColumn(), intBlock.getColumn(0)}, null); + ColumnBuilder intFinalResult = new IntColumnBuilder(null, 1); + intAccumulator.outputFinal(intFinalResult); + Assert.assertEquals(Integer.MIN_VALUE, intFinalResult.build().getInt(0)); + + Accumulator longAccumulator = + AccumulatorFactory.createBuiltinAccumulator( + TAggregationType.EXTREME, + Collections.singletonList(TSDataType.INT64), + Collections.emptyList(), + Collections.emptyMap(), + true); + TsBlockBuilder longBlockBuilder = + new TsBlockBuilder(Collections.singletonList(TSDataType.INT64)); + longBlockBuilder.getTimeColumnBuilder().writeLong(0); + longBlockBuilder.getValueColumnBuilders()[0].writeLong(Long.MAX_VALUE); + longBlockBuilder.declarePosition(); + longBlockBuilder.getTimeColumnBuilder().writeLong(1); + longBlockBuilder.getValueColumnBuilders()[0].writeLong(Long.MIN_VALUE); + longBlockBuilder.declarePosition(); + TsBlock longBlock = longBlockBuilder.build(); + longAccumulator.addInput( + new Column[] {longBlock.getTimeColumn(), longBlock.getColumn(0)}, null); + ColumnBuilder longFinalResult = new LongColumnBuilder(null, 1); + longAccumulator.outputFinal(longFinalResult); + Assert.assertEquals(Long.MIN_VALUE, longFinalResult.build().getLong(0)); + } + @Test public void firstValueAccumulatorTest() { Accumulator firstValueAccumulator = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactoryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactoryTest.java new file mode 100644 index 0000000000000..a029441b4dc80 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactoryTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.aggregation.slidingwindow; + +import org.junit.Assert; +import org.junit.Test; + +public class SlidingWindowAggregatorFactoryTest { + + @Test + public void compareExtremeMinValueTest() { + Assert.assertTrue( + SlidingWindowAggregatorFactory.compareExtreme(Integer.MIN_VALUE, Integer.MAX_VALUE) > 0); + Assert.assertTrue( + SlidingWindowAggregatorFactory.compareExtreme(Integer.MAX_VALUE, Integer.MIN_VALUE) < 0); + Assert.assertTrue(SlidingWindowAggregatorFactory.compareExtreme(1, -1) > 0); + + Assert.assertTrue( + SlidingWindowAggregatorFactory.compareExtreme(Long.MIN_VALUE, Long.MAX_VALUE) > 0); + Assert.assertTrue( + SlidingWindowAggregatorFactory.compareExtreme(Long.MAX_VALUE, Long.MIN_VALUE) < 0); + Assert.assertTrue(SlidingWindowAggregatorFactory.compareExtreme(1L, -1L) > 0); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java new file mode 100644 index 0000000000000..4f42ea9117d71 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.window; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; + +public class SessionWindowTest { + + @Test + public void satisfyHandlesOverflowedTimeDistance() { + SessionWindow window = new SessionWindow(1, true); + Column timeColumn = buildTimeColumn(Long.MIN_VALUE, Long.MAX_VALUE); + window.mergeOnePoint(new Column[] {timeColumn}, 0); + + Assert.assertFalse(window.satisfy(timeColumn, 1)); + } + + private Column buildTimeColumn(long... timestamps) { + TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32)); + for (long timestamp : timestamps) { + builder.getTimeColumnBuilder().writeLong(timestamp); + builder.getColumnBuilder(0).appendNull(); + builder.declarePosition(); + } + TsBlock tsBlock = builder.build(); + return tsBlock.getTimeColumn(); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncAINodeInternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncAINodeInternalServiceClient.java index 7977b43653625..b031198be4f39 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncAINodeInternalServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncAINodeInternalServiceClient.java @@ -145,7 +145,7 @@ public PooledObject makeObject(TEndPoint endPo new AsyncAINodeInternalServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java index 26aa3c80b0b16..ac7772be3e1c3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java @@ -190,7 +190,7 @@ public PooledObject makeObject(TEndPoint e new AsyncConfigNodeInternalServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java index f6096eb44cd7f..06127464fb7b1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java @@ -174,7 +174,7 @@ public PooledObject makeObject(TEndPoint end new AsyncDataNodeExternalServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java index 5f6aa0a6bb4e0..3d1566d435071 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java @@ -202,7 +202,7 @@ public PooledObject makeObject(TEndPoint end new AsyncDataNodeInternalServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java index 2329281062da6..072a5f89a6221 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java @@ -162,7 +162,7 @@ public PooledObject makeObject(TEndPo new AsyncDataNodeMPPDataExchangeServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncIoTConsensusV2ServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncIoTConsensusV2ServiceClient.java index f4b02eda0f69b..cc8b6f6e1bfbf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncIoTConsensusV2ServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncIoTConsensusV2ServiceClient.java @@ -171,7 +171,7 @@ public PooledObject makeObject(TEndPoint endPo new AsyncIoTConsensusV2ServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java index 42bba91f1fb32..7e01b08ef2120 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java @@ -234,7 +234,7 @@ public PooledObject makeObject(final TEndPoi new AsyncPipeDataTransferServiceClient( thriftClientProperty, endPoint, - tManagers[clientCnt.incrementAndGet() % tManagers.length], + tManagers[Math.floorMod(clientCnt.incrementAndGet(), tManagers.length)], clientManager)); } From 2a5ef1c0f34e4879ee5023b76d43c41259292c97 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 10 Jun 2026 11:22:25 +0800 Subject: [PATCH 2/2] Add overflow edge case unit tests --- .../filter/FixedIntervalFillFilterTest.java | 7 + .../aggregation/ExtremeAccumulatorTest.java | 49 +++++ .../changing/ChangingValueFilterTest.java | 55 +++++ .../TumblingTimeSamplingProcessorTest.java | 192 ++++++++++++++++++ .../operator/window/SessionWindowTest.java | 22 +- ...criptionPipeEventBatchSegmentLockTest.java | 50 +++++ 6 files changed, 373 insertions(+), 2 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilterTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessorTest.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLockTest.java diff --git a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java index 544c95b8cadf9..a40f6ec48f694 100644 --- a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java +++ b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/process/fill/filter/FixedIntervalFillFilterTest.java @@ -31,4 +31,11 @@ public void needFillHandlesOverflowedTimeDistance() { Assert.assertTrue(filter.needFill(Long.MIN_VALUE, Long.MIN_VALUE + 1)); Assert.assertFalse(filter.needFill(Long.MIN_VALUE, Long.MAX_VALUE)); } + + @Test + public void needFillRejectsNegativeInterval() { + FixedIntervalFillFilter filter = new FixedIntervalFillFilter(-1); + + Assert.assertFalse(filter.needFill(0, 0)); + } } diff --git a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java index c796f499a96e7..1edb86100138a 100644 --- a/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java +++ b/iotdb-core/calc-commons/src/test/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/ExtremeAccumulatorTest.java @@ -25,6 +25,8 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.DoubleColumnBuilder; +import org.apache.tsfile.read.common.block.column.FloatColumnBuilder; import org.apache.tsfile.read.common.block.column.IntColumnBuilder; import org.apache.tsfile.read.common.block.column.LongColumnBuilder; import org.junit.Assert; @@ -77,6 +79,31 @@ public void groupedExtremeAccumulatorKeepsOriginalValueAndHandlesMinValues() { Assert.assertEquals(Long.MIN_VALUE, longResult.build().getLong(0)); } + @Test + public void groupedExtremeAccumulatorKeepsOriginalFloatingValues() { + GroupedExtremeAccumulator floatAccumulator = new GroupedExtremeAccumulator(TSDataType.FLOAT); + floatAccumulator.setGroupCount(1); + TsBlock floatBlock = buildFloatBlock(-5.5f, 4.5f); + floatAccumulator.addInput( + new int[] {0, 0}, + new org.apache.tsfile.block.column.Column[] {floatBlock.getColumn(0)}, + AggregationMask.createSelectAll(floatBlock.getPositionCount())); + ColumnBuilder floatResult = new FloatColumnBuilder(null, 1); + floatAccumulator.evaluateFinal(0, floatResult); + Assert.assertEquals(-5.5f, floatResult.build().getFloat(0), 0.001); + + GroupedExtremeAccumulator doubleAccumulator = new GroupedExtremeAccumulator(TSDataType.DOUBLE); + doubleAccumulator.setGroupCount(1); + TsBlock doubleBlock = buildDoubleBlock(-10.25, 9.25); + doubleAccumulator.addInput( + new int[] {0, 0}, + new org.apache.tsfile.block.column.Column[] {doubleBlock.getColumn(0)}, + AggregationMask.createSelectAll(doubleBlock.getPositionCount())); + ColumnBuilder doubleResult = new DoubleColumnBuilder(null, 1); + doubleAccumulator.evaluateFinal(0, doubleResult); + Assert.assertEquals(-10.25, doubleResult.build().getDouble(0), 0.001); + } + private TsBlock buildIntBlock(int... values) { TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32)); ColumnBuilder valueBuilder = builder.getValueColumnBuilders()[0]; @@ -98,4 +125,26 @@ private TsBlock buildLongBlock(long... values) { } return builder.build(); } + + private TsBlock buildFloatBlock(float... values) { + TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.FLOAT)); + ColumnBuilder valueBuilder = builder.getValueColumnBuilders()[0]; + for (int i = 0; i < values.length; i++) { + builder.getTimeColumnBuilder().writeLong(i); + valueBuilder.writeFloat(values[i]); + builder.declarePosition(); + } + return builder.build(); + } + + private TsBlock buildDoubleBlock(double... values) { + TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.DOUBLE)); + ColumnBuilder valueBuilder = builder.getValueColumnBuilders()[0]; + for (int i = 0; i < values.length; i++) { + builder.getTimeColumnBuilder().writeLong(i); + valueBuilder.writeDouble(values[i]); + builder.declarePosition(); + } + return builder.build(); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilterTest.java new file mode 100644 index 0000000000000..2bf4da9957c18 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingValueFilterTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.downsampling.changing; + +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; + +public class ChangingValueFilterTest { + + @Test + public void testExtremeTimestampDistanceReachesMaxInterval() throws Exception { + final ChangingValueFilter filter = + new ChangingValueFilter<>(createProcessor(0, Long.MAX_VALUE, 0), Long.MIN_VALUE, 0); + + Assert.assertTrue(filter.filter(Long.MAX_VALUE, 0)); + } + + private ChangingValueSamplingProcessor createProcessor( + final long compressionMinTimeInterval, + final long compressionMaxTimeInterval, + final double compressionDeviation) + throws Exception { + final ChangingValueSamplingProcessor processor = new ChangingValueSamplingProcessor(); + setField(processor, "compressionMinTimeInterval", compressionMinTimeInterval); + setField(processor, "compressionMaxTimeInterval", compressionMaxTimeInterval); + setField(processor, "compressionDeviation", compressionDeviation); + return processor; + } + + private void setField(final Object target, final String fieldName, final Object value) + throws Exception { + final Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessorTest.java new file mode 100644 index 0000000000000..df18dba434e7d --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/processor/downsampling/tumbling/TumblingTimeSamplingProcessorTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.processor.downsampling.tumbling; + +import org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache; +import org.apache.iotdb.pipe.api.access.Row; +import org.apache.iotdb.pipe.api.collector.RowCollector; +import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; +import org.apache.iotdb.pipe.api.type.Binary; +import org.apache.iotdb.pipe.api.type.Type; + +import org.apache.tsfile.read.common.Path; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.time.LocalDate; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class TumblingTimeSamplingProcessorTest { + + @Test + public void testExtremeTimestampDistanceReachesInterval() throws Exception { + final TumblingTimeSamplingProcessor processor = new TumblingTimeSamplingProcessor(); + final TestLastObjectCache cache = new TestLastObjectCache(); + setField(processor, "intervalInCurrentPrecision", Long.MAX_VALUE); + setField(processor, "pathLastObjectCache", cache); + + try { + final CountingRowCollector rowCollector = new CountingRowCollector(); + + processor.processRow( + new TestRow(Long.MIN_VALUE), rowCollector, "root.db.d1", new AtomicReference<>()); + processor.processRow( + new TestRow(Long.MAX_VALUE), rowCollector, "root.db.d1", new AtomicReference<>()); + processor.processRow( + new TestRow(Long.MAX_VALUE - 1), rowCollector, "root.db.d1", new AtomicReference<>()); + + Assert.assertEquals(2, rowCollector.getCollectedRowCount()); + } finally { + cache.close(); + } + } + + private void setField(final Object target, final String fieldName, final Object value) + throws Exception { + final Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } + + private static class TestLastObjectCache extends PartialPathLastObjectCache { + + private TestLastObjectCache() { + super(1024); + } + + @Override + protected long calculateMemoryUsage(final Long object) { + return Long.BYTES; + } + } + + private static class CountingRowCollector implements RowCollector { + + private final AtomicInteger collectedRowCount = new AtomicInteger(); + + @Override + public void collectRow(final Row row) throws IOException { + collectedRowCount.incrementAndGet(); + } + + private int getCollectedRowCount() { + return collectedRowCount.get(); + } + } + + private static class TestRow implements Row { + + private final long timestamp; + + private TestRow(final long timestamp) { + this.timestamp = timestamp; + } + + @Override + public long getTime() { + return timestamp; + } + + @Override + public int getInt(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public LocalDate getDate(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public float getFloat(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public double getDouble(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getBoolean(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public Binary getBinary(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public String getString(final int columnIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public Object getObject(final int columnIndex) { + return 1; + } + + @Override + public Type getDataType(final int columnIndex) { + return Type.INT32; + } + + @Override + public boolean isNull(final int columnIndex) { + return false; + } + + @Override + public int size() { + return 1; + } + + @Override + public int getColumnIndex(final Path columnName) throws PipeParameterNotValidException { + return 0; + } + + @Override + public String getColumnName(final int columnIndex) { + return "s1"; + } + + @Override + public List getColumnTypes() { + return Collections.singletonList(Type.INT32); + } + + @Override + public String getDeviceId() { + return "root.db.d1"; + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java index 4f42ea9117d71..ea534a76ebc82 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindowTest.java @@ -39,14 +39,32 @@ public void satisfyHandlesOverflowedTimeDistance() { Assert.assertFalse(window.satisfy(timeColumn, 1)); } + @Test + public void skipPointsOutOfCurWindowHandlesOverflowedTimeDistance() { + SessionWindowManager manager = new SessionWindowManager(false, 1, true); + manager.initCurWindow(); + Column previousTimeColumn = buildTimeColumn(Long.MIN_VALUE); + manager.getCurWindow().mergeOnePoint(new Column[] {previousTimeColumn}, 0); + manager.next(); + + TsBlock nextBlock = buildTsBlock(Long.MAX_VALUE); + TsBlock skippedBlock = manager.skipPointsOutOfCurWindow(nextBlock); + + Assert.assertEquals(1, skippedBlock.getPositionCount()); + Assert.assertEquals(Long.MAX_VALUE, skippedBlock.getTimeColumn().getLong(0)); + } + private Column buildTimeColumn(long... timestamps) { + return buildTsBlock(timestamps).getTimeColumn(); + } + + private TsBlock buildTsBlock(long... timestamps) { TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32)); for (long timestamp : timestamps) { builder.getTimeColumnBuilder().writeLong(timestamp); builder.getColumnBuilder(0).appendNull(); builder.declarePosition(); } - TsBlock tsBlock = builder.build(); - return tsBlock.getTimeColumn(); + return builder.build(); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLockTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLockTest.java new file mode 100644 index 0000000000000..7488f5ffa26e4 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeEventBatchSegmentLockTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.subscription.event.batch; + +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.concurrent.locks.ReentrantLock; + +public class SubscriptionPipeEventBatchSegmentLockTest { + + @Test + public void negativeRegionIdTest() throws Exception { + final SubscriptionPipeEventBatchSegmentLock segmentLock = + new SubscriptionPipeEventBatchSegmentLock(); + + segmentLock.lock(-1); + try { + final ReentrantLock[] locks = getLocks(segmentLock); + Assert.assertTrue(locks[locks.length - 1].isHeldByCurrentThread()); + } finally { + segmentLock.unlock(-1); + } + } + + private ReentrantLock[] getLocks(final SubscriptionPipeEventBatchSegmentLock segmentLock) + throws Exception { + final Field locksField = SubscriptionPipeEventBatchSegmentLock.class.getDeclaredField("locks"); + locksField.setAccessible(true); + return (ReentrantLock[]) locksField.get(segmentLock); + } +}