diff --git a/app/custom_domain_utils.py b/app/custom_domain_utils.py index 6724a47a9..3ba36d514 100644 --- a/app/custom_domain_utils.py +++ b/app/custom_domain_utils.py @@ -3,13 +3,21 @@ from dataclasses import dataclass from enum import Enum -from typing import List, Optional +from typing import List, Optional, Type from app.config import JOB_DELETE_DOMAIN from app.db import Session from app.email_utils import get_email_domain_part from app.log import LOG -from app.models import User, CustomDomain, SLDomain, Mailbox, Job, DomainMailbox +from app.models import ( + User, + CustomDomain, + SLDomain, + Mailbox, + Job, + DomainMailbox, + ModelMixin, +) from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction _ALLOWED_DOMAIN_REGEX = re.compile(r"^(?!-)[A-Za-z0-9-]{1,63}(? str: return new_domain -def can_domain_be_used(user: User, domain: str) -> Optional[CannotUseDomainReason]: +def can_domain_be_used( + user: User, domain: str, model_type: Type[ModelMixin] +) -> Optional[CannotUseDomainReason]: if not is_valid_domain(domain): return CannotUseDomainReason.InvalidDomain elif SLDomain.get_by(domain=domain): return CannotUseDomainReason.BuiltinDomain - elif CustomDomain.get_by(domain=domain): + elif model_type.get_by(domain=domain): return CannotUseDomainReason.DomainAlreadyUsed elif get_email_domain_part(user.email) == domain: return CannotUseDomainReason.DomainPartOfUserEmail @@ -116,7 +126,7 @@ def create_custom_domain( ) new_domain = sanitize_domain(domain) - domain_forbidden_cause = can_domain_be_used(user, new_domain) + domain_forbidden_cause = can_domain_be_used(user, new_domain, CustomDomain) if domain_forbidden_cause: return CreateCustomDomainResult( message=domain_forbidden_cause.message(new_domain), message_category="error" diff --git a/app/dashboard/views/setting.py b/app/dashboard/views/setting.py index 6fcec6f26..4c32af90a 100644 --- a/app/dashboard/views/setting.py +++ b/app/dashboard/views/setting.py @@ -20,6 +20,7 @@ ALIAS_RANDOM_SUFFIX_LENGTH, CONNECT_WITH_PROTON, ) +from app.custom_domain_utils import sanitize_domain, can_domain_be_used from app.dashboard.base import dashboard_bp from app.db import Session from app.errors import ProtonPartnerNotSetUp @@ -40,6 +41,7 @@ PartnerUser, PartnerSubscription, UnsubscribeBehaviourEnum, + BlockedDomain, ) from app.proton.proton_partner import get_proton_partner from app.proton.proton_unlink import can_unlink_proton_account @@ -99,6 +101,12 @@ def setting(): else: pending_email = None + blocked_domains = ( + BlockedDomain.filter_by(user_id=current_user.id) + .order_by(BlockedDomain.created_at.desc()) + .all() + ) + if request.method == "POST": if not csrf_form.validate(): flash("Invalid request", "warning") @@ -289,6 +297,33 @@ def setting(): Session.commit() flash("Your preference has been updated", "success") return redirect(url_for("dashboard.setting")) + elif request.form.get("form-name") == "blocked-domains-add": + domain = request.form.get("domain-name") + new_domain = sanitize_domain(domain) + domain_forbidden_cause = can_domain_be_used( + current_user, new_domain, BlockedDomain + ) + + if domain_forbidden_cause: + flash(domain_forbidden_cause.message(new_domain), "error") + return redirect(url_for("dashboard.setting")) + + BlockedDomain.create(user_id=current_user.id, domain=new_domain) + + Session.commit() + flash(f"Added blocked domain [{domain}]", "success") + return redirect(url_for("dashboard.setting")) + elif request.form.get("form-name") == "blocked-domains-remove": + domain_id = request.form.get("domain_id") + domain_name = request.form.get("domain_name") + + domain = BlockedDomain.get(domain_id) + + BlockedDomain.delete(domain.id) + + Session.commit() + flash(f"Deleted blocked domain [{domain_name}]", "success") + return redirect(url_for("dashboard.setting")) manual_sub = ManualSubscription.get_by(user_id=current_user.id) apple_sub = AppleSubscription.get_by(user_id=current_user.id) @@ -325,4 +360,5 @@ def setting(): connect_with_proton=CONNECT_WITH_PROTON, proton_linked_account=proton_linked_account, can_unlink_proton_account=can_unlink_proton_account(current_user), + blocked_domains=blocked_domains, ) diff --git a/app/models.py b/app/models.py index 3032ba550..c69c078a0 100644 --- a/app/models.py +++ b/app/models.py @@ -446,6 +446,8 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle): default_mailbox = orm.relationship("Mailbox", foreign_keys=[default_mailbox_id]) + _blocked_domains = orm.relationship("BlockedDomain", lazy="joined") + # user can set a more strict max_spam score to block spams more aggressively max_spam_score = sa.Column(sa.Integer, nullable=True) @@ -1205,6 +1207,15 @@ def has_used_alias_from_partner(self) -> bool: > 0 ) + def is_domain_blocked(self, domain: str) -> bool: + """checks whether the provided domain is blocked for a given user""" + domain_names = [] + + for blocked_domain in BlockedDomain.filter_by(user_id=self.id): + domain_names.append(blocked_domain.domain) + + return any(domain in domain_name for domain_name in domain_names) + def __repr__(self): return f"" @@ -3090,6 +3101,20 @@ class DomainMailbox(Base, ModelMixin): ) +class BlockedDomain(Base, ModelMixin): + """store the blocked domains for a user""" + + __tablename__ = "blocked_domain" + + __table_args__ = ( + sa.UniqueConstraint("user_id", "domain", name="uq_blocked_domain"), + ) + + domain = sa.Column(sa.String(128), nullable=False) + + user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False) + + _NB_RECOVERY_CODE = 8 _RECOVERY_CODE_LENGTH = 8 diff --git a/email_handler.py b/email_handler.py index f732fb735..87ab0a03b 100644 --- a/email_handler.py +++ b/email_handler.py @@ -582,6 +582,15 @@ def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str handle_email_sent_to_ourself(alias, addr, msg, user) return [(True, status.E209)] + mail_from_domain = get_email_domain_part(mail_from) + if user.is_domain_blocked(mail_from_domain): + # by default return 2** instead of 5** to allow user to receive emails again when domain is unblocked + res_status = status.E200 + if user.block_behaviour == BlockBehaviourEnum.return_5xx: + res_status = status.E502 + + return [(True, res_status)] + from_header = get_header_unicode(msg[headers.FROM]) LOG.d("Create or get contact for from_header:%s", from_header) contact = get_or_create_contact(from_header, envelope.mail_from, alias) diff --git a/migrations/versions/2024_121017_32696502b574_.py b/migrations/versions/2024_121017_32696502b574_.py new file mode 100644 index 000000000..6efaa377b --- /dev/null +++ b/migrations/versions/2024_121017_32696502b574_.py @@ -0,0 +1,38 @@ +"""empty message + +Revision ID: 32696502b574 +Revises: 085f77996ce3 +Create Date: 2024-12-10 17:57:01.043030 + +""" +import sqlalchemy_utils +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '32696502b574' +down_revision = '085f77996ce3' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('blocked_domain', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('created_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=False), + sa.Column('updated_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=True), + sa.Column('domain', sa.String(length=128), nullable=False), + sa.Column('user_id', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['user_id'], ['users.id'], ondelete='cascade'), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('user_id', 'domain', name='uq_blocked_domain') + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('blocked_domain') + # ### end Alembic commands ### diff --git a/templates/dashboard/setting.html b/templates/dashboard/setting.html index bef06dccc..c40a17e09 100644 --- a/templates/dashboard/setting.html +++ b/templates/dashboard/setting.html @@ -545,6 +545,56 @@ + +
+
+
Blocked domains
+
You can add domains to prevent emails forwarded from them to your Mailbox.
+
+

New domain

+
+ {{ csrf_form.csrf_token }} + +
+ +
+ +
+ {% if blocked_domains | length > 0 %} + +
+
+

Blocked domains

+
+ {% for blocked_domain in blocked_domains %} + +
+
+
+
+
{{ blocked_domain.domain }}
+
+
+
+ {{ csrf_form.csrf_token }} + + + + +
+
+
+
+
+ {% endfor %} +
+
+ {% endif %} +
+
+ {% endblock %} {% block script %} diff --git a/tests/test_custom_domain_utils.py b/tests/test_custom_domain_utils.py index ab8c55519..001dfec2e 100644 --- a/tests/test_custom_domain_utils.py +++ b/tests/test_custom_domain_utils.py @@ -12,7 +12,7 @@ CannotSetCustomDomainMailboxesCause, ) from app.db import Session -from app.models import User, CustomDomain, Mailbox, DomainMailbox +from app.models import User, CustomDomain, Mailbox, DomainMailbox, BlockedDomain from tests.utils import create_new_user, random_string, random_domain from tests.utils import get_proton_partner, random_email @@ -50,42 +50,49 @@ def test_is_valid_domain(): # can_domain_be_used def test_can_domain_be_used(): domain = f"{random_string(10)}.com" - res = can_domain_be_used(user, domain) + res = can_domain_be_used(user, domain, CustomDomain) assert res is None def test_can_domain_be_used_existing_domain(): domain = random_domain() CustomDomain.create(user_id=user.id, domain=domain, commit=True) - res = can_domain_be_used(user, domain) + res = can_domain_be_used(user, domain, CustomDomain) assert res is CannotUseDomainReason.DomainAlreadyUsed def test_can_domain_be_used_sl_domain(): domain = ALIAS_DOMAINS[0] - res = can_domain_be_used(user, domain) + res = can_domain_be_used(user, domain, CustomDomain) assert res is CannotUseDomainReason.BuiltinDomain def test_can_domain_be_used_domain_of_user_email(): domain = user.email.split("@")[1] - res = can_domain_be_used(user, domain) + res = can_domain_be_used(user, domain, CustomDomain) assert res is CannotUseDomainReason.DomainPartOfUserEmail def test_can_domain_be_used_domain_of_existing_mailbox(): domain = random_domain() Mailbox.create(user_id=user.id, email=f"email@{domain}", verified=True, commit=True) - res = can_domain_be_used(user, domain) + res = can_domain_be_used(user, domain, CustomDomain) assert res is CannotUseDomainReason.DomainUserInMailbox def test_can_domain_be_used_invalid_domain(): domain = f"{random_string(10)}@lol.com" - res = can_domain_be_used(user, domain) + res = can_domain_be_used(user, domain, CustomDomain) assert res is CannotUseDomainReason.InvalidDomain +def test_can_blocked_domain_be_used_existing_domain(): + domain = random_domain() + BlockedDomain.create(user_id=user.id, domain=domain, commit=True) + res = can_domain_be_used(user, domain, BlockedDomain) + assert res is CannotUseDomainReason.DomainAlreadyUsed + + # sanitize_domain def test_can_sanitize_domain_empty(): assert sanitize_domain("") == "" diff --git a/tests/test_domains.py b/tests/test_domains.py index 58c28e355..f9702fd2c 100644 --- a/tests/test_domains.py +++ b/tests/test_domains.py @@ -1,5 +1,5 @@ from app.db import Session -from app.models import SLDomain, PartnerUser, AliasOptions +from app.models import SLDomain, PartnerUser, AliasOptions, BlockedDomain from app.proton.proton_partner import get_proton_partner from init_app import add_sl_domains from tests.utils import create_new_user, random_token @@ -7,6 +7,7 @@ def setup_module(): Session.query(SLDomain).delete() + Session.query(BlockedDomain).delete() SLDomain.create( domain="hidden", premium_only=False, flush=True, order=5, hidden=True ) @@ -33,6 +34,7 @@ def setup_module(): def teardown_module(): Session.query(SLDomain).delete() + Session.query(BlockedDomain).delete() add_sl_domains() @@ -227,3 +229,16 @@ def test_get_free_partner_and_premium_partner(): assert [d.domain for d in domains] == user.available_sl_domains( alias_options=options ) + + +def test_domain_blocked_for_user(): + user = create_new_user() + BlockedDomain.create( + user_id=user.id, + domain="example.com", + flush=True, + ) + Session.flush() + + assert user.is_domain_blocked("example.com") + assert not user.is_domain_blocked("some-other-example.com") diff --git a/tests/test_email_handler.py b/tests/test_email_handler.py index e77a97144..546c0f6f5 100644 --- a/tests/test_email_handler.py +++ b/tests/test_email_handler.py @@ -21,6 +21,8 @@ VerpType, Contact, SentAlert, + BlockedDomain, + BlockBehaviourEnum, ) from app.utils import random_string, canonicalize_email from email_handler import ( @@ -400,3 +402,36 @@ def test_not_send_to_pending_to_delete_users(flask_client): msg = EmailMessage() result = email_handler.handle(envelope, msg) assert result == status.E504 + + +def test_blocked_domain(flask_client): + user = create_new_user() + alias = Alias.create_new_random(user) + BlockedDomain.create( + user_id=user.id, + domain="rainbow.com", + commit=True, + ) + msg = load_eml_file("reference_encoded.eml") + envelope = Envelope() + envelope.mail_from = "somewhere@rainbow.com" + envelope.rcpt_tos = [alias.email] + result = email_handler.handle(envelope, msg) + assert result == status.E200 + + +def test_blocked_domain_reject_behaviour(flask_client): + user = create_new_user() + user.block_behaviour = BlockBehaviourEnum.return_5xx + alias = Alias.create_new_random(user) + BlockedDomain.create( + user_id=user.id, + domain="rainbow.com", + commit=True, + ) + msg = load_eml_file("reference_encoded.eml") + envelope = Envelope() + envelope.mail_from = "somewhere@rainbow.com" + envelope.rcpt_tos = [alias.email] + result = email_handler.handle(envelope, msg) + assert result == status.E502