Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ Flink 安全内部架构是建立在安全模块(实现 `org.apache.flink.runt
### Hadoop Security 模块
该模块使用 Hadoop UserGroupInformation(UGI)类来建立进程范围的 *登录用户* 上下文。然后,登录用户用于与 Hadoop 组件的所有交互,包括 HDFS、HBase 和 YARN。

如果启用了 Hadoop security(在 `core-site.xml` 中),登录用户将拥有所有配置的 Kerberos 凭据。否则,登录用户仅继承启动集群的操作系统帐户的用户身份。
如果启用了 Hadoop security(在 `core-site.xml` 中),登录用户将拥有所有配置的 Kerberos 凭据。否则,登录用户仅继承启动集群的操作系统帐户的用户身份。具体来说,登录过程的优先级如下:
* 当 `hadoop.security.authentication` 设置为 `kerberos` 时
* 当 `security.kerberos.login.keytab-login.enabled` 为 `true`(默认值)且配置了 `security.kerberos.login.keytab` 和 `security.kerberos.login.principal` 时,执行 keytab 登录
* 当 `security.kerberos.login.keytab-login.enabled` 设置为 `false` 时,跳过 keytab 登录,进程将依赖通过 `HADOOP_TOKEN_FILE_LOCATION` 分发的 delegation token。这对于 YARN/Kubernetes 部署中的 TaskManager 容器很有用,可以避免在容器启动上下文中已有 delegation token 时发起不必要的 KDC 请求。
* 当配置了 `security.kerberos.login.use-ticket-cache` 时,执行 credential cache 登录
* 其他情况使用启动集群的操作系统账户的用户身份

<a name="jaas-security-module"></a>

Expand Down
3 changes: 2 additions & 1 deletion docs/content/docs/deployment/security/security-kerberos.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ If Hadoop security is enabled (in `core-site.xml`), the login user will have wha
Otherwise, the login user conveys only the user identity of the OS account that launched the cluster.
In order to be specific the login process has the following order of precedence:
* When `hadoop.security.authentication` is set to `kerberos`
* When `security.kerberos.login.keytab` and `security.kerberos.login.principal` configured then keytab login performed
* When `security.kerberos.login.keytab-login.enabled` is set to `true` (default) and `security.kerberos.login.keytab` and `security.kerberos.login.principal` configured then keytab login performed
* When `security.kerberos.login.keytab-login.enabled` is set to `false`, keytab login is skipped and the process relies on delegation tokens distributed via `HADOOP_TOKEN_FILE_LOCATION` instead. This is useful for TaskManager containers in YARN/Kubernetes deployments to avoid unnecessary KDC requests when delegation tokens are already available from the container launch context.
* When `security.kerberos.login.use-ticket-cache` configured then credential cache login performed
* All other cases user identity of the OS account used

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,18 @@ public class SecurityOptions {
+ "possible to disable that behavior if it somehow conflicts "
+ "with the application being run.");

@Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
public static final ConfigOption<Boolean> KERBEROS_LOGIN_KEYTAB_LOGIN_ENABLED =
key("security.kerberos.login.keytab-login.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to perform Kerberos keytab login during process startup. "
+ "When set to false, the process will rely on delegation tokens "
+ "distributed via HADOOP_TOKEN_FILE_LOCATION instead of performing "
+ "a direct KDC login. This is useful for TaskManager containers in "
+ "YARN/Kubernetes where delegation tokens are already available.");

/**
* Returns a view over the given configuration via which options can be set/retrieved for the
* given provider.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ public void install() throws SecurityInstallException {

try {
KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(securityConfig);
if (kerberosLoginProvider.isLoginPossible(true)) {
boolean keytabLoginEnabled =
securityConfig
.getFlinkConfig()
.get(SecurityOptions.KERBEROS_LOGIN_KEYTAB_LOGIN_ENABLED);
if (keytabLoginEnabled && kerberosLoginProvider.isLoginPossible(true)) {
kerberosLoginProvider.doLogin(true);
loginUser = UserGroupInformation.getLoginUser();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@

import java.io.IOException;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -79,4 +82,56 @@ public void hadoopProxyUserSetWithDelegationTokensEnabledShouldThrow() {
assertTrue(exception.getCause() instanceof UnsupportedOperationException);
}
}

@Test
public void keytabLoginDisabledShouldSkipKdcLogin() {
try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(true);
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
ugi.when(UserGroupInformation::getLoginUser).thenReturn(userGroupInformation);
when(userGroupInformation.getAuthenticationMethod())
.thenReturn(UserGroupInformation.AuthenticationMethod.SIMPLE);

Configuration flinkConf = new Configuration();
flinkConf.set(SecurityOptions.KERBEROS_LOGIN_KEYTAB_LOGIN_ENABLED, false);
SecurityConfiguration securityConf = new SecurityConfiguration(flinkConf);
org.apache.hadoop.conf.Configuration hadoopConf =
new org.apache.hadoop.conf.Configuration();
HadoopModule hadoopModule = new HadoopModule(securityConf, hadoopConf);

assertDoesNotThrow(hadoopModule::install);

// loginUserFromKeytab should never be called when keytab login is disabled
ugi.verify(
() -> UserGroupInformation.loginUserFromKeytab(anyString(), anyString()),
never());
}
}

@Test
public void keytabLoginEnabledByDefaultShouldPerformKdcLogin() {
try (MockedStatic<UserGroupInformation> ugi = mockStatic(UserGroupInformation.class)) {
UserGroupInformation userGroupInformation = mock(UserGroupInformation.class);
ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(true);
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
ugi.when(UserGroupInformation::getLoginUser).thenReturn(userGroupInformation);
when(userGroupInformation.getAuthenticationMethod())
.thenReturn(UserGroupInformation.AuthenticationMethod.KERBEROS);
when(userGroupInformation.isFromKeytab()).thenReturn(true);
when(userGroupInformation.hasKerberosCredentials()).thenReturn(true);

Configuration flinkConf = new Configuration();
// default is true, not setting KERBEROS_LOGIN_KEYTAB_LOGIN_ENABLED explicitly
SecurityConfiguration securityConf = new SecurityConfiguration(flinkConf);
org.apache.hadoop.conf.Configuration hadoopConf =
new org.apache.hadoop.conf.Configuration();
HadoopModule hadoopModule = new HadoopModule(securityConf, hadoopConf);

// Without keytab/principal configured, isLoginPossible returns false (no principal),
// so it falls through to the else branch regardless.
// This test verifies the default config value is true and doesn't cause errors.
assertDoesNotThrow(hadoopModule::install);
}
}
}