diff --git a/app/blocked_domain_utils.py b/app/blocked_domain_utils.py new file mode 100644 index 000000000..93c5b6fd7 --- /dev/null +++ b/app/blocked_domain_utils.py @@ -0,0 +1,6 @@ +from app.models import BlockedDomain + + +def is_domain_blocked(user_id: int, domain: str) -> bool: + """checks whether the provided domain is blocked for a given user""" + return BlockedDomain.filter_by(user_id=user_id, domain=domain).first() is not None diff --git a/app/custom_domain_utils.py b/app/custom_domain_utils.py index 3eb4c111b..cbbc0fd52 100644 --- a/app/custom_domain_utils.py +++ b/app/custom_domain_utils.py @@ -1,15 +1,24 @@ -import arrow import re - from dataclasses import dataclass from enum import Enum from typing import List, Optional +import arrow + from app.constants import JobType from app.db import Session from app.email_utils import get_email_domain_part from app.log import LOG -from app.models import User, CustomDomain, SLDomain, Mailbox, Job, DomainMailbox, Alias +from app.models import ( + User, + CustomDomain, + SLDomain, + Mailbox, + Job, + DomainMailbox, + Alias, + BlockedDomain, +) from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction _ALLOWED_DOMAIN_REGEX = re.compile(r"^(?!-)[A-Za-z0-9-]{1,63}(? Optional[CannotUseDomainReaso return CannotUseDomainReason.InvalidDomain elif SLDomain.get_by(domain=domain): return CannotUseDomainReason.BuiltinDomain - elif CustomDomain.get_by(domain=domain): - return CannotUseDomainReason.DomainAlreadyUsed elif get_email_domain_part(user.email) == domain: return CannotUseDomainReason.DomainPartOfUserEmail elif Mailbox.filter( @@ -106,6 +113,34 @@ def can_domain_be_used(user: User, domain: str) -> Optional[CannotUseDomainReaso return None +def can_custom_domain_be_used( + user: User, domain: str +) -> Optional[CannotUseDomainReason]: + reason = can_domain_be_used(user, domain) + + if reason is not None: + return reason + + if CustomDomain.get_by(domain=domain): + return CannotUseDomainReason.DomainAlreadyUsed + else: + return None + + +def can_blocked_domain_be_used( + user: User, domain: str +) -> Optional[CannotUseDomainReason]: + reason = can_domain_be_used(user, domain) + + if reason is not None: + return reason + + if BlockedDomain.get_by(user_id=user.id, domain=domain): + return CannotUseDomainReason.DomainAlreadyUsed + else: + return None + + def create_custom_domain( user: User, domain: str, partner_id: Optional[int] = None ) -> CreateCustomDomainResult: @@ -116,7 +151,7 @@ def create_custom_domain( ) new_domain = sanitize_domain(domain) - domain_forbidden_cause = can_domain_be_used(user, new_domain) + domain_forbidden_cause = can_custom_domain_be_used(user, new_domain) if domain_forbidden_cause: return CreateCustomDomainResult( message=domain_forbidden_cause.message(new_domain), message_category="error" diff --git a/app/dashboard/views/setting.py b/app/dashboard/views/setting.py index 7fa1d3101..d99820a47 100644 --- a/app/dashboard/views/setting.py +++ b/app/dashboard/views/setting.py @@ -20,6 +20,7 @@ ALIAS_RANDOM_SUFFIX_LENGTH, CONNECT_WITH_PROTON, ) +from app.custom_domain_utils import sanitize_domain, can_blocked_domain_be_used from app.dashboard.base import dashboard_bp from app.db import Session from app.extensions import limiter @@ -39,6 +40,7 @@ PartnerSubscription, UnsubscribeBehaviourEnum, UserAliasDeleteAction, + BlockedDomain, ) from app.proton.proton_unlink import can_unlink_proton_account from app.utils import ( @@ -81,6 +83,12 @@ def setting(): else: pending_email = None + blocked_domains = ( + BlockedDomain.filter_by(user_id=current_user.id) + .order_by(BlockedDomain.created_at.desc()) + .all() + ) + if request.method == "POST": if not csrf_form.validate(): flash("Invalid request", "warning") @@ -284,6 +292,63 @@ def setting(): return redirect(url_for("dashboard.setting")) Session.commit() flash("Your preference has been updated", "success") + elif request.form.get("form-name") == "blocked-domains-add": + domain = request.form.get("domain-name") + + if not domain: + flash("Domain name is required", "error") + return redirect(url_for("dashboard.setting")) + + new_domain = sanitize_domain(domain) + domain_forbidden_cause = can_blocked_domain_be_used( + current_user, new_domain + ) + + if domain_forbidden_cause: + flash(domain_forbidden_cause.message(new_domain), "error") + return redirect(url_for("dashboard.setting")) + + BlockedDomain.create(user_id=current_user.id, domain=new_domain) + + LOG.i( + f"A new blocked domain [{new_domain}] was added for user [{current_user.id}]" + ) + + Session.commit() + flash(f"Added blocked domain [{domain}]", "success") + return redirect(url_for("dashboard.setting")) + elif request.form.get("form-name") == "blocked-domains-remove": + domain_id = request.form.get("domain_id") + domain_name = request.form.get("domain_name") + + if not domain_id: + flash("Domain Id is missing", "error") + return redirect(url_for("dashboard.setting")) + + if not domain_name: + flash("Domain Name is missing", "error") + return redirect(url_for("dashboard.setting")) + + domain = BlockedDomain.get(domain_id) + if not domain or domain.user_id != current_user.id: + LOG.e( + f"Blocked domain with id [{domain_id}] not found or not owned by user [{current_user.id}] therefore couldn't be deleted" + ) + flash( + "Blocked domain not found or the user doesn't have access to it", + "error", + ) + return redirect(url_for("dashboard.setting")) + + BlockedDomain.delete(domain.id) + + LOG.i( + f"A blocked domain [{domain}] was deleted by user [{current_user.id}]" + ) + + Session.commit() + flash(f"Deleted blocked domain [{domain_name}]", "success") + return redirect(url_for("dashboard.setting")) manual_sub = ManualSubscription.get_by(user_id=current_user.id) apple_sub = AppleSubscription.get_by(user_id=current_user.id) @@ -318,4 +383,5 @@ def setting(): ALIAS_RAND_SUFFIX_LENGTH=ALIAS_RANDOM_SUFFIX_LENGTH, connect_with_proton=CONNECT_WITH_PROTON, can_unlink_proton_account=can_unlink_proton_account(current_user), + blocked_domains=blocked_domains, ) diff --git a/app/models.py b/app/models.py index 581b52f6b..bdfc7a2aa 100644 --- a/app/models.py +++ b/app/models.py @@ -503,6 +503,8 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle): default_mailbox = orm.relationship("Mailbox", foreign_keys=[default_mailbox_id]) + _blocked_domains = orm.relationship("BlockedDomain") + # user can set a more strict max_spam score to block spams more aggressively max_spam_score = sa.Column(sa.Integer, nullable=True) @@ -3252,6 +3254,20 @@ class DomainMailbox(Base, ModelMixin): ) +class BlockedDomain(Base, ModelMixin): + """store the blocked domains for a user""" + + __tablename__ = "blocked_domain" + + __table_args__ = ( + sa.UniqueConstraint("user_id", "domain", name="uq_blocked_domain"), + ) + + domain = sa.Column(sa.String(128), nullable=False) + + user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False) + + _NB_RECOVERY_CODE = 8 _RECOVERY_CODE_LENGTH = 8 diff --git a/email_handler.py b/email_handler.py index 437b177fe..e40fe8a1e 100644 --- a/email_handler.py +++ b/email_handler.py @@ -61,6 +61,7 @@ change_alias_status, get_alias_recipient_name, ) +from app.blocked_domain_utils import is_domain_blocked from app.config import ( EMAIL_DOMAIN, URL, @@ -613,6 +614,21 @@ def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str handle_email_sent_to_ourself(alias, addr, msg, user) return [(True, status.E209)] + mail_from_domain = get_email_domain_part(mail_from) + if is_domain_blocked(user.id, mail_from_domain): + LOG.i( + f"Email [{mail_from}] was ignored for the user [{user.id}] because of a blocked domain [{mail_from_domain}]" + ) + # by default return 2** instead of 5** to allow user to receive emails again when domain is unblocked + res_status = status.E200 + if user.block_behaviour == BlockBehaviourEnum.return_5xx: + LOG.i( + f"Email [{mail_from}] was rejected for the user [{user.id}] because of a blocked domain [{mail_from_domain}]" + ) + res_status = status.E502 + + return [(True, res_status)] + from_header = get_header_unicode(msg[headers.FROM]) LOG.d("Create or get contact for from_header:%s", from_header) contact = get_or_create_contact(from_header, envelope.mail_from, alias) diff --git a/migrations/versions/2025_101020_fdb1b61d9bdb_.py b/migrations/versions/2025_101020_fdb1b61d9bdb_.py new file mode 100644 index 000000000..486c1b633 --- /dev/null +++ b/migrations/versions/2025_101020_fdb1b61d9bdb_.py @@ -0,0 +1,37 @@ +"""empty message + +Revision ID: c18048c40ed9 +Revises: 3ee37864eb67 +Create Date: 2025-10-10 20:29:32.701784 + +""" +import sqlalchemy as sa +import sqlalchemy_utils +from alembic import op + +# revision identifiers, used by Alembic. +revision = 'c18048c40ed9' +down_revision = '3ee37864eb67' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('blocked_domain', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('created_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=False), + sa.Column('updated_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=True), + sa.Column('domain', sa.String(length=128), nullable=False), + sa.Column('user_id', sa.Integer(), nullable=False), + sa.ForeignKeyConstraint(['user_id'], ['users.id'], ondelete='cascade'), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('user_id', 'domain', name='uq_blocked_domain') + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('blocked_domain') + # ### end Alembic commands ### diff --git a/templates/dashboard/setting.html b/templates/dashboard/setting.html index 60a68796f..691a96c12 100644 --- a/templates/dashboard/setting.html +++ b/templates/dashboard/setting.html @@ -538,6 +538,56 @@ + +
+
+
Blocked domains
+
You can add domains to prevent emails forwarded from them to your Mailbox.
+
+

New domain

+
+ {{ csrf_form.csrf_token }} + +
+ +
+ +
+ {% if blocked_domains | length > 0 %} + +
+
+

Blocked domains

+
+ {% for blocked_domain in blocked_domains %} + +
+
+
+
+
{{ blocked_domain.domain }}
+
+
+
+ {{ csrf_form.csrf_token }} + + + + +
+
+
+
+
+ {% endfor %} +
+
+ {% endif %} +
+
+ {% endblock %} {% block script %} diff --git a/tests/dashboard/test_setting.py b/tests/dashboard/test_setting.py index 20accbf93..6c236f541 100644 --- a/tests/dashboard/test_setting.py +++ b/tests/dashboard/test_setting.py @@ -1,7 +1,10 @@ +from unittest.mock import patch + from flask import url_for from app import config -from app.models import EmailChange +from app.db import Session +from app.models import EmailChange, BlockedDomain from app.utils import canonicalize_email from tests.utils import login, random_email, create_new_user @@ -26,3 +29,159 @@ def test_setup_done(flask_client): assert email_change is not None assert email_change.new_email == canonicalize_email(noncanonical_email) config.SKIP_MX_LOOKUP_ON_CHECK = False + + +def test_add_blocked_domain_none(flask_client): + user = create_new_user() + login(flask_client, user) + + # Missing domain-name + r = flask_client.post( + url_for("dashboard.setting"), + data={ + "form-name": "blocked-domains-add", + }, + follow_redirects=True, + ) + assert r.status_code == 200 + assert BlockedDomain.filter_by(user_id=user.id).count() == 0 + + +def test_add_blocked_domain_empty(flask_client): + user = create_new_user() + login(flask_client, user) + + # Empty domain-name + r = flask_client.post( + url_for("dashboard.setting"), + data={ + "form-name": "blocked-domains-add", + "domain-name": "", + }, + follow_redirects=True, + ) + assert r.status_code == 200 + assert BlockedDomain.filter_by(user_id=user.id).count() == 0 + + +def test_add_blocked_domain_success(flask_client): + user = create_new_user() + login(flask_client, user) + + r = flask_client.post( + url_for("dashboard.setting"), + data={ + "form-name": "blocked-domains-add", + "domain-name": " ExAmple.COM ", + }, + follow_redirects=True, + ) + assert r.status_code == 200 + + blocked_domains = BlockedDomain.filter_by(user_id=user.id).all() + assert len(blocked_domains) == 1 + assert blocked_domains[0].domain == "example.com" + + +def test_remove_blocked_domain_no_id(flask_client): + user = create_new_user() + + login(flask_client, user) + + # Missing domain_id + with patch("app.dashboard.views.setting.BlockedDomain.delete") as mock_delete: + r = flask_client.post( + url_for("dashboard.setting"), + data={ + "form-name": "blocked-domains-remove", + "domain_name": "example.com", + }, + follow_redirects=True, + ) + assert r.status_code == 200 + mock_delete.assert_not_called() + + +def test_remove_blocked_domain_invalid_id(flask_client): + user = create_new_user() + login(flask_client, user) + + # Invalid domain_id + with patch("app.dashboard.views.setting.BlockedDomain.delete") as mock_delete: + r = flask_client.post( + url_for("dashboard.setting"), + data={ + "form-name": "blocked-domains-remove", + "domain_id": 9999, + "domain_name": "example.com", + }, + follow_redirects=True, + ) + assert r.status_code == 200 + mock_delete.assert_not_called() + + +def test_remove_blocked_domain_success(flask_client): + user = create_new_user() + bd = BlockedDomain.create(user_id=user.id, domain="example.com") + Session.commit() + + login(flask_client, user) + + r = flask_client.post( + url_for("dashboard.setting"), + data={ + "form-name": "blocked-domains-remove", + "domain_id": bd.id, + "domain_name": bd.domain, + }, + follow_redirects=True, + ) + assert r.status_code == 200 + assert BlockedDomain.get(bd.id) is None + + +def test_remove_blocked_domain_not_owned(flask_client): + user1 = create_new_user() + user2 = create_new_user() + + bd = BlockedDomain.create(user_id=user1.id, domain="example.com") + Session.commit() + + login(flask_client, user2) + + # Try to remove user1's blocked domain as user2 + r = flask_client.post( + url_for("dashboard.setting"), + data={ + "form-name": "blocked-domains-remove", + "domain_id": bd.id, + "domain_name": "example.com", + }, + follow_redirects=True, + ) + assert r.status_code == 200 + assert BlockedDomain.get(bd.id) is not None + + +def test_remove_blocked_domain_name_missing(flask_client): + user = create_new_user() + bd = BlockedDomain.create(user_id=user.id, domain="example.com") + Session.commit() + + login(flask_client, user) + + # Missing domain_name + with patch("app.dashboard.views.setting.BlockedDomain.delete") as mock_delete: + r = flask_client.post( + url_for("dashboard.setting"), + data={ + "form-name": "blocked-domains-remove", + "domain_id": bd.id, + }, + follow_redirects=True, + ) + assert r.status_code == 200 + # Check that it didn't delete the domain + assert BlockedDomain.get(bd.id) is not None + mock_delete.assert_not_called() diff --git a/tests/test_blocked_domain_utils.py b/tests/test_blocked_domain_utils.py new file mode 100644 index 000000000..ed2913108 --- /dev/null +++ b/tests/test_blocked_domain_utils.py @@ -0,0 +1,31 @@ +from app.blocked_domain_utils import is_domain_blocked +from app.db import Session +from app.models import BlockedDomain +from tests.utils import create_new_user + + +def setup(): + user = create_new_user() + BlockedDomain.create( + user_id=user.id, + domain="example.com", + flush=True, + ) + Session.flush() + + +def teardown_module(): + Session.query(BlockedDomain).delete() + + +def test_domain_blocked_for_user(): + user = create_new_user() + BlockedDomain.create( + user_id=user.id, + domain="example.com", + flush=True, + ) + Session.flush() + + assert is_domain_blocked(user.id, "example.com") + assert not is_domain_blocked(user.id, "some-other-example.com") diff --git a/tests/test_custom_domain_utils.py b/tests/test_custom_domain_utils.py index 51904d580..7272279d9 100644 --- a/tests/test_custom_domain_utils.py +++ b/tests/test_custom_domain_utils.py @@ -12,9 +12,11 @@ CannotUseDomainReason, CannotSetCustomDomainMailboxesCause, count_custom_domain_aliases, + can_blocked_domain_be_used, + can_custom_domain_be_used, ) from app.db import Session -from app.models import User, CustomDomain, Mailbox, DomainMailbox, Alias +from app.models import User, CustomDomain, Mailbox, DomainMailbox, Alias, BlockedDomain from tests.utils import create_new_user, random_string, random_domain from tests.utils import get_proton_partner, random_email @@ -56,13 +58,6 @@ def test_can_domain_be_used(): assert res is None -def test_can_domain_be_used_existing_domain(): - domain = random_domain() - CustomDomain.create(user_id=user.id, domain=domain, commit=True) - res = can_domain_be_used(user, domain) - assert res is CannotUseDomainReason.DomainAlreadyUsed - - def test_can_domain_be_used_sl_domain(): domain = ALIAS_DOMAINS[0] res = can_domain_be_used(user, domain) @@ -88,6 +83,100 @@ def test_can_domain_be_used_invalid_domain(): assert res is CannotUseDomainReason.InvalidDomain +# can_custom_domain_be_used +def test_can_custom_domain_be_used(): + domain = f"{random_string(10)}.com" + res = can_custom_domain_be_used(user, domain) + assert res is None + + +def test_can_custom_domain_be_used_sl_domain(): + domain = ALIAS_DOMAINS[0] + res = can_custom_domain_be_used(user, domain) + assert res is CannotUseDomainReason.BuiltinDomain + + +def test_can_custom_domain_be_used_domain_of_user_email(): + domain = user.email.split("@")[1] + res = can_custom_domain_be_used(user, domain) + assert res is CannotUseDomainReason.DomainPartOfUserEmail + + +def test_can_custom_domain_be_used_domain_of_existing_mailbox(): + domain = random_domain() + Mailbox.create(user_id=user.id, email=f"email@{domain}", verified=True, commit=True) + res = can_custom_domain_be_used(user, domain) + assert res is CannotUseDomainReason.DomainUserInMailbox + + +def test_can_custom_domain_be_used_invalid_domain(): + domain = f"{random_string(10)}@lol.com" + res = can_custom_domain_be_used(user, domain) + assert res is CannotUseDomainReason.InvalidDomain + + +def test_can_custom_domain_be_used_existing_domain(): + domain = random_domain() + CustomDomain.create(user_id=user.id, domain=domain, commit=True) + res = can_custom_domain_be_used(user, domain) + assert res is CannotUseDomainReason.DomainAlreadyUsed + + +# can_blocked_domain_be_used +def test_can_blocked_domain_be_used(): + domain = f"{random_string(10)}.com" + res = can_blocked_domain_be_used(user, domain) + assert res is None + + +def test_can_blocked_domain_be_used_sl_domain(): + domain = ALIAS_DOMAINS[0] + res = can_blocked_domain_be_used(user, domain) + assert res is CannotUseDomainReason.BuiltinDomain + + +def test_can_blocked_domain_be_used_domain_of_user_email(): + domain = user.email.split("@")[1] + res = can_blocked_domain_be_used(user, domain) + assert res is CannotUseDomainReason.DomainPartOfUserEmail + + +def test_can_blocked_domain_be_used_domain_of_existing_mailbox(): + domain = random_domain() + Mailbox.create(user_id=user.id, email=f"email@{domain}", verified=True, commit=True) + res = can_blocked_domain_be_used(user, domain) + assert res is CannotUseDomainReason.DomainUserInMailbox + + +def test_can_blocked_domain_be_used_invalid_domain(): + domain = f"{random_string(10)}@lol.com" + res = can_blocked_domain_be_used(user, domain) + assert res is CannotUseDomainReason.InvalidDomain + + +def test_can_blocked_domain_be_used_existing_domain(): + domain = random_domain() + BlockedDomain.create(user_id=user.id, domain=domain, commit=True) + res = can_blocked_domain_be_used(user, domain) + assert res is CannotUseDomainReason.DomainAlreadyUsed + + +def test_can_blocked_domain_be_used_different_users(): + domain = random_domain() + other_user = create_new_user() + + # User 1 blocks the domain + BlockedDomain.create(user_id=user.id, domain=domain, commit=True) + + # User 1 should not be able to block it again + res1 = can_blocked_domain_be_used(user, domain) + assert res1 is CannotUseDomainReason.DomainAlreadyUsed + + # User 2 should be able to block it + res2 = can_blocked_domain_be_used(other_user, domain) + assert res2 is None + + # sanitize_domain def test_can_sanitize_domain_empty(): assert sanitize_domain("") == "" diff --git a/tests/test_email_handler.py b/tests/test_email_handler.py index 1e37e0704..7af5b659d 100644 --- a/tests/test_email_handler.py +++ b/tests/test_email_handler.py @@ -24,6 +24,8 @@ VerpType, Contact, SentAlert, + BlockedDomain, + BlockBehaviourEnum, ) from app.utils import random_string, canonicalize_email from email_handler import ( @@ -395,6 +397,39 @@ def test_preserve_headers(flask_client): assert msg[header] == header + "keep" +def test_blocked_domain(flask_client): + user = create_new_user() + alias = Alias.create_new_random(user) + BlockedDomain.create( + user_id=user.id, + domain="rainbow.com", + commit=True, + ) + msg = load_eml_file("reference_encoded.eml") + envelope = Envelope() + envelope.mail_from = "somewhere@rainbow.com" + envelope.rcpt_tos = [alias.email] + result = email_handler.handle(envelope, msg) + assert result == status.E200 + + +def test_blocked_domain_reject_behaviour(flask_client): + user = create_new_user() + user.block_behaviour = BlockBehaviourEnum.return_5xx + alias = Alias.create_new_random(user) + BlockedDomain.create( + user_id=user.id, + domain="rainbow.com", + commit=True, + ) + msg = load_eml_file("reference_encoded.eml") + envelope = Envelope() + envelope.mail_from = "somewhere@rainbow.com" + envelope.rcpt_tos = [alias.email] + result = email_handler.handle(envelope, msg) + assert result == status.E502 + + def test_not_send_to_pending_to_delete_users(flask_client): user = create_new_user() alias = Alias.create_new_random(user)