Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/kafka/client/broker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ ss::future<> remote_broker::maybe_authenticate() {
co_await do_authenticate();
_authentication_state = auth_state::authenticated;
} catch (...) {
_authentication_state = auth_state::none;
vlog(
_logger.warn, "Authentication error - {}", std::current_exception());
throw;
Comment thread
AldoFusterTurpin marked this conversation as resolved.
Expand Down
103 changes: 103 additions & 0 deletions src/v/kafka/client/test/reconnect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,106 @@ FIXTURE_TEST(password_change_live_client, kafka_client_fixture) {
info("Stopping kafka client");
kafka_client.stop().get();
}

FIXTURE_TEST(auth_failure_does_not_get_stuck_in_progress, kafka_client_fixture) {
using namespace std::chrono_literals;
ss::sstring username{"admin"};
ss::sstring correct_password{"correct_pass"};
ss::sstring wrong_password{"wrong_pass"};

info("Enable SASL and restart");
enable_sasl_and_restart(username);
auto disable_sasl = ss::defer([this] {
info("Disable SASL and restart");
disable_sasl_and_restart();
});
info("Waiting for leadership");
wait_for_controller_leadership().get();

info("Create superuser");
ss::sstring user_body = fmt::format(
R"({{"username": "{}", "password": "{}","algorithm": "SCRAM-SHA-256"}})",
username,
correct_password);
auto body = iobuf();
body.append(user_body.data(), user_body.size());
auto admin_client = make_admin_client();
auto res = http_request(
admin_client,
"/v1/security/users",
std::move(body),
boost::beast::http::verb::post);
BOOST_REQUIRE_EQUAL(res.headers.result(), boost::beast::http::status::ok);

// Connect successfully with correct credentials first so the cluster is
// started and brokers are discovered.
auto kafka_client = make_client();
kafka_client.set_retry_base_backoff(10ms);
kafka_client.set_max_retries(size_t(0));
Comment thread
AldoFusterTurpin marked this conversation as resolved.
kafka_client.set_credentials(
kc::sasl_configuration{
.mechanism = "SCRAM-SHA-256",
.username = username,
.password = correct_password,
});
kafka_client.connect().get();

// Now swap in wrong credentials and force a reconnect by dispatching a
// request. The broker will reconnect and attempt authentication with the
// wrong password, leaving _authentication_state in a failed state.
// Before the fix, the state was left as in_progress, which caused
// needs_authentication() to return false on the next attempt, silently
// skipping re-authentication and making all subsequent requests fail.
info("Swapping to wrong credentials");
kafka_client.set_credentials(
kc::sasl_configuration{
.mechanism = "SCRAM-SHA-256",
.username = username,
.password = wrong_password,
});

// Force the broker to reconnect and hit the bad auth path. Allow 1 retry
// so transient disconnect/broker_not_available after restart does not make
// the test flaky before it reaches the SASL auth path.
restart(true);
wait_for_controller_leadership().get();
kafka_client.set_max_retries(size_t(1));
BOOST_REQUIRE_EXCEPTION(
kafka_client.dispatch(make_list_topics_req()).get(),
kc::broker_error,
[](const kc::broker_error& e) {
return e.error == kafka::error_code::sasl_authentication_failed;
});
Comment thread
AldoFusterTurpin marked this conversation as resolved.

// Restore correct credentials. With the fix, _authentication_state was
// reset to none on failure, so the next dispatch re-attempts auth and
// succeeds. Without the fix it stays in_progress and the request fails.
info("Restoring correct credentials");
kafka_client.set_credentials(
kc::sasl_configuration{
.mechanism = "SCRAM-SHA-256",
.username = username,
.password = correct_password,
});
kafka_client.set_max_retries(size_t(3));

{
info("Adding a known topic to verify authenticated request succeeds");
auto tp = model::topic_partition(
model::topic("t"), model::partition_id(0));
auto ntp = make_default_ntp(tp.topic, tp.partition);
add_topic(model::topic_namespace_view(ntp)).get();

info("Verifying client recovers after auth failure");
auto result = kafka_client.dispatch(make_list_topics_req()).get();
BOOST_REQUIRE_EQUAL(result.data.topics.size(), 1);
Comment thread
AldoFusterTurpin marked this conversation as resolved.
static_assert(
kc::api_version_for(kafka::metadata_request::api_type::key)
< kafka::api_version(12),
"topic::name is nullable in v12+");
BOOST_REQUIRE_EQUAL((*result.data.topics[0].name)(), "t");
}

info("Stopping kafka client");
kafka_client.stop().get();
}
Loading