diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPClients.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPClients.scala index 92d6eb7b2b..cfb8e53f1c 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPClients.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPClients.scala @@ -7,6 +7,7 @@ import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging import org.apache.commons.io.IOUtils import org.apache.http.client.config.RequestConfig import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost, HttpRequestBase} +import org.apache.http.entity.BufferedHttpEntity import org.apache.http.impl.client.{CloseableHttpClient, HttpClientBuilder} import org.apache.http.impl.conn.PoolingHttpClientConnectionManager import org.apache.spark.injections.UDFUtils @@ -104,17 +105,37 @@ object HandlingUtils extends SparkLogging { case 201 => true case 202 => true case 429 => - Option(response.getFirstHeader("Retry-After")) - .foreach { h => - logInfo(s"waiting ${h.getValue} on ${ - request match { - case p: HttpPost => p.getURI + " " + - Try(IOUtils.toString(p.getEntity.getContent, "UTF-8")).getOrElse("") - case _ => request.getURI - } - }") + // Inspect body to distinguish capacity errors from transient rate limits. + // Guard with Content-Length cap to avoid buffering unexpectedly large payloads. + val MaxInspectionBytes = 1024 * 1024L + val bodyStr = Option(response.getEntity).flatMap { entity => + val contentLength = entity.getContentLength + if (contentLength > MaxInspectionBytes) { + None + } else { + response.setEntity(new BufferedHttpEntity(entity)) + Option(response.getEntity) + .flatMap(e => Try(IOUtils.toString(e.getContent, "UTF-8")).toOption) } - false + }.getOrElse("") + if (bodyStr.contains("CapacityLimitExceeded")) { + // Fabric capacity-exceeded 429s are NOT transient rate limits — + // retrying will not help and causes hangs + logWarning(s"Capacity limit exceeded (non-retryable 429) on ${request.getURI}") + true + } else { + Option(response.getFirstHeader("Retry-After")) + .foreach { h => + logInfo(s"waiting ${h.getValue} on ${ + request match { + case p: HttpPost => p.getURI + " " + + Try(IOUtils.toString(p.getEntity.getContent, "UTF-8")).getOrElse("") + case _ => request.getURI + } + }") + } + false + } case code => logWarning(s"got error $code: ${response.getStatusLine.getReasonPhrase} on ${ request match { diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split1/VerifySendWithRetries.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split1/VerifySendWithRetries.scala index 777c824ded..af1860b9c3 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split1/VerifySendWithRetries.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/io/split1/VerifySendWithRetries.scala @@ -273,6 +273,130 @@ class VerifySendWithRetries extends TestBase { } } + test("429 with CapacityLimitExceeded body is not retried") { + val port = getFreePort + val requestCount = new AtomicInteger(0) + val capacityBody = + """{"error":{"code":"CapacityLimitExceeded","message":"Serverless capacity limit exceeded"}}""" + val server = startServer(port) { exchange => + val n = requestCount.incrementAndGet() + if (n == 1) { + respond(exchange, 429, capacityBody) + } else { + respond(exchange, 200, """{"ok":true}""") + } + } + try { + val client = HttpClients.createDefault() + val request = new HttpGet(s"http://localhost:$port/test") + val response = HandlingUtils.sendWithRetries( + client, request, Array(100, 100, 100)) + val code = response.getStatusLine.getStatusCode + response.close() + client.close() + + assert(code === 429, "Capacity-exceeded 429 should be returned immediately, not retried") + assert(requestCount.get() === 1, "Should not retry on CapacityLimitExceeded") + } finally { + server.stop(0) + } + } + + test("429 with CapacityLimitExceeded ignores Retry-After header") { + val port = getFreePort + val requestCount = new AtomicInteger(0) + val capacityBody = + """{"error":{"code":"CapacityLimitExceeded","message":"Serverless capacity limit exceeded"}}""" + val server = startServer(port) { exchange => + val n = requestCount.incrementAndGet() + if (n == 1) { + respond(exchange, 429, capacityBody, headers = Map("Retry-After" -> "5")) + } else { + respond(exchange, 200, """{"ok":true}""") + } + } + try { + val client = HttpClients.createDefault() + val request = new HttpGet(s"http://localhost:$port/test") + val start = System.currentTimeMillis() + val response = HandlingUtils.sendWithRetries( + client, request, Array(100, 100, 100)) + val elapsed = System.currentTimeMillis() - start + val code = response.getStatusLine.getStatusCode + response.close() + client.close() + + assert(code === 429, "Capacity-exceeded should not retry even with Retry-After") + assert(requestCount.get() === 1, "Should not retry on CapacityLimitExceeded") + // Verify we didn't sleep for the 5s Retry-After + assert(elapsed < 4000, s"Should ignore Retry-After and return quickly, took ${elapsed}ms") + } finally { + server.stop(0) + } + } + + test("429 with non-capacity error body still retries normally") { + val port = getFreePort + val requestCount = new AtomicInteger(0) + val rateLimitBody = """{"error":{"code":"RateLimitExceeded","message":"Too many requests"}}""" + val server = startServer(port) { exchange => + val n = requestCount.incrementAndGet() + if (n <= 2) { + respond(exchange, 429, rateLimitBody) + } else { + respond(exchange, 200, """{"ok":true}""") + } + } + try { + val client = HttpClients.createDefault() + val request = new HttpGet(s"http://localhost:$port/test") + val response = HandlingUtils.sendWithRetries( + client, request, Array(100, 100, 100)) + val code = response.getStatusLine.getStatusCode + response.close() + client.close() + + assert(code === 200, "Non-capacity 429 should still retry and eventually succeed") + assert(requestCount.get() === 3, "Should have retried past the rate-limit 429s") + } finally { + server.stop(0) + } + } + + test("429 with CapacityLimitExceeded and chunked encoding (no Content-Length) is not retried") { + val port = getFreePort + val requestCount = new AtomicInteger(0) + val capacityBody = + """{"error":{"code":"CapacityLimitExceeded","message":"Serverless capacity limit exceeded"}}""" + val server = startServer(port) { exchange => + val n = requestCount.incrementAndGet() + if (n == 1) { + // Send with Content-Length = 0 (chunked) to simulate no Content-Length header + exchange.sendResponseHeaders(429, 0) + val os = exchange.getResponseBody + os.write(capacityBody.getBytes("UTF-8")) + os.close() + exchange.close() + } else { + respond(exchange, 200, """{"ok":true}""") + } + } + try { + val client = HttpClients.createDefault() + val request = new HttpGet(s"http://localhost:$port/test") + val response = HandlingUtils.sendWithRetries( + client, request, Array(100, 100, 100)) + val code = response.getStatusLine.getStatusCode + response.close() + client.close() + + assert(code === 429, "Chunked capacity-exceeded 429 should be returned immediately") + assert(requestCount.get() === 1, "Should not retry on chunked CapacityLimitExceeded") + } finally { + server.stop(0) + } + } + test("429 with Retry-After 0 means retry immediately") { val port = getFreePort val requestCount = new AtomicInteger(0)