diff --git a/e2e_test/ddl/role_inherited_privilege.slt b/e2e_test/ddl/role_inherited_privilege.slt new file mode 100644 index 0000000000000..2c121182cba35 --- /dev/null +++ b/e2e_test/ddl/role_inherited_privilege.slt @@ -0,0 +1,119 @@ +# Test that actual query privilege checks honor role inheritance, not only +# pg_catalog.has_table_privilege. + +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +DROP TABLE IF EXISTS role_inherited_privilege_t; + +statement ok +DROP ROLE IF EXISTS role_inherited_privilege_user; + +statement ok +DROP ROLE IF EXISTS role_noinherit_privilege_user; + +statement ok +DROP ROLE IF EXISTS role_inherited_privilege_app; + +statement ok +DROP ROLE IF EXISTS role_inherited_privilege_select; + +statement ok +DROP ROLE IF EXISTS role_inherited_privilege_update; + +statement ok +CREATE TABLE role_inherited_privilege_t (id int PRIMARY KEY, v int); + +statement ok +INSERT INTO role_inherited_privilege_t VALUES (1, 10), (2, 20); + +statement ok +CREATE ROLE role_inherited_privilege_select; + +statement ok +CREATE ROLE role_inherited_privilege_update; + +statement ok +CREATE ROLE role_inherited_privilege_app; + +statement ok +CREATE USER role_inherited_privilege_user WITH PASSWORD 'secret'; + +statement ok +CREATE USER role_noinherit_privilege_user WITH PASSWORD 'secret'; + +statement ok +GRANT SELECT ON TABLE role_inherited_privilege_t TO role_inherited_privilege_select; + +statement ok +GRANT UPDATE ON TABLE role_inherited_privilege_t TO role_inherited_privilege_update; + +statement ok +GRANT role_inherited_privilege_select TO role_inherited_privilege_app; + +statement ok +GRANT role_inherited_privilege_update TO role_inherited_privilege_app; + +statement ok +GRANT role_inherited_privilege_app TO role_inherited_privilege_user; + +statement ok +GRANT role_inherited_privilege_app TO role_noinherit_privilege_user WITH INHERIT FALSE; + +# Actual SELECT should also honor role-to-role inheritance after SET ROLE. +statement ok +SET ROLE role_inherited_privilege_app; + +query II +SELECT id, v FROM role_inherited_privilege_t ORDER BY id; +---- +1 10 +2 20 + +statement ok +UPDATE role_inherited_privilege_t SET v = v + 1 WHERE id = 1; + +statement ok +RESET ROLE; + +query II +SELECT id, v FROM role_inherited_privilege_t ORDER BY id; +---- +1 11 +2 20 + +# Actual SELECT/UPDATE should honor role-to-user inheritance for a login user. +system ok +PGPASSWORD=secret psql -h localhost -p 4566 -d dev -U role_inherited_privilege_user -v ON_ERROR_STOP=1 -c "SELECT id, v FROM role_inherited_privilege_t ORDER BY id;" | grep -q "1.*11" + +system ok +PGPASSWORD=secret psql -h localhost -p 4566 -d dev -U role_inherited_privilege_user -v ON_ERROR_STOP=1 -c "SET RW_IMPLICIT_FLUSH TO true; UPDATE role_inherited_privilege_t SET v = v + 1 WHERE id = 2;" + +query II +SELECT id, v FROM role_inherited_privilege_t ORDER BY id; +---- +1 11 +2 21 + +# INHERIT FALSE remains a denial for implicit role-to-user privileges. +system ok +PGPASSWORD=secret psql -h localhost -p 4566 -d dev -U role_noinherit_privilege_user -v ON_ERROR_STOP=1 -c "SELECT id, v FROM role_inherited_privilege_t ORDER BY id;" 2>&1 | grep -q "Permission denied" + +statement ok +DROP TABLE role_inherited_privilege_t; + +statement ok +DROP ROLE role_inherited_privilege_user; + +statement ok +DROP ROLE role_noinherit_privilege_user; + +statement ok +DROP ROLE role_inherited_privilege_app; + +statement ok +DROP ROLE role_inherited_privilege_select; + +statement ok +DROP ROLE role_inherited_privilege_update; diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index a727d4acb5007..a06127d94088d 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -61,6 +61,7 @@ pub use relation::{ // Re-export common types pub use risingwave_common::gap_fill::FillStrategy; use risingwave_common::id::ObjectId; +use risingwave_pb::user::RoleMembership; pub use select::{BoundDistinct, BoundSelect}; pub use set_expr::*; pub use statement::BoundStatement; @@ -73,6 +74,7 @@ use crate::catalog::schema_catalog::SchemaCatalog; use crate::catalog::{CatalogResult, DatabaseId, SecretId, ViewId}; use crate::error::ErrorCode; use crate::session::{AuthContext, SessionImpl, StagingCatalogManager, TemporarySourceManager}; +use crate::user::effective_privilege::role_memberships_snapshot; use crate::user::user_service::UserInfoReadGuard; pub type ShareId = usize; @@ -99,6 +101,7 @@ pub struct Binder { session_id: SessionId, context: BindContext, auth_context: Arc, + role_memberships: Vec, /// A stack holding contexts of outer queries when binding a subquery. /// It also holds all of the lateral contexts for each respective /// subquery. @@ -252,6 +255,9 @@ impl Binder { session_id: session.id(), context: BindContext::new(), auth_context: session.auth_context(), + role_memberships: role_memberships_snapshot( + session.env().role_membership_info_reader(), + ), upper_subquery_contexts: vec![], lateral_contexts: vec![], next_subquery_id: 0, diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index fa5d99ff51d40..0942e693726ac 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -37,6 +37,7 @@ use crate::catalog::{CatalogError, DatabaseId, IndexCatalog, TableId}; use crate::error::ErrorCode::PermissionDenied; use crate::error::{ErrorCode, Result, RwError}; use crate::handler::privilege::ObjectCheckItem; +use crate::user::effective_privilege::catalog_user_has_privilege; #[derive(Debug, Clone)] pub struct BoundBaseTable { @@ -299,23 +300,34 @@ impl Binder { .into()); } if let Some(user) = self.user.get_user_by_name(&self.auth_context.user_name) { - if user.is_super || user.id == item.owner { - return Ok(()); - } - if !user.has_privilege(item.object, item.mode) { + if !catalog_user_has_privilege( + &self.user, + user, + &self.role_memberships, + item.owner, + item.object, + item.mode, + ) { return Err(PermissionDenied(item.error_message()).into()); } // check CONNECT privilege for cross-db access - if self.database_id != database_id - && !user.has_privilege(database_id, AclMode::Connect) - { - let db_name = self.catalog.get_database_by_id(database_id)?.name.clone(); - - return Err(PermissionDenied(format!( - "permission denied for database \"{db_name}\"" - )) - .into()); + if self.database_id != database_id { + let db = self.catalog.get_database_by_id(database_id)?; + if !catalog_user_has_privilege( + &self.user, + user, + &self.role_memberships, + db.owner, + database_id, + AclMode::Connect, + ) { + return Err(PermissionDenied(format!( + "permission denied for database \"{}\"", + db.name + )) + .into()); + } } } else { return Err(PermissionDenied("Session user is invalid".to_owned()).into()); diff --git a/src/frontend/src/handler/alter_user.rs b/src/frontend/src/handler/alter_user.rs index d0b92b31cbf90..97b0449335331 100644 --- a/src/frontend/src/handler/alter_user.rs +++ b/src/frontend/src/handler/alter_user.rs @@ -95,14 +95,22 @@ fn alter_prost_user_info( user_info.can_create_db = false; update_fields.insert(UpdateField::CreateDb); } - UserOption::CreateUser => { + UserOption::CreateRole | UserOption::CreateUser => { user_info.can_create_user = true; update_fields.insert(UpdateField::CreateUser); } - UserOption::NoCreateUser => { + UserOption::NoCreateRole | UserOption::NoCreateUser => { user_info.can_create_user = false; update_fields.insert(UpdateField::CreateUser); } + UserOption::Inherit => { + user_info.can_inherit = true; + update_fields.insert(UpdateField::Inherit); + } + UserOption::NoInherit => { + user_info.can_inherit = false; + update_fields.insert(UpdateField::Inherit); + } UserOption::Login => { user_info.can_login = true; update_fields.insert(UpdateField::Login); @@ -119,15 +127,6 @@ fn alter_prost_user_info( user_info.is_admin = false; update_fields.insert(UpdateField::Admin); } - UserOption::CreateRole - | UserOption::NoCreateRole - | UserOption::Inherit - | UserOption::NoInherit => { - return Err(ErrorCode::InvalidParameterValue( - "role options are not supported yet".to_owned(), - ) - .into()); - } UserOption::EncryptedPassword(p) => { if !p.0.is_empty() { user_info.auth_info = encrypted_password(&user_info.name, &p.0); @@ -253,6 +252,13 @@ pub async fn handle_alter_user( Ok(response_builder.into()) } +pub async fn handle_alter_role( + handler_args: HandlerArgs, + stmt: AlterUserStatement, +) -> Result { + handle_alter_user(handler_args, stmt).await +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/src/frontend/src/handler/create_user.rs b/src/frontend/src/handler/create_user.rs index 940bd4cef70be..e7e2468de9a2e 100644 --- a/src/frontend/src/handler/create_user.rs +++ b/src/frontend/src/handler/create_user.rs @@ -105,21 +105,16 @@ pub async fn handle_create_user( UserOption::NoSuperUser => user_info.is_super = false, UserOption::CreateDB => user_info.can_create_db = true, UserOption::NoCreateDB => user_info.can_create_db = false, - UserOption::CreateUser => user_info.can_create_user = true, - UserOption::NoCreateUser => user_info.can_create_user = false, + UserOption::CreateRole | UserOption::CreateUser => user_info.can_create_user = true, + UserOption::NoCreateRole | UserOption::NoCreateUser => { + user_info.can_create_user = false + } + UserOption::Inherit => user_info.can_inherit = true, + UserOption::NoInherit => user_info.can_inherit = false, UserOption::Login => user_info.can_login = true, UserOption::NoLogin => user_info.can_login = false, UserOption::Admin => user_info.is_admin = true, UserOption::NoAdmin => user_info.is_admin = false, - UserOption::CreateRole - | UserOption::NoCreateRole - | UserOption::Inherit - | UserOption::NoInherit => { - return Err(ErrorCode::InvalidParameterValue( - "role options are not supported yet".to_owned(), - ) - .into()); - } UserOption::EncryptedPassword(password) => { if !password.0.is_empty() { user_info.auth_info = encrypted_password(&user_info.name, &password.0); @@ -168,6 +163,21 @@ pub async fn handle_create_user( Ok(response_builder.into()) } +pub async fn handle_create_role( + handler_args: HandlerArgs, + mut stmt: CreateUserStatement, +) -> Result { + let has_explicit_login = stmt + .with_options + .0 + .iter() + .any(|option| matches!(option, UserOption::Login | UserOption::NoLogin)); + if !has_explicit_login { + stmt.with_options.0.push(UserOption::NoLogin); + } + handle_create_user(handler_args, stmt).await +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/src/frontend/src/handler/handle_privilege.rs b/src/frontend/src/handler/handle_privilege.rs index 276bb12aa102f..b2df8f21dc769 100644 --- a/src/frontend/src/handler/handle_privilege.rs +++ b/src/frontend/src/handler/handle_privilege.rs @@ -26,7 +26,8 @@ use risingwave_pb::user::alter_default_privilege_request::{ use risingwave_pb::user::grant_privilege::ActionWithGrantOption; use risingwave_pb::user::{PbAction, PbGrantPrivilege}; use risingwave_sqlparser::ast::{ - DefaultPrivilegeOperation, GrantObjects, Ident, PrivilegeObjectType, Privileges, Statement, + DefaultPrivilegeOperation, GrantObjects, Ident, PrivilegeObjectType, Privileges, + RoleOptionKind, RoleOptionSpec, Statement, }; use super::RwPgResponse; @@ -39,6 +40,7 @@ use crate::error::{ErrorCode, Result}; use crate::handler::HandlerArgs; use crate::session::SessionImpl; use crate::user::UserId; +use crate::user::effective_privilege::{can_inherit_role, role_memberships_snapshot}; use crate::user::user_privilege::{ available_privilege_actions, check_privilege_type, get_prost_action, }; @@ -47,6 +49,7 @@ fn make_prost_privilege( session: &SessionImpl, privileges: Privileges, objects: GrantObjects, + granted_by: UserId, ) -> Result> { check_privilege_type(&privileges, &objects)?; @@ -358,7 +361,7 @@ fn make_prost_privilege( .into_iter() .map(|action| ActionWithGrantOption { action: action as i32, - granted_by: session.user_id(), + granted_by: granted_by as _, ..Default::default() }) .collect::>(); @@ -458,23 +461,11 @@ pub async fn handle_grant_privilege( return Err(ErrorCode::BindError("Invalid grant statement".to_owned()).into()); }; let users = bind_user_from_idents(&session, grantees)?; - if let Some(granted_by) = &granted_by { - let user_reader = session.env().user_info_reader(); - let reader = user_reader.read_guard(); - - // We remark that the user name is always case-sensitive. - if reader.get_user_by_name(&granted_by.real_value()).is_none() { - return Err(ErrorCode::InvalidInputSyntax(format!( - "Grantor \"{granted_by}\" does not exist" - )) - .into()); - } - } - - let privileges = make_prost_privilege(&session, privileges, objects)?; + let granted_by = bind_object_granted_by(&session, granted_by)?; + let privileges = make_prost_privilege(&session, privileges, objects, granted_by)?; let user_info_writer = session.user_info_writer()?; user_info_writer - .grant_privilege(users, privileges, with_grant_option, session.user_id()) + .grant_privilege(users, privileges, with_grant_option, granted_by) .await?; Ok(PgResponse::empty_result(StatementType::GRANT_PRIVILEGE)) } @@ -496,27 +487,14 @@ pub async fn handle_revoke_privilege( return Err(ErrorCode::BindError("Invalid revoke statement".to_owned()).into()); }; let users = bind_user_from_idents(&session, grantees)?; - let mut granted_by_id = None; - if let Some(granted_by) = &granted_by { - let user_reader = session.env().user_info_reader(); - let reader = user_reader.read_guard(); - - if let Some(user) = reader.get_user_by_name(&granted_by.real_value()) { - granted_by_id = Some(user.id); - } else { - return Err(ErrorCode::InvalidInputSyntax(format!( - "Grantor \"{granted_by}\" does not exist" - )) - .into()); - } - } - let privileges = make_prost_privilege(&session, privileges, objects)?; + let granted_by_id = bind_object_granted_by(&session, granted_by)?; + let privileges = make_prost_privilege(&session, privileges, objects, granted_by_id)?; let user_info_writer = session.user_info_writer()?; user_info_writer .revoke_privilege( users, privileges, - granted_by_id.unwrap_or(session.user_id()), + granted_by_id, session.user_id(), revoke_grant_option, cascade, @@ -526,6 +504,163 @@ pub async fn handle_revoke_privilege( Ok(PgResponse::empty_result(StatementType::REVOKE_PRIVILEGE)) } +pub async fn handle_grant_role(handler_args: HandlerArgs, stmt: Statement) -> Result { + let session = handler_args.session; + let Statement::GrantRole { + roles, + grantees, + role_options, + granted_by, + } = stmt + else { + return Err(ErrorCode::BindError("Invalid grant role statement".to_owned()).into()); + }; + + let role_ids = bind_user_from_idents(&session, roles)?; + let member_ids = bind_user_from_idents(&session, grantees)?; + let granted_by = bind_role_granted_by(&session, granted_by)?; + let mut admin_option = None; + let mut inherit_option = None; + let mut set_option = None; + for RoleOptionSpec { kind, value } in role_options { + match kind { + RoleOptionKind::Admin => admin_option = Some(value), + RoleOptionKind::Inherit => inherit_option = Some(value), + RoleOptionKind::Set => set_option = Some(value), + } + } + let user_info_writer = session.user_info_writer()?; + user_info_writer + .grant_role( + role_ids, + member_ids, + granted_by.id, + session.user_id(), + granted_by.specified, + admin_option, + inherit_option, + set_option, + ) + .await?; + + Ok(PgResponse::empty_result(StatementType::GRANT_PRIVILEGE)) +} + +pub async fn handle_revoke_role( + handler_args: HandlerArgs, + stmt: Statement, +) -> Result { + let session = handler_args.session; + let Statement::RevokeRole { + roles, + grantees, + revoke_role_option, + granted_by, + cascade, + } = stmt + else { + return Err(ErrorCode::BindError("Invalid revoke role statement".to_owned()).into()); + }; + + let revoke_admin_option = matches!(revoke_role_option, Some(RoleOptionKind::Admin)); + let revoke_inherit_option = matches!(revoke_role_option, Some(RoleOptionKind::Inherit)); + let revoke_set_option = matches!(revoke_role_option, Some(RoleOptionKind::Set)); + let role_ids = bind_user_from_idents(&session, roles)?; + let member_ids = bind_user_from_idents(&session, grantees)?; + let granted_by = bind_role_granted_by(&session, granted_by)?; + let user_info_writer = session.user_info_writer()?; + user_info_writer + .revoke_role( + role_ids, + member_ids, + granted_by.id, + granted_by.specified, + session.user_id(), + revoke_admin_option, + revoke_inherit_option, + revoke_set_option, + cascade, + ) + .await?; + + Ok(PgResponse::empty_result(StatementType::REVOKE_PRIVILEGE)) +} + +struct ResolvedGrantor { + id: UserId, + specified: bool, +} + +fn resolve_granted_by(session: &SessionImpl, granted_by: Option) -> Result { + let Some(granted_by) = granted_by else { + return Ok(ResolvedGrantor { + id: session.user_id(), + specified: false, + }); + }; + let granted_by_name = granted_by.real_value(); + let resolved_grantor = if granted_by_name.eq_ignore_ascii_case("current_user") + || granted_by_name.eq_ignore_ascii_case("current_role") + { + session.user_name() + } else if granted_by_name.eq_ignore_ascii_case("session_user") { + session.session_user_name() + } else { + granted_by_name + }; + + let user_reader = session.env().user_info_reader(); + let reader = user_reader.read_guard(); + let user = reader.get_user_by_name(&resolved_grantor).ok_or_else(|| { + ErrorCode::InvalidInputSyntax(format!("User \"{}\" does not exist", resolved_grantor)) + })?; + Ok(ResolvedGrantor { + id: user.id, + specified: true, + }) +} + +fn bind_object_granted_by(session: &SessionImpl, granted_by: Option) -> Result { + let granted_by = resolve_granted_by(session, granted_by)?; + if granted_by.id != session.user_id() { + return Err(ErrorCode::InvalidInputSyntax(format!( + "GRANTED BY must specify the current user \"{}\"", + session.user_name() + )) + .into()); + } + Ok(granted_by.id) +} + +fn bind_role_granted_by( + session: &SessionImpl, + granted_by: Option, +) -> Result { + let granted_by = resolve_granted_by(session, granted_by)?; + if granted_by.id == session.user_id() { + return Ok(granted_by); + } + let user_reader = session.env().user_info_reader(); + let reader = user_reader.read_guard(); + if reader + .get_user_by_id(&session.user_id()) + .map(|user| user.is_super) + .unwrap_or(false) + { + return Ok(granted_by); + } + drop(reader); + let memberships = role_memberships_snapshot(session.env().role_membership_info_reader()); + if can_inherit_role(session.user_id(), granted_by.id, &memberships) { + return Ok(granted_by); + } + Err(ErrorCode::InvalidInputSyntax(format!( + "current user \"{}\" does not possess grantor role", + session.user_name() + )) + .into()) +} + pub async fn handle_alter_default_privileges( handler_args: HandlerArgs, stmt: Statement, @@ -675,9 +810,19 @@ mod tests { .await .unwrap(); frontend.run_sql("CREATE DATABASE db1").await.unwrap(); - frontend + let grantor_err = frontend .run_sql("GRANT ALL ON DATABASE db1 TO user1 WITH GRANT OPTION GRANTED BY user") .await + .unwrap_err() + .to_string(); + assert!( + grantor_err.contains("GRANTED BY must specify the current user \"root\""), + "{grantor_err}" + ); + + frontend + .run_sql("GRANT ALL ON DATABASE db1 TO user1 WITH GRANT OPTION GRANTED BY root") + .await .unwrap(); let (session_database_id, database_id) = { @@ -727,7 +872,7 @@ mod tests { } frontend - .run_sql("REVOKE GRANT OPTION FOR ALL ON DATABASE db1 from user1 GRANTED BY user") + .run_sql("REVOKE GRANT OPTION FOR ALL ON DATABASE db1 from user1 GRANTED BY root") .await .unwrap(); { @@ -744,7 +889,7 @@ mod tests { } frontend - .run_sql("REVOKE ALL ON DATABASE db1 from user1 GRANTED BY user") + .run_sql("REVOKE ALL ON DATABASE db1 from user1 GRANTED BY root") .await .unwrap(); { diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index e1630691bd843..7082bbe668178 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -481,6 +481,7 @@ pub async fn handle( .await } Statement::CreateUser(stmt) => create_user::handle_create_user(handler_args, stmt).await, + Statement::CreateRole(stmt) => create_user::handle_create_role(handler_args, stmt).await, Statement::DeclareCursor { stmt } => { declare_cursor::handle_declare_cursor(handler_args, stmt).await } @@ -491,12 +492,19 @@ pub async fn handle( close_cursor::handle_close_cursor(handler_args, stmt).await } Statement::AlterUser(stmt) => alter_user::handle_alter_user(handler_args, stmt).await, + Statement::AlterRole(stmt) => alter_user::handle_alter_role(handler_args, stmt).await, Statement::Grant { .. } => { handle_privilege::handle_grant_privilege(handler_args, stmt).await } + Statement::GrantRole { .. } => { + handle_privilege::handle_grant_role(handler_args, stmt).await + } Statement::Revoke { .. } => { handle_privilege::handle_revoke_privilege(handler_args, stmt).await } + Statement::RevokeRole { .. } => { + handle_privilege::handle_revoke_role(handler_args, stmt).await + } Statement::Describe { name, kind } => match kind { DescribeKind::Fragments => { describe::handle_describe_fragments(handler_args, name).await @@ -535,7 +543,7 @@ pub async fn handle( | ObjectType::Schema | ObjectType::Connection | ObjectType::Secret => true, - ObjectType::Database | ObjectType::User | ObjectType::Role => { + ObjectType::Database | ObjectType::Role | ObjectType::User => { bail_not_implemented!("DROP CASCADE"); } } @@ -577,12 +585,9 @@ pub async fn handle( drop_schema::handle_drop_schema(handler_args, object_name, if_exists, cascade) .await } - ObjectType::User => { + ObjectType::Role | ObjectType::User => { drop_user::handle_drop_user(handler_args, object_name, if_exists).await } - ObjectType::Role => { - bail_not_implemented!("DROP ROLE") - } ObjectType::View => { drop_view::handle_drop_view(handler_args, object_name, if_exists, cascade).await } @@ -691,6 +696,11 @@ pub async fn handle( Statement::SetTimeZone { local: _, value } => { variable::handle_set_time_zone(handler_args, value) } + Statement::SetRole { + context_modifier, + role_name, + } => variable::handle_set_role(handler_args, context_modifier, role_name), + Statement::ResetRole => variable::handle_reset_role(handler_args), Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable), Statement::CreateIndex { name, diff --git a/src/frontend/src/handler/privilege.rs b/src/frontend/src/handler/privilege.rs index 52a1c128c54d7..4d8a47e34935f 100644 --- a/src/frontend/src/handler/privilege.rs +++ b/src/frontend/src/handler/privilege.rs @@ -21,6 +21,7 @@ use crate::error::ErrorCode::PermissionDenied; use crate::error::Result; use crate::session::SessionImpl; use crate::user::UserId; +use crate::user::effective_privilege::{role_memberships_snapshot, session_has_privilege}; #[derive(Debug)] pub struct ObjectCheckItem { @@ -69,22 +70,26 @@ impl SessionImpl { let user_reader = self.env().user_info_reader(); let reader = user_reader.read_guard(); - if let Some(user) = reader.get_user_by_name(&self.user_name()) { - if user.is_super { - return Ok(()); - } - for item in items { - if item.owner == user.id { - continue; - } - let has_privilege = user.has_privilege(item.object, item.mode); - if !has_privilege { - return Err(PermissionDenied(item.error_message()).into()); - } - } - } else { + if reader.get_user_by_name(&self.user_name()).is_none() { return Err(PermissionDenied("Session user is invalid".to_owned()).into()); } + drop(reader); + + let auth_context = self.auth_context(); + let memberships = role_memberships_snapshot(self.env().role_membership_info_reader()); + for item in items { + let has_privilege = session_has_privilege( + user_reader, + &auth_context, + &memberships, + item.owner, + item.object, + item.mode, + ); + if !has_privilege { + return Err(PermissionDenied(item.error_message()).into()); + } + } Ok(()) } @@ -213,4 +218,203 @@ mod tests { .unwrap(); assert!(&session.check_privileges(&check_items).is_ok()); } + + #[tokio::test] + async fn test_check_privileges_respects_active_role() { + let frontend = LocalFrontend::new(Default::default()).await; + let session = frontend.session_ref(); + let catalog_reader = session.env().catalog_reader(); + frontend.run_sql("CREATE SCHEMA role_schema").await.unwrap(); + + let schema = catalog_reader + .read_guard() + .get_schema_by_name(DEFAULT_DATABASE_NAME, "role_schema") + .unwrap() + .clone(); + let check_items = vec![ObjectCheckItem::new( + DEFAULT_SUPER_USER_ID, + AclMode::Create, + "role_schema".to_owned(), + schema.id(), + )]; + + frontend.run_sql("CREATE ROLE role_a").await.unwrap(); + frontend.run_sql("CREATE ROLE role_b").await.unwrap(); + frontend + .run_sql( + "CREATE USER role_member WITH NOSUPERUSER PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'", + ) + .await + .unwrap(); + frontend + .run_sql("GRANT CREATE ON SCHEMA role_schema TO role_a") + .await + .unwrap(); + frontend + .run_sql("GRANT role_a TO role_member") + .await + .unwrap(); + frontend + .run_sql("GRANT role_b TO role_member") + .await + .unwrap(); + + let user_id = { + let user_reader = session.env().user_info_reader(); + user_reader + .read_guard() + .get_user_by_name("role_member") + .unwrap() + .id + }; + let member_session = frontend.session_user_ref( + DEFAULT_DATABASE_NAME.to_owned(), + "role_member".to_owned(), + user_id, + ); + + assert!(member_session.check_privileges(&check_items).is_ok()); + + frontend + .run_sql_with_session(member_session.clone(), "SET ROLE role_b") + .await + .unwrap(); + assert!(member_session.check_privileges(&check_items).is_err()); + + frontend + .run_sql_with_session(member_session.clone(), "RESET ROLE") + .await + .unwrap(); + assert!(member_session.check_privileges(&check_items).is_ok()); + } + + #[tokio::test] + async fn test_check_privileges_does_not_inherit_superuser_attribute() { + let frontend = LocalFrontend::new(Default::default()).await; + let session = frontend.session_ref(); + let catalog_reader = session.env().catalog_reader(); + frontend + .run_sql("CREATE SCHEMA inherited_super_schema") + .await + .unwrap(); + + let schema = catalog_reader + .read_guard() + .get_schema_by_name(DEFAULT_DATABASE_NAME, "inherited_super_schema") + .unwrap() + .clone(); + let check_items = vec![ObjectCheckItem::new( + DEFAULT_SUPER_USER_ID, + AclMode::Create, + "inherited_super_schema".to_owned(), + schema.id(), + )]; + + frontend + .run_sql("CREATE USER inherited_super WITH SUPERUSER PASSWORD ''") + .await + .unwrap(); + frontend + .run_sql( + "CREATE USER inherited_super_member WITH NOSUPERUSER PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'", + ) + .await + .unwrap(); + frontend + .run_sql("GRANT inherited_super TO inherited_super_member") + .await + .unwrap(); + + let user_id = { + let user_reader = session.env().user_info_reader(); + user_reader + .read_guard() + .get_user_by_name("inherited_super_member") + .unwrap() + .id + }; + let member_session = frontend.session_user_ref( + DEFAULT_DATABASE_NAME.to_owned(), + "inherited_super_member".to_owned(), + user_id, + ); + + assert!(member_session.check_privileges(&check_items).is_err()); + + frontend + .run_sql_with_session(member_session.clone(), "SET ROLE inherited_super") + .await + .unwrap(); + assert!(member_session.check_privileges(&check_items).is_ok()); + } + + #[tokio::test] + async fn test_check_privileges_respects_noinherit_role() { + let frontend = LocalFrontend::new(Default::default()).await; + let session = frontend.session_ref(); + let catalog_reader = session.env().catalog_reader(); + frontend + .run_sql("CREATE SCHEMA noinherit_schema") + .await + .unwrap(); + + let schema = catalog_reader + .read_guard() + .get_schema_by_name(DEFAULT_DATABASE_NAME, "noinherit_schema") + .unwrap() + .clone(); + let check_items = vec![ObjectCheckItem::new( + DEFAULT_SUPER_USER_ID, + AclMode::Create, + "noinherit_schema".to_owned(), + schema.id(), + )]; + + frontend + .run_sql("CREATE ROLE noinherit_role WITH NOINHERIT") + .await + .unwrap(); + frontend + .run_sql( + "CREATE USER noinherit_member WITH NOSUPERUSER NOINHERIT PASSWORD 'md5827ccb0eea8a706c4c34a16891f84e7b'", + ) + .await + .unwrap(); + frontend + .run_sql("GRANT CREATE ON SCHEMA noinherit_schema TO noinherit_role") + .await + .unwrap(); + frontend + .run_sql("GRANT noinherit_role TO noinherit_member") + .await + .unwrap(); + + let user_id = { + let user_reader = session.env().user_info_reader(); + user_reader + .read_guard() + .get_user_by_name("noinherit_member") + .unwrap() + .id + }; + let member_session = frontend.session_user_ref( + DEFAULT_DATABASE_NAME.to_owned(), + "noinherit_member".to_owned(), + user_id, + ); + + assert!(member_session.check_privileges(&check_items).is_err()); + + frontend + .run_sql_with_session(member_session.clone(), "SET ROLE noinherit_role") + .await + .unwrap(); + assert!(member_session.check_privileges(&check_items).is_ok()); + + frontend + .run_sql_with_session(member_session.clone(), "RESET ROLE") + .await + .unwrap(); + assert!(member_session.check_privileges(&check_items).is_err()); + } } diff --git a/src/frontend/src/handler/variable.rs b/src/frontend/src/handler/variable.rs index 2612d19340054..2bc42e07806cc 100644 --- a/src/frontend/src/handler/variable.rs +++ b/src/frontend/src/handler/variable.rs @@ -20,11 +20,14 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::session_config::{ConfigReporter, SESSION_CONFIG_LIST_SEP}; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::types::Fields; -use risingwave_sqlparser::ast::{Ident, SetTimeZoneValue, SetVariableValue, Value}; +use risingwave_sqlparser::ast::{ + Ident, RoleContextModifier, SetRoleSpec, SetTimeZoneValue, SetVariableValue, Value, +}; use super::{RwPgResponse, RwPgResponseBuilderExt, fields_to_descriptors}; -use crate::error::Result; +use crate::error::{ErrorCode, Result}; use crate::handler::HandlerArgs; +use crate::user::effective_privilege::{can_set_role, role_memberships_snapshot}; /// convert `SetVariableValue` to string while remove the quotes on literals. pub(crate) fn set_var_to_param_str(value: &SetVariableValue) -> Option { @@ -109,6 +112,68 @@ pub(super) fn handle_set_time_zone( Ok(PgResponse::empty_result(StatementType::SET_VARIABLE)) } +pub(super) fn handle_set_role( + handler_args: HandlerArgs, + context_modifier: Option, + role_name: SetRoleSpec, +) -> Result { + let session = handler_args.session; + match role_name { + SetRoleSpec::None => { + if matches!(context_modifier, Some(RoleContextModifier::Local)) { + session + .set_local_current_user(session.session_user_name(), session.session_user_id()); + } else { + session.clear_local_current_user(); + session.reset_current_user(); + } + } + SetRoleSpec::Name(role_name) => { + let user_reader = session.env().user_info_reader(); + let reader = user_reader.read_guard(); + let role = reader + .get_user_by_name(&role_name.real_value()) + .cloned() + .ok_or_else(|| { + ErrorCode::InvalidInputSyntax(format!( + "Role \"{}\" does not exist", + role_name.real_value() + )) + })?; + let session_is_super = reader + .get_user_by_name(&session.session_user_name()) + .map(|user| user.is_super) + .unwrap_or(false); + drop(reader); + + let memberships = + role_memberships_snapshot(session.env().role_membership_info_reader()); + if !session_is_super && !can_set_role(session.session_user_id(), role.id, &memberships) + { + return Err(ErrorCode::PermissionDenied(format!( + "permission denied to set role \"{}\"", + role.name + )) + .into()); + } + if matches!(context_modifier, Some(RoleContextModifier::Local)) { + session.set_local_current_user(role.name.clone(), role.id); + } else { + session.clear_local_current_user(); + session.set_current_user(role.name.clone(), role.id); + } + } + } + + Ok(PgResponse::empty_result(StatementType::SET_VARIABLE)) +} + +pub(super) fn handle_reset_role(handler_args: HandlerArgs) -> Result { + handler_args.session.clear_local_current_user(); + handler_args.session.reset_current_user(); + Ok(PgResponse::empty_result(StatementType::SET_VARIABLE)) +} + pub(super) fn handle_show(handler_args: HandlerArgs, variable: Vec) -> Result { // TODO: Verify that the name used in `show` command is indeed always case-insensitive. let name = variable.iter().map(|e| e.real_value()).join(" "); diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 57ad3c518037a..f45375b5ff2a8 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -15,7 +15,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use anyhow::Context; -use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, UserId, WorkerId}; +use risingwave_common::id::{ConnectionId, JobId, SourceId, TableId, WorkerId}; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::util::cluster_limit::ClusterLimit; @@ -51,6 +51,7 @@ use risingwave_rpc_client::error::Result; use risingwave_rpc_client::{HummockMetaClient, MetaClient}; use crate::catalog::{DatabaseId, FragmentId, SinkId}; +use crate::user::UserId; /// A wrapper around the `MetaClient` that only provides a minor set of meta rpc. /// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`, diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 20ca2cd235636..2b9e68f18187f 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -14,6 +14,8 @@ use std::collections::HashMap; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; use itertools::Itertools; use parking_lot::RwLock; @@ -31,20 +33,70 @@ use risingwave_pb::hummock::{HummockVersionDeltas, HummockVersionStats}; use risingwave_pb::meta::object::{ObjectInfo, PbObjectInfo}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::{FragmentWorkerSlotMapping, MetaSnapshot, SubscribeResponse}; +use risingwave_pb::user::RoleMembership; use risingwave_rpc_client::ComputeClientPoolRef; +use thiserror_ext::AsReport; use tokio::sync::watch::Sender; +use tokio::time::sleep; +use tracing::warn; use crate::catalog::FragmentId; use crate::catalog::root_catalog::Catalog; +use crate::meta_client::FrontendMetaClient; use crate::scheduler::HummockSnapshotManagerRef; use crate::user::user_manager::UserInfoManager; +fn publish_catalog_version_if_newer( + catalog_updated_tx: &Sender, + version: CatalogVersion, +) { + if *catalog_updated_tx.borrow() < version + && let Err(error) = catalog_updated_tx.send(version) + { + warn!(error = %error.as_report(), version, "failed to publish catalog version"); + } +} + +const ROLE_MEMBERSHIP_REFRESH_RETRY_DELAY: Duration = Duration::from_millis(100); +const ROLE_MEMBERSHIP_REFRESH_RETRY_MAX_DELAY: Duration = Duration::from_secs(1); + +fn publish_catalog_version_if_current( + catalog_updated_tx: &Sender, + latest_role_membership_refresh_version: &AtomicU64, + version: CatalogVersion, +) -> bool { + if latest_role_membership_refresh_version.load(Ordering::Acquire) != version { + return false; + } + publish_catalog_version_if_newer(catalog_updated_tx, version); + true +} + +fn invalidate_role_memberships_and_publish_if_current( + role_memberships: &RwLock>, + catalog_updated_tx: &Sender, + latest_role_membership_refresh_version: &AtomicU64, + version: CatalogVersion, +) -> bool { + let mut role_memberships = role_memberships.write(); + if latest_role_membership_refresh_version.load(Ordering::Acquire) != version { + return false; + } + role_memberships.clear(); + drop(role_memberships); + publish_catalog_version_if_newer(catalog_updated_tx, version); + true +} + pub struct FrontendObserverNode { worker_node_manager: WorkerNodeManagerRef, version: CatalogVersion, catalog_updated_tx: Sender, catalog: Arc>, user_info_manager: Arc>, + role_memberships: Arc>>, + latest_role_membership_refresh_version: Arc, + meta_client: Arc, hummock_snapshot_manager: HummockSnapshotManagerRef, system_params_manager: LocalSystemParamsManagerRef, session_params: Arc>, @@ -204,22 +256,78 @@ impl ObserverState for FrontendObserverNode { let snapshot_version = version.unwrap(); self.version = snapshot_version.catalog_version; - self.catalog_updated_tx - .send(snapshot_version.catalog_version) - .unwrap(); *self.session_params.write() = serde_json::from_str(&session_params.unwrap().params).unwrap(); LocalSecretManager::global().init_secrets(secrets); LicenseManager::get().update_cluster_resource(cluster_resource.unwrap()); + self.refresh_role_memberships_cache_and_publish( + snapshot_version.catalog_version, + "failed to refresh cached role memberships after observer initialization", + ); } } impl FrontendObserverNode { + fn refresh_role_memberships_cache_and_publish( + &self, + version: CatalogVersion, + failure_context: &'static str, + ) { + self.latest_role_membership_refresh_version + .fetch_max(version, Ordering::AcqRel); + let role_memberships = self.role_memberships.clone(); + let meta_client = self.meta_client.clone(); + let catalog_updated_tx = self.catalog_updated_tx.clone(); + let latest_role_membership_refresh_version = + self.latest_role_membership_refresh_version.clone(); + tokio::spawn(async move { + let mut retry_delay = ROLE_MEMBERSHIP_REFRESH_RETRY_DELAY; + loop { + if latest_role_membership_refresh_version.load(Ordering::Acquire) != version { + return; + } + match meta_client.list_role_memberships(vec![]).await { + Ok(memberships) => { + let mut role_memberships = role_memberships.write(); + if latest_role_membership_refresh_version.load(Ordering::Acquire) != version + { + return; + } + *role_memberships = memberships; + drop(role_memberships); + publish_catalog_version_if_current( + &catalog_updated_tx, + &latest_role_membership_refresh_version, + version, + ); + return; + } + Err(error) => { + warn!(error = %error.as_report(), failure_context, "failed to refresh cached role memberships"); + if !invalidate_role_memberships_and_publish_if_current( + &role_memberships, + &catalog_updated_tx, + &latest_role_membership_refresh_version, + version, + ) { + return; + } + } + } + sleep(retry_delay).await; + retry_delay = + std::cmp::min(retry_delay * 2, ROLE_MEMBERSHIP_REFRESH_RETRY_MAX_DELAY); + } + }); + } + pub fn new( worker_node_manager: WorkerNodeManagerRef, catalog: Arc>, catalog_updated_tx: Sender, user_info_manager: Arc>, + role_memberships: Arc>>, + meta_client: Arc, hummock_snapshot_manager: HummockSnapshotManagerRef, system_params_manager: LocalSystemParamsManagerRef, session_params: Arc>, @@ -231,6 +339,9 @@ impl FrontendObserverNode { catalog, catalog_updated_tx, user_info_manager, + role_memberships, + latest_role_membership_refresh_version: Arc::new(AtomicU64::new(0)), + meta_client, hummock_snapshot_manager, system_params_manager, session_params, @@ -460,7 +571,10 @@ impl FrontendObserverNode { self.version ); self.version = resp.version; - self.catalog_updated_tx.send(resp.version).unwrap(); + self.refresh_role_memberships_cache_and_publish( + resp.version, + "failed to refresh cached role memberships after user notification", + ); } fn handle_fragment_mapping_notification(&mut self, resp: SubscribeResponse) { @@ -580,3 +694,60 @@ fn convert_worker_slot_mapping( ) .collect() } + +#[cfg(test)] +mod tests { + use tokio::sync::watch; + + use super::*; + + #[test] + fn publish_catalog_version_if_newer_is_monotonic() { + let (tx, rx) = watch::channel(10); + + publish_catalog_version_if_newer(&tx, 9); + assert_eq!(*rx.borrow(), 10); + + publish_catalog_version_if_newer(&tx, 11); + assert_eq!(*rx.borrow(), 11); + + publish_catalog_version_if_newer(&tx, 10); + assert_eq!(*rx.borrow(), 11); + } + + #[test] + fn role_membership_refresh_failure_invalidates_only_current_version() { + let (tx, rx) = watch::channel(1); + let latest_role_membership_refresh_version = AtomicU64::new(7); + let role_memberships = RwLock::new(vec![RoleMembership { + role_id: 1, + member_id: 2, + ..Default::default() + }]); + + assert!(invalidate_role_memberships_and_publish_if_current( + &role_memberships, + &tx, + &latest_role_membership_refresh_version, + 7, + )); + assert!(role_memberships.read().is_empty()); + assert_eq!(*rx.borrow(), 7); + + *role_memberships.write() = vec![RoleMembership { + role_id: 3, + member_id: 4, + ..Default::default() + }]; + latest_role_membership_refresh_version.store(8, Ordering::Release); + + assert!(!invalidate_role_memberships_and_publish_if_current( + &role_memberships, + &tx, + &latest_role_membership_refresh_version, + 7, + )); + assert_eq!(role_memberships.read().len(), 1); + assert_eq!(*rx.borrow(), 7); + } +} diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index aec7e60dcc995..e8b935a80d71b 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -79,7 +79,6 @@ use risingwave_pb::configured_monitor_service_server; use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer; use risingwave_pb::health::health_server::HealthServer; use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer; -use risingwave_pb::user::RoleMembership; use risingwave_pb::user::auth_info::EncryptionType; use risingwave_rpc_client::{ ComputeClientPool, ComputeClientPoolRef, FrontendClientPool, FrontendClientPoolRef, MetaClient, @@ -134,7 +133,9 @@ use crate::telemetry::FrontendTelemetryCreator; use crate::user::UserId; use crate::user::user_authentication::md5_hash_with_salt; use crate::user::user_manager::UserInfoManager; -use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl}; +use crate::user::user_service::{ + RoleMembershipInfoReader, UserInfoReader, UserInfoWriter, UserInfoWriterImpl, +}; use crate::{FrontendOpts, PgResponseStream, TableCatalog}; pub(crate) mod current; @@ -151,6 +152,7 @@ pub(crate) struct FrontendEnv { catalog_reader: CatalogReader, user_info_writer: Arc, user_info_reader: UserInfoReader, + role_membership_info_reader: RoleMembershipInfoReader, worker_node_manager: WorkerNodeManagerRef, query_manager: QueryManager, hummock_snapshot_manager: HummockSnapshotManagerRef, @@ -222,7 +224,7 @@ impl FrontendEnv { use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter}; let catalog = Arc::new(RwLock::new(Catalog::default())); - let role_memberships = Arc::new(RwLock::new(Vec::::new())); + let role_memberships = Arc::new(RwLock::new(vec![])); let meta_client = Arc::new(MockFrontendMetaClient::new(role_memberships.clone())); let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone())); let catalog_writer = Arc::new(MockCatalogWriter::new( @@ -233,9 +235,10 @@ impl FrontendEnv { let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default())); let user_info_writer = Arc::new(MockUserInfoWriter::new( user_info_manager.clone(), - role_memberships, + role_memberships.clone(), )); let user_info_reader = UserInfoReader::new(user_info_manager); + let role_membership_info_reader = RoleMembershipInfoReader::new(role_memberships); let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![])); let system_params_manager = Arc::new(LocalSystemParamsManager::for_test()); let compute_client_pool = Arc::new(ComputeClientPool::for_test()); @@ -275,6 +278,7 @@ impl FrontendEnv { catalog_reader, user_info_writer, user_info_reader, + role_membership_info_reader, worker_node_manager, query_manager, hummock_snapshot_manager, @@ -396,8 +400,9 @@ impl FrontendEnv { let user_info_writer = Arc::new(UserInfoWriterImpl::new( meta_client.clone(), catalog_updated_rx, - role_memberships, + role_memberships.clone(), )); + let role_membership_info_reader = RoleMembershipInfoReader::new(role_memberships.clone()); let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader.clone())); @@ -418,6 +423,8 @@ impl FrontendEnv { catalog, catalog_updated_tx, user_info_manager, + role_memberships, + frontend_meta_client.clone(), hummock_snapshot_manager.clone(), system_params_manager.clone(), session_params.clone(), @@ -566,6 +573,7 @@ impl FrontendEnv { catalog_writer, user_info_reader, user_info_writer, + role_membership_info_reader, worker_node_manager, meta_client: frontend_meta_client, query_manager, @@ -626,6 +634,10 @@ impl FrontendEnv { &self.user_info_reader } + pub fn role_membership_info_reader(&self) -> &RoleMembershipInfoReader { + &self.role_membership_info_reader + } + pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef { self.worker_node_manager.clone() } @@ -745,18 +757,58 @@ impl FrontendEnv { #[derive(Clone)] pub struct AuthContext { pub database: String, + pub session_user_name: String, + pub session_user_id: UserId, pub user_name: String, pub user_id: UserId, } impl AuthContext { pub fn new(database: String, user_name: String, user_id: UserId) -> Self { + Self::new_with_current(database, user_name.clone(), user_id, user_name, user_id) + } + + pub fn new_with_current( + database: String, + session_user_name: String, + session_user_id: UserId, + current_user_name: String, + current_user_id: UserId, + ) -> Self { Self { database, - user_name, - user_id, + session_user_name, + session_user_id, + user_name: current_user_name, + user_id: current_user_id, } } + + pub fn session_user_name(&self) -> &str { + &self.session_user_name + } + + pub fn session_user_id(&self) -> UserId { + self.session_user_id + } + + pub fn current_user_name(&self) -> &str { + &self.user_name + } + + pub fn current_user_id(&self) -> UserId { + self.user_id + } + + pub fn set_current_user(&mut self, current_user_name: String, current_user_id: UserId) { + self.user_name = current_user_name; + self.user_id = current_user_id; + } + + pub fn reset_current_user(&mut self) { + self.user_name = self.session_user_name.clone(); + self.user_id = self.session_user_id; + } } pub struct SessionImpl { env: FrontendEnv, @@ -971,21 +1023,35 @@ impl SessionImpl { } pub fn user_name(&self) -> String { - self.auth_context.read().user_name.clone() + self.auth_context.read().current_user_name().to_owned() } pub fn user_id(&self) -> UserId { - self.auth_context.read().user_id + self.auth_context.read().current_user_id() + } + + pub fn session_user_name(&self) -> String { + self.auth_context.read().session_user_name().to_owned() } pub fn session_user_id(&self) -> UserId { - self.user_id() + self.auth_context.read().session_user_id() } pub fn update_database(&self, database: String) { self.auth_context.write().database = database; } + pub fn set_current_user(&self, current_user_name: String, current_user_id: UserId) { + self.auth_context + .write() + .set_current_user(current_user_name, current_user_id); + } + + pub fn reset_current_user(&self) { + self.auth_context.write().reset_current_user(); + } + pub fn shared_config(&self) -> Arc> { Arc::clone(&self.config_map) } @@ -2048,3 +2114,24 @@ pub fn cancel_creating_jobs_in_session(session_id: SessionId, sessions_map: Sess false } } + +#[cfg(test)] +mod tests { + use super::AuthContext; + + #[test] + fn auth_context_tracks_session_and_current_user_separately() { + let ctx = AuthContext::new_with_current( + "dev".to_owned(), + "login_role".to_owned(), + 1.into(), + "active_role".to_owned(), + 2.into(), + ); + + assert_eq!(ctx.session_user_name(), "login_role"); + assert_eq!(ctx.session_user_id(), 1); + assert_eq!(ctx.current_user_name(), "active_role"); + assert_eq!(ctx.current_user_id(), 2); + } +} diff --git a/src/frontend/src/session/transaction.rs b/src/frontend/src/session/transaction.rs index 49f226e63445c..c4caf2e71ceda 100644 --- a/src/frontend/src/session/transaction.rs +++ b/src/frontend/src/session/transaction.rs @@ -23,6 +23,7 @@ use super::SessionImpl; use crate::catalog::catalog_service::CatalogWriter; use crate::error::{ErrorCode, Result}; use crate::scheduler::ReadSnapshot; +use crate::user::UserId; use crate::user::user_service::UserInfoWriter; /// Globally unique transaction id in this frontend instance. @@ -66,6 +67,10 @@ pub struct Context { /// The snapshot of the transaction, acquired lazily at the first read operation in the /// transaction. snapshot: Option, + + /// Restores the effective current role when a `SET LOCAL ROLE` override expires at the end of + /// the transaction. + local_role_restore: Option<(String, crate::user::UserId)>, } /// Transaction state. @@ -92,13 +97,21 @@ pub enum State { /// A guard that auto commits an implicit transaction when dropped. Do nothing if an explicit /// transaction is in progress. #[must_use] -pub struct ImplicitAutoCommitGuard(Weak>); +pub struct ImplicitAutoCommitGuard { + txn: Weak>, + auth_context: Weak>, +} impl Drop for ImplicitAutoCommitGuard { fn drop(&mut self) { - if let Some(txn) = self.0.upgrade() { + if let Some(txn) = self.txn.upgrade() { let mut txn = txn.lock(); - if let State::Implicit(_) = &*txn { + if let State::Implicit(ctx) = &mut *txn { + if let Some((user_name, user_id)) = ctx.local_role_restore.take() + && let Some(auth_context) = self.auth_context.upgrade() + { + auth_context.write().set_current_user(user_name, user_id); + } *txn = State::Initial; } } @@ -119,6 +132,7 @@ impl SessionImpl { id: Id::new(), access_mode: AccessMode::ReadWrite, snapshot: Default::default(), + local_role_restore: None, }) } State::Implicit(_) => unreachable!("implicit transaction is already in progress"), @@ -126,7 +140,10 @@ impl SessionImpl { * progress */ } - ImplicitAutoCommitGuard(Arc::downgrade(&self.txn)) + ImplicitAutoCommitGuard { + txn: Arc::downgrade(&self.txn), + auth_context: Arc::downgrade(&self.auth_context), + } } /// Starts an explicit transaction with the specified access mode from `START TRANSACTION`. @@ -147,6 +164,7 @@ impl SessionImpl { id: ctx.id, access_mode, snapshot: ctx.snapshot.clone(), + local_role_restore: None, }) } State::Explicit(_) => { @@ -161,16 +179,21 @@ impl SessionImpl { pub fn txn_commit_explicit(&self) { let mut txn = self.txn.lock(); - match &*txn { + match &mut *txn { State::Initial => unreachable!("no transaction in progress"), State::Implicit(_) => { // TODO: should be warning self.notice_to_user("there is no transaction in progress") } - State::Explicit(ctx) => match ctx.access_mode { - AccessMode::ReadWrite => unimplemented!(), - AccessMode::ReadOnly => *txn = State::Initial, - }, + State::Explicit(ctx) => { + if let Some((user_name, user_id)) = ctx.local_role_restore.take() { + self.set_current_user(user_name, user_id); + } + match ctx.access_mode { + AccessMode::ReadWrite => unimplemented!(), + AccessMode::ReadOnly => *txn = State::Initial, + } + } } } @@ -179,16 +202,61 @@ impl SessionImpl { pub fn txn_rollback_explicit(&self) { let mut txn = self.txn.lock(); - match &*txn { + match &mut *txn { State::Initial => unreachable!("no transaction in progress"), State::Implicit(_) => { // TODO: should be warning self.notice_to_user("there is no transaction in progress") } - State::Explicit(ctx) => match ctx.access_mode { - AccessMode::ReadWrite => unimplemented!(), - AccessMode::ReadOnly => *txn = State::Initial, - }, + State::Explicit(ctx) => { + if let Some((user_name, user_id)) = ctx.local_role_restore.take() { + self.set_current_user(user_name, user_id); + } + match ctx.access_mode { + AccessMode::ReadWrite => unimplemented!(), + AccessMode::ReadOnly => *txn = State::Initial, + } + } + } + } + + pub fn set_local_current_user( + &self, + current_user_name: String, + current_user_id: UserId, + ) -> bool { + let previous = { + let auth_context = self.auth_context.read(); + ( + auth_context.current_user_name().to_owned(), + auth_context.current_user_id(), + ) + }; + + let mut txn = self.txn.lock(); + match &mut *txn { + State::Explicit(ctx) => { + ctx.local_role_restore.get_or_insert(previous); + drop(txn); + self.set_current_user(current_user_name, current_user_id); + true + } + State::Initial | State::Implicit(_) => { + self.notice_to_user( + "SET LOCAL ROLE can only be used in transaction blocks; statement has no effect.", + ); + false + } + } + } + + pub fn clear_local_current_user(&self) { + let mut txn = self.txn.lock(); + match &mut *txn { + State::Initial | State::Implicit(_) => {} + State::Explicit(ctx) => { + ctx.local_role_restore = None; + } } } diff --git a/src/frontend/src/user/effective_privilege.rs b/src/frontend/src/user/effective_privilege.rs new file mode 100644 index 0000000000000..4d24a62cac983 --- /dev/null +++ b/src/frontend/src/user/effective_privilege.rs @@ -0,0 +1,152 @@ +// Copyright 2026 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashSet, VecDeque}; + +use risingwave_common::acl::AclMode; +use risingwave_pb::user::RoleMembership; + +use crate::session::AuthContext; +use crate::user::UserId; +use crate::user::user_catalog::UserCatalog; +use crate::user::user_manager::UserInfoManager; +use crate::user::user_service::{RoleMembershipInfoReader, UserInfoReader}; + +fn reachable_role_ids( + start_ids: impl IntoIterator, + memberships: &[RoleMembership], + edge_allowed: impl Fn(&RoleMembership) -> bool, +) -> HashSet { + let mut visited = HashSet::new(); + let mut queue = VecDeque::new(); + for start_id in start_ids { + queue.push_back(start_id); + } + + while let Some(member_id) = queue.pop_front() { + for membership in memberships.iter().filter(|membership| { + membership.member_id == member_id.as_raw_id() && edge_allowed(membership) + }) { + let role_id = UserId::from(membership.role_id); + if visited.insert(role_id) { + queue.push_back(role_id); + } + } + } + + visited +} + +pub fn can_set_role( + session_user_id: UserId, + target_role_id: UserId, + memberships: &[RoleMembership], +) -> bool { + session_user_id == target_role_id + || reachable_role_ids([session_user_id], memberships, |membership| { + membership.set_option + }) + .contains(&target_role_id) +} + +pub fn can_inherit_role( + current_role_id: UserId, + target_role_id: UserId, + memberships: &[RoleMembership], +) -> bool { + current_role_id == target_role_id + || reachable_role_ids([current_role_id], memberships, |membership| { + membership.inherit_option + }) + .contains(&target_role_id) +} + +pub fn session_has_privilege( + user_info_reader: &UserInfoReader, + auth_context: &AuthContext, + memberships: &[RoleMembership], + owner: UserId, + object: impl Copy + Into, + mode: AclMode, +) -> bool { + let current_user_id = auth_context.current_user_id(); + let reader = user_info_reader.read_guard(); + let Some(user) = reader.get_user_by_id(¤t_user_id) else { + return false; + }; + catalog_user_has_privilege(&reader, user, memberships, owner, object, mode) +} + +#[allow(dead_code)] +pub fn principal_has_privilege( + user_info_reader: &UserInfoReader, + memberships: &[RoleMembership], + principal: &UserCatalog, + owner: UserId, + object: impl Copy + Into, + mode: AclMode, +) -> bool { + let reader = user_info_reader.read_guard(); + catalog_user_has_privilege(&reader, principal, memberships, owner, object, mode) +} + +pub fn catalog_user_has_privilege( + user_info: &UserInfoManager, + principal: &UserCatalog, + memberships: &[RoleMembership], + owner: UserId, + object: impl Copy + Into, + mode: AclMode, +) -> bool { + if has_direct_privilege_for_catalog_user(principal, owner, object, mode) { + return true; + } + + for role_id in reachable_role_ids([principal.id], memberships, |membership| { + membership.inherit_option + }) { + let Some(role) = user_info.get_user_by_id(&role_id) else { + continue; + }; + if has_inherited_privilege_for_catalog_user(role, owner, object, mode) { + return true; + } + } + + false +} + +fn has_direct_privilege_for_catalog_user( + user: &UserCatalog, + owner: UserId, + object: impl Copy + Into, + mode: AclMode, +) -> bool { + user.is_super || has_inherited_privilege_for_catalog_user(user, owner, object, mode) +} + +fn has_inherited_privilege_for_catalog_user( + user: &UserCatalog, + owner: UserId, + object: impl Copy + Into, + mode: AclMode, +) -> bool { + user.id == owner || user.has_privilege(object, mode) +} + +pub fn role_memberships_snapshot( + role_membership_reader: &RoleMembershipInfoReader, +) -> Vec { + role_membership_reader.read_guard().clone() +} diff --git a/src/frontend/src/user/mod.rs b/src/frontend/src/user/mod.rs index 50232a8281abf..ba89128f13baa 100644 --- a/src/frontend/src/user/mod.rs +++ b/src/frontend/src/user/mod.rs @@ -15,6 +15,7 @@ use risingwave_common::id::{ObjectId, SchemaId}; use user_catalog::UserCatalog; +pub(crate) mod effective_privilege; pub(crate) mod user_authentication; pub(crate) mod user_catalog; pub(crate) mod user_manager; diff --git a/src/frontend/tests/role_dispatch.rs b/src/frontend/tests/role_dispatch.rs new file mode 100644 index 0000000000000..efa3830477073 --- /dev/null +++ b/src/frontend/tests/role_dispatch.rs @@ -0,0 +1,350 @@ +// Copyright 2026 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::catalog::DEFAULT_DATABASE_NAME; +use risingwave_frontend::test_utils::LocalFrontend; + +#[tokio::test] +async fn test_create_alter_drop_role_dispatches_attributes() { + let frontend = LocalFrontend::new(Default::default()).await; + + frontend.run_sql("CREATE ROLE analytics").await.unwrap(); + let analytics = frontend.user_by_name("analytics").unwrap(); + assert!( + !analytics.can_login, + "CREATE ROLE should default to NOLOGIN" + ); + assert!(!analytics.is_super); + assert!(!analytics.can_create_db); + assert!(!analytics.can_create_user); + + frontend + .run_sql("CREATE ROLE login_role WITH LOGIN PASSWORD 'secret'") + .await + .unwrap(); + assert!(frontend.user_by_name("login_role").unwrap().can_login); + + frontend + .run_sql("CREATE ROLE inherited_role WITH NOINHERIT") + .await + .unwrap(); + assert!(!frontend.user_by_name("inherited_role").unwrap().can_inherit); + + frontend + .run_sql("ALTER ROLE inherited_role WITH INHERIT") + .await + .unwrap(); + assert!(frontend.user_by_name("inherited_role").unwrap().can_inherit); + + frontend + .run_sql("ALTER ROLE analytics RENAME TO reporting") + .await + .unwrap(); + assert!(frontend.user_by_name("analytics").is_none()); + assert!(!frontend.user_by_name("reporting").unwrap().can_login); + + frontend.run_sql("CREATE ROLE doomed_role").await.unwrap(); + assert!(frontend.user_by_name("doomed_role").is_some()); + + frontend.run_sql("DROP ROLE doomed_role").await.unwrap(); + assert!(frontend.user_by_name("doomed_role").is_none()); +} + +#[tokio::test] +async fn test_grant_revoke_role_dispatches() { + let frontend = LocalFrontend::new(Default::default()).await; + + frontend.run_sql("CREATE ROLE role_a").await.unwrap(); + frontend + .run_sql("CREATE USER user_b WITH PASSWORD 'secret'") + .await + .unwrap(); + + frontend + .run_sql("GRANT role_a TO user_b WITH ADMIN OPTION") + .await + .unwrap(); + + let role_a = frontend.user_by_name("role_a").unwrap(); + let user_b = frontend.user_by_name("user_b").unwrap(); + let memberships = frontend.role_memberships().await; + assert_eq!(memberships.len(), 1); + assert_eq!(memberships[0].role_id, role_a.id.as_raw_id()); + assert_eq!(memberships[0].member_id, user_b.id.as_raw_id()); + assert!(memberships[0].admin_option); + + frontend + .run_sql("REVOKE ADMIN OPTION FOR role_a FROM user_b") + .await + .unwrap(); + + let memberships = frontend.role_memberships().await; + assert_eq!(memberships.len(), 1); + assert!(!memberships[0].admin_option); + + frontend.run_sql("REVOKE role_a FROM user_b").await.unwrap(); + + let memberships = frontend.role_memberships().await; + assert!(memberships.is_empty()); +} + +#[tokio::test] +async fn test_grant_revoke_role_membership_options_dispatch() { + let frontend = LocalFrontend::new(Default::default()).await; + + frontend.run_sql("CREATE ROLE role_a").await.unwrap(); + frontend + .run_sql("CREATE USER user_b WITH PASSWORD 'secret'") + .await + .unwrap(); + + frontend + .run_sql("GRANT role_a TO user_b WITH INHERIT FALSE, SET FALSE") + .await + .unwrap(); + + let memberships = frontend.role_memberships().await; + assert_eq!(memberships.len(), 1); + assert!(!memberships[0].admin_option); + assert!(!memberships[0].inherit_option); + assert!(!memberships[0].set_option); + + let user_id = frontend.user_by_name("user_b").unwrap().id; + let session = frontend.session_user_ref( + DEFAULT_DATABASE_NAME.to_owned(), + "user_b".to_owned(), + user_id, + ); + let set_role_err = frontend + .run_sql_with_session(session.clone(), "SET ROLE role_a") + .await + .unwrap_err() + .to_string(); + assert!( + set_role_err.contains("permission denied to set role"), + "unexpected error: {set_role_err}" + ); +} + +#[tokio::test] +async fn test_grant_revoke_role_granted_by_current_user_only() { + let frontend = LocalFrontend::new(Default::default()).await; + + frontend.run_sql("CREATE ROLE role_a").await.unwrap(); + frontend + .run_sql("CREATE USER user_b WITH PASSWORD 'secret'") + .await + .unwrap(); + + frontend + .run_sql("GRANT role_a TO user_b GRANTED BY root") + .await + .unwrap(); + assert_eq!(frontend.role_memberships().await.len(), 1); + + let err = frontend + .run_sql("GRANT role_a TO user_b GRANTED BY postgres") + .await + .unwrap_err() + .to_string(); + assert!( + err.contains("User \"postgres\" does not exist"), + "unexpected error: {err}" + ); + + frontend + .run_sql("REVOKE role_a FROM user_b GRANTED BY root") + .await + .unwrap(); + assert!(frontend.role_memberships().await.is_empty()); +} + +#[tokio::test] +async fn test_granted_by_current_role_uses_current_role_after_set_role() { + let frontend = LocalFrontend::new(Default::default()).await; + let session = frontend.session_ref(); + + frontend.run_sql("CREATE ROLE parent_role").await.unwrap(); + frontend.run_sql("CREATE ROLE grantor_role").await.unwrap(); + frontend.run_sql("CREATE ROLE grantee_role").await.unwrap(); + frontend + .run_sql("GRANT parent_role TO grantor_role WITH ADMIN OPTION") + .await + .unwrap(); + + frontend + .run_sql_with_session(session.clone(), "SET ROLE grantor_role") + .await + .unwrap(); + frontend + .run_sql_with_session( + session.clone(), + "GRANT parent_role TO grantee_role GRANTED BY current_role", + ) + .await + .unwrap(); + + assert!( + frontend + .run_sql_with_session( + session.clone(), + "REVOKE parent_role FROM grantee_role GRANTED BY session_user", + ) + .await + .is_err(), + "session_user must not be treated as current_role after SET ROLE" + ); + + frontend + .run_sql_with_session( + session.clone(), + "REVOKE parent_role FROM grantee_role GRANTED BY current_role", + ) + .await + .unwrap(); + frontend + .run_sql_with_session(session, "RESET ROLE") + .await + .unwrap(); + + let grantee_role = frontend.user_by_name("grantee_role").unwrap(); + assert!( + frontend + .role_memberships() + .await + .iter() + .all(|membership| membership.member_id != grantee_role.id.as_raw_id()), + "current_role grant should be removable by current_role grantor" + ); +} + +#[tokio::test] +async fn test_set_reset_role_dispatches_and_changes_identity() { + let frontend = LocalFrontend::new(Default::default()).await; + + frontend.run_sql("CREATE ROLE analytics").await.unwrap(); + frontend + .run_sql("CREATE USER app_user WITH PASSWORD 'secret'") + .await + .unwrap(); + frontend + .run_sql("GRANT analytics TO app_user") + .await + .unwrap(); + + let user_id = frontend.user_by_name("app_user").unwrap().id; + let session = frontend.session_user_ref( + DEFAULT_DATABASE_NAME.to_owned(), + "app_user".to_owned(), + user_id, + ); + + let before = session.auth_context(); + assert_eq!(before.session_user_name(), "app_user"); + assert_eq!(before.current_user_name(), "app_user"); + + frontend + .run_sql_with_session(session.clone(), "SET ROLE analytics") + .await + .unwrap(); + + let after_set = session.auth_context(); + assert_eq!(after_set.session_user_name(), "app_user"); + assert_eq!(after_set.current_user_name(), "analytics"); + + frontend + .run_sql_with_session(session.clone(), "RESET ROLE") + .await + .unwrap(); + + let after_reset = session.auth_context(); + assert_eq!(after_reset.session_user_name(), "app_user"); + assert_eq!(after_reset.current_user_name(), "app_user"); + + frontend + .run_sql_with_session(session.clone(), "SET ROLE NONE") + .await + .unwrap(); + + let after_none = session.auth_context(); + assert_eq!(after_none.session_user_name(), "app_user"); + assert_eq!(after_none.current_user_name(), "app_user"); + + frontend + .run_sql_with_session(session.clone(), "START TRANSACTION READ ONLY") + .await + .unwrap(); + frontend + .run_sql_with_session(session.clone(), "SET LOCAL ROLE analytics") + .await + .unwrap(); + let after_local = session.auth_context(); + assert_eq!(after_local.session_user_name(), "app_user"); + assert_eq!(after_local.current_user_name(), "analytics"); + frontend + .run_sql_with_session(session.clone(), "COMMIT") + .await + .unwrap(); + let after_local_commit = session.auth_context(); + assert_eq!(after_local_commit.session_user_name(), "app_user"); + assert_eq!(after_local_commit.current_user_name(), "app_user"); +} + +#[tokio::test] +async fn test_superuser_set_role_uses_session_user_after_role_switch() { + let frontend = LocalFrontend::new(Default::default()).await; + + frontend.run_sql("CREATE ROLE first_role").await.unwrap(); + frontend.run_sql("CREATE ROLE second_role").await.unwrap(); + + let session = frontend.session_ref(); + frontend + .run_sql_with_session(session.clone(), "SET ROLE first_role") + .await + .unwrap(); + assert_eq!(session.auth_context().current_user_name(), "first_role"); + + frontend + .run_sql_with_session(session.clone(), "SET ROLE second_role") + .await + .unwrap(); + assert_eq!(session.auth_context().current_user_name(), "second_role"); +} + +#[tokio::test] +async fn test_set_role_requires_membership() { + let frontend = LocalFrontend::new(Default::default()).await; + + frontend.run_sql("CREATE ROLE analytics").await.unwrap(); + frontend + .run_sql("CREATE USER app_user_2 WITH PASSWORD 'secret'") + .await + .unwrap(); + + let user_id = frontend.user_by_name("app_user_2").unwrap().id; + let session = frontend.session_user_ref( + DEFAULT_DATABASE_NAME.to_owned(), + "app_user_2".to_owned(), + user_id, + ); + let set_role_err = frontend + .run_sql_with_session(session, "SET ROLE analytics") + .await + .unwrap_err() + .to_string(); + assert!( + set_role_err.contains("permission denied to set role"), + "unexpected error: {set_role_err}" + ); +}