From 4ec94cd7bf7668a7bbc57ffb14ae68317f8832a6 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 27 Apr 2026 11:18:40 +0800 Subject: [PATCH 01/13] drop-1 --- .../PipeRealtimePriorityBlockingQueue.java | 9 +-- .../task/subtask/sink/PipeSinkSubtask.java | 10 +++- .../sink/PipeSinkSubtaskLifeCycle.java | 5 +- .../subtask/sink/PipeSinkSubtaskManager.java | 2 +- .../evolvable/batch/PipeTabletEventBatch.java | 7 ++- .../batch/PipeTransferBatchReqBuilder.java | 8 ++- .../async/IoTDBDataRegionAsyncSink.java | 51 ++++++++++++++--- .../thrift/sync/IoTDBDataRegionSyncSink.java | 5 +- .../SubscriptionSinkSubtaskLifeCycle.java | 5 +- .../SubscriptionSinkSubtaskManager.java | 2 +- .../iotdb/db/pipe/sink/PipeSinkTest.java | 53 ++++++++++++++++++ .../opc_security/8443_12686/iotdb-server.pfx | Bin 0 -> 2942 bytes .../task/connection/BlockingPendingQueue.java | 17 +++++- .../commons/pipe/sink/protocol/IoTDBSink.java | 3 +- 14 files changed, 145 insertions(+), 32 deletions(-) create mode 100644 iotdb-core/datanode/src/test/resources/opc_security/8443_12686/iotdb-server.pfx diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java index 6d227ac31fdce..3d553f73595f4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java @@ -356,13 +356,14 @@ public void discardAllEvents() { } @Override - public void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { - super.discardEventsOfPipe(pipeNameToDrop, regionId); + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); tsfileInsertEventDeque.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isEventFromPipe( + ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) { if (((EnrichedEvent) event) .clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) { eventCounter.decreaseEventCount(event); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 6e26a76d77481..f6008822e6194 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -198,9 +198,10 @@ public void close() { * When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard * its queued events in the output pipe connector. */ - public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) { + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { // Try to remove the events as much as possible - inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId); + inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); try { increaseHighPriorityTaskCount(); @@ -214,6 +215,7 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) { // will. if (lastEvent instanceof EnrichedEvent && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName()) + && creationTimeToDrop == ((EnrichedEvent) lastEvent).getCreationTime() && regionId == ((EnrichedEvent) lastEvent).getRegionId()) { // Do not clear the last event's reference counts because it may be on transferring lastEvent = null; @@ -237,6 +239,7 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) { // "nonnull" detection. if (lastExceptionEvent instanceof EnrichedEvent && pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName()) + && creationTimeToDrop == ((EnrichedEvent) lastExceptionEvent).getCreationTime() && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) { clearReferenceCountAndReleaseLastExceptionEvent(); } @@ -246,7 +249,8 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) { } if (outputPipeSink instanceof IoTDBSink) { - ((IoTDBSink) outputPipeSink).discardEventsOfPipe(pipeNameToDrop, regionId); + ((IoTDBSink) outputPipeSink) + .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java index 35f7983075dfe..1780f5a87efa8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java @@ -92,12 +92,13 @@ public synchronized void register() { * {@link PipeSinkSubtask} should never be used again * @throws IllegalStateException if {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0 */ - public synchronized boolean deregister(final String pipeNameToDeregister, int regionId) { + public synchronized boolean deregister( + final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } - subtask.discardEventsOfPipe(pipeNameToDeregister, regionId); + subtask.discardEventsOfPipe(pipeNameToDeregister, creationTimeToDeregister, regionId); try { if (registeredTaskCount > 1) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index 9138a07591875..4faa42db004a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -209,7 +209,7 @@ public synchronized void deregister( // Shall not be empty final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor; - lifeCycles.removeIf(o -> o.deregister(pipeName, regionId)); + lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId)); if (lifeCycles.isEmpty()) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java index 96bddd0d672f0..c44e12a4bbf20 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java @@ -154,10 +154,13 @@ public synchronized void close() { * Discard all events of the given pipe. This method only clears the reference count of the events * and discard them, but do not modify other objects (such as buffers) for simplicity. */ - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { events.removeIf( event -> { - if (pipeNameToDrop.equals(event.getPipeName()) && regionId == event.getRegionId()) { + if (pipeNameToDrop.equals(event.getPipeName()) + && creationTimeToDrop == event.getCreationTime() + && regionId == event.getRegionId()) { event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); return true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java index dd4d4fe1ce618..5bb76ae40e582 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java @@ -196,9 +196,11 @@ public boolean isEmpty() { && endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty); } - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { - defaultBatch.discardEventsOfPipe(pipeNameToDrop, regionId); - endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, regionId)); + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + endPointToBatch.values().forEach( + batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId)); } public int size() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index d19639310cf4a..c00e36e0c7c40 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -77,6 +77,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -126,6 +127,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Map pendingHandlers = new ConcurrentHashMap<>(); + private final Set droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); private boolean enableSendTsFileLimit; private volatile boolean isConnectionException; @@ -681,8 +683,15 @@ private void retryTransfer(final PipeTsFileInsertionEvent tsFileInsertionEvent) public void addFailureEventToRetryQueue(final Event event, final Exception e) { isConnectionException = e instanceof PipeConnectionException || ThriftClient.isConnectionBroken(e); - if (event instanceof EnrichedEvent && ((EnrichedEvent) event).isReleased()) { - return; + if (event instanceof EnrichedEvent) { + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + if (enrichedEvent.isReleased()) { + return; + } + if (isDroppedPipe(enrichedEvent)) { + enrichedEvent.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); + return; + } } if (isClosed.get()) { @@ -728,15 +737,18 @@ public boolean isEnableSendTsFileLimit() { //////////////////////////// Operations for close //////////////////////////// @Override - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { - if (isTabletBatchModeEnabled) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId); + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + droppedPipeTaskKeys.add(generatePipeTaskKey(pipeNameToDrop, creationTimeToDrop, regionId)); + + if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) { + tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } retryEventQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isDroppedPipe( + (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) { ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); retryEventQueueEventCounter.decreaseEventCount(event); return true; @@ -747,8 +759,8 @@ public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final retryTsFileQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isDroppedPipe( + (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) { ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); retryEventQueueEventCounter.decreaseEventCount(event); return true; @@ -792,6 +804,7 @@ public synchronized void close() { // clear reference count of events in retry queue after closing async client clearRetryEventsReferenceCount(); + droppedPipeTaskKeys.clear(); super.close(); } @@ -848,6 +861,26 @@ public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) { this.transferTsFileCounter = transferTsFileCounter; } + private boolean isDroppedPipe(final EnrichedEvent event) { + return droppedPipeTaskKeys.contains( + generatePipeTaskKey(event.getPipeName(), event.getCreationTime(), event.getRegionId())); + } + + private static boolean isDroppedPipe( + final EnrichedEvent event, + final String pipeNameToDrop, + final long creationTimeToDrop, + final int regionId) { + return pipeNameToDrop.equals(event.getPipeName()) + && creationTimeToDrop == event.getCreationTime() + && regionId == event.getRegionId(); + } + + private static String generatePipeTaskKey( + final String pipeName, final long creationTime, final int regionId) { + return pipeName + "_" + creationTime + "_" + regionId; + } + @Override public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) { if (tabletBatchBuilder != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index 9357a8c6a6d7d..e8c4420861c59 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -599,9 +599,10 @@ public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOEx } @Override - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { if (Objects.nonNull(tabletBatchBuilder)) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId); + tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java index 98163697374da..390a6d58018ea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java @@ -63,7 +63,10 @@ public synchronized void register() { } @Override - public synchronized boolean deregister(final String pipeNameToDeregister, int regionId) { + public synchronized boolean deregister( + final String pipeNameToDeregister, + final long creationTimeToDeregister, + final int regionId) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java index 6d5f27d8172a5..16a9ee1a03d92 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java @@ -168,7 +168,7 @@ public synchronized void deregister( final PipeSinkSubtaskLifeCycle lifeCycle = attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString); - if (lifeCycle.deregister(pipeName, regionId)) { + if (lifeCycle.deregister(pipeName, creationTime, regionId)) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index ec2122b917574..7dfd0446038c0 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.sink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment; @@ -30,6 +31,7 @@ import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.write.record.Tablet; @@ -104,6 +106,46 @@ public void testIoTDBThriftAsyncConnectorToOthers() { } } + @Test + public void testAsyncSinkDropDoesNotRequeueDroppedPipeEvents() throws Exception { + try (final IoTDBDataRegionAsyncSink connector = new IoTDBDataRegionAsyncSink()) { + final PipeParameters parameters = + new PipeParameters( + new HashMap() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); + put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6668"); + } + }); + connector.validate(new PipeParameterValidator(parameters)); + connector.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + + final PipeRawTabletInsertionEvent droppedEvent = + createPipeRawTabletInsertionEvent("pipe", 1L, 1); + droppedEvent.increaseReferenceCount("test"); + droppedEvent.setCommitterKeyAndCommitId(new CommitterKey("pipe", 1L, 1, -1), 1L); + + connector.discardEventsOfPipe("pipe", 1L, 1); + connector.addFailureEventToRetryQueue(droppedEvent, new PipeException("test")); + + Assert.assertEquals(0, connector.getRetryEventQueueSize()); + Assert.assertTrue(droppedEvent.isReleased()); + + final PipeRawTabletInsertionEvent recreatedPipeEvent = + createPipeRawTabletInsertionEvent("pipe", 2L, 1); + recreatedPipeEvent.increaseReferenceCount("test"); + recreatedPipeEvent.setCommitterKeyAndCommitId(new CommitterKey("pipe", 2L, 1, -1), 1L); + + connector.addFailureEventToRetryQueue(recreatedPipeEvent, new PipeException("test")); + + Assert.assertEquals(1, connector.getRetryEventQueueSize()); + } + } + @Test public void testOpcUaSink() { final List schemaList = @@ -194,4 +236,15 @@ public void testOpcUaSink() { Assert.fail(); } } + + private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent( + final String pipeName, final long creationTime, final int regionId) { + final List schemaList = + Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64)); + final Tablet tablet = new Tablet("root.db.d" + regionId, schemaList, 1); + tablet.addTimestamp(0, 1L); + tablet.addValue("s1", 0, 1L); + return new PipeRawTabletInsertionEvent( + false, "root.db", "db", "root.db", tablet, false, pipeName, creationTime, null, null, false); + } } diff --git a/iotdb-core/datanode/src/test/resources/opc_security/8443_12686/iotdb-server.pfx b/iotdb-core/datanode/src/test/resources/opc_security/8443_12686/iotdb-server.pfx new file mode 100644 index 0000000000000000000000000000000000000000..b36d23da368bde99bee29e7bad02abd68127d7ed GIT binary patch literal 2942 zcma);X*d*&7RP6%Suo?ZWG5NBLB_r|Mr5*GyI%7WS<05(*q0eXr5B+H(b#v{X|kv6 zYX}*k?4m3w%ypNi_ufzUemLhm=YP)seEdBJO@fDjfalO8xGZ25yUk?@AQOCXRKGYBYvMnLKQbrD1l0iZ8H>Fr~$0iDl*fhaI@ zX_Eh8D3ij5`)$0IyvYFuOahP4Rmw?UFX6uPnUiAI-1bvv$4EWb7Qv4+S5~AEw&c!L zUvcJ>HEHQI=rdfaw<;*vV=K;pn~9ot!BcKtQ5dQ1?!ix1aeWYH_px!zSDcgG1Yn&1 zY{=zjJ)QXyp)fC7IWc6dd6iG-y)$C{<;j5g>5;#79LWv>c{rg@MjP0#mHFqLDlS$z zgr}KWwbd}9d?f6KUhA*5b~&@Ih+FN5i%tuJRVzD`8hK{(_H1g7FE>i}R#<&Y$Ybdc z#tK*DnV`A?F1y*=+BQ-B9~i*MZzGzJeZom}Dl65I&URHtA6;A^CDGM*h`DWV*BcGm zHfIioVV2HrD8hp_)$d_Pvz-inI4frdNN+59KJ74OnIlnJlGbhtFQyFp6>vJu-)3&Z z^I~D*v1kqpgm4%w9o7c>j$7zJ3mWwN%v;o{AyGgsIykQK8&m8$?_7rHB0jXRz5|66 zk7{=FdiN|Eb1)L>p_Ye|ThkEzEENW-ql1zoO+(e|4X?^=4#BRW`pRVyJ;rY@`EPx> zo*{85k!<*?NS@RZ*(xb-A@5m+=!_yi8}{vdSppFVg2!V8TMe)NDu4h&SSS zYI)4H4yM4e0gr40ZFw-4Apfn)JBRmr#E@P;>{Nef_C8WHdQ_eu7hasl8XCh-zOIy^ ztapNOG{9`S9i)XWXGu9yj^rk^>%zby+YF@Wz*#{9K_i$-VQ7xf|fhp3@BlQ(GJmkypv?z6(n{oNREz!5cvPw6qW(~PPh)*zj zC)x6F)x$XHgQ;05vgcJJa>J)y_8zY?Ziq3lg1h>1WtS%YaZN_`N!V^Q|b;UV=GM+ z3u!Y9u|n=OhC7DZUHVgtn3%|QY@BJHiid*D;t@gHNf&iIU-?ZG#94rwN_dPY^(v zoo7)q!0k=2jmA1|V{-6isf?LLc6}|wu~lqhc-gDc_7fRrnG-+lxl=rAr>7!Grh-xo zbINkE?CuLWhzxN*F?202YN+bIuXa+9RW36|Bvnih-r0XnfX>Jp zyDH~c91jWG-`RPiNZ|IyAckt3%`re~(&vI9*Vg!`3uU)m1(YC1%OEe*$hqkQ`a_hJ ziO2d&sjp=uFmyDgyuij#;8$0w>K^k~sieWX&@jnQaC{QLIBLPi(hmxY&^qu|T?FEj zZFJPE`=(NIigI$8W(3Ahqka;;1{v>a^WSUvFM*8IX>O z>W#I-59|LtPi;Ed*nTRSsdrwj@V!-I$YXf6(}I&Bh`ppr^odA2-)Is>ok`s)JM5|2 z>TUQaHjuhGgUnL!fMs`@3Rq&}EE3`w%_};&IOxq9x4L!aeMt4aS2M42>vOxIt~XCL zIK-v7zm?EZH)DN9clgyr=&}6mC!N{{8NIP-4*fx^j*=ht4(8gfVBwkxmu;k?28g@( zozD}BmbCmgPY!)z-Ik5h$Uqnew4+~r`@al;!pmVuF>#55~Hl7D13tT_up;( zcH?cdZ_LgQ(IB4LpMeP)7m_3IFT~=h!Se4*zCk*k&Jrn?Lc~^4Evgzjr#?QJ?j~`% z^*Iv`Ti>PYA1>Z*IV$8G^I}-1%OciX8l8LK7B{UCCH(AWQeJCc5QH7?G^)k!Tg7>* z?KbPxGribcy2X@~Pq=ci9fk1f2*1;D&80ullmZz!@3T_RPndB=69kmt?*R*hhLA`y zR)AbUH7y+nP(jPLpeq&)NEL6;;*HsnOM&obN1j76gDVux7^QW93TT+QwQK&pIdB*n zrqk*1n|$?A03`@sy8b2#v--sLr!-c9twdYkMD)BL^!ww!15{DU%oR`g8^=U4k_@ex!FyI3yM*Xi69V}$j3Q_b zratg3j2wh;hniun_8N>CvAMHHLnl2245`)J5I;Zd2~2;Gv2USe^NVcrbHPb9OJ3h!}z! z@_*bfE6=kww~v)n>~Y^cBs73~%>u6|v3m^xe0;g)+N^wvkwWXF_|gikv0$EE54=#reu!;3y$n<65r zch!i>(HA7?uP&v|9aaJy^A}ocRr78<|I=!Z9*jihD=*o12QS#4INnrYF%*fX3DzAv z^v~bg32aV>X->Gq9nu<{H3eBMcy?SnU+67p@&L`t(MD%rW&HJ%f`AYJSYTOA+~Id- zu_0h*VIe`XvyxYX!}v2@o;Q|LQN_jNcVRGF>8HWh&tEbb*U}?Q5A;QSM^4BL|3K*9 E0Y_L$3jhEB literal 0 HcmV?d00001 diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java index 8773b03f9f308..b3b796ab6d84c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java @@ -122,12 +122,13 @@ public void discardAllEvents() { eventCounter.reset(); } - public void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { pendingQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isEventFromPipe( + ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) { if (((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName())) { eventCounter.decreaseEventCount(event); } @@ -162,4 +163,14 @@ protected void checkBeforeOffer(final E event) { ((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName()); } } + + protected static boolean isEventFromPipe( + final EnrichedEvent event, + final String pipeNameToDrop, + final long creationTimeToDrop, + final int regionId) { + return pipeNameToDrop.equals(event.getPipeName()) + && creationTimeToDrop == event.getCreationTime() + && regionId == event.getRegionId(); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index 88a8b71775f45..7de06376b6d2b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -641,7 +641,8 @@ public void rateLimitIfNeeded( * When a pipe is dropped, the connector maybe reused and will not be closed. We need to discard * its batched or queued events in the output pipe connector. */ - public synchronized void discardEventsOfPipe(final String pipeName, final int regionId) { + public synchronized void discardEventsOfPipe( + final String pipeName, final long creationTime, final int regionId) { // Do nothing by default } From 543efacc830755c006836575e65fb2e1ab7ed09d Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 27 Apr 2026 12:22:13 +0800 Subject: [PATCH 02/13] wd --- .../task/subtask/sink/PipeSinkSubtask.java | 5 +- .../websocket/WebSocketConnectorServer.java | 143 ++++++++++++++---- .../protocol/websocket/WebSocketSink.java | 11 +- .../subtask/sink/PipeSinkSubtaskTest.java | 61 ++++++++ .../iotdb/db/pipe/sink/PipeSinkTest.java | 42 +++++ .../commons/pipe/sink/protocol/IoTDBSink.java | 2 +- .../PipeConnectorWithEventDiscard.java | 25 +++ 7 files changed, 259 insertions(+), 30 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index f6008822e6194..dd31d5b5fd89b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; @@ -248,8 +249,8 @@ public void discardEventsOfPipe( decreaseHighPriorityTaskCount(); } - if (outputPipeSink instanceof IoTDBSink) { - ((IoTDBSink) outputPipeSink) + if (outputPipeSink instanceof PipeConnectorWithEventDiscard) { + ((PipeConnectorWithEventDiscard) outputPipeSink) .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java index 491d40a1e4537..6b402b2702618 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java @@ -39,6 +39,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -56,6 +57,7 @@ public class WebSocketConnectorServer extends WebSocketServer { // Map>> private final ConcurrentHashMap> eventsWaitingForAck = new ConcurrentHashMap<>(); + private final Set droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); private final BidiMap router = new DualTreeBidiMap(null, Comparator.comparing(Object::hashCode)) {}; @@ -97,13 +99,8 @@ public synchronized void unregister(WebSocketSink connector) { eventWrappers = new ArrayList<>(eventTransferQueue); eventTransferQueue.clear(); } - eventWrappers.forEach( - (eventWrapper) -> { - if (eventWrapper.event instanceof EnrichedEvent) { - ((EnrichedEvent) eventWrapper.event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } - }); + eventWrappers.forEach(eventWrapper -> discardEvent(eventWrapper.event)); + eventWrappers.clear(); synchronized (eventTransferQueue) { eventTransferQueue.notifyAll(); } @@ -113,13 +110,35 @@ public synchronized void unregister(WebSocketSink connector) { if (eventsWaitingForAck.containsKey(pipeName)) { eventsWaitingForAck .remove(pipeName) - .forEach( - (eventId, eventWrapper) -> { - if (eventWrapper.event instanceof EnrichedEvent) { - ((EnrichedEvent) eventWrapper.event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } - }); + .forEach((eventId, eventWrapper) -> discardEvent(eventWrapper.event)); + } + + droppedPipeTaskKeys.removeIf(key -> key.startsWith(pipeName + "_")); + } + + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + droppedPipeTaskKeys.add(generatePipeTaskKey(pipeNameToDrop, creationTimeToDrop, regionId)); + + final PriorityBlockingQueue eventTransferQueue = + eventsWaitingForTransfer.get(pipeNameToDrop); + if (eventTransferQueue != null) { + eventTransferQueue.removeIf( + eventWrapper -> + discardIfMatches( + eventWrapper.event, pipeNameToDrop, creationTimeToDrop, regionId)); + synchronized (eventTransferQueue) { + eventTransferQueue.notifyAll(); + } + } + + final ConcurrentHashMap eventId2EventMap = + eventsWaitingForAck.get(pipeNameToDrop); + if (eventId2EventMap != null) { + eventId2EventMap.entrySet().removeIf( + entry -> + discardIfMatches( + entry.getValue().event, pipeNameToDrop, creationTimeToDrop, regionId)); } } @@ -300,21 +319,24 @@ public void onError(WebSocket webSocket, Exception e) { } public void addEvent(Event event, WebSocketSink connector) { + if (isDroppedPipe(event)) { + discardEvent(event); + return; + } + + final String pipeName = connector.getPipeName(); final PriorityBlockingQueue queue = - eventsWaitingForTransfer.get(connector.getPipeName()); + eventsWaitingForTransfer.get(pipeName); if (queue == null) { LOGGER.warn("The pipe {} was dropped so the event {} will be dropped.", connector, event); - if (event instanceof EnrichedEvent) { - ((EnrichedEvent) event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } + discardEvent(event); return; } if (queue.size() >= 5) { synchronized (queue) { - while (queue.size() >= 5) { + while (queue.size() >= 5 && isQueueAvailable(pipeName, queue) && !isDroppedPipe(event)) { try { queue.wait(); } catch (InterruptedException e) { @@ -323,15 +345,27 @@ public void addEvent(Event event, WebSocketSink connector) { } } + if (!isQueueAvailable(pipeName, queue) || isDroppedPipe(event)) { + discardEvent(event); + return; + } + queue.put( new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event)); return; } } + if (!isQueueAvailable(pipeName, queue) || isDroppedPipe(event)) { + discardEvent(event); + return; + } + synchronized (queue) { queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event)); } + + queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event)); } private class TransferThread extends Thread { @@ -377,6 +411,11 @@ private void transfer(String pipeName, EventWaitingForTransfer element) { final WebSocketSink connector = element.connector; try { + if (isDroppedPipe(event)) { + discardEvent(event); + return; + } + ByteBuffer tabletBuffer; if (event instanceof PipeRawTabletInsertionEvent) { tabletBuffer = ((PipeRawTabletInsertionEvent) event).convertToTablet().serialize(); @@ -387,7 +426,11 @@ private void transfer(String pipeName, EventWaitingForTransfer element) { } if (tabletBuffer == null) { - connector.commit((EnrichedEvent) event); + if (isDroppedPipe(event)) { + discardEvent(event); + } else { + connector.commit((EnrichedEvent) event); + } return; } @@ -398,11 +441,17 @@ private void transfer(String pipeName, EventWaitingForTransfer element) { server.broadcast(payload, Collections.singletonList(router.get(pipeName))); + if (isDroppedPipe(event)) { + discardEvent(event); + return; + } + final ConcurrentHashMap eventId2EventMap = eventsWaitingForAck.get(pipeName); if (eventId2EventMap == null) { LOGGER.warn( "The pipe {} was dropped so the event ack {} will be ignored.", pipeName, eventId); + discardEvent(event); return; } eventId2EventMap.put(eventId, new EventWaitingForAck(connector, event)); @@ -410,13 +459,10 @@ private void transfer(String pipeName, EventWaitingForTransfer element) { synchronized (server) { final PriorityBlockingQueue queue = eventsWaitingForTransfer.get(pipeName); - if (queue == null) { + if (queue == null || isDroppedPipe(event)) { LOGGER.warn( "The pipe {} was dropped so the event {} will be dropped.", pipeName, eventId); - if (event instanceof EnrichedEvent) { - ((EnrichedEvent) event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } + discardEvent(event); return; } @@ -465,4 +511,49 @@ public EventWaitingForAck(WebSocketSink connector, Event event) { this.event = event; } } + + private boolean discardIfMatches( + final Event event, + final String pipeNameToDrop, + final long creationTimeToDrop, + final int regionId) { + if (!(event instanceof EnrichedEvent)) { + return false; + } + + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + if (!pipeNameToDrop.equals(enrichedEvent.getPipeName()) + || creationTimeToDrop != enrichedEvent.getCreationTime() + || regionId != enrichedEvent.getRegionId()) { + return false; + } + + discardEvent(enrichedEvent); + return true; + } + + private boolean isDroppedPipe(final Event event) { + return event instanceof EnrichedEvent + && droppedPipeTaskKeys.contains( + generatePipeTaskKey( + ((EnrichedEvent) event).getPipeName(), + ((EnrichedEvent) event).getCreationTime(), + ((EnrichedEvent) event).getRegionId())); + } + + private boolean isQueueAvailable( + final String pipeName, final PriorityBlockingQueue queue) { + return eventsWaitingForTransfer.get(pipeName) == queue; + } + + private static String generatePipeTaskKey( + final String pipeName, final long creationTime, final int regionId) { + return pipeName + "_" + creationTime + "_" + regionId; + } + + private void discardEvent(final Event event) { + if (event instanceof EnrichedEvent) { + ((EnrichedEvent) event).clearReferenceCount(WebSocketSink.class.getName()); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java index 4e593e673fd56..dadeee8053d04 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.websocket; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; @@ -41,7 +42,7 @@ import java.util.Optional; @TreeModel -public class WebSocketSink implements PipeConnector { +public class WebSocketSink implements PipeConnector, PipeConnectorWithEventDiscard { private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketSink.class); @@ -166,6 +167,14 @@ public void close() throws Exception { } } + @Override + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + if (server != null) { + server.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + } + } + public void commit(EnrichedEvent enrichedEvent) { Optional.ofNullable(enrichedEvent) .ifPresent(event -> event.decreaseReferenceCount(WebSocketSink.class.getName(), true)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java new file mode 100644 index 0000000000000..ddfc699721b92 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent.task.subtask.sink; + +import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; +import org.apache.iotdb.pipe.api.PipeConnector; + +import org.junit.Test; +import org.mockito.Mockito; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.withSettings; + +public class PipeSinkSubtaskTest { + + @Test + public void testDiscardEventsOfPipeDelegatesToConnector() { + final PipeConnector connector = + mock( + PipeConnector.class, + withSettings().extraInterfaces(PipeConnectorWithEventDiscard.class)); + final UnboundedBlockingPendingQueue pendingQueue = mock(UnboundedBlockingPendingQueue.class); + + final PipeSinkSubtask subtask = + Mockito.spy( + new PipeSinkSubtask( + "PipeSinkSubtaskTest", + System.currentTimeMillis(), + "data_test", + 0, + (UnboundedBlockingPendingQueue) pendingQueue, + connector)); + + try { + subtask.discardEventsOfPipe("pipe", 1L, 1); + + verify((PipeConnectorWithEventDiscard) connector).discardEventsOfPipe("pipe", 1L, 1); + } finally { + subtask.close(); + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index 7dfd0446038c0..3ad262130ff9e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -29,6 +29,8 @@ import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; +import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketConnectorServer; +import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketSink; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -39,6 +41,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.security.SecureRandom; import java.util.Arrays; @@ -146,6 +149,45 @@ public void testAsyncSinkDropDoesNotRequeueDroppedPipeEvents() throws Exception } } + @Test + public void testWebSocketSinkDropDoesNotRequeueDroppedPipeEvents() { + final String pipeName = "pipe_" + System.nanoTime(); + final WebSocketConnectorServer server = WebSocketConnectorServer.getOrCreateInstance(0); + final WebSocketSink connector = Mockito.mock(WebSocketSink.class); + Mockito.when(connector.getPipeName()).thenReturn(pipeName); + + server.register(connector); + try { + final PipeRawTabletInsertionEvent droppedEvent = + createPipeRawTabletInsertionEvent(pipeName, 1L, 1); + droppedEvent.increaseReferenceCount(WebSocketSink.class.getName()); + droppedEvent.setCommitterKeyAndCommitId(new CommitterKey(pipeName, 1L, 1, -1), 1L); + server.addEvent(droppedEvent, connector); + + server.discardEventsOfPipe(pipeName, 1L, 1); + Assert.assertTrue(droppedEvent.isReleased()); + + final PipeRawTabletInsertionEvent recreatedDroppedPipeEvent = + createPipeRawTabletInsertionEvent(pipeName, 1L, 1); + recreatedDroppedPipeEvent.increaseReferenceCount(WebSocketSink.class.getName()); + recreatedDroppedPipeEvent.setCommitterKeyAndCommitId( + new CommitterKey(pipeName, 1L, 1, -1), 2L); + server.addEvent(recreatedDroppedPipeEvent, connector); + + Assert.assertTrue(recreatedDroppedPipeEvent.isReleased()); + + final PipeRawTabletInsertionEvent recreatedPipeEvent = + createPipeRawTabletInsertionEvent(pipeName, 2L, 1); + recreatedPipeEvent.increaseReferenceCount(WebSocketSink.class.getName()); + recreatedPipeEvent.setCommitterKeyAndCommitId(new CommitterKey(pipeName, 2L, 1, -1), 3L); + server.addEvent(recreatedPipeEvent, connector); + + Assert.assertFalse(recreatedPipeEvent.isReleased()); + } finally { + server.unregister(connector); + } + } + @Test public void testOpcUaSink() { final List schemaList = diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index 7de06376b6d2b..ebb3d90bc8cab 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -148,7 +148,7 @@ @TreeModel @TableModel -public abstract class IoTDBSink implements PipeConnector { +public abstract class IoTDBSink implements PipeConnector, PipeConnectorWithEventDiscard { private static final String PARSE_URL_ERROR_FORMATTER = "Exception occurred while parsing node urls from target servers: {}"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java new file mode 100644 index 0000000000000..ab4dbcf90750f --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.sink.protocol; + +public interface PipeConnectorWithEventDiscard { + + void discardEventsOfPipe(String pipeName, long creationTime, int regionId); +} From a3b7ea0bcd4173a562ce1f9d50c5728f79171db6 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 27 Apr 2026 14:28:36 +0800 Subject: [PATCH 03/13] drop --- .../task/connection/PipeEventCollector.java | 18 ++-- .../PipeRealtimePriorityBlockingQueue.java | 4 +- .../connection/PipeEventCollectorTest.java | 87 +++++++++++++++++++ .../task/connection/BlockingPendingQueue.java | 40 ++++++++- 4 files changed, 139 insertions(+), 10 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index a22848ee3baed..b000c5d2366fc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -220,7 +220,8 @@ private void parseAndCollectEvent(final PipeDeleteDataNodeEvent deleteDataEvent) private void collectEvent(final Event event) { if (event instanceof EnrichedEvent) { - if (!((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName())) { + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + if (!enrichedEvent.increaseReferenceCount(PipeEventCollector.class.getName())) { LOGGER.warn("PipeEventCollector: The event {} is already released, skipping it.", event); isFailedToIncreaseReferenceCount = true; return; @@ -228,18 +229,25 @@ private void collectEvent(final Event event) { // Assign a commit id for this event in order to report progress in order. PipeEventCommitManager.getInstance() - .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, creationTime, regionId); + .enrichWithCommitterKeyAndCommitId(enrichedEvent, creationTime, regionId); // Assign a rebootTime for iotConsensusV2 - ((EnrichedEvent) event).setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes()); + enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes()); + + if (enrichedEvent.getPipeName() != null + && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), creationTime, regionId)) { + enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName()); + return; + } } if (event instanceof PipeHeartbeatEvent) { ((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue); } - pendingQueue.offer(event); - collectInvocationCount.incrementAndGet(); + if (pendingQueue.offer(event)) { + collectInvocationCount.incrementAndGet(); + } } public void resetFlags() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java index 3d553f73595f4..f972bba0e6ede 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java @@ -73,7 +73,9 @@ public PipeRealtimePriorityBlockingQueue() { @Override public boolean offer(final Event event) { - checkBeforeOffer(event); + if (!checkBeforeOffer(event)) { + return false; + } if (event instanceof TsFileInsertionEvent) { tsfileInsertEventDeque.add((TsFileInsertionEvent) event); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java new file mode 100644 index 0000000000000..bbd5e0b5e3d1a --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent.task.connection; + +import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeRealtimePriorityBlockingQueue; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter; +import org.apache.iotdb.pipe.api.event.Event; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class PipeEventCollectorTest { + + @Test + public void testCollectorDoesNotOfferEventsOfDroppedPipeToUnboundedPendingQueue() { + verifyCollectorDoesNotOfferEventsOfDroppedPipe( + new UnboundedBlockingPendingQueue<>(new PipeDataRegionEventCounter())); + } + + @Test + public void testCollectorDoesNotOfferEventsOfDroppedPipeToRealtimePendingQueue() { + verifyCollectorDoesNotOfferEventsOfDroppedPipe(new PipeRealtimePriorityBlockingQueue()); + } + + private void verifyCollectorDoesNotOfferEventsOfDroppedPipe( + final UnboundedBlockingPendingQueue pendingQueue) { + pendingQueue.discardEventsOfPipe("pipe", 1L, 1); + + final PipeEventCollector droppedPipeCollector = + new PipeEventCollector(pendingQueue, 1L, 1, false, false, false); + final PipeRawTabletInsertionEvent droppedPipeEvent = + createPipeRawTabletInsertionEvent("pipe", 1L); + droppedPipeCollector.collect(droppedPipeEvent); + + Assert.assertTrue(droppedPipeEvent.isReleased()); + Assert.assertEquals(0, pendingQueue.size()); + + final PipeEventCollector recreatedPipeCollector = + new PipeEventCollector(pendingQueue, 2L, 1, false, false, false); + final PipeRawTabletInsertionEvent recreatedPipeEvent = + createPipeRawTabletInsertionEvent("pipe", 2L); + recreatedPipeCollector.collect(recreatedPipeEvent); + + Assert.assertFalse(recreatedPipeEvent.isReleased()); + Assert.assertEquals(1, pendingQueue.size()); + + pendingQueue.discardAllEvents(); + Assert.assertTrue(recreatedPipeEvent.isReleased()); + } + + private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent( + final String pipeName, final long creationTime) { + final List schemaList = + Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64)); + final Tablet tablet = new Tablet("root.db.d1", schemaList, 1); + tablet.addTimestamp(0, 1L); + tablet.addValue("s1", 0, 1L); + return new PipeRawTabletInsertionEvent( + false, "root.db", "db", "root.db", tablet, false, pipeName, creationTime, null, null, false); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java index b3b796ab6d84c..7080a2fe6f9c4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java @@ -28,8 +28,10 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Set; import java.util.function.Consumer; public abstract class BlockingPendingQueue { @@ -43,6 +45,7 @@ public abstract class BlockingPendingQueue { protected final PipeEventCounter eventCounter; protected final AtomicBoolean isClosed = new AtomicBoolean(false); + protected final Set droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); protected BlockingPendingQueue( final BlockingQueue pendingQueue, final PipeEventCounter eventCounter) { @@ -51,7 +54,10 @@ protected BlockingPendingQueue( } public boolean offer(final E event) { - checkBeforeOffer(event); + if (!checkBeforeOffer(event)) { + return false; + } + final boolean offered = pendingQueue.offer(event); if (offered) { eventCounter.increaseEventCount(event); @@ -60,7 +66,9 @@ public boolean offer(final E event) { } public boolean put(final E event) { - checkBeforeOffer(event); + if (!checkBeforeOffer(event)) { + return false; + } try { pendingQueue.put(event); eventCounter.increaseEventCount(event); @@ -101,6 +109,7 @@ public void clear() { isClosed.set(true); pendingQueue.clear(); eventCounter.reset(); + droppedPipeTaskKeys.clear(); } /** DO NOT FORGET to set eventCounter to new value after invoking this method. */ @@ -120,10 +129,12 @@ public void discardAllEvents() { return true; }); eventCounter.reset(); + droppedPipeTaskKeys.clear(); } public void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + droppedPipeTaskKeys.add(generatePipeTaskKey(pipeNameToDrop, creationTimeToDrop, regionId)); pendingQueue.removeIf( event -> { if (event instanceof EnrichedEvent @@ -158,10 +169,12 @@ public int getPipeHeartbeatEventCount() { return eventCounter.getPipeHeartbeatEventCount(); } - protected void checkBeforeOffer(final E event) { - if (isClosed.get() && event instanceof EnrichedEvent) { + protected boolean checkBeforeOffer(final E event) { + final boolean shouldReject = isClosed.get() || isEventFromDroppedPipe(event); + if (shouldReject && event instanceof EnrichedEvent) { ((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName()); } + return !shouldReject; } protected static boolean isEventFromPipe( @@ -173,4 +186,23 @@ protected static boolean isEventFromPipe( && creationTimeToDrop == event.getCreationTime() && regionId == event.getRegionId(); } + + protected boolean isEventFromDroppedPipe(final E event) { + return event instanceof EnrichedEvent + && ((EnrichedEvent) event).getPipeName() != null + && isPipeDropped( + ((EnrichedEvent) event).getPipeName(), + ((EnrichedEvent) event).getCreationTime(), + ((EnrichedEvent) event).getRegionId()); + } + + public boolean isPipeDropped( + final String pipeName, final long creationTime, final int regionId) { + return droppedPipeTaskKeys.contains(generatePipeTaskKey(pipeName, creationTime, regionId)); + } + + private static String generatePipeTaskKey( + final String pipeName, final long creationTime, final int regionId) { + return pipeName + "_" + creationTime + "_" + regionId; + } } From d8114969d0a7601c3697f09cfedc74523c8fcfe0 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 27 Apr 2026 18:50:05 +0800 Subject: [PATCH 04/13] fix --- .../agent/task/subtask/sink/PipeSinkSubtask.java | 2 +- .../batch/PipeTransferBatchReqBuilder.java | 5 +++-- .../websocket/WebSocketConnectorServer.java | 13 +++++++------ .../pipe/sink/protocol/websocket/WebSocketSink.java | 2 +- .../subtask/SubscriptionSinkSubtaskLifeCycle.java | 4 +--- .../task/connection/PipeEventCollectorTest.java | 12 +++++++++++- .../org/apache/iotdb/db/pipe/sink/PipeSinkTest.java | 12 +++++++++++- .../agent/task/connection/BlockingPendingQueue.java | 5 ++--- 8 files changed, 37 insertions(+), 18 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index dd31d5b5fd89b..a11a1a68f0ce4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -25,8 +25,8 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; -import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java index 5bb76ae40e582..49d9d8cea09be 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java @@ -199,8 +199,9 @@ public boolean isEmpty() { public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); - endPointToBatch.values().forEach( - batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId)); + endPointToBatch + .values() + .forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId)); } public int size() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java index 6b402b2702618..4706eb0275a5d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java @@ -125,8 +125,7 @@ public synchronized void discardEventsOfPipe( if (eventTransferQueue != null) { eventTransferQueue.removeIf( eventWrapper -> - discardIfMatches( - eventWrapper.event, pipeNameToDrop, creationTimeToDrop, regionId)); + discardIfMatches(eventWrapper.event, pipeNameToDrop, creationTimeToDrop, regionId)); synchronized (eventTransferQueue) { eventTransferQueue.notifyAll(); } @@ -135,10 +134,12 @@ public synchronized void discardEventsOfPipe( final ConcurrentHashMap eventId2EventMap = eventsWaitingForAck.get(pipeNameToDrop); if (eventId2EventMap != null) { - eventId2EventMap.entrySet().removeIf( - entry -> - discardIfMatches( - entry.getValue().event, pipeNameToDrop, creationTimeToDrop, regionId)); + eventId2EventMap + .entrySet() + .removeIf( + entry -> + discardIfMatches( + entry.getValue().event, pipeNameToDrop, creationTimeToDrop, regionId)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java index dadeee8053d04..e8a38d63e6e42 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java @@ -19,9 +19,9 @@ package org.apache.iotdb.db.pipe.sink.protocol.websocket; -import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java index 390a6d58018ea..af871feaa7e0d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java @@ -64,9 +64,7 @@ public synchronized void register() { @Override public synchronized boolean deregister( - final String pipeNameToDeregister, - final long creationTimeToDeregister, - final int regionId) { + final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java index bbd5e0b5e3d1a..029a722c8a95c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java @@ -82,6 +82,16 @@ private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent( tablet.addTimestamp(0, 1L); tablet.addValue("s1", 0, 1L); return new PipeRawTabletInsertionEvent( - false, "root.db", "db", "root.db", tablet, false, pipeName, creationTime, null, null, false); + false, + "root.db", + "db", + "root.db", + tablet, + false, + pipeName, + creationTime, + null, + null, + false); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index 3ad262130ff9e..cf311639ee9a2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -287,6 +287,16 @@ private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent( tablet.addTimestamp(0, 1L); tablet.addValue("s1", 0, 1L); return new PipeRawTabletInsertionEvent( - false, "root.db", "db", "root.db", tablet, false, pipeName, creationTime, null, null, false); + false, + "root.db", + "db", + "root.db", + tablet, + false, + pipeName, + creationTime, + null, + null, + false); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java index 7080a2fe6f9c4..adbc79d500476 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java @@ -27,11 +27,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.Set; import java.util.function.Consumer; public abstract class BlockingPendingQueue { @@ -196,8 +196,7 @@ && isPipeDropped( ((EnrichedEvent) event).getRegionId()); } - public boolean isPipeDropped( - final String pipeName, final long creationTime, final int regionId) { + public boolean isPipeDropped(final String pipeName, final long creationTime, final int regionId) { return droppedPipeTaskKeys.contains(generatePipeTaskKey(pipeName, creationTime, regionId)); } From fb861bddca3d77be5c92b38414868c2e210bf2f8 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 27 Apr 2026 18:56:35 +0800 Subject: [PATCH 05/13] local --- .../websocket/WebSocketConnectorServer.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java index 4706eb0275a5d..8b1251639082a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java @@ -57,7 +57,7 @@ public class WebSocketConnectorServer extends WebSocketServer { // Map>> private final ConcurrentHashMap> eventsWaitingForAck = new ConcurrentHashMap<>(); - private final Set droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); + private final Set> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); private final BidiMap router = new DualTreeBidiMap(null, Comparator.comparing(Object::hashCode)) {}; @@ -113,12 +113,12 @@ public synchronized void unregister(WebSocketSink connector) { .forEach((eventId, eventWrapper) -> discardEvent(eventWrapper.event)); } - droppedPipeTaskKeys.removeIf(key -> key.startsWith(pipeName + "_")); + droppedPipeTaskKeys.removeIf(key -> key.getLeft().equals(pipeName)); } public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - droppedPipeTaskKeys.add(generatePipeTaskKey(pipeNameToDrop, creationTimeToDrop, regionId)); + droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); final PriorityBlockingQueue eventTransferQueue = eventsWaitingForTransfer.get(pipeNameToDrop); @@ -536,7 +536,7 @@ private boolean discardIfMatches( private boolean isDroppedPipe(final Event event) { return event instanceof EnrichedEvent && droppedPipeTaskKeys.contains( - generatePipeTaskKey( + new Triple<>( ((EnrichedEvent) event).getPipeName(), ((EnrichedEvent) event).getCreationTime(), ((EnrichedEvent) event).getRegionId())); @@ -547,11 +547,6 @@ private boolean isQueueAvailable( return eventsWaitingForTransfer.get(pipeName) == queue; } - private static String generatePipeTaskKey( - final String pipeName, final long creationTime, final int regionId) { - return pipeName + "_" + creationTime + "_" + regionId; - } - private void discardEvent(final Event event) { if (event instanceof EnrichedEvent) { ((EnrichedEvent) event).clearReferenceCount(WebSocketSink.class.getName()); From 23f978273e82663f0dc1a9b421ed39127884072d Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 27 Apr 2026 16:52:49 +0800 Subject: [PATCH 06/13] triple --- .../task/connection/BlockingPendingQueue.java | 13 +++--- .../commons/pipe/datastructure/Triple.java | 44 +++++++++++++++++++ 2 files changed, 49 insertions(+), 8 deletions(-) create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java index adbc79d500476..15cd64941e0ac 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.pipe.agent.task.connection; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.datastructure.Triple; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.metric.PipeEventCounter; import org.apache.iotdb.pipe.api.event.Event; @@ -45,7 +46,8 @@ public abstract class BlockingPendingQueue { protected final PipeEventCounter eventCounter; protected final AtomicBoolean isClosed = new AtomicBoolean(false); - protected final Set droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); + protected final Set> droppedPipeTaskKeys = + ConcurrentHashMap.newKeySet(); protected BlockingPendingQueue( final BlockingQueue pendingQueue, final PipeEventCounter eventCounter) { @@ -134,7 +136,7 @@ public void discardAllEvents() { public void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - droppedPipeTaskKeys.add(generatePipeTaskKey(pipeNameToDrop, creationTimeToDrop, regionId)); + droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); pendingQueue.removeIf( event -> { if (event instanceof EnrichedEvent @@ -197,11 +199,6 @@ && isPipeDropped( } public boolean isPipeDropped(final String pipeName, final long creationTime, final int regionId) { - return droppedPipeTaskKeys.contains(generatePipeTaskKey(pipeName, creationTime, regionId)); - } - - private static String generatePipeTaskKey( - final String pipeName, final long creationTime, final int regionId) { - return pipeName + "_" + creationTime + "_" + regionId; + return droppedPipeTaskKeys.contains(new Triple<>(pipeName, creationTime, regionId)); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java new file mode 100644 index 0000000000000..43db3cb896bcd --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.datastructure; + +public class Triple { + public final L first; + public final M second; + public final R third; + + public Triple(L first, M second, R third) { + this.first = first; + this.second = second; + this.third = third; + } + + public L getFirst() { + return first; + } + + public M getSecond() { + return second; + } + + public R getThird() { + return third; + } +} From abc546b699aeee3f686a698e1ba0179a2c988016 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 27 Apr 2026 18:58:59 +0800 Subject: [PATCH 07/13] by --- .../pipe/sink/protocol/websocket/WebSocketConnectorServer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java index 8b1251639082a..ef210b7ed4ed5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.external.collections4.BidiMap; import org.apache.iotdb.commons.external.collections4.bidimap.DualTreeBidiMap; +import org.apache.iotdb.commons.pipe.datastructure.Triple; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.pipe.api.event.Event; @@ -113,7 +114,7 @@ public synchronized void unregister(WebSocketSink connector) { .forEach((eventId, eventWrapper) -> discardEvent(eventWrapper.event)); } - droppedPipeTaskKeys.removeIf(key -> key.getLeft().equals(pipeName)); + droppedPipeTaskKeys.removeIf(key -> key.getFirst().equals(pipeName)); } public synchronized void discardEventsOfPipe( From 0cf1afff42a6a2f4b7e411538ee01ed3aabafcd2 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 27 Apr 2026 18:59:13 +0800 Subject: [PATCH 08/13] spt --- .../pipe/sink/protocol/websocket/WebSocketConnectorServer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java index ef210b7ed4ed5..9fe4d7f944ad4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java @@ -58,7 +58,8 @@ public class WebSocketConnectorServer extends WebSocketServer { // Map>> private final ConcurrentHashMap> eventsWaitingForAck = new ConcurrentHashMap<>(); - private final Set> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); + private final Set> droppedPipeTaskKeys = + ConcurrentHashMap.newKeySet(); private final BidiMap router = new DualTreeBidiMap(null, Comparator.comparing(Object::hashCode)) {}; From 28f54c9266aafcd9eaa01ccdd3857f3982ea5e1f Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 27 Apr 2026 17:16:48 +0800 Subject: [PATCH 09/13] bug-fix --- .../commons/pipe/datastructure/Triple.java | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java index 43db3cb896bcd..275ccb20ea3ac 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java @@ -19,12 +19,14 @@ package org.apache.iotdb.commons.pipe.datastructure; +import java.util.Objects; + public class Triple { public final L first; public final M second; public final R third; - public Triple(L first, M second, R third) { + public Triple(final L first, final M second, final R third) { this.first = first; this.second = second; this.third = third; @@ -41,4 +43,21 @@ public M getSecond() { public R getThird() { return third; } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Triple triple = (Triple) o; + return first.equals(triple.first) && second.equals(triple.second) && third.equals(triple.third); + } + + @Override + public int hashCode() { + return Objects.hash(first, second, third); + } } From 967995cb7244421f847f3e51c5f20caabfdf503e Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 27 Apr 2026 19:05:24 +0800 Subject: [PATCH 10/13] no-pipe-task-key --- .../thrift/async/IoTDBDataRegionAsyncSink.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index c00e36e0c7c40..1aded6cd84733 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.datastructure.Triple; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; @@ -127,7 +128,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Map pendingHandlers = new ConcurrentHashMap<>(); - private final Set droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); + private final Set> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); private boolean enableSendTsFileLimit; private volatile boolean isConnectionException; @@ -739,7 +740,7 @@ public boolean isEnableSendTsFileLimit() { @Override public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { - droppedPipeTaskKeys.add(generatePipeTaskKey(pipeNameToDrop, creationTimeToDrop, regionId)); + droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) { tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); @@ -863,7 +864,7 @@ public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) { private boolean isDroppedPipe(final EnrichedEvent event) { return droppedPipeTaskKeys.contains( - generatePipeTaskKey(event.getPipeName(), event.getCreationTime(), event.getRegionId())); + new Triple<>(event.getPipeName(), event.getCreationTime(), event.getRegionId())); } private static boolean isDroppedPipe( @@ -876,11 +877,6 @@ private static boolean isDroppedPipe( && regionId == event.getRegionId(); } - private static String generatePipeTaskKey( - final String pipeName, final long creationTime, final int regionId) { - return pipeName + "_" + creationTime + "_" + regionId; - } - @Override public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) { if (tabletBatchBuilder != null) { From f230054dffa493df1233a74e9200699ff4e9fbb4 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 27 Apr 2026 19:09:03 +0800 Subject: [PATCH 11/13] Update IoTDBDataRegionAsyncSink.java --- .../sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 1aded6cd84733..14cb42f7a0795 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -128,7 +128,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Map pendingHandlers = new ConcurrentHashMap<>(); - private final Set> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); + private final Set> droppedPipeTaskKeys = + ConcurrentHashMap.newKeySet(); private boolean enableSendTsFileLimit; private volatile boolean isConnectionException; From f0f416b54f8a92008a89d13a09f8c93b49bf2f6f Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 29 Apr 2026 11:50:42 +0800 Subject: [PATCH 12/13] triple --- .../sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java | 2 ++ .../pipe/sink/protocol/websocket/WebSocketConnectorServer.java | 2 ++ .../pipe/agent/task/connection/BlockingPendingQueue.java | 2 ++ 3 files changed, 6 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 14cb42f7a0795..554c6c43dfbb0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -128,6 +128,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Map pendingHandlers = new ConcurrentHashMap<>(); + + // Pipe name, creation time, region id private final Set> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java index 9fe4d7f944ad4..735e68a7a14a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java @@ -58,6 +58,8 @@ public class WebSocketConnectorServer extends WebSocketServer { // Map>> private final ConcurrentHashMap> eventsWaitingForAck = new ConcurrentHashMap<>(); + + // Pipe name, creation time, region id private final Set> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java index 15cd64941e0ac..8d920121363a3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java @@ -46,6 +46,8 @@ public abstract class BlockingPendingQueue { protected final PipeEventCounter eventCounter; protected final AtomicBoolean isClosed = new AtomicBoolean(false); + + // Pipe name, creation time, region id protected final Set> droppedPipeTaskKeys = ConcurrentHashMap.newKeySet(); From 236bb1beb10196b22a389ce905535aec4e7dc8e9 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 6 May 2026 11:01:46 +0800 Subject: [PATCH 13/13] Fix --- .../websocket/WebSocketConnectorServer.java | 2 -- .../opc_security/8443_12686/iotdb-server.pfx | Bin 2942 -> 0 bytes 2 files changed, 2 deletions(-) delete mode 100644 iotdb-core/datanode/src/test/resources/opc_security/8443_12686/iotdb-server.pfx diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java index 735e68a7a14a9..516f9bbc437dd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java @@ -369,8 +369,6 @@ public void addEvent(Event event, WebSocketSink connector) { synchronized (queue) { queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event)); } - - queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event)); } private class TransferThread extends Thread { diff --git a/iotdb-core/datanode/src/test/resources/opc_security/8443_12686/iotdb-server.pfx b/iotdb-core/datanode/src/test/resources/opc_security/8443_12686/iotdb-server.pfx deleted file mode 100644 index b36d23da368bde99bee29e7bad02abd68127d7ed..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2942 zcma);X*d*&7RP6%Suo?ZWG5NBLB_r|Mr5*GyI%7WS<05(*q0eXr5B+H(b#v{X|kv6 zYX}*k?4m3w%ypNi_ufzUemLhm=YP)seEdBJO@fDjfalO8xGZ25yUk?@AQOCXRKGYBYvMnLKQbrD1l0iZ8H>Fr~$0iDl*fhaI@ zX_Eh8D3ij5`)$0IyvYFuOahP4Rmw?UFX6uPnUiAI-1bvv$4EWb7Qv4+S5~AEw&c!L zUvcJ>HEHQI=rdfaw<;*vV=K;pn~9ot!BcKtQ5dQ1?!ix1aeWYH_px!zSDcgG1Yn&1 zY{=zjJ)QXyp)fC7IWc6dd6iG-y)$C{<;j5g>5;#79LWv>c{rg@MjP0#mHFqLDlS$z zgr}KWwbd}9d?f6KUhA*5b~&@Ih+FN5i%tuJRVzD`8hK{(_H1g7FE>i}R#<&Y$Ybdc z#tK*DnV`A?F1y*=+BQ-B9~i*MZzGzJeZom}Dl65I&URHtA6;A^CDGM*h`DWV*BcGm zHfIioVV2HrD8hp_)$d_Pvz-inI4frdNN+59KJ74OnIlnJlGbhtFQyFp6>vJu-)3&Z z^I~D*v1kqpgm4%w9o7c>j$7zJ3mWwN%v;o{AyGgsIykQK8&m8$?_7rHB0jXRz5|66 zk7{=FdiN|Eb1)L>p_Ye|ThkEzEENW-ql1zoO+(e|4X?^=4#BRW`pRVyJ;rY@`EPx> zo*{85k!<*?NS@RZ*(xb-A@5m+=!_yi8}{vdSppFVg2!V8TMe)NDu4h&SSS zYI)4H4yM4e0gr40ZFw-4Apfn)JBRmr#E@P;>{Nef_C8WHdQ_eu7hasl8XCh-zOIy^ ztapNOG{9`S9i)XWXGu9yj^rk^>%zby+YF@Wz*#{9K_i$-VQ7xf|fhp3@BlQ(GJmkypv?z6(n{oNREz!5cvPw6qW(~PPh)*zj zC)x6F)x$XHgQ;05vgcJJa>J)y_8zY?Ziq3lg1h>1WtS%YaZN_`N!V^Q|b;UV=GM+ z3u!Y9u|n=OhC7DZUHVgtn3%|QY@BJHiid*D;t@gHNf&iIU-?ZG#94rwN_dPY^(v zoo7)q!0k=2jmA1|V{-6isf?LLc6}|wu~lqhc-gDc_7fRrnG-+lxl=rAr>7!Grh-xo zbINkE?CuLWhzxN*F?202YN+bIuXa+9RW36|Bvnih-r0XnfX>Jp zyDH~c91jWG-`RPiNZ|IyAckt3%`re~(&vI9*Vg!`3uU)m1(YC1%OEe*$hqkQ`a_hJ ziO2d&sjp=uFmyDgyuij#;8$0w>K^k~sieWX&@jnQaC{QLIBLPi(hmxY&^qu|T?FEj zZFJPE`=(NIigI$8W(3Ahqka;;1{v>a^WSUvFM*8IX>O z>W#I-59|LtPi;Ed*nTRSsdrwj@V!-I$YXf6(}I&Bh`ppr^odA2-)Is>ok`s)JM5|2 z>TUQaHjuhGgUnL!fMs`@3Rq&}EE3`w%_};&IOxq9x4L!aeMt4aS2M42>vOxIt~XCL zIK-v7zm?EZH)DN9clgyr=&}6mC!N{{8NIP-4*fx^j*=ht4(8gfVBwkxmu;k?28g@( zozD}BmbCmgPY!)z-Ik5h$Uqnew4+~r`@al;!pmVuF>#55~Hl7D13tT_up;( zcH?cdZ_LgQ(IB4LpMeP)7m_3IFT~=h!Se4*zCk*k&Jrn?Lc~^4Evgzjr#?QJ?j~`% z^*Iv`Ti>PYA1>Z*IV$8G^I}-1%OciX8l8LK7B{UCCH(AWQeJCc5QH7?G^)k!Tg7>* z?KbPxGribcy2X@~Pq=ci9fk1f2*1;D&80ullmZz!@3T_RPndB=69kmt?*R*hhLA`y zR)AbUH7y+nP(jPLpeq&)NEL6;;*HsnOM&obN1j76gDVux7^QW93TT+QwQK&pIdB*n zrqk*1n|$?A03`@sy8b2#v--sLr!-c9twdYkMD)BL^!ww!15{DU%oR`g8^=U4k_@ex!FyI3yM*Xi69V}$j3Q_b zratg3j2wh;hniun_8N>CvAMHHLnl2245`)J5I;Zd2~2;Gv2USe^NVcrbHPb9OJ3h!}z! z@_*bfE6=kww~v)n>~Y^cBs73~%>u6|v3m^xe0;g)+N^wvkwWXF_|gikv0$EE54=#reu!;3y$n<65r zch!i>(HA7?uP&v|9aaJy^A}ocRr78<|I=!Z9*jihD=*o12QS#4INnrYF%*fX3DzAv z^v~bg32aV>X->Gq9nu<{H3eBMcy?SnU+67p@&L`t(MD%rW&HJ%f`AYJSYTOA+~Id- zu_0h*VIe`XvyxYX!}v2@o;Q|LQN_jNcVRGF>8HWh&tEbb*U}?Q5A;QSM^4BL|3K*9 E0Y_L$3jhEB