Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.microsoft.azure.synapse.ml.logging.SynapseMLLogging
import org.apache.commons.io.IOUtils
import org.apache.http.client.config.RequestConfig
import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost, HttpRequestBase}
import org.apache.http.entity.BufferedHttpEntity
import org.apache.http.impl.client.{CloseableHttpClient, HttpClientBuilder}
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
import org.apache.spark.injections.UDFUtils
Expand Down Expand Up @@ -104,17 +105,32 @@ object HandlingUtils extends SparkLogging {
case 201 => true
case 202 => true
case 429 =>
Option(response.getFirstHeader("Retry-After"))
.foreach { h =>
logInfo(s"waiting ${h.getValue} on ${
request match {
case p: HttpPost => p.getURI + " " +
Try(IOUtils.toString(p.getEntity.getContent, "UTF-8")).getOrElse("")
case _ => request.getURI
}
}")
}
false
// Buffer the response entity so the body can be inspected
// and still returned to the caller if we don't retry
if (response.getEntity != null) {
response.setEntity(new BufferedHttpEntity(response.getEntity))
}
val bodyStr = Option(response.getEntity)
.flatMap(e => Try(IOUtils.toString(e.getContent, "UTF-8")).toOption)
.getOrElse("")
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferedHttpEntity will buffer the entire 429 response payload in memory. If a service returns a large body (or misbehaves), this can create avoidable memory pressure during retries. Consider guarding this with a size cap (e.g., based on Content-Length) or reading only a bounded prefix sufficient to detect the error code, and skip buffering/inspection when the entity is too large.

Suggested change
// Buffer the response entity so the body can be inspected
// and still returned to the caller if we don't retry
if (response.getEntity != null) {
response.setEntity(new BufferedHttpEntity(response.getEntity))
}
val bodyStr = Option(response.getEntity)
.flatMap(e => Try(IOUtils.toString(e.getContent, "UTF-8")).toOption)
.getOrElse("")
val max429InspectionBytes = 1024 * 1024L
val bodyStr = Option(response.getEntity).flatMap { entity =>
val contentLength = entity.getContentLength
if (contentLength >= 0 && contentLength <= max429InspectionBytes) {
// Buffer only small, known-size entities so the body can be
// inspected and still returned to the caller if we don't retry.
response.setEntity(new BufferedHttpEntity(entity))
Option(response.getEntity)
.flatMap(e => Try(IOUtils.toString(e.getContent, "UTF-8")).toOption)
} else {
logWarning(
s"Skipping 429 response body inspection for ${request.getURI} " +
s"because Content-Length=$contentLength exceeds $max429InspectionBytes bytes " +
s"or is unknown")
None
}
}.getOrElse("")

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 429 body inspection reads from e.getContent but does not close the InputStream. Even with a BufferedHttpEntity (likely a ByteArrayInputStream), leaving streams unclosed is a resource-leak pattern. Wrap the read in a construct that always closes the stream (e.g., Using.resource or explicit try/finally).

Copilot uses AI. Check for mistakes.
if (bodyStr.contains("CapacityLimitExceeded")) {
// Fabric capacity-exceeded 429s are NOT transient rate limits —
// retrying will not help and causes hangs
logWarning(s"Capacity limit exceeded (non-retryable 429) on ${request.getURI}: $bodyStr")
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logs the full 429 response body ($bodyStr) at warning level. Error bodies can contain sensitive identifiers or be unexpectedly large, which can lead to data exposure and log bloat. Prefer logging only the extracted error code (and possibly a truncated message/first N chars) rather than the entire payload.

Suggested change
logWarning(s"Capacity limit exceeded (non-retryable 429) on ${request.getURI}: $bodyStr")
logWarning(s"Capacity limit exceeded (non-retryable 429, code=CapacityLimitExceeded) " +
s"on ${request.getURI}")

Copilot uses AI. Check for mistakes.
true
} else {
Comment on lines +121 to +126
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bodyStr.contains("CapacityLimitExceeded") is a brittle detector and could produce false positives/negatives (e.g., the substring appearing in a message, different JSON shapes, casing). Since the codebase already uses JSON parsing (e.g., spray-json in core), consider parsing the response JSON and checking error.code == "CapacityLimitExceeded" explicitly before deciding not to retry.

Copilot uses AI. Check for mistakes.
Option(response.getFirstHeader("Retry-After"))
.foreach { h =>
logInfo(s"waiting ${h.getValue} on ${
request match {
case p: HttpPost => p.getURI + " " +
Try(IOUtils.toString(p.getEntity.getContent, "UTF-8")).getOrElse("")
case _ => request.getURI
}
}")
}
false
}
case code =>
logWarning(s"got error $code: ${response.getStatusLine.getReasonPhrase} on ${
request match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,98 @@ class VerifySendWithRetries extends TestBase {
}
}

test("429 with CapacityLimitExceeded body is not retried") {
val port = getFreePort
val requestCount = new AtomicInteger(0)
val capacityBody =
"""{"error":{"code":"CapacityLimitExceeded","message":"Serverless capacity limit exceeded"}}"""
val server = startServer(port) { exchange =>
val n = requestCount.incrementAndGet()
if (n == 1) {
respond(exchange, 429, capacityBody)
} else {
respond(exchange, 200, """{"ok":true}""")
}
}
try {
val client = HttpClients.createDefault()
val request = new HttpGet(s"http://localhost:$port/test")
val start = System.currentTimeMillis()
val response = HandlingUtils.sendWithRetries(
client, request, Array(100, 100, 100))
val elapsed = System.currentTimeMillis() - start
val code = response.getStatusLine.getStatusCode
response.close()
client.close()

assert(code === 429, "Capacity-exceeded 429 should be returned immediately, not retried")
assert(requestCount.get() === 1, "Should not retry on CapacityLimitExceeded")
assert(elapsed < 1000, s"Should return immediately, took ${elapsed}ms")
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The elapsed-time assertions (e.g., < 1000ms) can be flaky under CI contention/GC pauses since they depend on wall-clock timing rather than behavior. Consider removing the timing check (the requestCount assertion already validates no retry) or using a more forgiving bound to reduce intermittent failures.

Copilot uses AI. Check for mistakes.
} finally {
server.stop(0)
}
}

test("429 with CapacityLimitExceeded ignores Retry-After header") {
val port = getFreePort
val requestCount = new AtomicInteger(0)
val capacityBody =
"""{"error":{"code":"CapacityLimitExceeded","message":"Serverless capacity limit exceeded"}}"""
val server = startServer(port) { exchange =>
val n = requestCount.incrementAndGet()
if (n == 1) {
respond(exchange, 429, capacityBody, headers = Map("Retry-After" -> "5"))
} else {
respond(exchange, 200, """{"ok":true}""")
}
}
try {
val client = HttpClients.createDefault()
val request = new HttpGet(s"http://localhost:$port/test")
val start = System.currentTimeMillis()
val response = HandlingUtils.sendWithRetries(
client, request, Array(100, 100, 100))
val elapsed = System.currentTimeMillis() - start
val code = response.getStatusLine.getStatusCode
response.close()
client.close()

assert(code === 429, "Capacity-exceeded should not retry even with Retry-After")
assert(requestCount.get() === 1, "Should not retry on CapacityLimitExceeded")
assert(elapsed < 1000, s"Should ignore Retry-After and return immediately, took ${elapsed}ms")
} finally {
server.stop(0)
}
}

test("429 with non-capacity error body still retries normally") {
val port = getFreePort
val requestCount = new AtomicInteger(0)
val rateLimitBody = """{"error":{"code":"RateLimitExceeded","message":"Too many requests"}}"""
val server = startServer(port) { exchange =>
val n = requestCount.incrementAndGet()
if (n <= 2) {
respond(exchange, 429, rateLimitBody)
} else {
respond(exchange, 200, """{"ok":true}""")
}
}
try {
val client = HttpClients.createDefault()
val request = new HttpGet(s"http://localhost:$port/test")
val response = HandlingUtils.sendWithRetries(
client, request, Array(100, 100, 100))
val code = response.getStatusLine.getStatusCode
response.close()
client.close()

assert(code === 200, "Non-capacity 429 should still retry and eventually succeed")
assert(requestCount.get() === 3, "Should have retried past the rate-limit 429s")
} finally {
server.stop(0)
}
}

test("429 with Retry-After 0 means retry immediately") {
val port = getFreePort
val requestCount = new AtomicInteger(0)
Expand Down
Loading