Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,26 +220,34 @@ private void parseAndCollectEvent(final PipeDeleteDataNodeEvent deleteDataEvent)

private void collectEvent(final Event event) {
if (event instanceof EnrichedEvent) {
if (!((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName())) {
final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
if (!enrichedEvent.increaseReferenceCount(PipeEventCollector.class.getName())) {
LOGGER.warn("PipeEventCollector: The event {} is already released, skipping it.", event);
isFailedToIncreaseReferenceCount = true;
return;
}

// Assign a commit id for this event in order to report progress in order.
PipeEventCommitManager.getInstance()
.enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, creationTime, regionId);
.enrichWithCommitterKeyAndCommitId(enrichedEvent, creationTime, regionId);

// Assign a rebootTime for iotConsensusV2
((EnrichedEvent) event).setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());

if (enrichedEvent.getPipeName() != null
&& pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), creationTime, regionId)) {
enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName());
return;
}
}

if (event instanceof PipeHeartbeatEvent) {
((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
}

pendingQueue.offer(event);
collectInvocationCount.incrementAndGet();
if (pendingQueue.offer(event)) {
collectInvocationCount.incrementAndGet();
}
}

public void resetFlags() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@

@Override
public boolean offer(final Event event) {
checkBeforeOffer(event);
if (!checkBeforeOffer(event)) {
return false;
}

if (event instanceof TsFileInsertionEvent) {
tsfileInsertEventDeque.add((TsFileInsertionEvent) event);
Expand Down Expand Up @@ -196,7 +198,7 @@
return tsfileInsertEventDeque.peek();
}

public synchronized void replace(

Check warning on line 201 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 118 to 64, Complexity from 15 to 14, Nesting Level from 5 to 2, Number of Variables from 24 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ3OmOWonELRW8fhpDCT&open=AZ3OmOWonELRW8fhpDCT&pullRequest=17560
String dataRegionId, Set<TsFileResource> sourceFiles, List<TsFileResource> targetFiles) {

final int regionId = Integer.parseInt(dataRegionId);
Expand Down Expand Up @@ -356,13 +358,14 @@
}

@Override
public void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) {
super.discardEventsOfPipe(pipeNameToDrop, regionId);
public void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
tsfileInsertEventDeque.removeIf(
event -> {
if (event instanceof EnrichedEvent
&& pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
&& regionId == ((EnrichedEvent) event).getRegionId()) {
&& isEventFromPipe(
((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) {
if (((EnrichedEvent) event)
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
eventCounter.decreaseEventCount(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
Expand Down Expand Up @@ -198,9 +199,10 @@ public void close() {
* When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard
* its queued events in the output pipe connector.
*/
public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
public void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
// Try to remove the events as much as possible
inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);

try {
increaseHighPriorityTaskCount();
Expand All @@ -214,6 +216,7 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
// will.
if (lastEvent instanceof EnrichedEvent
&& pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
&& creationTimeToDrop == ((EnrichedEvent) lastEvent).getCreationTime()
&& regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
// Do not clear the last event's reference counts because it may be on transferring
lastEvent = null;
Expand All @@ -237,6 +240,7 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
// "nonnull" detection.
if (lastExceptionEvent instanceof EnrichedEvent
&& pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName())
&& creationTimeToDrop == ((EnrichedEvent) lastExceptionEvent).getCreationTime()
&& regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) {
clearReferenceCountAndReleaseLastExceptionEvent();
}
Expand All @@ -245,8 +249,9 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
decreaseHighPriorityTaskCount();
}

if (outputPipeSink instanceof IoTDBSink) {
((IoTDBSink) outputPipeSink).discardEventsOfPipe(pipeNameToDrop, regionId);
if (outputPipeSink instanceof PipeConnectorWithEventDiscard) {
((PipeConnectorWithEventDiscard) outputPipeSink)
.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ public synchronized void register() {
* {@link PipeSinkSubtask} should never be used again
* @throws IllegalStateException if {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0
*/
public synchronized boolean deregister(final String pipeNameToDeregister, int regionId) {
public synchronized boolean deregister(
final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) {
if (registeredTaskCount <= 0) {
throw new IllegalStateException("registeredTaskCount <= 0");
}

subtask.discardEventsOfPipe(pipeNameToDeregister, regionId);
subtask.discardEventsOfPipe(pipeNameToDeregister, creationTimeToDeregister, regionId);

try {
if (registeredTaskCount > 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public synchronized void deregister(
// Shall not be empty
final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor;

lifeCycles.removeIf(o -> o.deregister(pipeName, regionId));
lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId));

if (lifeCycles.isEmpty()) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,13 @@ public synchronized void close() {
* Discard all events of the given pipe. This method only clears the reference count of the events
* and discard them, but do not modify other objects (such as buffers) for simplicity.
*/
public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) {
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
events.removeIf(
event -> {
if (pipeNameToDrop.equals(event.getPipeName()) && regionId == event.getRegionId()) {
if (pipeNameToDrop.equals(event.getPipeName())
&& creationTimeToDrop == event.getCreationTime()
&& regionId == event.getRegionId()) {
event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,12 @@ public boolean isEmpty() {
&& endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
}

public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) {
defaultBatch.discardEventsOfPipe(pipeNameToDrop, regionId);
endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, regionId));
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
endPointToBatch
.values()
.forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId));
}

public int size() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.Triple;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
Expand Down Expand Up @@ -77,6 +78,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -127,6 +129,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers =
new ConcurrentHashMap<>();

// Pipe name, creation time, region id
private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
ConcurrentHashMap.newKeySet();
Comment on lines +133 to +134
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment to explain what the three are.


private boolean enableSendTsFileLimit;
private volatile boolean isConnectionException;

Expand Down Expand Up @@ -681,8 +687,15 @@ private void retryTransfer(final PipeTsFileInsertionEvent tsFileInsertionEvent)
public void addFailureEventToRetryQueue(final Event event, final Exception e) {
isConnectionException =
e instanceof PipeConnectionException || ThriftClient.isConnectionBroken(e);
if (event instanceof EnrichedEvent && ((EnrichedEvent) event).isReleased()) {
return;
if (event instanceof EnrichedEvent) {
final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
if (enrichedEvent.isReleased()) {
return;
}
if (isDroppedPipe(enrichedEvent)) {
enrichedEvent.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
return;
}
}

if (isClosed.get()) {
Expand Down Expand Up @@ -728,15 +741,18 @@ public boolean isEnableSendTsFileLimit() {
//////////////////////////// Operations for close ////////////////////////////

@Override
public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) {
if (isTabletBatchModeEnabled) {
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId));

if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) {
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
}
retryEventQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
&& pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
&& regionId == ((EnrichedEvent) event).getRegionId()) {
&& isDroppedPipe(
(EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) {
((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
retryEventQueueEventCounter.decreaseEventCount(event);
return true;
Expand All @@ -747,8 +763,8 @@ public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final
retryTsFileQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
&& pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
&& regionId == ((EnrichedEvent) event).getRegionId()) {
&& isDroppedPipe(
(EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) {
((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
retryEventQueueEventCounter.decreaseEventCount(event);
return true;
Expand Down Expand Up @@ -792,6 +808,7 @@ public synchronized void close() {

// clear reference count of events in retry queue after closing async client
clearRetryEventsReferenceCount();
droppedPipeTaskKeys.clear();

super.close();
}
Expand Down Expand Up @@ -848,6 +865,21 @@ public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) {
this.transferTsFileCounter = transferTsFileCounter;
}

private boolean isDroppedPipe(final EnrichedEvent event) {
return droppedPipeTaskKeys.contains(
new Triple<>(event.getPipeName(), event.getCreationTime(), event.getRegionId()));
}

private static boolean isDroppedPipe(
final EnrichedEvent event,
final String pipeNameToDrop,
final long creationTimeToDrop,
final int regionId) {
return pipeNameToDrop.equals(event.getPipeName())
&& creationTimeToDrop == event.getCreationTime()
&& regionId == event.getRegionId();
}

@Override
public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
if (tabletBatchBuilder != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,9 +599,10 @@ public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOEx
}

@Override
public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) {
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
if (Objects.nonNull(tabletBatchBuilder)) {
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
}
}

Expand Down
Loading
Loading