diff --git a/changelogs/current.yaml b/changelogs/current.yaml index aadaf66a5a289..4308ef2743836 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -100,6 +100,12 @@ minor_behavior_changes: bug_fixes: # *Changes expected to improve the state of the world and are unlikely to have negative effects* +- area: redis_proxy + change: | + Fixed the SCAN command to return spec-compliant responses in sharded clusters. Previously, SCAN + fanned out to all shards and returned concatenated cursor/key pairs. The fix scans one shard at + a time using encoded cursors (``shard_index:upstream_cursor``). This behavior can be reverted by + setting ``envoy.reloadable_features.redis_scan_single_shard`` to ``false``. - area: hot_restart change: | Fixed hot restart for listeners with a network namespace in the address. Previously, socket diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index 6b452a1d5df9c..602e06615ec3a 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -88,6 +88,7 @@ RUNTIME_GUARD(envoy_reloadable_features_reject_empty_trusted_ca_file); RUNTIME_GUARD(envoy_reloadable_features_report_load_when_rq_active_is_non_zero); RUNTIME_GUARD(envoy_reloadable_features_reset_ignore_upstream_reason); RUNTIME_GUARD(envoy_reloadable_features_reset_with_error); +RUNTIME_GUARD(envoy_reloadable_features_redis_scan_single_shard); RUNTIME_GUARD(envoy_reloadable_features_safe_http2_options); RUNTIME_GUARD(envoy_reloadable_features_skip_dns_lookup_for_proxied_requests); RUNTIME_GUARD(envoy_reloadable_features_tcp_proxy_odcds_over_ads_fix); diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc index fcef0f21d617b..22e91d37e6046 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.cc @@ -4,9 +4,13 @@ #include #include "source/common/common/logger.h" +#include "source/common/runtime/runtime_features.h" #include "source/extensions/filters/network/common/redis/supported_commands.h" #include "source/extensions/filters/network/redis_proxy/cluster_response_handler.h" +#include "absl/strings/numbers.h" +#include "absl/strings/str_split.h" + namespace Envoy { namespace Extensions { namespace NetworkFilters { @@ -464,15 +468,68 @@ SplitRequestPtr ScanRequest::create(Router& router, Common::Redis::RespValuePtr& return nullptr; } + Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); + + if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.redis_scan_single_shard")) { + std::string& cursor_string = base_request->asArray()[1].asString(); + + // Cursor format: "shard_index:upstream_cursor" or plain "0" for initial request. + uint32_t target_shard = 0; + std::string upstream_cursor = "0"; + std::vector parts = absl::StrSplit(cursor_string, ':'); + if (parts.size() == 2) { + if (!absl::SimpleAtoi(parts[0], &target_shard) || target_shard >= shard_size) { + command_stats.error_.inc(); + callbacks.onResponse(Common::Redis::Utility::makeError("ERR invalid cursor")); + return nullptr; + } + upstream_cursor = std::string(parts[1]); + } else if (parts.size() == 1) { + target_shard = 0; + upstream_cursor = std::string(parts[0]); + } else { + command_stats.error_.inc(); + callbacks.onResponse(Common::Redis::Utility::makeError("ERR invalid cursor")); + return nullptr; + } + + cursor_string = upstream_cursor; + + std::unique_ptr request_ptr{new ScanRequest( + callbacks, command_stats, time_source, delay_command_latency, shard_size, target_shard)}; + request_ptr->num_pending_responses_ = 1; + request_ptr->pending_requests_.reserve(1); + + request_ptr->pending_response_ = std::make_unique(); + request_ptr->pending_response_->type(Common::Redis::RespType::Array); + + request_ptr->pending_requests_.emplace_back(*request_ptr, 0); + PendingRequest& pending_request = request_ptr->pending_requests_.back(); + + ENVOY_LOG(debug, "scan request shard {}, cursor {}: {}", target_shard, upstream_cursor, + base_request->toString()); + pending_request.handle_ = + makeFragmentedRequestToShard(route, base_request->asArray()[0].asString(), target_shard, + *base_request, pending_request, callbacks.transaction()); + + if (!pending_request.handle_) { + pending_request.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost)); + } + + if (request_ptr->num_pending_responses_ > 0) { + return request_ptr; + } + return nullptr; + } + std::unique_ptr request_ptr{ - new ScanRequest(callbacks, command_stats, time_source, delay_command_latency)}; + new ScanRequest(callbacks, command_stats, time_source, delay_command_latency, 0, 0)}; request_ptr->num_pending_responses_ = shard_size; request_ptr->pending_requests_.reserve(request_ptr->num_pending_responses_); request_ptr->pending_response_ = std::make_unique(); request_ptr->pending_response_->type(Common::Redis::RespType::Array); - Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request); for (uint32_t shard_index = 0; shard_index < shard_size; shard_index++) { request_ptr->pending_requests_.emplace_back(*request_ptr, shard_index); PendingRequest& pending_request = request_ptr->pending_requests_.back(); @@ -498,6 +555,18 @@ void ScanRequest::onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t pending_requests_[index].handle_ = nullptr; switch (value->type()) { case Common::Redis::RespType::Array: { + // Encode shard index into cursor for single-shard mode. + if (shard_size_ > 0 && value->asArray().size() == 2 && + value->asArray()[0].type() == Common::Redis::RespType::BulkString) { + std::string& upstream_cursor = value->asArray()[0].asString(); + if (upstream_cursor == "0") { + if (target_shard_ + 1 < shard_size_) { + upstream_cursor = fmt::format("{}:0", target_shard_ + 1); + } + } else { + upstream_cursor = fmt::format("{}:{}", target_shard_, upstream_cursor); + } + } pending_response_->asArray().insert(pending_response_->asArray().end(), value->asArray().begin(), value->asArray().end()); break; @@ -971,8 +1040,9 @@ InstanceImpl::InstanceImpl(RouterPtr&& router, Stats::Scope& scope, const std::s eval_command_handler_(*router_), object_command_handler_(*router_), mget_handler_(*router_), mset_handler_(*router_), scan_handler_(*router_), shard_info_handler_(*router_), random_shard_handler_(*router_), split_keys_sum_result_handler_(*router_), - transaction_handler_(*router_), cluster_scope_handler_(*router_), - stats_{ALL_COMMAND_SPLITTER_STATS(POOL_COUNTER_PREFIX(scope, stat_prefix + "splitter."))}, + transaction_handler_(*router_), + cluster_scope_handler_(*router_), stats_{ALL_COMMAND_SPLITTER_STATS( + POOL_COUNTER_PREFIX(scope, stat_prefix + "splitter."))}, time_source_(time_source), fault_manager_(std::move(fault_manager)), custom_commands_(std::move(custom_commands)) { for (const std::string& command : Common::Redis::SupportedCommands::simpleCommands()) { diff --git a/source/extensions/filters/network/redis_proxy/command_splitter_impl.h b/source/extensions/filters/network/redis_proxy/command_splitter_impl.h index 3fe2d3f224dcf..3dc512dc53530 100644 --- a/source/extensions/filters/network/redis_proxy/command_splitter_impl.h +++ b/source/extensions/filters/network/redis_proxy/command_splitter_impl.h @@ -304,10 +304,9 @@ class MGETRequest : public FragmentedRequest { }; /** - * ScanRequest is a specialized request for the SCAN command. It sends the command to all Redis - * servers and merges the results. The SCAN command is used to incrementally iterate over keys in - * the database, and it may return multiple pages of results. This request handles the pagination - * by sending multiple requests to the Redis servers until all keys are retrieved. + * ScanRequest handles the SCAN command across a sharded Redis cluster. It routes each SCAN to a + * single shard using an encoded cursor format (shard_index:upstream_cursor) and advances through + * shards sequentially so the client sees a spec-compliant single-cursor response. */ class ScanRequest : public FragmentedRequest { public: @@ -318,10 +317,15 @@ class ScanRequest : public FragmentedRequest { private: ScanRequest(SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source, - bool delay_command_latency) - : FragmentedRequest(callbacks, command_stats, time_source, delay_command_latency) {} + bool delay_command_latency, uint32_t shard_size, uint32_t target_shard) + : FragmentedRequest(callbacks, command_stats, time_source, delay_command_latency), + shard_size_(shard_size), target_shard_(target_shard) {} + // RedisProxy::CommandSplitter::FragmentedRequest void onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) override; + + uint32_t shard_size_; + uint32_t target_shard_; }; /** diff --git a/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc b/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc index 4a18578ffca07..9c7e4b1acad7f 100644 --- a/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc +++ b/test/extensions/filters/network/redis_proxy/command_splitter_impl_test.cc @@ -17,6 +17,7 @@ #include "test/mocks/stats/mocks.h" #include "test/mocks/stream_info/mocks.h" #include "test/test_common/simulated_time_system.h" +#include "test/test_common/test_runtime.h" using testing::_; using testing::DoAll; @@ -1479,112 +1480,188 @@ INSTANTIATE_TEST_SUITE_P(RedisSingleServerRequestWithDelayFaultTest, class ScanHandlerTest : public FragmentedRequestCommandHandlerTest, public testing::WithParamInterface { public: - void setup(uint16_t shard_size, const std::list& null_handle_indexes, - bool mirrored = false) { - std::vector request_strings = {"scan", "0"}; - makeRequestToShard(shard_size, request_strings, null_handle_indexes, mirrored); + void setupSingleShard(uint16_t shard_size, uint32_t target_shard, const std::string& cursor_str, + bool null_handle = false) { + Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; + makeBulkStringArray(*request, {"scan", cursor_str}); + + pool_callbacks_.resize(1); + std::vector tmp_pool_requests(1); + pool_requests_.swap(tmp_pool_requests); + + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); + EXPECT_CALL(*conn_pool_, shardSize_()).WillRepeatedly(Return(shard_size)); + + Common::Redis::Client::PoolRequest* request_to_use = null_handle ? nullptr : &pool_requests_[0]; + EXPECT_CALL(*conn_pool_, makeRequestToShard_(target_shard, _, _)) + .WillOnce(DoAll(WithArg<2>(SaveArgAddress(&pool_callbacks_[0])), Return(request_to_use))); + + handle_ = splitter_.makeRequest(std::move(request), callbacks_, dispatcher_, stream_info_); } - Common::Redis::RespValuePtr response() { - Common::Redis::RespValuePtr response = std::make_unique(); - response->type(Common::Redis::RespType::Array); - return response; + Common::Redis::RespValuePtr scanResponse(const std::string& cursor, + const std::vector& keys) { + auto resp = std::make_unique(); + resp->type(Common::Redis::RespType::Array); + std::vector elements(2); + + elements[0].type(Common::Redis::RespType::BulkString); + elements[0].asString() = cursor; + + elements[1].type(Common::Redis::RespType::Array); + std::vector key_values(keys.size()); + for (uint64_t i = 0; i < keys.size(); i++) { + key_values[i].type(Common::Redis::RespType::BulkString); + key_values[i].asString() = keys[i]; + } + elements[1].asArray().swap(key_values); + resp->asArray().swap(elements); + return resp; + } + + Common::Redis::RespValue expectedScanResponse(const std::string& cursor, + const std::vector& keys) { + Common::Redis::RespValue expected; + expected.type(Common::Redis::RespType::Array); + std::vector elements(2); + elements[0].type(Common::Redis::RespType::BulkString); + elements[0].asString() = cursor; + elements[1].type(Common::Redis::RespType::Array); + std::vector key_values(keys.size()); + for (uint64_t i = 0; i < keys.size(); i++) { + key_values[i].type(Common::Redis::RespType::BulkString); + key_values[i].asString() = keys[i]; + } + elements[1].asArray().swap(key_values); + expected.asArray().swap(elements); + return expected; } }; -TEST_P(ScanHandlerTest, Normal) { +TEST_P(ScanHandlerTest, InitialScanRoutesToShardZero) { InSequence s; - setup(2, {}); + setupSingleShard(3, 0, "0"); EXPECT_NE(nullptr, handle_); - Common::Redis::RespValue expected_response; - expected_response.type(Common::Redis::RespType::Array); - pool_callbacks_[1]->onResponse(response()); + + Common::Redis::RespValue expected_response = expectedScanResponse("0:42", {"key1"}); + time_system_.setMonotonicTime(std::chrono::milliseconds(10)); EXPECT_CALL( store_, deliverHistogramToSinks( Property(&Stats::Metric::name, "redis.foo.command." + GetParam() + ".latency"), 10)); EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&expected_response))); - pool_callbacks_[0]->onResponse(response()); + pool_callbacks_[0]->onResponse(scanResponse("42", {"key1"})); + EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".total").value()); EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".success").value()); }; -TEST_P(ScanHandlerTest, Mirrored) { +TEST_P(ScanHandlerTest, EncodedCursorRoutesToCorrectShard) { InSequence s; - setupMirrorPolicy(); - setup(2, {}, true); + setupSingleShard(3, 1, "1:55"); EXPECT_NE(nullptr, handle_); - Common::Redis::RespValue expected_response; - expected_response.type(Common::Redis::RespType::Array); + Common::Redis::RespValue expected_response = expectedScanResponse("1:99", {}); - pool_callbacks_[1]->onResponse(response()); - mirror_pool_callbacks_[1]->onResponse(response()); - - time_system_.setMonotonicTime(std::chrono::milliseconds(10)); - EXPECT_CALL( - store_, - deliverHistogramToSinks( - Property(&Stats::Metric::name, "redis.foo.command." + GetParam() + ".latency"), 10)); EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&expected_response))); - pool_callbacks_[0]->onResponse(response()); - mirror_pool_callbacks_[0]->onResponse(response()); - - EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".total").value()); - EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".success").value()); + pool_callbacks_[0]->onResponse(scanResponse("99", {})); }; -TEST_F(ScanHandlerTest, Cancel) { +TEST_P(ScanHandlerTest, ShardDoneAdvancesToNextShard) { InSequence s; - setup(2, {}); + setupSingleShard(3, 0, "0"); EXPECT_NE(nullptr, handle_); - EXPECT_CALL(pool_requests_[0], cancel()); - EXPECT_CALL(pool_requests_[1], cancel()); - handle_->cancel(); + Common::Redis::RespValue expected_response = expectedScanResponse("1:0", {}); + + EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&expected_response))); + pool_callbacks_[0]->onResponse(scanResponse("0", {})); }; -TEST_P(ScanHandlerTest, NormalOneZero) { +TEST_P(ScanHandlerTest, LastShardDoneReturnsZeroCursor) { InSequence s; - setup(2, {}); + setupSingleShard(3, 2, "2:10"); EXPECT_NE(nullptr, handle_); + Common::Redis::RespValue expected_response = expectedScanResponse("0", {}); + + EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&expected_response))); + pool_callbacks_[0]->onResponse(scanResponse("0", {})); +}; + +TEST_F(ScanHandlerTest, InvalidCursorFormat) { + InSequence s; + + Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; + makeBulkStringArray(*request, {"scan", "a:b:c"}); + Common::Redis::RespValue expected_response; - expected_response.type(Common::Redis::RespType::Array); + expected_response.type(Common::Redis::RespType::Error); + expected_response.asString() = "ERR invalid cursor"; - pool_callbacks_[1]->onResponse(response()); + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); + EXPECT_CALL(*conn_pool_, shardSize_()).WillRepeatedly(Return(3)); + EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&expected_response))); + EXPECT_EQ(nullptr, + splitter_.makeRequest(std::move(request), callbacks_, dispatcher_, stream_info_)); +}; + +TEST_F(ScanHandlerTest, ShardIndexOutOfBounds) { + InSequence s; + + Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; + makeBulkStringArray(*request, {"scan", "5:0"}); + + Common::Redis::RespValue expected_response; + expected_response.type(Common::Redis::RespType::Error); + expected_response.asString() = "ERR invalid cursor"; + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); + EXPECT_CALL(*conn_pool_, shardSize_()).WillRepeatedly(Return(3)); EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&expected_response))); - pool_callbacks_[0]->onResponse(response()); + EXPECT_EQ(nullptr, + splitter_.makeRequest(std::move(request), callbacks_, dispatcher_, stream_info_)); +}; - EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".total").value()); - EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".success").value()); +TEST_F(ScanHandlerTest, Cancel) { + InSequence s; + + setupSingleShard(2, 0, "0"); + EXPECT_NE(nullptr, handle_); + + EXPECT_CALL(pool_requests_[0], cancel()); + handle_->cancel(); }; -TEST_P(ScanHandlerTest, UpstreamError) { +TEST_P(ScanHandlerTest, NoUpstreamHostForAll) { Common::Redis::RespValue expected_response; expected_response.type(Common::Redis::RespType::Error); - expected_response.asString() = "finished with 2 error(s)"; + expected_response.asString() = "no upstream host"; + EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true)); + EXPECT_CALL(*conn_pool_, shardSize_()).WillRepeatedly(Return(0)); EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&expected_response))); - setup(2, {0, 1}); - EXPECT_EQ(nullptr, handle_); + + Common::Redis::RespValuePtr request{new Common::Redis::RespValue()}; + makeBulkStringArray(*request, {"scan", "0"}); + EXPECT_EQ(nullptr, + splitter_.makeRequest(std::move(request), callbacks_, dispatcher_, stream_info_)); EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".total").value()); EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".error").value()); }; -TEST_P(ScanHandlerTest, NoUpstreamHostForAll) { +TEST_P(ScanHandlerTest, UpstreamFailure) { Common::Redis::RespValue expected_response; expected_response.type(Common::Redis::RespType::Error); - expected_response.asString() = "no upstream host"; + expected_response.asString() = "finished with 1 error(s)"; EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&expected_response))); - setup(0, {}); + setupSingleShard(2, 0, "0", true); EXPECT_EQ(nullptr, handle_); EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".total").value()); EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".error").value()); @@ -1607,6 +1684,71 @@ TEST_F(ScanHandlerTest, ScanWrongNumberOfArgs) { INSTANTIATE_TEST_SUITE_P(ScanHandlerTest, ScanHandlerTest, testing::Values("scan")); +class ScanHandlerLegacyTest : public FragmentedRequestCommandHandlerTest { +public: + ScanHandlerLegacyTest() { + scoped_runtime_.mergeValues({{"envoy.reloadable_features.redis_scan_single_shard", "false"}}); + } + + void setup(uint16_t shard_size, const std::list& null_handle_indexes) { + std::vector request_strings = {"scan", "0"}; + makeRequestToShard(shard_size, request_strings, null_handle_indexes, false); + } + + Common::Redis::RespValuePtr response() { + Common::Redis::RespValuePtr response = std::make_unique(); + response->type(Common::Redis::RespType::Array); + return response; + } + + TestScopedRuntime scoped_runtime_; +}; + +TEST_F(ScanHandlerLegacyTest, FanOutToAllShards) { + InSequence s; + + setup(2, {}); + EXPECT_NE(nullptr, handle_); + + Common::Redis::RespValue expected_response; + expected_response.type(Common::Redis::RespType::Array); + + pool_callbacks_[1]->onResponse(response()); + + time_system_.setMonotonicTime(std::chrono::milliseconds(10)); + EXPECT_CALL(store_, deliverHistogramToSinks( + Property(&Stats::Metric::name, "redis.foo.command.scan.latency"), 10)); + EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&expected_response))); + pool_callbacks_[0]->onResponse(response()); + + EXPECT_EQ(1UL, store_.counter("redis.foo.command.scan.total").value()); + EXPECT_EQ(1UL, store_.counter("redis.foo.command.scan.success").value()); +}; + +TEST_F(ScanHandlerLegacyTest, AllShardsUpstreamError) { + Common::Redis::RespValue expected_response; + expected_response.type(Common::Redis::RespType::Error); + expected_response.asString() = "finished with 2 error(s)"; + + EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&expected_response))); + setup(2, {0, 1}); + EXPECT_EQ(nullptr, handle_); + EXPECT_EQ(1UL, store_.counter("redis.foo.command.scan.total").value()); + EXPECT_EQ(1UL, store_.counter("redis.foo.command.scan.error").value()); +}; + +TEST_F(ScanHandlerLegacyTest, NoUpstreamHost) { + Common::Redis::RespValue expected_response; + expected_response.type(Common::Redis::RespType::Error); + expected_response.asString() = "no upstream host"; + + EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&expected_response))); + setup(0, {}); + EXPECT_EQ(nullptr, handle_); + EXPECT_EQ(1UL, store_.counter("redis.foo.command.scan.total").value()); + EXPECT_EQ(1UL, store_.counter("redis.foo.command.scan.error").value()); +}; + // INFO.SHARD command handler tests - queries a single specific shard class InfoShardHandlerTest : public FragmentedRequestCommandHandlerTest, public testing::WithParamInterface {