Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;

Expand Down Expand Up @@ -88,7 +90,6 @@ public boolean isTsFileEmpty() {
return resource.getDevices().isEmpty();
}

@SuppressWarnings("OptionalGetWithoutIsPresent")
public boolean needDecodeTsFile(
Function<List<Pair<IDeviceID, TTimePartitionSlot>>, List<TRegionReplicaSet>>
partitionFetcher) {
Expand All @@ -100,10 +101,16 @@ public boolean needDecodeTsFile(
new ArrayList<>(resource.getDevices().size() << 1);
for (final IDeviceID device : resource.getDevices()) {
// iterating the index, must present
final TTimePartitionSlot startSlot =
TimePartitionUtils.getTimePartitionSlot(resource.getStartTime(device).get());
final TTimePartitionSlot endSlot =
TimePartitionUtils.getTimePartitionSlot(resource.getEndTime(device).get());
final Optional<Long> startTime = resource.getStartTime(device);
if (!startTime.isPresent()) {
throw new NoSuchElementException("No value present");
}
final Optional<Long> endTime = resource.getEndTime(device);
if (!endTime.isPresent()) {
throw new NoSuchElementException("No value present");
}
final TTimePartitionSlot startSlot = TimePartitionUtils.getTimePartitionSlot(startTime.get());
final TTimePartitionSlot endSlot = TimePartitionUtils.getTimePartitionSlot(endTime.get());
slotList.add(new Pair<>(device, startSlot));
if (!startSlot.equals(endSlot)) {
slotList.add(new Pair<>(device, endSlot));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -791,12 +791,13 @@ private void routeChunkData() throws LoadFileException {
final TTimePartitionSlot timePartitionSlot = chunkData.getTimePartitionSlot();
final Map<TTimePartitionSlot, Integer> slotIndexes =
partitionSlotIndexes.computeIfAbsent(device, key -> new HashMap<>());
Integer partitionSlotIndex = slotIndexes.get(timePartitionSlot);
if (partitionSlotIndex == null) {
partitionSlotIndex = partitionSlotList.size();
slotIndexes.put(timePartitionSlot, partitionSlotIndex);
partitionSlotList.add(new Pair<>(device, timePartitionSlot));
}
final int partitionSlotIndex =
slotIndexes.computeIfAbsent(
timePartitionSlot,
slot -> {
partitionSlotList.add(new Pair<>(device, slot));
return partitionSlotList.size() - 1;
});
chunkPartitionIndexes[i] = partitionSlotIndex;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,24 +172,25 @@ private void tryLoadPendingTsFiles() {
if (!loadEntry.isPresent()) {
return;
}
final ActiveLoadPendingQueue.ActiveLoadEntry activeLoadEntry = loadEntry.get();

try {
final TSStatus result = loadTsFile(loadEntry.get(), session);
final TSStatus result = loadTsFile(activeLoadEntry, session);
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
LOGGER.info(
"Successfully auto load tsfile {} (isGeneratedByPipe = {})",
loadEntry.get().getFile(),
loadEntry.get().isGeneratedByPipe());
activeLoadEntry.getFile(),
activeLoadEntry.isGeneratedByPipe());
} else {
handleLoadFailure(loadEntry.get(), result);
handleLoadFailure(activeLoadEntry, result);
}
} catch (final FileNotFoundException e) {
handleFileNotFoundException(loadEntry.get());
handleFileNotFoundException(activeLoadEntry);
} catch (final Exception e) {
handleOtherException(loadEntry.get(), e);
handleOtherException(activeLoadEntry, e);
} finally {
pendingQueue.removeFromLoading(loadEntry.get().getFile());
pendingQueue.removeFromLoading(activeLoadEntry.getFile());
}
}
} finally {
Expand Down
Loading