Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ minor_behavior_changes:

bug_fixes:
# *Changes expected to improve the state of the world and are unlikely to have negative effects*
- area: redis_proxy
change: |
Fixed the SCAN command to return spec-compliant responses in sharded clusters. Previously, SCAN
fanned out to all shards and returned concatenated cursor/key pairs. The fix scans one shard at
a time using encoded cursors (``shard_index:upstream_cursor``). This behavior can be reverted by
setting ``envoy.reloadable_features.redis_scan_single_shard`` to ``false``.
- area: hot_restart
change: |
Fixed hot restart for listeners with a network namespace in the address. Previously, socket
Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ RUNTIME_GUARD(envoy_reloadable_features_reject_empty_trusted_ca_file);
RUNTIME_GUARD(envoy_reloadable_features_report_load_when_rq_active_is_non_zero);
RUNTIME_GUARD(envoy_reloadable_features_reset_ignore_upstream_reason);
RUNTIME_GUARD(envoy_reloadable_features_reset_with_error);
RUNTIME_GUARD(envoy_reloadable_features_redis_scan_single_shard);
RUNTIME_GUARD(envoy_reloadable_features_safe_http2_options);
RUNTIME_GUARD(envoy_reloadable_features_skip_dns_lookup_for_proxied_requests);
RUNTIME_GUARD(envoy_reloadable_features_tcp_proxy_odcds_over_ads_fix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
#include <cstdint>

#include "source/common/common/logger.h"
#include "source/common/runtime/runtime_features.h"
#include "source/extensions/filters/network/common/redis/supported_commands.h"
#include "source/extensions/filters/network/redis_proxy/cluster_response_handler.h"

#include "absl/strings/numbers.h"
#include "absl/strings/str_split.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
Expand Down Expand Up @@ -464,15 +468,68 @@ SplitRequestPtr ScanRequest::create(Router& router, Common::Redis::RespValuePtr&
return nullptr;
}

Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request);

if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.redis_scan_single_shard")) {
std::string& cursor_string = base_request->asArray()[1].asString();

// Cursor format: "shard_index:upstream_cursor" or plain "0" for initial request.
uint32_t target_shard = 0;
std::string upstream_cursor = "0";
std::vector<absl::string_view> parts = absl::StrSplit(cursor_string, ':');
if (parts.size() == 2) {
if (!absl::SimpleAtoi(parts[0], &target_shard) || target_shard >= shard_size) {
command_stats.error_.inc();
callbacks.onResponse(Common::Redis::Utility::makeError("ERR invalid cursor"));
return nullptr;
}
upstream_cursor = std::string(parts[1]);
} else if (parts.size() == 1) {
target_shard = 0;
upstream_cursor = std::string(parts[0]);
} else {
command_stats.error_.inc();
callbacks.onResponse(Common::Redis::Utility::makeError("ERR invalid cursor"));
return nullptr;
}

cursor_string = upstream_cursor;

std::unique_ptr<ScanRequest> request_ptr{new ScanRequest(
callbacks, command_stats, time_source, delay_command_latency, shard_size, target_shard)};
request_ptr->num_pending_responses_ = 1;
request_ptr->pending_requests_.reserve(1);

request_ptr->pending_response_ = std::make_unique<Common::Redis::RespValue>();
request_ptr->pending_response_->type(Common::Redis::RespType::Array);

request_ptr->pending_requests_.emplace_back(*request_ptr, 0);
PendingRequest& pending_request = request_ptr->pending_requests_.back();

ENVOY_LOG(debug, "scan request shard {}, cursor {}: {}", target_shard, upstream_cursor,
base_request->toString());
pending_request.handle_ =
makeFragmentedRequestToShard(route, base_request->asArray()[0].asString(), target_shard,
*base_request, pending_request, callbacks.transaction());

if (!pending_request.handle_) {
pending_request.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost));
}

if (request_ptr->num_pending_responses_ > 0) {
return request_ptr;
}
return nullptr;
}

std::unique_ptr<ScanRequest> request_ptr{
new ScanRequest(callbacks, command_stats, time_source, delay_command_latency)};
new ScanRequest(callbacks, command_stats, time_source, delay_command_latency, 0, 0)};
request_ptr->num_pending_responses_ = shard_size;
request_ptr->pending_requests_.reserve(request_ptr->num_pending_responses_);

request_ptr->pending_response_ = std::make_unique<Common::Redis::RespValue>();
request_ptr->pending_response_->type(Common::Redis::RespType::Array);

Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request);
for (uint32_t shard_index = 0; shard_index < shard_size; shard_index++) {
request_ptr->pending_requests_.emplace_back(*request_ptr, shard_index);
PendingRequest& pending_request = request_ptr->pending_requests_.back();
Expand All @@ -498,6 +555,18 @@ void ScanRequest::onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t
pending_requests_[index].handle_ = nullptr;
switch (value->type()) {
case Common::Redis::RespType::Array: {
// Encode shard index into cursor for single-shard mode.
if (shard_size_ > 0 && value->asArray().size() == 2 &&
value->asArray()[0].type() == Common::Redis::RespType::BulkString) {
std::string& upstream_cursor = value->asArray()[0].asString();
if (upstream_cursor == "0") {
if (target_shard_ + 1 < shard_size_) {
upstream_cursor = fmt::format("{}:0", target_shard_ + 1);
}
} else {
upstream_cursor = fmt::format("{}:{}", target_shard_, upstream_cursor);
}
}
pending_response_->asArray().insert(pending_response_->asArray().end(),
value->asArray().begin(), value->asArray().end());
break;
Expand Down Expand Up @@ -971,8 +1040,9 @@ InstanceImpl::InstanceImpl(RouterPtr&& router, Stats::Scope& scope, const std::s
eval_command_handler_(*router_), object_command_handler_(*router_), mget_handler_(*router_),
mset_handler_(*router_), scan_handler_(*router_), shard_info_handler_(*router_),
random_shard_handler_(*router_), split_keys_sum_result_handler_(*router_),
transaction_handler_(*router_), cluster_scope_handler_(*router_),
stats_{ALL_COMMAND_SPLITTER_STATS(POOL_COUNTER_PREFIX(scope, stat_prefix + "splitter."))},
transaction_handler_(*router_),
cluster_scope_handler_(*router_), stats_{ALL_COMMAND_SPLITTER_STATS(
POOL_COUNTER_PREFIX(scope, stat_prefix + "splitter."))},
time_source_(time_source), fault_manager_(std::move(fault_manager)),
custom_commands_(std::move(custom_commands)) {
for (const std::string& command : Common::Redis::SupportedCommands::simpleCommands()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,9 @@ class MGETRequest : public FragmentedRequest {
};

/**
* ScanRequest is a specialized request for the SCAN command. It sends the command to all Redis
* servers and merges the results. The SCAN command is used to incrementally iterate over keys in
* the database, and it may return multiple pages of results. This request handles the pagination
* by sending multiple requests to the Redis servers until all keys are retrieved.
* ScanRequest handles the SCAN command across a sharded Redis cluster. It routes each SCAN to a
* single shard using an encoded cursor format (shard_index:upstream_cursor) and advances through
* shards sequentially so the client sees a spec-compliant single-cursor response.
*/
class ScanRequest : public FragmentedRequest {
public:
Expand All @@ -318,10 +317,15 @@ class ScanRequest : public FragmentedRequest {

private:
ScanRequest(SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source,
bool delay_command_latency)
: FragmentedRequest(callbacks, command_stats, time_source, delay_command_latency) {}
bool delay_command_latency, uint32_t shard_size, uint32_t target_shard)
: FragmentedRequest(callbacks, command_stats, time_source, delay_command_latency),
shard_size_(shard_size), target_shard_(target_shard) {}

// RedisProxy::CommandSplitter::FragmentedRequest
void onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) override;

uint32_t shard_size_;
uint32_t target_shard_;
};

/**
Expand Down
Loading