Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging
import org.apache.commons.io.IOUtils
import org.apache.http.client.config.RequestConfig
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost, HttpRequestBase}
import org.apache.http.entity.BufferedHttpEntity
import org.apache.http.impl.client.{CloseableHttpClient, HttpClientBuilder}
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
import org.apache.spark.injections.UDFUtils
Expand Down Expand Up @@ -104,17 +105,37 @@ object HandlingUtils extends SparkLogging {
case 201 => true
case 202 => true
case 429 =>
Option(response.getFirstHeader("Retry-After"))
.foreach { h =>
logInfo(s"waiting ${h.getValue} on ${
request match {
case p: HttpPost => p.getURI + " " +
Try(IOUtils.toString(p.getEntity.getContent, "UTF-8")).getOrElse("")
case _ => request.getURI
}
}")
// Inspect body to distinguish capacity errors from transient rate limits.
// Guard with Content-Length cap to avoid buffering unexpectedly large payloads.
val MaxInspectionBytes = 1024 * 1024L
val bodyStr = Option(response.getEntity).flatMap { entity =>
val contentLength = entity.getContentLength
if (contentLength > MaxInspectionBytes) {
None
} else {
response.setEntity(new BufferedHttpEntity(entity))
Option(response.getEntity)
.flatMap(e => Try(IOUtils.toString(e.getContent, "UTF-8")).toOption)
}
false
}.getOrElse("")
if (bodyStr.contains("CapacityLimitExceeded")) {
// Fabric capacity-exceeded 429s are NOT transient rate limits —
// retrying will not help and causes hangs
logWarning(s"Capacity limit exceeded (non-retryable 429) on ${request.getURI}")
true
} else {
Comment on lines +121 to +126
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bodyStr.contains("CapacityLimitExceeded") is a brittle detector and could produce false positives/negatives (e.g., the substring appearing in a message, different JSON shapes, casing). Since the codebase already uses JSON parsing (e.g., spray-json in core), consider parsing the response JSON and checking error.code == "CapacityLimitExceeded" explicitly before deciding not to retry.

Copilot uses AI. Check for mistakes.
Option(response.getFirstHeader("Retry-After"))
.foreach { h =>
logInfo(s"waiting ${h.getValue} on ${
request match {
case p: HttpPost => p.getURI + " " +
Try(IOUtils.toString(p.getEntity.getContent, "UTF-8")).getOrElse("")
case _ => request.getURI
}
}")
}
false
}
case code =>
logWarning(s"got error $code: ${response.getStatusLine.getReasonPhrase} on ${
request match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,130 @@ class VerifySendWithRetries extends TestBase {
}
}

test("429 with CapacityLimitExceeded body is not retried") {
val port = getFreePort
val requestCount = new AtomicInteger(0)
val capacityBody =
"""{"error":{"code":"CapacityLimitExceeded","message":"Serverless capacity limit exceeded"}}"""
val server = startServer(port) { exchange =>
val n = requestCount.incrementAndGet()
if (n == 1) {
respond(exchange, 429, capacityBody)
} else {
respond(exchange, 200, """{"ok":true}""")
}
}
try {
val client = HttpClients.createDefault()
val request = new HttpGet(s"http://localhost:$port/test")
val response = HandlingUtils.sendWithRetries(
client, request, Array(100, 100, 100))
val code = response.getStatusLine.getStatusCode
response.close()
client.close()

assert(code === 429, "Capacity-exceeded 429 should be returned immediately, not retried")
assert(requestCount.get() === 1, "Should not retry on CapacityLimitExceeded")
} finally {
server.stop(0)
}
}

test("429 with CapacityLimitExceeded ignores Retry-After header") {
val port = getFreePort
val requestCount = new AtomicInteger(0)
val capacityBody =
"""{"error":{"code":"CapacityLimitExceeded","message":"Serverless capacity limit exceeded"}}"""
val server = startServer(port) { exchange =>
val n = requestCount.incrementAndGet()
if (n == 1) {
respond(exchange, 429, capacityBody, headers = Map("Retry-After" -> "5"))
} else {
respond(exchange, 200, """{"ok":true}""")
}
}
try {
val client = HttpClients.createDefault()
val request = new HttpGet(s"http://localhost:$port/test")
val start = System.currentTimeMillis()
val response = HandlingUtils.sendWithRetries(
client, request, Array(100, 100, 100))
val elapsed = System.currentTimeMillis() - start
val code = response.getStatusLine.getStatusCode
response.close()
client.close()

assert(code === 429, "Capacity-exceeded should not retry even with Retry-After")
assert(requestCount.get() === 1, "Should not retry on CapacityLimitExceeded")
// Verify we didn't sleep for the 5s Retry-After
assert(elapsed < 4000, s"Should ignore Retry-After and return quickly, took ${elapsed}ms")
} finally {
server.stop(0)
}
}

test("429 with non-capacity error body still retries normally") {
val port = getFreePort
val requestCount = new AtomicInteger(0)
val rateLimitBody = """{"error":{"code":"RateLimitExceeded","message":"Too many requests"}}"""
val server = startServer(port) { exchange =>
val n = requestCount.incrementAndGet()
if (n <= 2) {
respond(exchange, 429, rateLimitBody)
} else {
respond(exchange, 200, """{"ok":true}""")
}
}
try {
val client = HttpClients.createDefault()
val request = new HttpGet(s"http://localhost:$port/test")
val response = HandlingUtils.sendWithRetries(
client, request, Array(100, 100, 100))
val code = response.getStatusLine.getStatusCode
response.close()
client.close()

assert(code === 200, "Non-capacity 429 should still retry and eventually succeed")
assert(requestCount.get() === 3, "Should have retried past the rate-limit 429s")
} finally {
server.stop(0)
}
}

test("429 with CapacityLimitExceeded and chunked encoding (no Content-Length) is not retried") {
val port = getFreePort
val requestCount = new AtomicInteger(0)
val capacityBody =
"""{"error":{"code":"CapacityLimitExceeded","message":"Serverless capacity limit exceeded"}}"""
val server = startServer(port) { exchange =>
val n = requestCount.incrementAndGet()
if (n == 1) {
// Send with Content-Length = 0 (chunked) to simulate no Content-Length header
exchange.sendResponseHeaders(429, 0)
val os = exchange.getResponseBody
os.write(capacityBody.getBytes("UTF-8"))
os.close()
exchange.close()
} else {
respond(exchange, 200, """{"ok":true}""")
}
}
try {
val client = HttpClients.createDefault()
val request = new HttpGet(s"http://localhost:$port/test")
val response = HandlingUtils.sendWithRetries(
client, request, Array(100, 100, 100))
val code = response.getStatusLine.getStatusCode
response.close()
client.close()

assert(code === 429, "Chunked capacity-exceeded 429 should be returned immediately")
assert(requestCount.get() === 1, "Should not retry on chunked CapacityLimitExceeded")
} finally {
server.stop(0)
}
}

test("429 with Retry-After 0 means retry immediately") {
val port = getFreePort
val requestCount = new AtomicInteger(0)
Expand Down
Loading