diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index a22848ee3baed..b000c5d2366fc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -220,7 +220,8 @@ private void parseAndCollectEvent(final PipeDeleteDataNodeEvent deleteDataEvent) private void collectEvent(final Event event) { if (event instanceof EnrichedEvent) { - if (!((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName())) { + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + if (!enrichedEvent.increaseReferenceCount(PipeEventCollector.class.getName())) { LOGGER.warn("PipeEventCollector: The event {} is already released, skipping it.", event); isFailedToIncreaseReferenceCount = true; return; @@ -228,18 +229,25 @@ private void collectEvent(final Event event) { // Assign a commit id for this event in order to report progress in order. PipeEventCommitManager.getInstance() - .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, creationTime, regionId); + .enrichWithCommitterKeyAndCommitId(enrichedEvent, creationTime, regionId); // Assign a rebootTime for iotConsensusV2 - ((EnrichedEvent) event).setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes()); + enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes()); + + if (enrichedEvent.getPipeName() != null + && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), creationTime, regionId)) { + enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName()); + return; + } } if (event instanceof PipeHeartbeatEvent) { ((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue); } - pendingQueue.offer(event); - collectInvocationCount.incrementAndGet(); + if (pendingQueue.offer(event)) { + collectInvocationCount.incrementAndGet(); + } } public void resetFlags() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java index 6d227ac31fdce..f972bba0e6ede 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java @@ -73,7 +73,9 @@ public PipeRealtimePriorityBlockingQueue() { @Override public boolean offer(final Event event) { - checkBeforeOffer(event); + if (!checkBeforeOffer(event)) { + return false; + } if (event instanceof TsFileInsertionEvent) { tsfileInsertEventDeque.add((TsFileInsertionEvent) event); @@ -356,13 +358,14 @@ public void discardAllEvents() { } @Override - public void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { - super.discardEventsOfPipe(pipeNameToDrop, regionId); + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); tsfileInsertEventDeque.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isEventFromPipe( + ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) { if (((EnrichedEvent) event) .clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) { eventCounter.decreaseEventCount(event); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 6e26a76d77481..a11a1a68f0ce4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; @@ -198,9 +199,10 @@ public void close() { * When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard * its queued events in the output pipe connector. */ - public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) { + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { // Try to remove the events as much as possible - inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId); + inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); try { increaseHighPriorityTaskCount(); @@ -214,6 +216,7 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) { // will. if (lastEvent instanceof EnrichedEvent && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName()) + && creationTimeToDrop == ((EnrichedEvent) lastEvent).getCreationTime() && regionId == ((EnrichedEvent) lastEvent).getRegionId()) { // Do not clear the last event's reference counts because it may be on transferring lastEvent = null; @@ -237,6 +240,7 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) { // "nonnull" detection. if (lastExceptionEvent instanceof EnrichedEvent && pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName()) + && creationTimeToDrop == ((EnrichedEvent) lastExceptionEvent).getCreationTime() && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) { clearReferenceCountAndReleaseLastExceptionEvent(); } @@ -245,8 +249,9 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) { decreaseHighPriorityTaskCount(); } - if (outputPipeSink instanceof IoTDBSink) { - ((IoTDBSink) outputPipeSink).discardEventsOfPipe(pipeNameToDrop, regionId); + if (outputPipeSink instanceof PipeConnectorWithEventDiscard) { + ((PipeConnectorWithEventDiscard) outputPipeSink) + .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java index 35f7983075dfe..1780f5a87efa8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java @@ -92,12 +92,13 @@ public synchronized void register() { * {@link PipeSinkSubtask} should never be used again * @throws IllegalStateException if {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0 */ - public synchronized boolean deregister(final String pipeNameToDeregister, int regionId) { + public synchronized boolean deregister( + final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } - subtask.discardEventsOfPipe(pipeNameToDeregister, regionId); + subtask.discardEventsOfPipe(pipeNameToDeregister, creationTimeToDeregister, regionId); try { if (registeredTaskCount > 1) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index 9138a07591875..4faa42db004a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -209,7 +209,7 @@ public synchronized void deregister( // Shall not be empty final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor; - lifeCycles.removeIf(o -> o.deregister(pipeName, regionId)); + lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId)); if (lifeCycles.isEmpty()) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java index 96bddd0d672f0..c44e12a4bbf20 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java @@ -154,10 +154,13 @@ public synchronized void close() { * Discard all events of the given pipe. This method only clears the reference count of the events * and discard them, but do not modify other objects (such as buffers) for simplicity. */ - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { events.removeIf( event -> { - if (pipeNameToDrop.equals(event.getPipeName()) && regionId == event.getRegionId()) { + if (pipeNameToDrop.equals(event.getPipeName()) + && creationTimeToDrop == event.getCreationTime() + && regionId == event.getRegionId()) { event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); return true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java index dd4d4fe1ce618..49d9d8cea09be 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java @@ -196,9 +196,12 @@ public boolean isEmpty() { && endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty); } - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { - defaultBatch.discardEventsOfPipe(pipeNameToDrop, regionId); - endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, regionId)); + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + endPointToBatch + .values() + .forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId)); } public int size() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index d19639310cf4a..554c6c43dfbb0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.datastructure.Triple; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; @@ -77,6 +78,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -127,6 +129,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { private final Map pendingHandlers = new ConcurrentHashMap<>(); + // Pipe name, creation time, region id + private final Set> droppedPipeTaskKeys = + ConcurrentHashMap.newKeySet(); + private boolean enableSendTsFileLimit; private volatile boolean isConnectionException; @@ -681,8 +687,15 @@ private void retryTransfer(final PipeTsFileInsertionEvent tsFileInsertionEvent) public void addFailureEventToRetryQueue(final Event event, final Exception e) { isConnectionException = e instanceof PipeConnectionException || ThriftClient.isConnectionBroken(e); - if (event instanceof EnrichedEvent && ((EnrichedEvent) event).isReleased()) { - return; + if (event instanceof EnrichedEvent) { + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + if (enrichedEvent.isReleased()) { + return; + } + if (isDroppedPipe(enrichedEvent)) { + enrichedEvent.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); + return; + } } if (isClosed.get()) { @@ -728,15 +741,18 @@ public boolean isEnableSendTsFileLimit() { //////////////////////////// Operations for close //////////////////////////// @Override - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { - if (isTabletBatchModeEnabled) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId); + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); + + if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) { + tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } retryEventQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isDroppedPipe( + (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) { ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); retryEventQueueEventCounter.decreaseEventCount(event); return true; @@ -747,8 +763,8 @@ public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final retryTsFileQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isDroppedPipe( + (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) { ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); retryEventQueueEventCounter.decreaseEventCount(event); return true; @@ -792,6 +808,7 @@ public synchronized void close() { // clear reference count of events in retry queue after closing async client clearRetryEventsReferenceCount(); + droppedPipeTaskKeys.clear(); super.close(); } @@ -848,6 +865,21 @@ public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) { this.transferTsFileCounter = transferTsFileCounter; } + private boolean isDroppedPipe(final EnrichedEvent event) { + return droppedPipeTaskKeys.contains( + new Triple<>(event.getPipeName(), event.getCreationTime(), event.getRegionId())); + } + + private static boolean isDroppedPipe( + final EnrichedEvent event, + final String pipeNameToDrop, + final long creationTimeToDrop, + final int regionId) { + return pipeNameToDrop.equals(event.getPipeName()) + && creationTimeToDrop == event.getCreationTime() + && regionId == event.getRegionId(); + } + @Override public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) { if (tabletBatchBuilder != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index 9357a8c6a6d7d..e8c4420861c59 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -599,9 +599,10 @@ public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOEx } @Override - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { if (Objects.nonNull(tabletBatchBuilder)) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId); + tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java index 491d40a1e4537..516f9bbc437dd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.external.collections4.BidiMap; import org.apache.iotdb.commons.external.collections4.bidimap.DualTreeBidiMap; +import org.apache.iotdb.commons.pipe.datastructure.Triple; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.pipe.api.event.Event; @@ -39,6 +40,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -57,6 +59,10 @@ public class WebSocketConnectorServer extends WebSocketServer { private final ConcurrentHashMap> eventsWaitingForAck = new ConcurrentHashMap<>(); + // Pipe name, creation time, region id + private final Set> droppedPipeTaskKeys = + ConcurrentHashMap.newKeySet(); + private final BidiMap router = new DualTreeBidiMap(null, Comparator.comparing(Object::hashCode)) {}; @@ -97,13 +103,8 @@ public synchronized void unregister(WebSocketSink connector) { eventWrappers = new ArrayList<>(eventTransferQueue); eventTransferQueue.clear(); } - eventWrappers.forEach( - (eventWrapper) -> { - if (eventWrapper.event instanceof EnrichedEvent) { - ((EnrichedEvent) eventWrapper.event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } - }); + eventWrappers.forEach(eventWrapper -> discardEvent(eventWrapper.event)); + eventWrappers.clear(); synchronized (eventTransferQueue) { eventTransferQueue.notifyAll(); } @@ -113,13 +114,36 @@ public synchronized void unregister(WebSocketSink connector) { if (eventsWaitingForAck.containsKey(pipeName)) { eventsWaitingForAck .remove(pipeName) - .forEach( - (eventId, eventWrapper) -> { - if (eventWrapper.event instanceof EnrichedEvent) { - ((EnrichedEvent) eventWrapper.event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } - }); + .forEach((eventId, eventWrapper) -> discardEvent(eventWrapper.event)); + } + + droppedPipeTaskKeys.removeIf(key -> key.getFirst().equals(pipeName)); + } + + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); + + final PriorityBlockingQueue eventTransferQueue = + eventsWaitingForTransfer.get(pipeNameToDrop); + if (eventTransferQueue != null) { + eventTransferQueue.removeIf( + eventWrapper -> + discardIfMatches(eventWrapper.event, pipeNameToDrop, creationTimeToDrop, regionId)); + synchronized (eventTransferQueue) { + eventTransferQueue.notifyAll(); + } + } + + final ConcurrentHashMap eventId2EventMap = + eventsWaitingForAck.get(pipeNameToDrop); + if (eventId2EventMap != null) { + eventId2EventMap + .entrySet() + .removeIf( + entry -> + discardIfMatches( + entry.getValue().event, pipeNameToDrop, creationTimeToDrop, regionId)); } } @@ -300,21 +324,24 @@ public void onError(WebSocket webSocket, Exception e) { } public void addEvent(Event event, WebSocketSink connector) { + if (isDroppedPipe(event)) { + discardEvent(event); + return; + } + + final String pipeName = connector.getPipeName(); final PriorityBlockingQueue queue = - eventsWaitingForTransfer.get(connector.getPipeName()); + eventsWaitingForTransfer.get(pipeName); if (queue == null) { LOGGER.warn("The pipe {} was dropped so the event {} will be dropped.", connector, event); - if (event instanceof EnrichedEvent) { - ((EnrichedEvent) event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } + discardEvent(event); return; } if (queue.size() >= 5) { synchronized (queue) { - while (queue.size() >= 5) { + while (queue.size() >= 5 && isQueueAvailable(pipeName, queue) && !isDroppedPipe(event)) { try { queue.wait(); } catch (InterruptedException e) { @@ -323,12 +350,22 @@ public void addEvent(Event event, WebSocketSink connector) { } } + if (!isQueueAvailable(pipeName, queue) || isDroppedPipe(event)) { + discardEvent(event); + return; + } + queue.put( new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event)); return; } } + if (!isQueueAvailable(pipeName, queue) || isDroppedPipe(event)) { + discardEvent(event); + return; + } + synchronized (queue) { queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event)); } @@ -377,6 +414,11 @@ private void transfer(String pipeName, EventWaitingForTransfer element) { final WebSocketSink connector = element.connector; try { + if (isDroppedPipe(event)) { + discardEvent(event); + return; + } + ByteBuffer tabletBuffer; if (event instanceof PipeRawTabletInsertionEvent) { tabletBuffer = ((PipeRawTabletInsertionEvent) event).convertToTablet().serialize(); @@ -387,7 +429,11 @@ private void transfer(String pipeName, EventWaitingForTransfer element) { } if (tabletBuffer == null) { - connector.commit((EnrichedEvent) event); + if (isDroppedPipe(event)) { + discardEvent(event); + } else { + connector.commit((EnrichedEvent) event); + } return; } @@ -398,11 +444,17 @@ private void transfer(String pipeName, EventWaitingForTransfer element) { server.broadcast(payload, Collections.singletonList(router.get(pipeName))); + if (isDroppedPipe(event)) { + discardEvent(event); + return; + } + final ConcurrentHashMap eventId2EventMap = eventsWaitingForAck.get(pipeName); if (eventId2EventMap == null) { LOGGER.warn( "The pipe {} was dropped so the event ack {} will be ignored.", pipeName, eventId); + discardEvent(event); return; } eventId2EventMap.put(eventId, new EventWaitingForAck(connector, event)); @@ -410,13 +462,10 @@ private void transfer(String pipeName, EventWaitingForTransfer element) { synchronized (server) { final PriorityBlockingQueue queue = eventsWaitingForTransfer.get(pipeName); - if (queue == null) { + if (queue == null || isDroppedPipe(event)) { LOGGER.warn( "The pipe {} was dropped so the event {} will be dropped.", pipeName, eventId); - if (event instanceof EnrichedEvent) { - ((EnrichedEvent) event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } + discardEvent(event); return; } @@ -465,4 +514,44 @@ public EventWaitingForAck(WebSocketSink connector, Event event) { this.event = event; } } + + private boolean discardIfMatches( + final Event event, + final String pipeNameToDrop, + final long creationTimeToDrop, + final int regionId) { + if (!(event instanceof EnrichedEvent)) { + return false; + } + + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + if (!pipeNameToDrop.equals(enrichedEvent.getPipeName()) + || creationTimeToDrop != enrichedEvent.getCreationTime() + || regionId != enrichedEvent.getRegionId()) { + return false; + } + + discardEvent(enrichedEvent); + return true; + } + + private boolean isDroppedPipe(final Event event) { + return event instanceof EnrichedEvent + && droppedPipeTaskKeys.contains( + new Triple<>( + ((EnrichedEvent) event).getPipeName(), + ((EnrichedEvent) event).getCreationTime(), + ((EnrichedEvent) event).getRegionId())); + } + + private boolean isQueueAvailable( + final String pipeName, final PriorityBlockingQueue queue) { + return eventsWaitingForTransfer.get(pipeName) == queue; + } + + private void discardEvent(final Event event) { + if (event instanceof EnrichedEvent) { + ((EnrichedEvent) event).clearReferenceCount(WebSocketSink.class.getName()); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java index 4e593e673fd56..e8a38d63e6e42 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; @@ -41,7 +42,7 @@ import java.util.Optional; @TreeModel -public class WebSocketSink implements PipeConnector { +public class WebSocketSink implements PipeConnector, PipeConnectorWithEventDiscard { private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketSink.class); @@ -166,6 +167,14 @@ public void close() throws Exception { } } + @Override + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + if (server != null) { + server.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + } + } + public void commit(EnrichedEvent enrichedEvent) { Optional.ofNullable(enrichedEvent) .ifPresent(event -> event.decreaseReferenceCount(WebSocketSink.class.getName(), true)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java index 98163697374da..af871feaa7e0d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java @@ -63,7 +63,8 @@ public synchronized void register() { } @Override - public synchronized boolean deregister(final String pipeNameToDeregister, int regionId) { + public synchronized boolean deregister( + final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java index 6d5f27d8172a5..16a9ee1a03d92 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java @@ -168,7 +168,7 @@ public synchronized void deregister( final PipeSinkSubtaskLifeCycle lifeCycle = attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString); - if (lifeCycle.deregister(pipeName, regionId)) { + if (lifeCycle.deregister(pipeName, creationTime, regionId)) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java new file mode 100644 index 0000000000000..029a722c8a95c --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent.task.connection; + +import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeRealtimePriorityBlockingQueue; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter; +import org.apache.iotdb.pipe.api.event.Event; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class PipeEventCollectorTest { + + @Test + public void testCollectorDoesNotOfferEventsOfDroppedPipeToUnboundedPendingQueue() { + verifyCollectorDoesNotOfferEventsOfDroppedPipe( + new UnboundedBlockingPendingQueue<>(new PipeDataRegionEventCounter())); + } + + @Test + public void testCollectorDoesNotOfferEventsOfDroppedPipeToRealtimePendingQueue() { + verifyCollectorDoesNotOfferEventsOfDroppedPipe(new PipeRealtimePriorityBlockingQueue()); + } + + private void verifyCollectorDoesNotOfferEventsOfDroppedPipe( + final UnboundedBlockingPendingQueue pendingQueue) { + pendingQueue.discardEventsOfPipe("pipe", 1L, 1); + + final PipeEventCollector droppedPipeCollector = + new PipeEventCollector(pendingQueue, 1L, 1, false, false, false); + final PipeRawTabletInsertionEvent droppedPipeEvent = + createPipeRawTabletInsertionEvent("pipe", 1L); + droppedPipeCollector.collect(droppedPipeEvent); + + Assert.assertTrue(droppedPipeEvent.isReleased()); + Assert.assertEquals(0, pendingQueue.size()); + + final PipeEventCollector recreatedPipeCollector = + new PipeEventCollector(pendingQueue, 2L, 1, false, false, false); + final PipeRawTabletInsertionEvent recreatedPipeEvent = + createPipeRawTabletInsertionEvent("pipe", 2L); + recreatedPipeCollector.collect(recreatedPipeEvent); + + Assert.assertFalse(recreatedPipeEvent.isReleased()); + Assert.assertEquals(1, pendingQueue.size()); + + pendingQueue.discardAllEvents(); + Assert.assertTrue(recreatedPipeEvent.isReleased()); + } + + private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent( + final String pipeName, final long creationTime) { + final List schemaList = + Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64)); + final Tablet tablet = new Tablet("root.db.d1", schemaList, 1); + tablet.addTimestamp(0, 1L); + tablet.addValue("s1", 0, 1L); + return new PipeRawTabletInsertionEvent( + false, + "root.db", + "db", + "root.db", + tablet, + false, + pipeName, + creationTime, + null, + null, + false); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java new file mode 100644 index 0000000000000..ddfc699721b92 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent.task.subtask.sink; + +import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; +import org.apache.iotdb.pipe.api.PipeConnector; + +import org.junit.Test; +import org.mockito.Mockito; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.withSettings; + +public class PipeSinkSubtaskTest { + + @Test + public void testDiscardEventsOfPipeDelegatesToConnector() { + final PipeConnector connector = + mock( + PipeConnector.class, + withSettings().extraInterfaces(PipeConnectorWithEventDiscard.class)); + final UnboundedBlockingPendingQueue pendingQueue = mock(UnboundedBlockingPendingQueue.class); + + final PipeSinkSubtask subtask = + Mockito.spy( + new PipeSinkSubtask( + "PipeSinkSubtaskTest", + System.currentTimeMillis(), + "data_test", + 0, + (UnboundedBlockingPendingQueue) pendingQueue, + connector)); + + try { + subtask.discardEventsOfPipe("pipe", 1L, 1); + + verify((PipeConnectorWithEventDiscard) connector).discardEventsOfPipe("pipe", 1L, 1); + } finally { + subtask.close(); + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index ec2122b917574..cf311639ee9a2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.sink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment; @@ -28,8 +29,11 @@ import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; +import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketConnectorServer; +import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketSink; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.write.record.Tablet; @@ -37,6 +41,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.security.SecureRandom; import java.util.Arrays; @@ -104,6 +109,85 @@ public void testIoTDBThriftAsyncConnectorToOthers() { } } + @Test + public void testAsyncSinkDropDoesNotRequeueDroppedPipeEvents() throws Exception { + try (final IoTDBDataRegionAsyncSink connector = new IoTDBDataRegionAsyncSink()) { + final PipeParameters parameters = + new PipeParameters( + new HashMap() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); + put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6668"); + } + }); + connector.validate(new PipeParameterValidator(parameters)); + connector.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + + final PipeRawTabletInsertionEvent droppedEvent = + createPipeRawTabletInsertionEvent("pipe", 1L, 1); + droppedEvent.increaseReferenceCount("test"); + droppedEvent.setCommitterKeyAndCommitId(new CommitterKey("pipe", 1L, 1, -1), 1L); + + connector.discardEventsOfPipe("pipe", 1L, 1); + connector.addFailureEventToRetryQueue(droppedEvent, new PipeException("test")); + + Assert.assertEquals(0, connector.getRetryEventQueueSize()); + Assert.assertTrue(droppedEvent.isReleased()); + + final PipeRawTabletInsertionEvent recreatedPipeEvent = + createPipeRawTabletInsertionEvent("pipe", 2L, 1); + recreatedPipeEvent.increaseReferenceCount("test"); + recreatedPipeEvent.setCommitterKeyAndCommitId(new CommitterKey("pipe", 2L, 1, -1), 1L); + + connector.addFailureEventToRetryQueue(recreatedPipeEvent, new PipeException("test")); + + Assert.assertEquals(1, connector.getRetryEventQueueSize()); + } + } + + @Test + public void testWebSocketSinkDropDoesNotRequeueDroppedPipeEvents() { + final String pipeName = "pipe_" + System.nanoTime(); + final WebSocketConnectorServer server = WebSocketConnectorServer.getOrCreateInstance(0); + final WebSocketSink connector = Mockito.mock(WebSocketSink.class); + Mockito.when(connector.getPipeName()).thenReturn(pipeName); + + server.register(connector); + try { + final PipeRawTabletInsertionEvent droppedEvent = + createPipeRawTabletInsertionEvent(pipeName, 1L, 1); + droppedEvent.increaseReferenceCount(WebSocketSink.class.getName()); + droppedEvent.setCommitterKeyAndCommitId(new CommitterKey(pipeName, 1L, 1, -1), 1L); + server.addEvent(droppedEvent, connector); + + server.discardEventsOfPipe(pipeName, 1L, 1); + Assert.assertTrue(droppedEvent.isReleased()); + + final PipeRawTabletInsertionEvent recreatedDroppedPipeEvent = + createPipeRawTabletInsertionEvent(pipeName, 1L, 1); + recreatedDroppedPipeEvent.increaseReferenceCount(WebSocketSink.class.getName()); + recreatedDroppedPipeEvent.setCommitterKeyAndCommitId( + new CommitterKey(pipeName, 1L, 1, -1), 2L); + server.addEvent(recreatedDroppedPipeEvent, connector); + + Assert.assertTrue(recreatedDroppedPipeEvent.isReleased()); + + final PipeRawTabletInsertionEvent recreatedPipeEvent = + createPipeRawTabletInsertionEvent(pipeName, 2L, 1); + recreatedPipeEvent.increaseReferenceCount(WebSocketSink.class.getName()); + recreatedPipeEvent.setCommitterKeyAndCommitId(new CommitterKey(pipeName, 2L, 1, -1), 3L); + server.addEvent(recreatedPipeEvent, connector); + + Assert.assertFalse(recreatedPipeEvent.isReleased()); + } finally { + server.unregister(connector); + } + } + @Test public void testOpcUaSink() { final List schemaList = @@ -194,4 +278,25 @@ public void testOpcUaSink() { Assert.fail(); } } + + private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent( + final String pipeName, final long creationTime, final int regionId) { + final List schemaList = + Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64)); + final Tablet tablet = new Tablet("root.db.d" + regionId, schemaList, 1); + tablet.addTimestamp(0, 1L); + tablet.addValue("s1", 0, 1L); + return new PipeRawTabletInsertionEvent( + false, + "root.db", + "db", + "root.db", + tablet, + false, + pipeName, + creationTime, + null, + null, + false); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java index 8773b03f9f308..8d920121363a3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.pipe.agent.task.connection; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.datastructure.Triple; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.metric.PipeEventCounter; import org.apache.iotdb.pipe.api.event.Event; @@ -27,7 +28,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -44,6 +47,10 @@ public abstract class BlockingPendingQueue { protected final AtomicBoolean isClosed = new AtomicBoolean(false); + // Pipe name, creation time, region id + protected final Set> droppedPipeTaskKeys = + ConcurrentHashMap.newKeySet(); + protected BlockingPendingQueue( final BlockingQueue pendingQueue, final PipeEventCounter eventCounter) { this.pendingQueue = pendingQueue; @@ -51,7 +58,10 @@ protected BlockingPendingQueue( } public boolean offer(final E event) { - checkBeforeOffer(event); + if (!checkBeforeOffer(event)) { + return false; + } + final boolean offered = pendingQueue.offer(event); if (offered) { eventCounter.increaseEventCount(event); @@ -60,7 +70,9 @@ public boolean offer(final E event) { } public boolean put(final E event) { - checkBeforeOffer(event); + if (!checkBeforeOffer(event)) { + return false; + } try { pendingQueue.put(event); eventCounter.increaseEventCount(event); @@ -101,6 +113,7 @@ public void clear() { isClosed.set(true); pendingQueue.clear(); eventCounter.reset(); + droppedPipeTaskKeys.clear(); } /** DO NOT FORGET to set eventCounter to new value after invoking this method. */ @@ -120,14 +133,17 @@ public void discardAllEvents() { return true; }); eventCounter.reset(); + droppedPipeTaskKeys.clear(); } - public void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); pendingQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isEventFromPipe( + ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) { if (((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName())) { eventCounter.decreaseEventCount(event); } @@ -157,9 +173,34 @@ public int getPipeHeartbeatEventCount() { return eventCounter.getPipeHeartbeatEventCount(); } - protected void checkBeforeOffer(final E event) { - if (isClosed.get() && event instanceof EnrichedEvent) { + protected boolean checkBeforeOffer(final E event) { + final boolean shouldReject = isClosed.get() || isEventFromDroppedPipe(event); + if (shouldReject && event instanceof EnrichedEvent) { ((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName()); } + return !shouldReject; + } + + protected static boolean isEventFromPipe( + final EnrichedEvent event, + final String pipeNameToDrop, + final long creationTimeToDrop, + final int regionId) { + return pipeNameToDrop.equals(event.getPipeName()) + && creationTimeToDrop == event.getCreationTime() + && regionId == event.getRegionId(); + } + + protected boolean isEventFromDroppedPipe(final E event) { + return event instanceof EnrichedEvent + && ((EnrichedEvent) event).getPipeName() != null + && isPipeDropped( + ((EnrichedEvent) event).getPipeName(), + ((EnrichedEvent) event).getCreationTime(), + ((EnrichedEvent) event).getRegionId()); + } + + public boolean isPipeDropped(final String pipeName, final long creationTime, final int regionId) { + return droppedPipeTaskKeys.contains(new Triple<>(pipeName, creationTime, regionId)); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java new file mode 100644 index 0000000000000..275ccb20ea3ac --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.datastructure; + +import java.util.Objects; + +public class Triple { + public final L first; + public final M second; + public final R third; + + public Triple(final L first, final M second, final R third) { + this.first = first; + this.second = second; + this.third = third; + } + + public L getFirst() { + return first; + } + + public M getSecond() { + return second; + } + + public R getThird() { + return third; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Triple triple = (Triple) o; + return first.equals(triple.first) && second.equals(triple.second) && third.equals(triple.third); + } + + @Override + public int hashCode() { + return Objects.hash(first, second, third); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index 88a8b71775f45..ebb3d90bc8cab 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -148,7 +148,7 @@ @TreeModel @TableModel -public abstract class IoTDBSink implements PipeConnector { +public abstract class IoTDBSink implements PipeConnector, PipeConnectorWithEventDiscard { private static final String PARSE_URL_ERROR_FORMATTER = "Exception occurred while parsing node urls from target servers: {}"; @@ -641,7 +641,8 @@ public void rateLimitIfNeeded( * When a pipe is dropped, the connector maybe reused and will not be closed. We need to discard * its batched or queued events in the output pipe connector. */ - public synchronized void discardEventsOfPipe(final String pipeName, final int regionId) { + public synchronized void discardEventsOfPipe( + final String pipeName, final long creationTime, final int regionId) { // Do nothing by default } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java new file mode 100644 index 0000000000000..ab4dbcf90750f --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.sink.protocol; + +public interface PipeConnectorWithEventDiscard { + + void discardEventsOfPipe(String pipeName, long creationTime, int regionId); +}