@@ -130,6 +130,10 @@ protected boolean tryReadLockWithTimeOut(final long timeOutInSeconds) {
130130 }
131131 }
132132
133+ protected boolean tryReadLockWithTimeOutInMs (final long timeOutInMs ) {
134+ return tryReadLockWithTimeOut (convertMsToCeilSeconds (timeOutInMs ));
135+ }
136+
133137 protected void releaseReadLock () {
134138 pipeMetaKeeper .releaseReadLock ();
135139 }
@@ -148,10 +152,18 @@ protected boolean tryWriteLockWithTimeOut(final long timeOutInSeconds) {
148152 }
149153 }
150154
155+ protected boolean tryWriteLockWithTimeOutInMs (final long timeOutInMs ) {
156+ return tryWriteLockWithTimeOut (convertMsToCeilSeconds (timeOutInMs ));
157+ }
158+
151159 protected void releaseWriteLock () {
152160 pipeMetaKeeper .releaseWriteLock ();
153161 }
154162
163+ private long convertMsToCeilSeconds (final long timeOutInMs ) {
164+ return Math .max (1L , (Math .max (0L , timeOutInMs ) + 999L ) / 1000L );
165+ }
166+
155167 ////////////////////////// Pipe Task Management Entry //////////////////////////
156168
157169 public TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges (
@@ -368,7 +380,7 @@ protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(final String
368380
369381 public List <TPushPipeMetaRespExceptionMessage > handlePipeMetaChanges (
370382 final List <PipeMeta > pipeMetaListFromCoordinator ) {
371- if (!tryWriteLockWithTimeOut (
383+ if (!tryWriteLockWithTimeOutInMs (
372384 CommonDescriptor .getInstance ().getConfig ().getDnConnectionTimeoutInMS () * 2L / 3 )) {
373385 return null ;
374386 }
@@ -1107,7 +1119,7 @@ private void stopAllPipesWithCriticalExceptionInternal(final int currentNodeId)
11071119
11081120 public void collectPipeMetaList (final TPipeHeartbeatReq req , final TPipeHeartbeatResp resp )
11091121 throws TException {
1110- if (!tryReadLockWithTimeOut (
1122+ if (!tryReadLockWithTimeOutInMs (
11111123 CommonDescriptor .getInstance ().getConfig ().getDnConnectionTimeoutInMS () * 2L / 3 )) {
11121124 return ;
11131125 }
0 commit comments