From 443a32fe60b123e37856d84d060cfda529819842 Mon Sep 17 00:00:00 2001
From: baishen <baishen2009@gmail.com>
Date: Wed, 13 Dec 2023 18:13:09 +0800
Subject: [PATCH] feat(query): Support create password policy

---
 src/common/exception/src/exception_code.rs    |   3 +
 src/meta/app/src/principal/mod.rs             |   2 +
 src/meta/app/src/principal/password_policy.rs |  35 +++
 .../src/user_from_to_protobuf_impl.rs         |  56 ++++
 src/meta/proto-conv/src/util.rs               |   1 +
 src/meta/protos/proto/user.proto              |  21 ++
 src/query/ast/src/ast/format/ast_format.rs    |  46 +++
 src/query/ast/src/ast/statements/mod.rs       |   2 +
 .../ast/src/ast/statements/password_policy.rs | 240 +++++++++++++++
 src/query/ast/src/ast/statements/statement.rs |  12 +
 src/query/ast/src/parser/statement.rs         | 185 ++++++++++-
 src/query/ast/src/parser/token.rs             |  24 ++
 src/query/ast/src/visitors/visitor.rs         |  10 +
 src/query/ast/src/visitors/visitor_mut.rs     |  10 +
 src/query/ast/src/visitors/walk.rs            |   5 +
 src/query/ast/src/visitors/walk_mut.rs        |   5 +
 src/query/management/src/lib.rs               |   3 +
 .../management/src/password_policy/mod.rs     |  19 ++
 .../password_policy/password_policy_api.rs    |  35 +++
 .../password_policy/password_policy_mgr.rs    | 175 +++++++++++
 .../access/management_mode_access.rs          |   4 +
 .../interpreters/access/privilege_access.rs   |   5 +
 .../src/interpreters/interpreter_factory.rs   |  17 ++
 .../interpreter_password_policies_show.rs     | 101 ++++++
 .../interpreter_password_policy_alter.rs      | 163 ++++++++++
 .../interpreter_password_policy_create.rs     |  79 +++++
 .../interpreter_password_policy_desc.rs       | 144 +++++++++
 .../interpreter_password_policy_drop.rs       |  60 ++++
 src/query/service/src/interpreters/mod.rs     |  10 +
 src/query/sql/src/planner/binder/binder.rs    |  15 +
 src/query/sql/src/planner/binder/ddl/mod.rs   |   1 +
 .../src/planner/binder/ddl/password_policy.rs | 287 ++++++++++++++++++
 .../sql/src/planner/format/display_plan.rs    |   7 +
 .../sql/src/planner/plans/ddl/account.rs      |  83 +++++
 src/query/sql/src/planner/plans/plan.rs       |  19 +-
 src/query/users/src/lib.rs                    |   1 +
 src/query/users/src/password_policy.rs        | 195 ++++++++++++
 src/query/users/src/user_api.rs               |  12 +
 38 files changed, 2088 insertions(+), 4 deletions(-)
 create mode 100644 src/meta/app/src/principal/password_policy.rs
 create mode 100644 src/query/ast/src/ast/statements/password_policy.rs
 create mode 100644 src/query/management/src/password_policy/mod.rs
 create mode 100644 src/query/management/src/password_policy/password_policy_api.rs
 create mode 100644 src/query/management/src/password_policy/password_policy_mgr.rs
 create mode 100644 src/query/service/src/interpreters/interpreter_password_policies_show.rs
 create mode 100644 src/query/service/src/interpreters/interpreter_password_policy_alter.rs
 create mode 100644 src/query/service/src/interpreters/interpreter_password_policy_create.rs
 create mode 100644 src/query/service/src/interpreters/interpreter_password_policy_desc.rs
 create mode 100644 src/query/service/src/interpreters/interpreter_password_policy_drop.rs
 create mode 100644 src/query/sql/src/planner/binder/ddl/password_policy.rs
 create mode 100644 src/query/users/src/password_policy.rs

diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs
index 137caee6372c7..b04258b1206d8 100644
--- a/src/common/exception/src/exception_code.rs
+++ b/src/common/exception/src/exception_code.rs
@@ -225,6 +225,9 @@ build_exceptions! {
     NetworkPolicyAlreadyExists(2208),
     IllegalNetworkPolicy(2209),
     NetworkPolicyIsUsedByUser(2210),
+    UnknownPasswordPolicy(2211),
+    PasswordPolicyAlreadyExists(2212),
+    IllegalPasswordPolicy(2213),
 
     // Meta api error codes.
     DatabaseAlreadyExists(2301),
diff --git a/src/meta/app/src/principal/mod.rs b/src/meta/app/src/principal/mod.rs
index d4a137c72e179..5fb35d7b67962 100644
--- a/src/meta/app/src/principal/mod.rs
+++ b/src/meta/app/src/principal/mod.rs
@@ -18,6 +18,7 @@ mod connection;
 mod file_format;
 mod network_policy;
 mod ownership_info;
+mod password_policy;
 mod principal_identity;
 mod role_info;
 mod user_auth;
@@ -35,6 +36,7 @@ pub use connection::*;
 pub use file_format::*;
 pub use network_policy::NetworkPolicy;
 pub use ownership_info::OwnershipInfo;
+pub use password_policy::PasswordPolicy;
 pub use principal_identity::PrincipalIdentity;
 pub use role_info::RoleInfo;
 pub use role_info::RoleInfoSerdeError;
diff --git a/src/meta/app/src/principal/password_policy.rs b/src/meta/app/src/principal/password_policy.rs
new file mode 100644
index 0000000000000..7b22e64c75e31
--- /dev/null
+++ b/src/meta/app/src/principal/password_policy.rs
@@ -0,0 +1,35 @@
+// Copyright 2021 Datafuse Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use chrono::DateTime;
+use chrono::Utc;
+
+#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)]
+pub struct PasswordPolicy {
+    pub name: String,
+    pub min_length: u64,
+    pub max_length: u64,
+    pub min_upper_case_chars: u64,
+    pub min_lower_case_chars: u64,
+    pub min_numeric_chars: u64,
+    pub min_special_chars: u64,
+    pub min_age_days: u64,
+    pub max_age_days: u64,
+    pub max_retries: u64,
+    pub lockout_time_mins: u64,
+    pub history: u64,
+    pub comment: String,
+    pub create_on: DateTime<Utc>,
+    pub update_on: Option<DateTime<Utc>>,
+}
diff --git a/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs
index dcb00f0177029..5ce04cf41d83b 100644
--- a/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs
+++ b/src/meta/proto-conv/src/user_from_to_protobuf_impl.rs
@@ -389,3 +389,59 @@ impl FromToProto for mt::principal::NetworkPolicy {
         })
     }
 }
+
+impl FromToProto for mt::principal::PasswordPolicy {
+    type PB = pb::PasswordPolicy;
+    fn get_pb_ver(p: &Self::PB) -> u64 {
+        p.ver
+    }
+    fn from_pb(p: pb::PasswordPolicy) -> Result<Self, Incompatible>
+    where Self: Sized {
+        reader_check_msg(p.ver, p.min_reader_ver)?;
+        Ok(mt::principal::PasswordPolicy {
+            name: p.name.clone(),
+            min_length: p.min_length,
+            max_length: p.max_length,
+            min_upper_case_chars: p.min_upper_case_chars,
+            min_lower_case_chars: p.min_lower_case_chars,
+            min_numeric_chars: p.min_numeric_chars,
+            min_special_chars: p.min_special_chars,
+            min_age_days: p.min_age_days,
+            max_age_days: p.max_age_days,
+            max_retries: p.max_retries,
+            lockout_time_mins: p.lockout_time_mins,
+            history: p.history,
+            comment: p.comment,
+            create_on: DateTime::<Utc>::from_pb(p.create_on)?,
+            update_on: match p.update_on {
+                Some(t) => Some(DateTime::<Utc>::from_pb(t)?),
+                None => None,
+            },
+        })
+    }
+
+    fn to_pb(&self) -> Result<pb::PasswordPolicy, Incompatible> {
+        Ok(pb::PasswordPolicy {
+            ver: VER,
+            min_reader_ver: MIN_READER_VER,
+            name: self.name.clone(),
+            min_length: self.min_length,
+            max_length: self.max_length,
+            min_upper_case_chars: self.min_upper_case_chars,
+            min_lower_case_chars: self.min_lower_case_chars,
+            min_numeric_chars: self.min_numeric_chars,
+            min_special_chars: self.min_special_chars,
+            min_age_days: self.min_age_days,
+            max_age_days: self.max_age_days,
+            max_retries: self.max_retries,
+            lockout_time_mins: self.lockout_time_mins,
+            history: self.history,
+            comment: self.comment.clone(),
+            create_on: self.create_on.to_pb()?,
+            update_on: match &self.update_on {
+                Some(t) => Some(t.to_pb()?),
+                None => None,
+            },
+        })
+    }
+}
diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs
index 267f19361493c..00940d955ed62 100644
--- a/src/meta/proto-conv/src/util.rs
+++ b/src/meta/proto-conv/src/util.rs
@@ -95,6 +95,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
     (63, "2023-10-30: Add: connection.proto"),
     (64, "2023-11-16: Add: user.proto/NDJsonFileFormatParams add field `missing_field_as` and `null_field_as`", ),
     (65, "2023-11-16: Retype: use Datetime<Utc> instead of u64 to in lvt.time", ),
+    (66, "2023-12-13: Add: user.proto/PasswordPolicy", ),
     // Dear developer:
     //      If you're gonna add a new metadata version, you'll have to add a test for it.
     //      You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)
diff --git a/src/meta/protos/proto/user.proto b/src/meta/protos/proto/user.proto
index b5cc639d33391..07dd4d02f056d 100644
--- a/src/meta/protos/proto/user.proto
+++ b/src/meta/protos/proto/user.proto
@@ -139,3 +139,24 @@ message NetworkPolicy {
   string create_on = 5;
   optional string update_on = 6;
 }
+
+message PasswordPolicy {
+  uint64 ver = 100;
+  uint64 min_reader_ver = 101;
+
+  string name = 1;
+  uint64 min_length = 2;
+  uint64 max_length = 3;
+  uint64 min_upper_case_chars = 4;
+  uint64 min_lower_case_chars = 5;
+  uint64 min_numeric_chars = 6;
+  uint64 min_special_chars = 7;
+  uint64 min_age_days = 8;
+  uint64 max_age_days = 9;
+  uint64 max_retries = 10;
+  uint64 lockout_time_mins = 11;
+  uint64 history = 12;
+  string comment = 13;
+  string create_on = 14;
+  optional string update_on = 15;
+}
diff --git a/src/query/ast/src/ast/format/ast_format.rs b/src/query/ast/src/ast/format/ast_format.rs
index c112d51155dd1..d5225ba135874 100644
--- a/src/query/ast/src/ast/format/ast_format.rs
+++ b/src/query/ast/src/ast/format/ast_format.rs
@@ -2578,6 +2578,52 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor {
         self.children.push(node);
     }
 
+    fn visit_create_password_policy(&mut self, stmt: &'ast CreatePasswordPolicyStmt) {
+        let ctx = AstFormatContext::new(format!("PasswordPolicyName {}", stmt.name));
+        let child = FormatTreeNode::new(ctx);
+
+        let name = "CreatePasswordPolicy".to_string();
+        let format_ctx = AstFormatContext::with_children(name, 1);
+        let node = FormatTreeNode::with_children(format_ctx, vec![child]);
+        self.children.push(node);
+    }
+
+    fn visit_alter_password_policy(&mut self, stmt: &'ast AlterPasswordPolicyStmt) {
+        let ctx = AstFormatContext::new(format!("PasswordPolicyName {}", stmt.name));
+        let child = FormatTreeNode::new(ctx);
+
+        let name = "AlterPasswordPolicy".to_string();
+        let format_ctx = AstFormatContext::with_children(name, 1);
+        let node = FormatTreeNode::with_children(format_ctx, vec![child]);
+        self.children.push(node);
+    }
+
+    fn visit_drop_password_policy(&mut self, stmt: &'ast DropPasswordPolicyStmt) {
+        let ctx = AstFormatContext::new(format!("PasswordPolicyName {}", stmt.name));
+        let child = FormatTreeNode::new(ctx);
+
+        let name = "DropPasswordPolicy".to_string();
+        let format_ctx = AstFormatContext::with_children(name, 1);
+        let node = FormatTreeNode::with_children(format_ctx, vec![child]);
+        self.children.push(node);
+    }
+
+    fn visit_desc_password_policy(&mut self, stmt: &'ast DescPasswordPolicyStmt) {
+        let ctx = AstFormatContext::new(format!("PasswordPolicyName {}", stmt.name));
+        let child = FormatTreeNode::new(ctx);
+
+        let name = "DescPasswordPolicy".to_string();
+        let format_ctx = AstFormatContext::with_children(name, 1);
+        let node = FormatTreeNode::with_children(format_ctx, vec![child]);
+        self.children.push(node);
+    }
+
+    fn visit_show_password_policies(&mut self) {
+        let ctx = AstFormatContext::new("ShowPasswordPolicies".to_string());
+        let node = FormatTreeNode::new(ctx);
+        self.children.push(node);
+    }
+
     fn visit_with(&mut self, with: &'ast With) {
         let mut children = Vec::with_capacity(with.ctes.len());
         for cte in with.ctes.iter() {
diff --git a/src/query/ast/src/ast/statements/mod.rs b/src/query/ast/src/ast/statements/mod.rs
index dd550cf0c3d00..a0482e2f349b1 100644
--- a/src/query/ast/src/ast/statements/mod.rs
+++ b/src/query/ast/src/ast/statements/mod.rs
@@ -28,6 +28,7 @@ mod kill;
 mod lock;
 mod merge_into;
 mod network_policy;
+mod password_policy;
 mod pipe;
 mod presign;
 mod replace;
@@ -61,6 +62,7 @@ pub use kill::*;
 pub use lock::*;
 pub use merge_into::*;
 pub use network_policy::*;
+pub use password_policy::*;
 pub use pipe::*;
 pub use presign::*;
 pub use replace::*;
diff --git a/src/query/ast/src/ast/statements/password_policy.rs b/src/query/ast/src/ast/statements/password_policy.rs
new file mode 100644
index 0000000000000..b32fe96d0c055
--- /dev/null
+++ b/src/query/ast/src/ast/statements/password_policy.rs
@@ -0,0 +1,240 @@
+// Copyright 2021 Datafuse Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::fmt::Display;
+use std::fmt::Formatter;
+
+use crate::ast::Identifier;
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct CreatePasswordPolicyStmt {
+    pub if_not_exists: bool,
+    pub name: String,
+    pub set_options: PasswordSetOptions,
+}
+
+impl Display for CreatePasswordPolicyStmt {
+    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+        write!(f, "CREATE PASSWORD POLICY ")?;
+        if self.if_not_exists {
+            write!(f, "IF NOT EXISTS ")?;
+        }
+        write!(f, "{}", self.name)?;
+        write!(f, "{}", self.set_options)?;
+
+        Ok(())
+    }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct AlterPasswordPolicyStmt {
+    pub if_exists: bool,
+    pub name: String,
+    pub action: AlterPasswordAction,
+}
+
+impl Display for AlterPasswordPolicyStmt {
+    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+        write!(f, "ALTER PASSWORD POLICY ")?;
+        if self.if_exists {
+            write!(f, "IF EXISTS ")?;
+        }
+        write!(f, "{} ", self.name)?;
+        write!(f, "{}", self.action)?;
+
+        Ok(())
+    }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum AlterPasswordAction {
+    RenamePassword { new_name: Identifier },
+    SetOptions(PasswordSetOptions),
+    UnSetOptions(PasswordUnSetOptions),
+}
+
+impl Display for AlterPasswordAction {
+    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+        match self {
+            AlterPasswordAction::RenamePassword { new_name } => {
+                write!(f, "RENAME {}", new_name)?;
+            }
+            AlterPasswordAction::SetOptions(set_options) => {
+                write!(f, "SET {}", set_options)?;
+            }
+            AlterPasswordAction::UnSetOptions(unset_options) => {
+                write!(f, "UNSET {}", unset_options)?;
+            }
+        }
+
+        Ok(())
+    }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct PasswordSetOptions {
+    pub min_length: Option<u64>,
+    pub max_length: Option<u64>,
+    pub min_upper_case_chars: Option<u64>,
+    pub min_lower_case_chars: Option<u64>,
+    pub min_numeric_chars: Option<u64>,
+    pub min_special_chars: Option<u64>,
+    pub min_age_days: Option<u64>,
+    pub max_age_days: Option<u64>,
+    pub max_retries: Option<u64>,
+    pub lockout_time_mins: Option<u64>,
+    pub history: Option<u64>,
+    pub comment: Option<String>,
+}
+
+impl Display for PasswordSetOptions {
+    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+        if let Some(min_length) = self.min_length {
+            write!(f, " PASSWORD_MIN_LENGTH = {}", min_length)?;
+        }
+        if let Some(max_length) = self.max_length {
+            write!(f, " PASSWORD_MAX_LENGTH = {}", max_length)?;
+        }
+        if let Some(min_upper_case_chars) = self.min_upper_case_chars {
+            write!(
+                f,
+                " PASSWORD_MIN_UPPER_CASE_CHARS = {}",
+                min_upper_case_chars
+            )?;
+        }
+        if let Some(min_lower_case_chars) = self.min_lower_case_chars {
+            write!(
+                f,
+                " PASSWORD_MIN_LOWER_CASE_CHARS = {}",
+                min_lower_case_chars
+            )?;
+        }
+        if let Some(min_numeric_chars) = self.min_numeric_chars {
+            write!(f, " PASSWORD_MIN_NUMERIC_CHARS = {}", min_numeric_chars)?;
+        }
+        if let Some(min_special_chars) = self.min_special_chars {
+            write!(f, " PASSWORD_MIN_SPECIAL_CHARS = {}", min_special_chars)?;
+        }
+        if let Some(min_age_days) = self.min_age_days {
+            write!(f, " PASSWORD_MIN_AGE_DAYS = {}", min_age_days)?;
+        }
+        if let Some(max_age_days) = self.max_age_days {
+            write!(f, " PASSWORD_MAX_AGE_DAYS = {}", max_age_days)?;
+        }
+        if let Some(max_retries) = self.max_retries {
+            write!(f, " PASSWORD_MAX_RETRIES = {}", max_retries)?;
+        }
+        if let Some(lockout_time_mins) = self.lockout_time_mins {
+            write!(f, " PASSWORD_LOCKOUT_TIME_MINS = {}", lockout_time_mins)?;
+        }
+        if let Some(history) = self.history {
+            write!(f, " PASSWORD_HISTORY = {}", history)?;
+        }
+        if let Some(comment) = &self.comment {
+            write!(f, " COMMENT = '{}'", comment)?;
+        }
+
+        Ok(())
+    }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct PasswordUnSetOptions {
+    pub min_length: bool,
+    pub max_length: bool,
+    pub min_upper_case_chars: bool,
+    pub min_lower_case_chars: bool,
+    pub min_numeric_chars: bool,
+    pub min_special_chars: bool,
+    pub min_age_days: bool,
+    pub max_age_days: bool,
+    pub max_retries: bool,
+    pub lockout_time_mins: bool,
+    pub history: bool,
+    pub comment: bool,
+}
+
+impl Display for PasswordUnSetOptions {
+    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+        if self.min_length {
+            write!(f, " PASSWORD_MIN_LENGTH")?;
+        }
+        if self.max_length {
+            write!(f, " PASSWORD_MAX_LENGTH")?;
+        }
+        if self.min_upper_case_chars {
+            write!(f, " PASSWORD_MIN_UPPER_CASE_CHARS")?;
+        }
+        if self.min_lower_case_chars {
+            write!(f, " PASSWORD_MIN_LOWER_CASE_CHARS")?;
+        }
+        if self.min_numeric_chars {
+            write!(f, " PASSWORD_MIN_NUMERIC_CHARS")?;
+        }
+        if self.min_special_chars {
+            write!(f, " PASSWORD_MIN_SPECIAL_CHARS")?;
+        }
+        if self.min_age_days {
+            write!(f, " PASSWORD_MIN_AGE_DAYS")?;
+        }
+        if self.max_age_days {
+            write!(f, " PASSWORD_MAX_AGE_DAYS")?;
+        }
+        if self.max_retries {
+            write!(f, " PASSWORD_MAX_RETRIES")?;
+        }
+        if self.lockout_time_mins {
+            write!(f, " PASSWORD_LOCKOUT_TIME_MINS")?;
+        }
+        if self.history {
+            write!(f, " PASSWORD_HISTORY")?;
+        }
+        if self.comment {
+            write!(f, " COMMENT")?;
+        }
+
+        Ok(())
+    }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct DropPasswordPolicyStmt {
+    pub if_exists: bool,
+    pub name: String,
+}
+
+impl Display for DropPasswordPolicyStmt {
+    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+        write!(f, "DROP PASSWORD POLICY ")?;
+        if self.if_exists {
+            write!(f, "IF EXISTS ")?;
+        }
+        write!(f, "{}", self.name)?;
+
+        Ok(())
+    }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct DescPasswordPolicyStmt {
+    pub name: String,
+}
+
+impl Display for DescPasswordPolicyStmt {
+    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+        write!(f, "DESCRIBE PASSWORD POLICY {}", self.name)?;
+
+        Ok(())
+    }
+}
diff --git a/src/query/ast/src/ast/statements/statement.rs b/src/query/ast/src/ast/statements/statement.rs
index 6455d38cd1042..c7a8897c69b90 100644
--- a/src/query/ast/src/ast/statements/statement.rs
+++ b/src/query/ast/src/ast/statements/statement.rs
@@ -252,6 +252,13 @@ pub enum Statement {
     DescNetworkPolicy(DescNetworkPolicyStmt),
     ShowNetworkPolicies,
 
+    // password policy
+    CreatePasswordPolicy(CreatePasswordPolicyStmt),
+    AlterPasswordPolicy(AlterPasswordPolicyStmt),
+    DropPasswordPolicy(DropPasswordPolicyStmt),
+    DescPasswordPolicy(DescPasswordPolicyStmt),
+    ShowPasswordPolicies,
+
     // tasks
     CreateTask(CreateTaskStmt),
     AlterTask(AlterTaskStmt),
@@ -588,6 +595,11 @@ impl Display for Statement {
             Statement::DropNetworkPolicy(stmt) => write!(f, "{stmt}")?,
             Statement::DescNetworkPolicy(stmt) => write!(f, "{stmt}")?,
             Statement::ShowNetworkPolicies => write!(f, "SHOW NETWORK POLICIES")?,
+            Statement::CreatePasswordPolicy(stmt) => write!(f, "{stmt}")?,
+            Statement::AlterPasswordPolicy(stmt) => write!(f, "{stmt}")?,
+            Statement::DropPasswordPolicy(stmt) => write!(f, "{stmt}")?,
+            Statement::DescPasswordPolicy(stmt) => write!(f, "{stmt}")?,
+            Statement::ShowPasswordPolicies => write!(f, "SHOW PASSWORD POLICIES")?,
             Statement::CreateTask(stmt) => write!(f, "{stmt}")?,
             Statement::AlterTask(stmt) => write!(f, "{stmt}")?,
             Statement::ExecuteTask(stmt) => write!(f, "{stmt}")?,
diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs
index 33453f6f9082e..2d34cb7a9f6ed 100644
--- a/src/query/ast/src/parser/statement.rs
+++ b/src/query/ast/src/parser/statement.rs
@@ -1597,6 +1597,61 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
         rule! { SHOW ~ NETWORK ~ POLICIES },
     );
 
+    let create_password_policy = map(
+        rule! {
+            CREATE ~ PASSWORD ~ POLICY ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ #ident
+             ~ #password_set_options
+        },
+        |(_, _, _, opt_if_not_exists, name, set_options)| {
+            let stmt = CreatePasswordPolicyStmt {
+                if_not_exists: opt_if_not_exists.is_some(),
+                name: name.to_string(),
+                set_options,
+            };
+            Statement::CreatePasswordPolicy(stmt)
+        },
+    );
+    let alter_password_policy = map(
+        rule! {
+            ALTER ~ PASSWORD ~ POLICY ~ ( IF ~ ^EXISTS )? ~ #ident
+             ~ #alter_password_action
+        },
+        |(_, _, _, opt_if_exists, name, action)| {
+            let stmt = AlterPasswordPolicyStmt {
+                if_exists: opt_if_exists.is_some(),
+                name: name.to_string(),
+                action,
+            };
+            Statement::AlterPasswordPolicy(stmt)
+        },
+    );
+    let drop_password_policy = map(
+        rule! {
+            DROP ~ PASSWORD ~ POLICY ~ ( IF ~ ^EXISTS )? ~ #ident
+        },
+        |(_, _, _, opt_if_exists, name)| {
+            let stmt = DropPasswordPolicyStmt {
+                if_exists: opt_if_exists.is_some(),
+                name: name.to_string(),
+            };
+            Statement::DropPasswordPolicy(stmt)
+        },
+    );
+    let describe_password_policy = map(
+        rule! {
+            ( DESC | DESCRIBE ) ~ PASSWORD ~ POLICY ~ #ident
+        },
+        |(_, _, _, name)| {
+            Statement::DescPasswordPolicy(DescPasswordPolicyStmt {
+                name: name.to_string(),
+            })
+        },
+    );
+    let show_password_policies = value(
+        Statement::ShowPasswordPolicies,
+        rule! { SHOW ~ PASSWORD ~ POLICIES },
+    );
+
     let create_pipe = map(
         rule! {
             CREATE ~ PIPE ~ ( IF ~ ^NOT ~ ^EXISTS )?
@@ -1684,13 +1739,18 @@ pub fn statement(i: Input) -> IResult<StatementWithFormat> {
             | #alter_database : "`ALTER DATABASE [IF EXISTS] <action>`"
             | #use_database : "`USE <database>`"
         ),
-        // network policy
+        // network policy / password policy
         rule!(
             #create_network_policy: "`CREATE NETWORK POLICY [IF NOT EXISTS] name ALLOWED_IP_LIST = ('ip1' [, 'ip2']) [BLOCKED_IP_LIST = ('ip1' [, 'ip2'])] [COMMENT = '<string_literal>']`"
             | #alter_network_policy: "`ALTER NETWORK POLICY [IF EXISTS] name SET [ALLOWED_IP_LIST = ('ip1' [, 'ip2'])] [BLOCKED_IP_LIST = ('ip1' [, 'ip2'])] [COMMENT = '<string_literal>']`"
             | #drop_network_policy: "`DROP NETWORK POLICY [IF EXISTS] name`"
             | #describe_network_policy: "`DESC NETWORK POLICY name`"
             | #show_network_policies: "`SHOW NETWORK POLICIES`"
+            | #create_password_policy: "`CREATE PASSWORD POLICY [IF NOT EXISTS] name [PASSWORD_MIN_LENGTH = <u64_literal>] ... [COMMENT = '<string_literal>']`"
+            | #alter_password_policy: "`ALTER PASSWORD POLICY [IF EXISTS] name SET [PASSWORD_MIN_LENGTH = <u64_literal>] ... [COMMENT = '<string_literal>']`"
+            | #drop_password_policy: "`DROP PASSWORD POLICY [IF EXISTS] name`"
+            | #describe_password_policy: "`DESC PASSWORD POLICY name`"
+            | #show_password_policies: "`SHOW PASSWORD POLICIES`"
         ),
         rule!(
             #insert : "`INSERT INTO [TABLE] <table> [(<column>, ...)] (FORMAT <format> | VALUES <values> | <query>)`"
@@ -3189,3 +3249,126 @@ pub fn merge_update_expr(i: Input) -> IResult<MergeUpdateExpr> {
         |((table, name), _, expr)| MergeUpdateExpr { table, name, expr },
     )(i)
 }
+
+pub fn password_set_options(i: Input) -> IResult<PasswordSetOptions> {
+    map(
+        rule! {
+             ( PASSWORD_MIN_LENGTH ~ Eq ~ #literal_u64 ) ?
+             ~ ( PASSWORD_MAX_LENGTH ~ Eq ~ #literal_u64 ) ?
+             ~ ( PASSWORD_MIN_UPPER_CASE_CHARS ~ Eq ~ #literal_u64 ) ?
+             ~ ( PASSWORD_MIN_LOWER_CASE_CHARS ~ Eq ~ #literal_u64 ) ?
+             ~ ( PASSWORD_MIN_NUMERIC_CHARS ~ Eq ~ #literal_u64 ) ?
+             ~ ( PASSWORD_MIN_SPECIAL_CHARS ~ Eq ~ #literal_u64 ) ?
+             ~ ( PASSWORD_MIN_AGE_DAYS ~ Eq ~ #literal_u64 ) ?
+             ~ ( PASSWORD_MAX_AGE_DAYS ~ Eq ~ #literal_u64 ) ?
+             ~ ( PASSWORD_MAX_RETRIES ~ Eq ~ #literal_u64 ) ?
+             ~ ( PASSWORD_LOCKOUT_TIME_MINS ~ Eq ~ #literal_u64 ) ?
+             ~ ( PASSWORD_HISTORY ~ Eq ~ #literal_u64 ) ?
+             ~ ( COMMENT ~ Eq ~ #literal_string)?
+        },
+        |(
+            opt_min_length,
+            opt_max_length,
+            opt_min_upper_case_chars,
+            opt_min_lower_case_chars,
+            opt_min_numeric_chars,
+            opt_min_special_chars,
+            opt_min_age_days,
+            opt_max_age_days,
+            opt_max_retries,
+            opt_lockout_time_mins,
+            opt_history,
+            opt_comment,
+        )| {
+            PasswordSetOptions {
+                min_length: opt_min_length.map(|opt| opt.2),
+                max_length: opt_max_length.map(|opt| opt.2),
+                min_upper_case_chars: opt_min_upper_case_chars.map(|opt| opt.2),
+                min_lower_case_chars: opt_min_lower_case_chars.map(|opt| opt.2),
+                min_numeric_chars: opt_min_numeric_chars.map(|opt| opt.2),
+                min_special_chars: opt_min_special_chars.map(|opt| opt.2),
+                min_age_days: opt_min_age_days.map(|opt| opt.2),
+                max_age_days: opt_max_age_days.map(|opt| opt.2),
+                max_retries: opt_max_retries.map(|opt| opt.2),
+                lockout_time_mins: opt_lockout_time_mins.map(|opt| opt.2),
+                history: opt_history.map(|opt| opt.2),
+                comment: opt_comment.map(|opt| opt.2),
+            }
+        },
+    )(i)
+}
+
+pub fn password_unset_options(i: Input) -> IResult<PasswordUnSetOptions> {
+    map(
+        rule! {
+             PASSWORD_MIN_LENGTH ?
+             ~ PASSWORD_MAX_LENGTH ?
+             ~ PASSWORD_MIN_UPPER_CASE_CHARS ?
+             ~ PASSWORD_MIN_LOWER_CASE_CHARS ?
+             ~ PASSWORD_MIN_NUMERIC_CHARS ?
+             ~ PASSWORD_MIN_SPECIAL_CHARS ?
+             ~ PASSWORD_MIN_AGE_DAYS ?
+             ~ PASSWORD_MAX_AGE_DAYS ?
+             ~ PASSWORD_MAX_RETRIES ?
+             ~ PASSWORD_LOCKOUT_TIME_MINS ?
+             ~ PASSWORD_HISTORY ?
+             ~ COMMENT ?
+        },
+        |(
+            opt_min_length,
+            opt_max_length,
+            opt_min_upper_case_chars,
+            opt_min_lower_case_chars,
+            opt_min_numeric_chars,
+            opt_min_special_chars,
+            opt_min_age_days,
+            opt_max_age_days,
+            opt_max_retries,
+            opt_lockout_time_mins,
+            opt_history,
+            opt_comment,
+        )| {
+            PasswordUnSetOptions {
+                min_length: opt_min_length.is_some(),
+                max_length: opt_max_length.is_some(),
+                min_upper_case_chars: opt_min_upper_case_chars.is_some(),
+                min_lower_case_chars: opt_min_lower_case_chars.is_some(),
+                min_numeric_chars: opt_min_numeric_chars.is_some(),
+                min_special_chars: opt_min_special_chars.is_some(),
+                min_age_days: opt_min_age_days.is_some(),
+                max_age_days: opt_max_age_days.is_some(),
+                max_retries: opt_max_retries.is_some(),
+                lockout_time_mins: opt_lockout_time_mins.is_some(),
+                history: opt_history.is_some(),
+                comment: opt_comment.is_some(),
+            }
+        },
+    )(i)
+}
+
+pub fn alter_password_action(i: Input) -> IResult<AlterPasswordAction> {
+    let rename_password = map(
+        rule! {
+           RENAME ~ TO ~ #ident
+        },
+        |(_, _, new_name)| AlterPasswordAction::RenamePassword { new_name },
+    );
+    let set_options = map(
+        rule! {
+           SET ~ #password_set_options
+        },
+        |(_, set_options)| AlterPasswordAction::SetOptions(set_options),
+    );
+    let unset_options = map(
+        rule! {
+           UNSET ~ #password_unset_options
+        },
+        |(_, unset_options)| AlterPasswordAction::UnSetOptions(unset_options),
+    );
+
+    rule!(
+        #rename_password
+        | #set_options
+        | #unset_options
+    )(i)
+}
diff --git a/src/query/ast/src/parser/token.rs b/src/query/ast/src/parser/token.rs
index 7d934d5ba84b7..67a6df4a697e8 100644
--- a/src/query/ast/src/parser/token.rs
+++ b/src/query/ast/src/parser/token.rs
@@ -747,6 +747,30 @@ pub enum TokenKind {
     PARTITION,
     #[token("PARQUET", ignore(ascii_case))]
     PARQUET,
+    #[token("PASSWORD", ignore(ascii_case))]
+    PASSWORD,
+    #[token("PASSWORD_MIN_LENGTH", ignore(ascii_case))]
+    PASSWORD_MIN_LENGTH,
+    #[token("PASSWORD_MAX_LENGTH", ignore(ascii_case))]
+    PASSWORD_MAX_LENGTH,
+    #[token("PASSWORD_MIN_UPPER_CASE_CHARS", ignore(ascii_case))]
+    PASSWORD_MIN_UPPER_CASE_CHARS,
+    #[token("PASSWORD_MIN_LOWER_CASE_CHARS", ignore(ascii_case))]
+    PASSWORD_MIN_LOWER_CASE_CHARS,
+    #[token("PASSWORD_MIN_NUMERIC_CHARS", ignore(ascii_case))]
+    PASSWORD_MIN_NUMERIC_CHARS,
+    #[token("PASSWORD_MIN_SPECIAL_CHARS", ignore(ascii_case))]
+    PASSWORD_MIN_SPECIAL_CHARS,
+    #[token("PASSWORD_MIN_AGE_DAYS", ignore(ascii_case))]
+    PASSWORD_MIN_AGE_DAYS,
+    #[token("PASSWORD_MAX_AGE_DAYS", ignore(ascii_case))]
+    PASSWORD_MAX_AGE_DAYS,
+    #[token("PASSWORD_MAX_RETRIES", ignore(ascii_case))]
+    PASSWORD_MAX_RETRIES,
+    #[token("PASSWORD_LOCKOUT_TIME_MINS", ignore(ascii_case))]
+    PASSWORD_LOCKOUT_TIME_MINS,
+    #[token("PASSWORD_HISTORY", ignore(ascii_case))]
+    PASSWORD_HISTORY,
     #[token("PATTERN", ignore(ascii_case))]
     PATTERN,
     #[token("PIPELINE", ignore(ascii_case))]
diff --git a/src/query/ast/src/visitors/visitor.rs b/src/query/ast/src/visitors/visitor.rs
index 1571f5107d186..ec6b285f1d771 100644
--- a/src/query/ast/src/visitors/visitor.rs
+++ b/src/query/ast/src/visitors/visitor.rs
@@ -670,6 +670,16 @@ pub trait Visitor<'ast>: Sized {
 
     fn visit_show_network_policies(&mut self) {}
 
+    fn visit_create_password_policy(&mut self, _stmt: &'ast CreatePasswordPolicyStmt) {}
+
+    fn visit_alter_password_policy(&mut self, _stmt: &'ast AlterPasswordPolicyStmt) {}
+
+    fn visit_drop_password_policy(&mut self, _stmt: &'ast DropPasswordPolicyStmt) {}
+
+    fn visit_desc_password_policy(&mut self, _stmt: &'ast DescPasswordPolicyStmt) {}
+
+    fn visit_show_password_policies(&mut self) {}
+
     fn visit_create_task(&mut self, _stmt: &'ast CreateTaskStmt) {}
 
     fn visit_drop_task(&mut self, _stmt: &'ast DropTaskStmt) {}
diff --git a/src/query/ast/src/visitors/visitor_mut.rs b/src/query/ast/src/visitors/visitor_mut.rs
index 9e1a9ccf1219d..3684790c36b73 100644
--- a/src/query/ast/src/visitors/visitor_mut.rs
+++ b/src/query/ast/src/visitors/visitor_mut.rs
@@ -684,6 +684,16 @@ pub trait VisitorMut: Sized {
 
     fn visit_show_network_policies(&mut self) {}
 
+    fn visit_create_password_policy(&mut self, _stmt: &mut CreatePasswordPolicyStmt) {}
+
+    fn visit_alter_password_policy(&mut self, _stmt: &mut AlterPasswordPolicyStmt) {}
+
+    fn visit_drop_password_policy(&mut self, _stmt: &mut DropPasswordPolicyStmt) {}
+
+    fn visit_desc_password_policy(&mut self, _stmt: &mut DescPasswordPolicyStmt) {}
+
+    fn visit_show_password_policies(&mut self) {}
+
     fn visit_create_task(&mut self, _stmt: &mut CreateTaskStmt) {}
 
     fn visit_drop_task(&mut self, _stmt: &mut DropTaskStmt) {}
diff --git a/src/query/ast/src/visitors/walk.rs b/src/query/ast/src/visitors/walk.rs
index 319f5e35dff04..2c55d8db4b852 100644
--- a/src/query/ast/src/visitors/walk.rs
+++ b/src/query/ast/src/visitors/walk.rs
@@ -503,6 +503,11 @@ pub fn walk_statement<'a, V: Visitor<'a>>(visitor: &mut V, statement: &'a Statem
         Statement::DropNetworkPolicy(stmt) => visitor.visit_drop_network_policy(stmt),
         Statement::DescNetworkPolicy(stmt) => visitor.visit_desc_network_policy(stmt),
         Statement::ShowNetworkPolicies => visitor.visit_show_network_policies(),
+        Statement::CreatePasswordPolicy(stmt) => visitor.visit_create_password_policy(stmt),
+        Statement::AlterPasswordPolicy(stmt) => visitor.visit_alter_password_policy(stmt),
+        Statement::DropPasswordPolicy(stmt) => visitor.visit_drop_password_policy(stmt),
+        Statement::DescPasswordPolicy(stmt) => visitor.visit_desc_password_policy(stmt),
+        Statement::ShowPasswordPolicies => visitor.visit_show_password_policies(),
         Statement::CreateTask(stmt) => visitor.visit_create_task(stmt),
         Statement::ExecuteTask(stmt) => visitor.visit_execute_task(stmt),
         Statement::DropTask(stmt) => visitor.visit_drop_task(stmt),
diff --git a/src/query/ast/src/visitors/walk_mut.rs b/src/query/ast/src/visitors/walk_mut.rs
index 97928f849adec..09b283f836d36 100644
--- a/src/query/ast/src/visitors/walk_mut.rs
+++ b/src/query/ast/src/visitors/walk_mut.rs
@@ -508,6 +508,11 @@ pub fn walk_statement_mut<V: VisitorMut>(visitor: &mut V, statement: &mut Statem
         Statement::DropNetworkPolicy(stmt) => visitor.visit_drop_network_policy(stmt),
         Statement::DescNetworkPolicy(stmt) => visitor.visit_desc_network_policy(stmt),
         Statement::ShowNetworkPolicies => visitor.visit_show_network_policies(),
+        Statement::CreatePasswordPolicy(stmt) => visitor.visit_create_password_policy(stmt),
+        Statement::AlterPasswordPolicy(stmt) => visitor.visit_alter_password_policy(stmt),
+        Statement::DropPasswordPolicy(stmt) => visitor.visit_drop_password_policy(stmt),
+        Statement::DescPasswordPolicy(stmt) => visitor.visit_desc_password_policy(stmt),
+        Statement::ShowPasswordPolicies => visitor.visit_show_password_policies(),
 
         Statement::CreateTask(stmt) => visitor.visit_create_task(stmt),
         Statement::ExecuteTask(stmt) => visitor.visit_execute_task(stmt),
diff --git a/src/query/management/src/lib.rs b/src/query/management/src/lib.rs
index 35c959bb8b9a5..c70a693fe09bb 100644
--- a/src/query/management/src/lib.rs
+++ b/src/query/management/src/lib.rs
@@ -18,6 +18,7 @@ mod cluster;
 mod connection;
 mod file_format;
 mod network_policy;
+mod password_policy;
 mod quota;
 mod role;
 mod serde;
@@ -34,6 +35,8 @@ pub use file_format::FileFormatApi;
 pub use file_format::FileFormatMgr;
 pub use network_policy::NetworkPolicyApi;
 pub use network_policy::NetworkPolicyMgr;
+pub use password_policy::PasswordPolicyApi;
+pub use password_policy::PasswordPolicyMgr;
 pub use quota::QuotaApi;
 pub use quota::QuotaMgr;
 pub use role::RoleApi;
diff --git a/src/query/management/src/password_policy/mod.rs b/src/query/management/src/password_policy/mod.rs
new file mode 100644
index 0000000000000..33bab1ac7e0be
--- /dev/null
+++ b/src/query/management/src/password_policy/mod.rs
@@ -0,0 +1,19 @@
+// Copyright 2021 Datafuse Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+mod password_policy_api;
+mod password_policy_mgr;
+
+pub use password_policy_api::PasswordPolicyApi;
+pub use password_policy_mgr::PasswordPolicyMgr;
diff --git a/src/query/management/src/password_policy/password_policy_api.rs b/src/query/management/src/password_policy/password_policy_api.rs
new file mode 100644
index 0000000000000..da5d1d4ec101e
--- /dev/null
+++ b/src/query/management/src/password_policy/password_policy_api.rs
@@ -0,0 +1,35 @@
+// Copyright 2021 Datafuse Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use common_exception::Result;
+use common_meta_app::principal::PasswordPolicy;
+use common_meta_types::MatchSeq;
+use common_meta_types::SeqV;
+
+#[async_trait::async_trait]
+pub trait PasswordPolicyApi: Sync + Send {
+    async fn add_password_policy(&self, password_policy: PasswordPolicy) -> Result<u64>;
+
+    async fn update_password_policy(
+        &self,
+        password_policy: PasswordPolicy,
+        seq: MatchSeq,
+    ) -> Result<u64>;
+
+    async fn drop_password_policy(&self, name: &str, seq: MatchSeq) -> Result<()>;
+
+    async fn get_password_policy(&self, name: &str, seq: MatchSeq) -> Result<SeqV<PasswordPolicy>>;
+
+    async fn get_password_policies(&self) -> Result<Vec<PasswordPolicy>>;
+}
diff --git a/src/query/management/src/password_policy/password_policy_mgr.rs b/src/query/management/src/password_policy/password_policy_mgr.rs
new file mode 100644
index 0000000000000..4ab724728ff16
--- /dev/null
+++ b/src/query/management/src/password_policy/password_policy_mgr.rs
@@ -0,0 +1,175 @@
+// Copyright 2021 Datafuse Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+
+use common_base::base::escape_for_key;
+use common_exception::ErrorCode;
+use common_exception::Result;
+use common_meta_app::principal::PasswordPolicy;
+use common_meta_kvapi::kvapi;
+use common_meta_kvapi::kvapi::UpsertKVReq;
+use common_meta_types::MatchSeq;
+use common_meta_types::MatchSeqExt;
+use common_meta_types::MetaError;
+use common_meta_types::Operation;
+use common_meta_types::SeqV;
+
+use crate::password_policy::password_policy_api::PasswordPolicyApi;
+use crate::serde::deserialize_struct;
+use crate::serde::serialize_struct;
+
+static NETWORK_POLICY_API_KEY_PREFIX: &str = "__fd_password_policies";
+
+pub struct PasswordPolicyMgr {
+    kv_api: Arc<dyn kvapi::KVApi<Error = MetaError>>,
+    password_policy_prefix: String,
+}
+
+impl PasswordPolicyMgr {
+    pub fn create(
+        kv_api: Arc<dyn kvapi::KVApi<Error = MetaError>>,
+        tenant: &str,
+    ) -> Result<Self, ErrorCode> {
+        if tenant.is_empty() {
+            return Err(ErrorCode::TenantIsEmpty(
+                "Tenant can not empty (while create password policy)",
+            ));
+        }
+
+        Ok(PasswordPolicyMgr {
+            kv_api,
+            password_policy_prefix: format!("{}/{}", NETWORK_POLICY_API_KEY_PREFIX, tenant),
+        })
+    }
+
+    fn make_password_policy_key(&self, name: &str) -> Result<String> {
+        Ok(format!(
+            "{}/{}",
+            self.password_policy_prefix,
+            escape_for_key(name)?
+        ))
+    }
+}
+
+#[async_trait::async_trait]
+impl PasswordPolicyApi for PasswordPolicyMgr {
+    #[async_backtrace::framed]
+    #[minitrace::trace]
+    async fn add_password_policy(&self, password_policy: PasswordPolicy) -> Result<u64> {
+        let match_seq = MatchSeq::Exact(0);
+        let key = self.make_password_policy_key(password_policy.name.as_str())?;
+        let value = Operation::Update(serialize_struct(
+            &password_policy,
+            ErrorCode::IllegalPasswordPolicy,
+            || "",
+        )?);
+
+        let kv_api = self.kv_api.clone();
+        let upsert_kv = kv_api.upsert_kv(UpsertKVReq::new(&key, match_seq, value, None));
+
+        let res_seq = upsert_kv.await?.added_seq_or_else(|v| {
+            ErrorCode::PasswordPolicyAlreadyExists(format!(
+                "PasswordPolicy already exists, seq [{}]",
+                v.seq
+            ))
+        })?;
+
+        Ok(res_seq)
+    }
+
+    #[async_backtrace::framed]
+    #[minitrace::trace]
+    async fn update_password_policy(
+        &self,
+        password_policy: PasswordPolicy,
+        match_seq: MatchSeq,
+    ) -> Result<u64> {
+        let key = self.make_password_policy_key(password_policy.name.as_str())?;
+        let value = Operation::Update(serialize_struct(
+            &password_policy,
+            ErrorCode::IllegalPasswordPolicy,
+            || "",
+        )?);
+
+        let kv_api = self.kv_api.clone();
+        let upsert_kv = kv_api
+            .upsert_kv(UpsertKVReq::new(&key, match_seq, value, None))
+            .await?;
+
+        match upsert_kv.result {
+            Some(SeqV { seq: s, .. }) => Ok(s),
+            None => Err(ErrorCode::UnknownPasswordPolicy(format!(
+                "Unknown PasswordPolicy, or seq not match {}",
+                password_policy.name.clone()
+            ))),
+        }
+    }
+
+    #[async_backtrace::framed]
+    #[minitrace::trace]
+    async fn drop_password_policy(&self, name: &str, seq: MatchSeq) -> Result<()> {
+        let key = self.make_password_policy_key(name)?;
+        let kv_api = self.kv_api.clone();
+        let res = kv_api
+            .upsert_kv(UpsertKVReq::new(&key, seq, Operation::Delete, None))
+            .await?;
+        if res.prev.is_some() && res.result.is_none() {
+            Ok(())
+        } else {
+            Err(ErrorCode::UnknownPasswordPolicy(format!(
+                "Unknown PasswordPolicy {}",
+                name
+            )))
+        }
+    }
+
+    #[async_backtrace::framed]
+    #[minitrace::trace]
+    async fn get_password_policy(&self, name: &str, seq: MatchSeq) -> Result<SeqV<PasswordPolicy>> {
+        let key = self.make_password_policy_key(name)?;
+        let res = self.kv_api.get_kv(&key).await?;
+        let seq_value = res.ok_or_else(|| {
+            ErrorCode::UnknownPasswordPolicy(format!("Unknown PasswordPolicy {}", name))
+        })?;
+
+        match seq.match_seq(&seq_value) {
+            Ok(_) => Ok(SeqV::new(
+                seq_value.seq,
+                deserialize_struct(&seq_value.data, ErrorCode::IllegalPasswordPolicy, || "")?,
+            )),
+            Err(_) => Err(ErrorCode::UnknownPasswordPolicy(format!(
+                "Unknown PasswordPolicy {}",
+                name
+            ))),
+        }
+    }
+
+    #[async_backtrace::framed]
+    #[minitrace::trace]
+    async fn get_password_policies(&self) -> Result<Vec<PasswordPolicy>> {
+        let values = self
+            .kv_api
+            .prefix_list_kv(&self.password_policy_prefix)
+            .await?;
+
+        let mut password_policies = Vec::with_capacity(values.len());
+        for (_, value) in values {
+            let password_policy =
+                deserialize_struct(&value.data, ErrorCode::IllegalPasswordPolicy, || "")?;
+            password_policies.push(password_policy);
+        }
+        Ok(password_policies)
+    }
+}
diff --git a/src/query/service/src/interpreters/access/management_mode_access.rs b/src/query/service/src/interpreters/access/management_mode_access.rs
index 389a26d5a8e71..5209d9a52fd59 100644
--- a/src/query/service/src/interpreters/access/management_mode_access.rs
+++ b/src/query/service/src/interpreters/access/management_mode_access.rs
@@ -104,6 +104,10 @@ impl AccessChecker for ManagementModeAccess {
                 | Plan::CreateNetworkPolicy(_)
                 | Plan::AlterNetworkPolicy(_)
                 | Plan::DropNetworkPolicy(_)
+                // Password policy.
+                | Plan::CreatePasswordPolicy(_)
+                | Plan::AlterPasswordPolicy(_)
+                | Plan::DropPasswordPolicy(_)
 
                 // UDF
                 | Plan::CreateUDF(_)
diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs
index 756a39d240109..e9a4a1c6d89bf 100644
--- a/src/query/service/src/interpreters/access/privilege_access.rs
+++ b/src/query/service/src/interpreters/access/privilege_access.rs
@@ -932,6 +932,11 @@ impl AccessChecker for PrivilegeAccess {
             | Plan::DropNetworkPolicy(_)
             | Plan::DescNetworkPolicy(_)
             | Plan::ShowNetworkPolicies(_)
+            | Plan::CreatePasswordPolicy(_)
+            | Plan::AlterPasswordPolicy(_)
+            | Plan::DropPasswordPolicy(_)
+            | Plan::DescPasswordPolicy(_)
+            | Plan::ShowPasswordPolicies(_)
             | Plan::CreateConnection(_)
             | Plan::ShowConnections(_)
             | Plan::DescConnection(_)
diff --git a/src/query/service/src/interpreters/interpreter_factory.rs b/src/query/service/src/interpreters/interpreter_factory.rs
index ec0f435ead5cb..5dca3f15a5db6 100644
--- a/src/query/service/src/interpreters/interpreter_factory.rs
+++ b/src/query/service/src/interpreters/interpreter_factory.rs
@@ -471,6 +471,23 @@ impl InterpreterFactory {
             Plan::ShowNetworkPolicies(_) => {
                 Ok(Arc::new(ShowNetworkPoliciesInterpreter::try_create(ctx)?))
             }
+            Plan::CreatePasswordPolicy(p) => Ok(Arc::new(
+                CreatePasswordPolicyInterpreter::try_create(ctx, *p.clone())?,
+            )),
+            Plan::AlterPasswordPolicy(p) => Ok(Arc::new(
+                AlterPasswordPolicyInterpreter::try_create(ctx, *p.clone())?,
+            )),
+            Plan::DropPasswordPolicy(p) => Ok(Arc::new(DropPasswordPolicyInterpreter::try_create(
+                ctx,
+                *p.clone(),
+            )?)),
+            Plan::DescPasswordPolicy(p) => Ok(Arc::new(DescPasswordPolicyInterpreter::try_create(
+                ctx,
+                *p.clone(),
+            )?)),
+            Plan::ShowPasswordPolicies(_) => {
+                Ok(Arc::new(ShowPasswordPoliciesInterpreter::try_create(ctx)?))
+            }
 
             Plan::CreateTask(p) => Ok(Arc::new(CreateTaskInterpreter::try_create(
                 ctx,
diff --git a/src/query/service/src/interpreters/interpreter_password_policies_show.rs b/src/query/service/src/interpreters/interpreter_password_policies_show.rs
new file mode 100644
index 0000000000000..4e6597e175553
--- /dev/null
+++ b/src/query/service/src/interpreters/interpreter_password_policies_show.rs
@@ -0,0 +1,101 @@
+// Copyright 2021 Datafuse Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+
+use common_exception::Result;
+use common_expression::types::StringType;
+use common_expression::types::TimestampType;
+use common_expression::DataBlock;
+use common_expression::FromData;
+use common_users::UserApiProvider;
+
+use crate::interpreters::Interpreter;
+use crate::pipelines::PipelineBuildResult;
+use crate::sessions::QueryContext;
+use crate::sessions::TableContext;
+
+#[derive(Debug)]
+pub struct ShowPasswordPoliciesInterpreter {
+    ctx: Arc<QueryContext>,
+}
+
+impl ShowPasswordPoliciesInterpreter {
+    pub fn try_create(ctx: Arc<QueryContext>) -> Result<Self> {
+        Ok(ShowPasswordPoliciesInterpreter { ctx })
+    }
+}
+
+#[async_trait::async_trait]
+impl Interpreter for ShowPasswordPoliciesInterpreter {
+    fn name(&self) -> &str {
+        "ShowPasswordPoliciesInterpreter"
+    }
+
+    #[async_backtrace::framed]
+    async fn execute2(&self) -> Result<PipelineBuildResult> {
+        let tenant = self.ctx.get_tenant();
+        let user_mgr = UserApiProvider::instance();
+        let password_policies = user_mgr.get_password_policies(&tenant).await?;
+
+        let mut created_ons = Vec::with_capacity(password_policies.len());
+        let mut names = Vec::with_capacity(password_policies.len());
+        let mut comments = Vec::with_capacity(password_policies.len());
+        let mut options = Vec::with_capacity(password_policies.len());
+        for password_policy in password_policies {
+            created_ons.push(password_policy.create_on.timestamp_micros());
+            names.push(password_policy.name.as_bytes().to_vec());
+            comments.push(password_policy.comment.as_bytes().to_vec());
+
+            let values = vec![
+                format!("PASSWORD_MIN_LENGTH = {}", password_policy.min_length),
+                format!("PASSWORD_MAX_LENGTH = {}", password_policy.max_length),
+                format!(
+                    "PASSWORD_MIN_UPPER_CASE_CHARS = {}",
+                    password_policy.min_upper_case_chars
+                ),
+                format!(
+                    "PASSWORD_MIN_LOWER_CASE_CHARS = {}",
+                    password_policy.min_lower_case_chars
+                ),
+                format!(
+                    "PASSWORD_MIN_NUMERIC_CHARS = {}",
+                    password_policy.min_numeric_chars
+                ),
+                format!(
+                    "PASSWORD_MIN_SPECIAL_CHARS = {}",
+                    password_policy.min_special_chars
+                ),
+                format!("PASSWORD_MIN_AGE_DAYS = {}", password_policy.min_age_days),
+                format!("PASSWORD_MAX_AGE_DAYS = {}", password_policy.max_age_days),
+                format!("PASSWORD_MAX_RETRIES = {}", password_policy.max_retries),
+                format!(
+                    "PASSWORD_LOCKOUT_TIME_MINS = {}",
+                    password_policy.lockout_time_mins
+                ),
+                format!("PASSWORD_HISTORY = {}", password_policy.history),
+            ];
+            let option = values.join(", ");
+            options.push(option.as_bytes().to_vec());
+        }
+
+        // todo owner
+        PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![
+            TimestampType::from_data(created_ons),
+            StringType::from_data(names),
+            StringType::from_data(comments),
+            StringType::from_data(options),
+        ])])
+    }
+}
diff --git a/src/query/service/src/interpreters/interpreter_password_policy_alter.rs b/src/query/service/src/interpreters/interpreter_password_policy_alter.rs
new file mode 100644
index 0000000000000..6133b2c16f4b6
--- /dev/null
+++ b/src/query/service/src/interpreters/interpreter_password_policy_alter.rs
@@ -0,0 +1,163 @@
+// Copyright 2021 Datafuse Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+
+use common_ast::ast::AlterPasswordAction;
+use common_exception::Result;
+use common_sql::plans::AlterPasswordPolicyPlan;
+use common_users::UserApiProvider;
+use log::debug;
+
+use crate::interpreters::Interpreter;
+use crate::pipelines::PipelineBuildResult;
+use crate::sessions::QueryContext;
+use crate::sessions::TableContext;
+
+#[derive(Debug)]
+pub struct AlterPasswordPolicyInterpreter {
+    ctx: Arc<QueryContext>,
+    plan: AlterPasswordPolicyPlan,
+}
+
+impl AlterPasswordPolicyInterpreter {
+    pub fn try_create(ctx: Arc<QueryContext>, plan: AlterPasswordPolicyPlan) -> Result<Self> {
+        Ok(AlterPasswordPolicyInterpreter { ctx, plan })
+    }
+}
+
+#[async_trait::async_trait]
+impl Interpreter for AlterPasswordPolicyInterpreter {
+    fn name(&self) -> &str {
+        "AlterPasswordPolicyInterpreter"
+    }
+
+    #[minitrace::trace]
+    #[async_backtrace::framed]
+    async fn execute2(&self) -> Result<PipelineBuildResult> {
+        debug!("ctx.id" = self.ctx.get_id().as_str(); "alter_password_policy_execute");
+
+        let plan = self.plan.clone();
+        let tenant = self.ctx.get_tenant();
+
+        let user_mgr = UserApiProvider::instance();
+
+        match plan.action {
+            AlterPasswordAction::RenamePassword { .. } => {
+                todo!()
+            }
+            AlterPasswordAction::SetOptions(set_options) => {
+                user_mgr
+                    .update_password_policy(
+                        &tenant,
+                        &plan.name,
+                        set_options.min_length,
+                        set_options.max_length,
+                        set_options.min_upper_case_chars,
+                        set_options.min_lower_case_chars,
+                        set_options.min_numeric_chars,
+                        set_options.min_special_chars,
+                        set_options.min_age_days,
+                        set_options.max_age_days,
+                        set_options.max_retries,
+                        set_options.lockout_time_mins,
+                        set_options.history,
+                        set_options.comment.clone(),
+                        plan.if_exists,
+                    )
+                    .await?;
+            }
+            AlterPasswordAction::UnSetOptions(unset_options) => {
+                // convert unset options to default values
+                let min_length = if unset_options.min_length {
+                    Some(8)
+                } else {
+                    None
+                };
+                let max_length = if unset_options.max_length {
+                    Some(256)
+                } else {
+                    None
+                };
+                let min_upper_case_chars = if unset_options.min_upper_case_chars {
+                    Some(1)
+                } else {
+                    None
+                };
+                let min_lower_case_chars = if unset_options.min_lower_case_chars {
+                    Some(1)
+                } else {
+                    None
+                };
+                let min_numeric_chars = if unset_options.min_numeric_chars {
+                    Some(1)
+                } else {
+                    None
+                };
+                let min_special_chars = if unset_options.min_special_chars {
+                    Some(1)
+                } else {
+                    None
+                };
+                let min_age_days = if unset_options.min_age_days {
+                    Some(0)
+                } else {
+                    None
+                };
+                let max_age_days = if unset_options.max_age_days {
+                    Some(90)
+                } else {
+                    None
+                };
+                let max_retries = if unset_options.max_retries {
+                    Some(5)
+                } else {
+                    None
+                };
+                let lockout_time_mins = if unset_options.lockout_time_mins {
+                    Some(15)
+                } else {
+                    None
+                };
+                let history = if unset_options.history { Some(0) } else { None };
+                let comment = if unset_options.comment {
+                    Some("".to_string())
+                } else {
+                    None
+                };
+                user_mgr
+                    .update_password_policy(
+                        &tenant,
+                        &plan.name,
+                        min_length,
+                        max_length,
+                        min_upper_case_chars,
+                        min_lower_case_chars,
+                        min_numeric_chars,
+                        min_special_chars,
+                        min_age_days,
+                        max_age_days,
+                        max_retries,
+                        lockout_time_mins,
+                        history,
+                        comment,
+                        plan.if_exists,
+                    )
+                    .await?;
+            }
+        }
+
+        Ok(PipelineBuildResult::create())
+    }
+}
diff --git a/src/query/service/src/interpreters/interpreter_password_policy_create.rs b/src/query/service/src/interpreters/interpreter_password_policy_create.rs
new file mode 100644
index 0000000000000..596769368d343
--- /dev/null
+++ b/src/query/service/src/interpreters/interpreter_password_policy_create.rs
@@ -0,0 +1,79 @@
+// Copyright 2021 Datafuse Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+
+use chrono::Utc;
+use common_exception::Result;
+use common_meta_app::principal::PasswordPolicy;
+use common_sql::plans::CreatePasswordPolicyPlan;
+use common_users::UserApiProvider;
+use log::debug;
+
+use crate::interpreters::Interpreter;
+use crate::pipelines::PipelineBuildResult;
+use crate::sessions::QueryContext;
+use crate::sessions::TableContext;
+
+#[derive(Debug)]
+pub struct CreatePasswordPolicyInterpreter {
+    ctx: Arc<QueryContext>,
+    plan: CreatePasswordPolicyPlan,
+}
+
+impl CreatePasswordPolicyInterpreter {
+    pub fn try_create(ctx: Arc<QueryContext>, plan: CreatePasswordPolicyPlan) -> Result<Self> {
+        Ok(CreatePasswordPolicyInterpreter { ctx, plan })
+    }
+}
+
+#[async_trait::async_trait]
+impl Interpreter for CreatePasswordPolicyInterpreter {
+    fn name(&self) -> &str {
+        "CreatePasswordPolicyInterpreter"
+    }
+
+    #[minitrace::trace]
+    #[async_backtrace::framed]
+    async fn execute2(&self) -> Result<PipelineBuildResult> {
+        debug!("ctx.id" = self.ctx.get_id().as_str(); "create_password_policy_execute");
+
+        let plan = self.plan.clone();
+        let tenant = self.ctx.get_tenant();
+        let user_mgr = UserApiProvider::instance();
+
+        let password_policy = PasswordPolicy {
+            name: plan.name,
+            min_length: plan.min_length,
+            max_length: plan.max_length,
+            min_upper_case_chars: plan.min_upper_case_chars,
+            min_lower_case_chars: plan.min_lower_case_chars,
+            min_numeric_chars: plan.min_numeric_chars,
+            min_special_chars: plan.min_special_chars,
+            min_age_days: plan.min_age_days,
+            max_age_days: plan.max_age_days,
+            max_retries: plan.max_retries,
+            lockout_time_mins: plan.lockout_time_mins,
+            history: plan.history,
+            comment: plan.comment,
+            create_on: Utc::now(),
+            update_on: None,
+        };
+        user_mgr
+            .add_password_policy(&tenant, password_policy, plan.if_not_exists)
+            .await?;
+
+        Ok(PipelineBuildResult::create())
+    }
+}
diff --git a/src/query/service/src/interpreters/interpreter_password_policy_desc.rs b/src/query/service/src/interpreters/interpreter_password_policy_desc.rs
new file mode 100644
index 0000000000000..d3bd6f7a68612
--- /dev/null
+++ b/src/query/service/src/interpreters/interpreter_password_policy_desc.rs
@@ -0,0 +1,144 @@
+// Copyright 2021 Datafuse Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+
+use common_exception::Result;
+use common_expression::types::StringType;
+use common_expression::types::UInt64Type;
+use common_expression::DataBlock;
+use common_expression::FromData;
+use common_sql::plans::DescPasswordPolicyPlan;
+use common_users::UserApiProvider;
+
+use crate::interpreters::Interpreter;
+use crate::pipelines::PipelineBuildResult;
+use crate::sessions::QueryContext;
+use crate::sessions::TableContext;
+
+#[derive(Debug)]
+pub struct DescPasswordPolicyInterpreter {
+    ctx: Arc<QueryContext>,
+    plan: DescPasswordPolicyPlan,
+}
+
+impl DescPasswordPolicyInterpreter {
+    pub fn try_create(ctx: Arc<QueryContext>, plan: DescPasswordPolicyPlan) -> Result<Self> {
+        Ok(DescPasswordPolicyInterpreter { ctx, plan })
+    }
+}
+
+#[async_trait::async_trait]
+impl Interpreter for DescPasswordPolicyInterpreter {
+    fn name(&self) -> &str {
+        "DescPasswordPolicyInterpreter"
+    }
+
+    #[async_backtrace::framed]
+    async fn execute2(&self) -> Result<PipelineBuildResult> {
+        let tenant = self.ctx.get_tenant();
+        let user_mgr = UserApiProvider::instance();
+
+        let password_policy = user_mgr
+            .get_password_policy(&tenant, self.plan.name.as_str())
+            .await?;
+
+        let properties = vec![
+            "NAME".as_bytes().to_vec(),
+            "OWNER".as_bytes().to_vec(),
+            "COMMENT".as_bytes().to_vec(),
+            "PASSWORD_MIN_LENGTH".as_bytes().to_vec(),
+            "PASSWORD_MAX_LENGTH".as_bytes().to_vec(),
+            "PASSWORD_MIN_UPPER_CASE_CHARS".as_bytes().to_vec(),
+            "PASSWORD_MIN_LOWER_CASE_CHARS".as_bytes().to_vec(),
+            "PASSWORD_MIN_NUMERIC_CHARS".as_bytes().to_vec(),
+            "PASSWORD_MIN_SPECIAL_CHARS".as_bytes().to_vec(),
+            "PASSWORD_MIN_AGE_DAYS".as_bytes().to_vec(),
+            "PASSWORD_MAX_AGE_DAYS".as_bytes().to_vec(),
+            "PASSWORD_MAX_RETRIES".as_bytes().to_vec(),
+            "PASSWORD_LOCKOUT_TIME_MINS".as_bytes().to_vec(),
+            "PASSWORD_HISTORY".as_bytes().to_vec(),
+        ];
+
+        let min_length = format!("{}", password_policy.min_length);
+        let max_length = format!("{}", password_policy.max_length);
+        let min_upper_case_chars = format!("{}", password_policy.min_upper_case_chars);
+        let min_lower_case_chars = format!("{}", password_policy.min_lower_case_chars);
+        let min_numeric_chars = format!("{}", password_policy.min_numeric_chars);
+        let min_special_chars = format!("{}", password_policy.min_special_chars);
+        let min_age_days = format!("{}", password_policy.min_age_days);
+        let max_age_days = format!("{}", password_policy.max_age_days);
+        let max_retries = format!("{}", password_policy.max_retries);
+        let lockout_time_mins = format!("{}", password_policy.lockout_time_mins);
+        let history = format!("{}", password_policy.history);
+
+        let values = vec![
+            password_policy.name.as_bytes().to_vec(),
+            "PROD_ADMIN".as_bytes().to_vec(),
+            password_policy.comment.as_bytes().to_vec(),
+            min_length.as_bytes().to_vec(),
+            max_length.as_bytes().to_vec(),
+            min_upper_case_chars.as_bytes().to_vec(),
+            min_lower_case_chars.as_bytes().to_vec(),
+            min_numeric_chars.as_bytes().to_vec(),
+            min_special_chars.as_bytes().to_vec(),
+            min_age_days.as_bytes().to_vec(),
+            max_age_days.as_bytes().to_vec(),
+            max_retries.as_bytes().to_vec(),
+            lockout_time_mins.as_bytes().to_vec(),
+            history.as_bytes().to_vec(),
+        ];
+
+        let defaults = vec![
+            None,
+            None,
+            None,
+            Some(8u64),
+            Some(256u64),
+            Some(1u64),
+            Some(1u64),
+            Some(1u64),
+            Some(0u64),
+            Some(0u64),
+            Some(90u64),
+            Some(5u64),
+            Some(15u64),
+            Some(24u64),
+        ];
+
+        let descriptions = vec![
+            "Name of password policy.".as_bytes().to_vec(),
+            "Owner of password policy.".as_bytes().to_vec(),
+            "user comment associated to an object in the dictionary.".as_bytes().to_vec(),
+            "Minimum length of new password.".as_bytes().to_vec(),
+            "Maximum length of new password.".as_bytes().to_vec(),
+            "Minimum number of uppercase characters in new password.".as_bytes().to_vec(),
+            "Minimum number of lowercase characters in new password.".as_bytes().to_vec(),
+            "Minimum number of numeric characters in new password.".as_bytes().to_vec(),
+            "Minimum number of special characters in new password.".as_bytes().to_vec(),
+            "Period after a password is changed during which a password cannot be changed again, in days.".as_bytes().to_vec(),
+            "Period after which password must be changed, in days.".as_bytes().to_vec(),
+            "Number of attempts users have to enter the correct password before their account is locked.".as_bytes().to_vec(),
+            "Period of time for which users will be locked after entering their password incorrectly many times (specified by MAX_RETRIES), in minutes.".as_bytes().to_vec(),
+            "Number of most recent passwords that may not be repeated by the user.".as_bytes().to_vec(),
+        ];
+
+        PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![
+            StringType::from_data(properties),
+            StringType::from_data(values),
+            UInt64Type::from_opt_data(defaults),
+            StringType::from_data(descriptions),
+        ])])
+    }
+}
diff --git a/src/query/service/src/interpreters/interpreter_password_policy_drop.rs b/src/query/service/src/interpreters/interpreter_password_policy_drop.rs
new file mode 100644
index 0000000000000..9aadd758012a0
--- /dev/null
+++ b/src/query/service/src/interpreters/interpreter_password_policy_drop.rs
@@ -0,0 +1,60 @@
+// Copyright 2021 Datafuse Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::sync::Arc;
+
+use common_exception::Result;
+use common_sql::plans::DropPasswordPolicyPlan;
+use common_users::UserApiProvider;
+use log::debug;
+
+use crate::interpreters::Interpreter;
+use crate::pipelines::PipelineBuildResult;
+use crate::sessions::QueryContext;
+use crate::sessions::TableContext;
+
+#[derive(Debug)]
+pub struct DropPasswordPolicyInterpreter {
+    ctx: Arc<QueryContext>,
+    plan: DropPasswordPolicyPlan,
+}
+
+impl DropPasswordPolicyInterpreter {
+    pub fn try_create(ctx: Arc<QueryContext>, plan: DropPasswordPolicyPlan) -> Result<Self> {
+        Ok(DropPasswordPolicyInterpreter { ctx, plan })
+    }
+}
+
+#[async_trait::async_trait]
+impl Interpreter for DropPasswordPolicyInterpreter {
+    fn name(&self) -> &str {
+        "DropPasswordPolicyInterpreter"
+    }
+
+    #[minitrace::trace]
+    #[async_backtrace::framed]
+    async fn execute2(&self) -> Result<PipelineBuildResult> {
+        debug!("ctx.id" = self.ctx.get_id().as_str(); "drop_password_policy_execute");
+
+        let plan = self.plan.clone();
+        let tenant = self.ctx.get_tenant();
+
+        let user_mgr = UserApiProvider::instance();
+        user_mgr
+            .drop_password_policy(&tenant, plan.name.as_str(), plan.if_exists)
+            .await?;
+
+        Ok(PipelineBuildResult::create())
+    }
+}
diff --git a/src/query/service/src/interpreters/mod.rs b/src/query/service/src/interpreters/mod.rs
index 890267e79a894..2bb3eb6bca57d 100644
--- a/src/query/service/src/interpreters/mod.rs
+++ b/src/query/service/src/interpreters/mod.rs
@@ -54,6 +54,11 @@ mod interpreter_network_policy_alter;
 mod interpreter_network_policy_create;
 mod interpreter_network_policy_desc;
 mod interpreter_network_policy_drop;
+mod interpreter_password_policies_show;
+mod interpreter_password_policy_alter;
+mod interpreter_password_policy_create;
+mod interpreter_password_policy_desc;
+mod interpreter_password_policy_drop;
 mod interpreter_presign;
 mod interpreter_privilege_grant;
 mod interpreter_privilege_revoke;
@@ -154,6 +159,11 @@ pub use interpreter_network_policy_alter::AlterNetworkPolicyInterpreter;
 pub use interpreter_network_policy_create::CreateNetworkPolicyInterpreter;
 pub use interpreter_network_policy_desc::DescNetworkPolicyInterpreter;
 pub use interpreter_network_policy_drop::DropNetworkPolicyInterpreter;
+pub use interpreter_password_policies_show::ShowPasswordPoliciesInterpreter;
+pub use interpreter_password_policy_alter::AlterPasswordPolicyInterpreter;
+pub use interpreter_password_policy_create::CreatePasswordPolicyInterpreter;
+pub use interpreter_password_policy_desc::DescPasswordPolicyInterpreter;
+pub use interpreter_password_policy_drop::DropPasswordPolicyInterpreter;
 pub use interpreter_privilege_grant::GrantPrivilegeInterpreter;
 pub use interpreter_privilege_revoke::RevokePrivilegeInterpreter;
 pub use interpreter_replace::ReplaceInterpreter;
diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs
index a5eb0f205eff3..8787c1b1d245c 100644
--- a/src/query/sql/src/planner/binder/binder.rs
+++ b/src/query/sql/src/planner/binder/binder.rs
@@ -556,6 +556,21 @@ impl<'a> Binder {
             Statement::ShowNetworkPolicies => {
                 self.bind_show_network_policies().await?
             }
+            Statement::CreatePasswordPolicy(stmt) => {
+                self.bind_create_password_policy(stmt).await?
+            }
+            Statement::AlterPasswordPolicy(stmt) => {
+                self.bind_alter_password_policy(stmt).await?
+            }
+            Statement::DropPasswordPolicy(stmt) => {
+                self.bind_drop_password_policy(stmt).await?
+            }
+            Statement::DescPasswordPolicy(stmt) => {
+                self.bind_desc_password_policy(stmt).await?
+            }
+            Statement::ShowPasswordPolicies => {
+                self.bind_show_password_policies().await?
+            }
             Statement::CreateTask(stmt) => {
                 self.bind_create_task(stmt).await?
             }
diff --git a/src/query/sql/src/planner/binder/ddl/mod.rs b/src/query/sql/src/planner/binder/ddl/mod.rs
index f20c2c98c2399..b38e29f239691 100644
--- a/src/query/sql/src/planner/binder/ddl/mod.rs
+++ b/src/query/sql/src/planner/binder/ddl/mod.rs
@@ -20,6 +20,7 @@ mod data_mask;
 mod database;
 mod index;
 mod network_policy;
+mod password_policy;
 mod role;
 mod share;
 mod stage;
diff --git a/src/query/sql/src/planner/binder/ddl/password_policy.rs b/src/query/sql/src/planner/binder/ddl/password_policy.rs
new file mode 100644
index 0000000000000..00b10adcda115
--- /dev/null
+++ b/src/query/sql/src/planner/binder/ddl/password_policy.rs
@@ -0,0 +1,287 @@
+// Copyright 2021 Datafuse Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use common_ast::ast::*;
+use common_exception::ErrorCode;
+use common_exception::Result;
+
+use crate::binder::Binder;
+use crate::plans::AlterPasswordPolicyPlan;
+use crate::plans::CreatePasswordPolicyPlan;
+use crate::plans::DescPasswordPolicyPlan;
+use crate::plans::DropPasswordPolicyPlan;
+use crate::plans::Plan;
+use crate::plans::ShowPasswordPoliciesPlan;
+
+impl Binder {
+    #[async_backtrace::framed]
+    pub(in crate::planner::binder) async fn bind_create_password_policy(
+        &mut self,
+        stmt: &CreatePasswordPolicyStmt,
+    ) -> Result<Plan> {
+        let CreatePasswordPolicyStmt {
+            if_not_exists,
+            name,
+            set_options,
+        } = stmt;
+
+        let min_length = set_options.min_length.unwrap_or(8);
+        let max_length = set_options.max_length.unwrap_or(256);
+        let min_upper_case_chars = set_options.min_upper_case_chars.unwrap_or(1);
+        let min_lower_case_chars = set_options.min_lower_case_chars.unwrap_or(1);
+        let min_numeric_chars = set_options.min_numeric_chars.unwrap_or(1);
+        let min_special_chars = set_options.min_special_chars.unwrap_or(1);
+        let min_age_days = set_options.min_age_days.unwrap_or(0);
+        let max_age_days = set_options.max_age_days.unwrap_or(90);
+        let max_retries = set_options.max_retries.unwrap_or(5);
+        let lockout_time_mins = set_options.lockout_time_mins.unwrap_or(15);
+        let history = set_options.history.unwrap_or(0);
+
+        check_password_min_length(min_length)?;
+        check_password_max_length(max_length)?;
+        check_password_min_upper_case_chars(min_upper_case_chars)?;
+        check_password_min_lower_case_chars(min_lower_case_chars)?;
+        check_password_min_numeric_chars(min_numeric_chars)?;
+        check_password_min_special_chars(min_special_chars)?;
+        check_password_min_age_days(min_age_days)?;
+        check_password_max_age_days(max_age_days)?;
+        check_password_max_retries(max_retries)?;
+        check_password_lockout_time_mins(lockout_time_mins)?;
+        check_password_history(history)?;
+
+        let comment = set_options.comment.clone().unwrap_or_default();
+
+        let tenant = self.ctx.get_tenant();
+        let plan = CreatePasswordPolicyPlan {
+            if_not_exists: *if_not_exists,
+            tenant,
+            name: name.to_string(),
+            min_length,
+            max_length,
+            min_upper_case_chars,
+            min_lower_case_chars,
+            min_numeric_chars,
+            min_special_chars,
+            min_age_days,
+            max_age_days,
+            max_retries,
+            lockout_time_mins,
+            history,
+            comment,
+        };
+        Ok(Plan::CreatePasswordPolicy(Box::new(plan)))
+    }
+
+    #[async_backtrace::framed]
+    pub(in crate::planner::binder) async fn bind_alter_password_policy(
+        &mut self,
+        stmt: &AlterPasswordPolicyStmt,
+    ) -> Result<Plan> {
+        let AlterPasswordPolicyStmt {
+            if_exists,
+            name,
+            action,
+        } = stmt;
+
+        if let AlterPasswordAction::SetOptions(set_options) = action {
+            if let Some(min_length) = set_options.min_length {
+                check_password_min_length(min_length)?;
+            }
+            if let Some(max_length) = set_options.max_length {
+                check_password_max_length(max_length)?;
+            }
+            if let Some(min_upper_case_chars) = set_options.min_upper_case_chars {
+                check_password_min_upper_case_chars(min_upper_case_chars)?;
+            }
+            if let Some(min_lower_case_chars) = set_options.min_lower_case_chars {
+                check_password_min_lower_case_chars(min_lower_case_chars)?;
+            }
+            if let Some(min_numeric_chars) = set_options.min_numeric_chars {
+                check_password_min_numeric_chars(min_numeric_chars)?;
+            }
+            if let Some(min_special_chars) = set_options.min_special_chars {
+                check_password_min_special_chars(min_special_chars)?;
+            }
+            if let Some(min_age_days) = set_options.min_age_days {
+                check_password_min_age_days(min_age_days)?;
+            }
+            if let Some(max_age_days) = set_options.max_age_days {
+                check_password_max_age_days(max_age_days)?;
+            }
+            if let Some(max_retries) = set_options.max_retries {
+                check_password_max_retries(max_retries)?;
+            }
+            if let Some(lockout_time_mins) = set_options.lockout_time_mins {
+                check_password_lockout_time_mins(lockout_time_mins)?;
+            }
+            if let Some(history) = set_options.history {
+                check_password_history(history)?;
+            }
+        }
+
+        let tenant = self.ctx.get_tenant();
+        let plan = AlterPasswordPolicyPlan {
+            if_exists: *if_exists,
+            tenant,
+            name: name.to_string(),
+            action: action.clone(),
+        };
+        Ok(Plan::AlterPasswordPolicy(Box::new(plan)))
+    }
+
+    #[async_backtrace::framed]
+    pub(in crate::planner::binder) async fn bind_drop_password_policy(
+        &mut self,
+        stmt: &DropPasswordPolicyStmt,
+    ) -> Result<Plan> {
+        let DropPasswordPolicyStmt { if_exists, name } = stmt;
+
+        let tenant = self.ctx.get_tenant();
+        let plan = DropPasswordPolicyPlan {
+            if_exists: *if_exists,
+            tenant,
+            name: name.to_string(),
+        };
+        Ok(Plan::DropPasswordPolicy(Box::new(plan)))
+    }
+
+    #[async_backtrace::framed]
+    pub(in crate::planner::binder) async fn bind_desc_password_policy(
+        &mut self,
+        stmt: &DescPasswordPolicyStmt,
+    ) -> Result<Plan> {
+        let DescPasswordPolicyStmt { name } = stmt;
+
+        let plan = DescPasswordPolicyPlan {
+            name: name.to_string(),
+        };
+        Ok(Plan::DescPasswordPolicy(Box::new(plan)))
+    }
+
+    #[async_backtrace::framed]
+    pub(in crate::planner::binder) async fn bind_show_password_policies(&mut self) -> Result<Plan> {
+        let plan = ShowPasswordPoliciesPlan {};
+        Ok(Plan::ShowPasswordPolicies(Box::new(plan)))
+    }
+}
+
+fn check_password_min_length(min_length: u64) -> Result<()> {
+    if !(8..=256).contains(&min_length) {
+        return Err(ErrorCode::SemanticError(format!(
+            "invalid min length, supported range: 8 to 256, but got {}",
+            min_length
+        )));
+    }
+    Ok(())
+}
+
+fn check_password_max_length(max_length: u64) -> Result<()> {
+    if !(8..=256).contains(&max_length) {
+        return Err(ErrorCode::SemanticError(format!(
+            "invalid max length, supported range: 8 to 256, but got {}",
+            max_length
+        )));
+    }
+    Ok(())
+}
+
+fn check_password_min_upper_case_chars(min_upper_case_chars: u64) -> Result<()> {
+    if min_upper_case_chars > 256 {
+        return Err(ErrorCode::SemanticError(format!(
+            "invalid min upper case chars, supported range: 0 to 256, but got {}",
+            min_upper_case_chars
+        )));
+    }
+    Ok(())
+}
+
+fn check_password_min_lower_case_chars(min_lower_case_chars: u64) -> Result<()> {
+    if min_lower_case_chars > 256 {
+        return Err(ErrorCode::SemanticError(format!(
+            "invalid min lower case chars, supported range: 0 to 256, but got {}",
+            min_lower_case_chars
+        )));
+    }
+    Ok(())
+}
+
+fn check_password_min_numeric_chars(min_numeric_chars: u64) -> Result<()> {
+    if min_numeric_chars > 256 {
+        return Err(ErrorCode::SemanticError(format!(
+            "invalid min numeric chars, supported range: 0 to 256, but got {}",
+            min_numeric_chars
+        )));
+    }
+    Ok(())
+}
+
+fn check_password_min_special_chars(min_special_chars: u64) -> Result<()> {
+    if min_special_chars > 256 {
+        return Err(ErrorCode::SemanticError(format!(
+            "invalid min special chars, supported range: 0 to 256, but got {}",
+            min_special_chars
+        )));
+    }
+    Ok(())
+}
+
+fn check_password_min_age_days(min_age_days: u64) -> Result<()> {
+    if min_age_days > 999 {
+        return Err(ErrorCode::SemanticError(format!(
+            "invalid min age days, supported range: 0 to 999, but got {}",
+            min_age_days
+        )));
+    }
+    Ok(())
+}
+
+fn check_password_max_age_days(max_age_days: u64) -> Result<()> {
+    if max_age_days > 999 {
+        return Err(ErrorCode::SemanticError(format!(
+            "invalid max age days, supported range: 0 to 999, but got {}",
+            max_age_days
+        )));
+    }
+    Ok(())
+}
+
+fn check_password_max_retries(max_retries: u64) -> Result<()> {
+    if !(1..=10).contains(&max_retries) {
+        return Err(ErrorCode::SemanticError(format!(
+            "invalid max retries, supported range: 1 to 10, but got {}",
+            max_retries
+        )));
+    }
+    Ok(())
+}
+
+fn check_password_lockout_time_mins(lockout_time_mins: u64) -> Result<()> {
+    if !(1..=999).contains(&lockout_time_mins) {
+        return Err(ErrorCode::SemanticError(format!(
+            "invalid lockout time mins, supported range: 1 to 999, but got {}",
+            lockout_time_mins
+        )));
+    }
+    Ok(())
+}
+
+fn check_password_history(history: u64) -> Result<()> {
+    if history > 24 {
+        return Err(ErrorCode::SemanticError(format!(
+            "invalid history, supported range: 0 to 24, but got {}",
+            history
+        )));
+    }
+    Ok(())
+}
diff --git a/src/query/sql/src/planner/format/display_plan.rs b/src/query/sql/src/planner/format/display_plan.rs
index 28493a9069ce9..92cb4288ca8e7 100644
--- a/src/query/sql/src/planner/format/display_plan.rs
+++ b/src/query/sql/src/planner/format/display_plan.rs
@@ -177,6 +177,13 @@ impl Plan {
             Plan::DescNetworkPolicy(_) => Ok("DescNetworkPolicy".to_string()),
             Plan::ShowNetworkPolicies(_) => Ok("ShowNetworkPolicies".to_string()),
 
+            // password policy
+            Plan::CreatePasswordPolicy(_) => Ok("CreatePasswordPolicy".to_string()),
+            Plan::AlterPasswordPolicy(_) => Ok("AlterPasswordPolicy".to_string()),
+            Plan::DropPasswordPolicy(_) => Ok("DropPasswordPolicy".to_string()),
+            Plan::DescPasswordPolicy(_) => Ok("DescPasswordPolicy".to_string()),
+            Plan::ShowPasswordPolicies(_) => Ok("ShowPasswordPolicies".to_string()),
+
             // task
             Plan::CreateTask(_) => Ok("CreateTask".to_string()),
             Plan::DropTask(_) => Ok("DropTask".to_string()),
diff --git a/src/query/sql/src/planner/plans/ddl/account.rs b/src/query/sql/src/planner/plans/ddl/account.rs
index a17d1c5a35050..056b0a8823ccf 100644
--- a/src/query/sql/src/planner/plans/ddl/account.rs
+++ b/src/query/sql/src/planner/plans/ddl/account.rs
@@ -12,6 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+use common_ast::ast::AlterPasswordAction;
 use common_expression::types::DataType;
 use common_expression::types::NumberDataType;
 use common_expression::DataField;
@@ -195,3 +196,85 @@ impl ShowNetworkPoliciesPlan {
         ])
     }
 }
+
+#[derive(Clone, Debug, PartialEq)]
+pub struct CreatePasswordPolicyPlan {
+    pub if_not_exists: bool,
+    pub tenant: String,
+    pub name: String,
+    pub min_length: u64,
+    pub max_length: u64,
+    pub min_upper_case_chars: u64,
+    pub min_lower_case_chars: u64,
+    pub min_numeric_chars: u64,
+    pub min_special_chars: u64,
+    pub min_age_days: u64,
+    pub max_age_days: u64,
+    pub max_retries: u64,
+    pub lockout_time_mins: u64,
+    pub history: u64,
+    pub comment: String,
+}
+
+impl CreatePasswordPolicyPlan {
+    pub fn schema(&self) -> DataSchemaRef {
+        DataSchemaRefExt::create(vec![])
+    }
+}
+
+#[derive(Clone, Debug, PartialEq)]
+pub struct AlterPasswordPolicyPlan {
+    pub if_exists: bool,
+    pub tenant: String,
+    pub name: String,
+    pub action: AlterPasswordAction,
+}
+
+impl AlterPasswordPolicyPlan {
+    pub fn schema(&self) -> DataSchemaRef {
+        DataSchemaRefExt::create(vec![])
+    }
+}
+
+#[derive(Clone, Debug, PartialEq)]
+pub struct DropPasswordPolicyPlan {
+    pub if_exists: bool,
+    pub tenant: String,
+    pub name: String,
+}
+
+impl DropPasswordPolicyPlan {
+    pub fn schema(&self) -> DataSchemaRef {
+        DataSchemaRefExt::create(vec![])
+    }
+}
+
+#[derive(Clone, Debug, PartialEq)]
+pub struct DescPasswordPolicyPlan {
+    pub name: String,
+}
+
+impl DescPasswordPolicyPlan {
+    pub fn schema(&self) -> DataSchemaRef {
+        DataSchemaRefExt::create(vec![
+            DataField::new("Property", DataType::String),
+            DataField::new("Value", DataType::String),
+            DataField::new("Default", DataType::Nullable(Box::new(DataType::String))),
+            DataField::new("Description", DataType::String),
+        ])
+    }
+}
+
+#[derive(Clone, Debug, PartialEq)]
+pub struct ShowPasswordPoliciesPlan {}
+
+impl ShowPasswordPoliciesPlan {
+    pub fn schema(&self) -> DataSchemaRef {
+        DataSchemaRefExt::create(vec![
+            DataField::new("Created on", DataType::Timestamp),
+            DataField::new("Name", DataType::String),
+            DataField::new("Comment", DataType::String),
+            DataField::new("Option", DataType::String),
+        ])
+    }
+}
diff --git a/src/query/sql/src/planner/plans/plan.rs b/src/query/sql/src/planner/plans/plan.rs
index 4dc958c43dba7..6e2e0ebe1bec2 100644
--- a/src/query/sql/src/planner/plans/plan.rs
+++ b/src/query/sql/src/planner/plans/plan.rs
@@ -29,6 +29,7 @@ use crate::optimizer::SExpr;
 use crate::plans::copy_into_location::CopyIntoLocationPlan;
 use crate::plans::AddTableColumnPlan;
 use crate::plans::AlterNetworkPolicyPlan;
+use crate::plans::AlterPasswordPolicyPlan;
 use crate::plans::AlterShareTenantsPlan;
 use crate::plans::AlterTableClusterKeyPlan;
 use crate::plans::AlterTaskPlan;
@@ -46,6 +47,7 @@ use crate::plans::CreateDatamaskPolicyPlan;
 use crate::plans::CreateFileFormatPlan;
 use crate::plans::CreateIndexPlan;
 use crate::plans::CreateNetworkPolicyPlan;
+use crate::plans::CreatePasswordPolicyPlan;
 use crate::plans::CreateRolePlan;
 use crate::plans::CreateShareEndpointPlan;
 use crate::plans::CreateSharePlan;
@@ -61,6 +63,7 @@ use crate::plans::DeletePlan;
 use crate::plans::DescConnectionPlan;
 use crate::plans::DescDatamaskPolicyPlan;
 use crate::plans::DescNetworkPolicyPlan;
+use crate::plans::DescPasswordPolicyPlan;
 use crate::plans::DescSharePlan;
 use crate::plans::DescribeTablePlan;
 use crate::plans::DescribeTaskPlan;
@@ -71,6 +74,7 @@ use crate::plans::DropDatamaskPolicyPlan;
 use crate::plans::DropFileFormatPlan;
 use crate::plans::DropIndexPlan;
 use crate::plans::DropNetworkPolicyPlan;
+use crate::plans::DropPasswordPolicyPlan;
 use crate::plans::DropRolePlan;
 use crate::plans::DropShareEndpointPlan;
 use crate::plans::DropSharePlan;
@@ -119,6 +123,7 @@ use crate::plans::ShowGrantTenantsOfSharePlan;
 use crate::plans::ShowGrantsPlan;
 use crate::plans::ShowNetworkPoliciesPlan;
 use crate::plans::ShowObjectGrantPrivilegesPlan;
+use crate::plans::ShowPasswordPoliciesPlan;
 use crate::plans::ShowRolesPlan;
 use crate::plans::ShowShareEndpointPlan;
 use crate::plans::ShowSharesPlan;
@@ -301,6 +306,13 @@ pub enum Plan {
     DescNetworkPolicy(Box<DescNetworkPolicyPlan>),
     ShowNetworkPolicies(Box<ShowNetworkPoliciesPlan>),
 
+    // Password policy
+    CreatePasswordPolicy(Box<CreatePasswordPolicyPlan>),
+    AlterPasswordPolicy(Box<AlterPasswordPolicyPlan>),
+    DropPasswordPolicy(Box<DropPasswordPolicyPlan>),
+    DescPasswordPolicy(Box<DescPasswordPolicyPlan>),
+    ShowPasswordPolicies(Box<ShowPasswordPoliciesPlan>),
+
     // Task
     CreateTask(Box<CreateTaskPlan>),
     AlterTask(Box<AlterTaskPlan>),
@@ -406,11 +418,10 @@ impl Plan {
             Plan::CreateDatamaskPolicy(plan) => plan.schema(),
             Plan::DropDatamaskPolicy(plan) => plan.schema(),
             Plan::DescDatamaskPolicy(plan) => plan.schema(),
-            Plan::CreateNetworkPolicy(plan) => plan.schema(),
-            Plan::AlterNetworkPolicy(plan) => plan.schema(),
-            Plan::DropNetworkPolicy(plan) => plan.schema(),
             Plan::DescNetworkPolicy(plan) => plan.schema(),
             Plan::ShowNetworkPolicies(plan) => plan.schema(),
+            Plan::DescPasswordPolicy(plan) => plan.schema(),
+            Plan::ShowPasswordPolicies(plan) => plan.schema(),
             Plan::CopyIntoTable(plan) => plan.schema(),
             Plan::MergeInto(plan) => plan.schema(),
             Plan::CreateTask(plan) => plan.schema(),
@@ -454,6 +465,8 @@ impl Plan {
                 | Plan::DescDatamaskPolicy(_)
                 | Plan::DescNetworkPolicy(_)
                 | Plan::ShowNetworkPolicies(_)
+                | Plan::DescPasswordPolicy(_)
+                | Plan::ShowPasswordPolicies(_)
                 | Plan::CopyIntoTable(_)
                 | Plan::ShowTasks(_)
                 | Plan::DescribeTask(_)
diff --git a/src/query/users/src/lib.rs b/src/query/users/src/lib.rs
index f3f605d43f031..9c51e5b842ebe 100644
--- a/src/query/users/src/lib.rs
+++ b/src/query/users/src/lib.rs
@@ -20,6 +20,7 @@ extern crate core;
 
 mod jwt;
 mod network_policy;
+mod password_policy;
 mod role_mgr;
 mod user;
 mod user_api;
diff --git a/src/query/users/src/password_policy.rs b/src/query/users/src/password_policy.rs
new file mode 100644
index 0000000000000..af3b24ef86098
--- /dev/null
+++ b/src/query/users/src/password_policy.rs
@@ -0,0 +1,195 @@
+// Copyright 2021 Datafuse Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use chrono::Utc;
+use common_exception::ErrorCode;
+use common_exception::Result;
+use common_management::PasswordPolicyApi;
+use common_meta_app::principal::PasswordPolicy;
+use common_meta_types::MatchSeq;
+
+use crate::UserApiProvider;
+
+impl UserApiProvider {
+    // Add a new password policy.
+    #[async_backtrace::framed]
+    pub async fn add_password_policy(
+        &self,
+        tenant: &str,
+        password_policy: PasswordPolicy,
+        if_not_exists: bool,
+    ) -> Result<u64> {
+        if if_not_exists
+            && self
+                .exists_password_policy(tenant, password_policy.name.as_str())
+                .await?
+        {
+            return Ok(0);
+        }
+
+        let client = self.get_password_policy_api_client(tenant)?;
+        let add_password_policy = client.add_password_policy(password_policy);
+        match add_password_policy.await {
+            Ok(res) => Ok(res),
+            Err(e) => {
+                if if_not_exists && e.code() == ErrorCode::PASSWORD_POLICY_ALREADY_EXISTS {
+                    Ok(0)
+                } else {
+                    Err(e.add_message_back("(while add password policy)"))
+                }
+            }
+        }
+    }
+
+    // Update password policy.
+    #[async_backtrace::framed]
+    #[allow(clippy::too_many_arguments)]
+    pub async fn update_password_policy(
+        &self,
+        tenant: &str,
+        name: &str,
+        min_length: Option<u64>,
+        max_length: Option<u64>,
+        min_upper_case_chars: Option<u64>,
+        min_lower_case_chars: Option<u64>,
+        min_numeric_chars: Option<u64>,
+        min_special_chars: Option<u64>,
+        min_age_days: Option<u64>,
+        max_age_days: Option<u64>,
+        max_retries: Option<u64>,
+        lockout_time_mins: Option<u64>,
+        history: Option<u64>,
+        comment: Option<String>,
+        if_exists: bool,
+    ) -> Result<Option<u64>> {
+        let client = self.get_password_policy_api_client(tenant)?;
+        let seq_password_policy = match client.get_password_policy(name, MatchSeq::GE(0)).await {
+            Ok(seq_password_policy) => seq_password_policy,
+            Err(e) => {
+                if if_exists && e.code() == ErrorCode::UNKNOWN_PASSWORD_POLICY {
+                    return Ok(None);
+                } else {
+                    return Err(e.add_message_back(" (while alter password policy)"));
+                }
+            }
+        };
+
+        let seq = seq_password_policy.seq;
+        let mut password_policy = seq_password_policy.data;
+        if let Some(min_length) = min_length {
+            password_policy.min_length = min_length;
+        }
+        if let Some(max_length) = max_length {
+            password_policy.max_length = max_length;
+        }
+        if let Some(min_upper_case_chars) = min_upper_case_chars {
+            password_policy.min_upper_case_chars = min_upper_case_chars;
+        }
+        if let Some(min_lower_case_chars) = min_lower_case_chars {
+            password_policy.min_lower_case_chars = min_lower_case_chars;
+        }
+        if let Some(min_numeric_chars) = min_numeric_chars {
+            password_policy.min_numeric_chars = min_numeric_chars;
+        }
+        if let Some(min_special_chars) = min_special_chars {
+            password_policy.min_special_chars = min_special_chars;
+        }
+        if let Some(min_age_days) = min_age_days {
+            password_policy.min_age_days = min_age_days;
+        }
+        if let Some(max_age_days) = max_age_days {
+            password_policy.max_age_days = max_age_days;
+        }
+        if let Some(max_retries) = max_retries {
+            password_policy.max_retries = max_retries;
+        }
+        if let Some(lockout_time_mins) = lockout_time_mins {
+            password_policy.lockout_time_mins = lockout_time_mins;
+        }
+        if let Some(history) = history {
+            password_policy.history = history;
+        }
+        if let Some(comment) = comment {
+            password_policy.comment = comment;
+        }
+        password_policy.update_on = Some(Utc::now());
+
+        match client
+            .update_password_policy(password_policy, MatchSeq::Exact(seq))
+            .await
+        {
+            Ok(res) => Ok(Some(res)),
+            Err(e) => Err(e.add_message_back(" (while alter password policy).")),
+        }
+    }
+
+    // Drop a password policy by name.
+    #[async_backtrace::framed]
+    pub async fn drop_password_policy(
+        &self,
+        tenant: &str,
+        name: &str,
+        if_exists: bool,
+    ) -> Result<()> {
+        // TODO check used by user
+        let client = self.get_password_policy_api_client(tenant)?;
+        match client.drop_password_policy(name, MatchSeq::GE(1)).await {
+            Ok(res) => Ok(res),
+            Err(e) => {
+                if if_exists && e.code() == ErrorCode::UNKNOWN_PASSWORD_POLICY {
+                    Ok(())
+                } else {
+                    Err(e.add_message_back(" (while drop password policy)"))
+                }
+            }
+        }
+    }
+
+    // Check whether a password policy is exist.
+    #[async_backtrace::framed]
+    pub async fn exists_password_policy(&self, tenant: &str, name: &str) -> Result<bool> {
+        match self.get_password_policy(tenant, name).await {
+            Ok(_) => Ok(true),
+            Err(e) => {
+                if e.code() == ErrorCode::UNKNOWN_PASSWORD_POLICY {
+                    Ok(false)
+                } else {
+                    Err(e)
+                }
+            }
+        }
+    }
+
+    // Get a password_policy by tenant.
+    #[async_backtrace::framed]
+    pub async fn get_password_policy(&self, tenant: &str, name: &str) -> Result<PasswordPolicy> {
+        let client = self.get_password_policy_api_client(tenant)?;
+        let password_policy = client
+            .get_password_policy(name, MatchSeq::GE(0))
+            .await?
+            .data;
+        Ok(password_policy)
+    }
+
+    // Get all password policies by tenant.
+    #[async_backtrace::framed]
+    pub async fn get_password_policies(&self, tenant: &str) -> Result<Vec<PasswordPolicy>> {
+        let client = self.get_password_policy_api_client(tenant)?;
+        let password_policies = client
+            .get_password_policies()
+            .await
+            .map_err(|e| e.add_message_back(" (while get password policies)."))?;
+        Ok(password_policies)
+    }
+}
diff --git a/src/query/users/src/user_api.rs b/src/query/users/src/user_api.rs
index f41d61d3c57fa..9bd262056836b 100644
--- a/src/query/users/src/user_api.rs
+++ b/src/query/users/src/user_api.rs
@@ -24,6 +24,8 @@ use common_management::FileFormatApi;
 use common_management::FileFormatMgr;
 use common_management::NetworkPolicyApi;
 use common_management::NetworkPolicyMgr;
+use common_management::PasswordPolicyApi;
+use common_management::PasswordPolicyMgr;
 use common_management::QuotaApi;
 use common_management::QuotaMgr;
 use common_management::RoleApi;
@@ -139,6 +141,16 @@ impl UserApiProvider {
         )?))
     }
 
+    pub fn get_password_policy_api_client(
+        &self,
+        tenant: &str,
+    ) -> Result<Arc<impl PasswordPolicyApi>> {
+        Ok(Arc::new(PasswordPolicyMgr::create(
+            self.client.clone(),
+            tenant,
+        )?))
+    }
+
     pub fn get_meta_store_client(&self) -> Arc<MetaStore> {
         Arc::new(self.meta.clone())
     }