From f45e50b06ae509f422426ff958b6be1b7c3f8708 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 30 Apr 2026 19:43:52 +0800 Subject: [PATCH] fix --- .../pipe/agent/task/PipeDataNodeTaskAgent.java | 2 +- .../commons/pipe/agent/task/PipeTaskAgent.java | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 031cfd3a62e0e..06d6a512b30a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -377,7 +377,7 @@ public void stopAllPipesWithCriticalExceptionAndTrackException( ///////////////////////// Heartbeat ///////////////////////// public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws TException { - if (!tryReadLockWithTimeOut( + if (!tryReadLockWithTimeOutInMs( CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) { return; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index 73b543592c1e2..99d452987943e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -130,6 +130,10 @@ protected boolean tryReadLockWithTimeOut(final long timeOutInSeconds) { } } + protected boolean tryReadLockWithTimeOutInMs(final long timeOutInMs) { + return tryReadLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs)); + } + protected void releaseReadLock() { pipeMetaKeeper.releaseReadLock(); } @@ -148,10 +152,18 @@ protected boolean tryWriteLockWithTimeOut(final long timeOutInSeconds) { } } + protected boolean tryWriteLockWithTimeOutInMs(final long timeOutInMs) { + return tryWriteLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs)); + } + protected void releaseWriteLock() { pipeMetaKeeper.releaseWriteLock(); } + private long convertMsToCeilSeconds(final long timeOutInMs) { + return Math.max(1L, (Math.max(0L, timeOutInMs) + 999L) / 1000L); + } + ////////////////////////// Pipe Task Management Entry ////////////////////////// public TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges( @@ -368,7 +380,7 @@ protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(final String public List handlePipeMetaChanges( final List pipeMetaListFromCoordinator) { - if (!tryWriteLockWithTimeOut( + if (!tryWriteLockWithTimeOutInMs( CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) { return null; } @@ -1107,7 +1119,7 @@ private void stopAllPipesWithCriticalExceptionInternal(final int currentNodeId) public void collectPipeMetaList(final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws TException { - if (!tryReadLockWithTimeOut( + if (!tryReadLockWithTimeOutInMs( CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) { return; }