Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions app/blocked_domain_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from app.models import BlockedDomain


def is_domain_blocked(user_id: int, domain: str) -> bool:
"""checks whether the provided domain is blocked for a given user"""
return BlockedDomain.filter_by(user_id=user_id, domain=domain).first() is not None
47 changes: 41 additions & 6 deletions app/custom_domain_utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
import arrow
import re

from dataclasses import dataclass
from enum import Enum
from typing import List, Optional

import arrow

from app.constants import JobType
from app.db import Session
from app.email_utils import get_email_domain_part
from app.log import LOG
from app.models import User, CustomDomain, SLDomain, Mailbox, Job, DomainMailbox, Alias
from app.models import (
User,
CustomDomain,
SLDomain,
Mailbox,
Job,
DomainMailbox,
Alias,
BlockedDomain,
)
from app.user_audit_log_utils import emit_user_audit_log, UserAuditLogAction

_ALLOWED_DOMAIN_REGEX = re.compile(r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$")
Expand Down Expand Up @@ -94,8 +103,6 @@ def can_domain_be_used(user: User, domain: str) -> Optional[CannotUseDomainReaso
return CannotUseDomainReason.InvalidDomain
elif SLDomain.get_by(domain=domain):
return CannotUseDomainReason.BuiltinDomain
elif CustomDomain.get_by(domain=domain):
return CannotUseDomainReason.DomainAlreadyUsed
elif get_email_domain_part(user.email) == domain:
return CannotUseDomainReason.DomainPartOfUserEmail
elif Mailbox.filter(
Expand All @@ -106,6 +113,34 @@ def can_domain_be_used(user: User, domain: str) -> Optional[CannotUseDomainReaso
return None


def can_custom_domain_be_used(
user: User, domain: str
) -> Optional[CannotUseDomainReason]:
reason = can_domain_be_used(user, domain)

if reason is not None:
return reason

if CustomDomain.get_by(domain=domain):
return CannotUseDomainReason.DomainAlreadyUsed
else:
return None


def can_blocked_domain_be_used(
user: User, domain: str
) -> Optional[CannotUseDomainReason]:
reason = can_domain_be_used(user, domain)

if reason is not None:
return reason

if BlockedDomain.get_by(user_id=user.id, domain=domain):
return CannotUseDomainReason.DomainAlreadyUsed
else:
return None


def create_custom_domain(
user: User, domain: str, partner_id: Optional[int] = None
) -> CreateCustomDomainResult:
Expand All @@ -116,7 +151,7 @@ def create_custom_domain(
)

new_domain = sanitize_domain(domain)
domain_forbidden_cause = can_domain_be_used(user, new_domain)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this changed. This would allow users to create a custom domain with an invalid domain or an SL domain.

Copy link
Author

@tozo tozo Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the original code I modified the existing and created 2 separate functions (can_domain_be_used, can_custom_domain_be_used and can_blocked_domain_be_used respectively).

Because of that I also had to change this to call the function can_custom_domain_be_used to still include all the previously executed checks.
But inside of that function I call the can_domain_be_used which should check for the domain validity and SL domain as well and if that reason is not None, then return with that error.

Currently in this branch that call is in line 119:

reason = can_domain_be_used(user, domain)

if reason is not None:
    return reason

domain_forbidden_cause = can_custom_domain_be_used(user, new_domain)
if domain_forbidden_cause:
return CreateCustomDomainResult(
message=domain_forbidden_cause.message(new_domain), message_category="error"
Expand Down
66 changes: 66 additions & 0 deletions app/dashboard/views/setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ALIAS_RANDOM_SUFFIX_LENGTH,
CONNECT_WITH_PROTON,
)
from app.custom_domain_utils import sanitize_domain, can_blocked_domain_be_used
from app.dashboard.base import dashboard_bp
from app.db import Session
from app.extensions import limiter
Expand All @@ -39,6 +40,7 @@
PartnerSubscription,
UnsubscribeBehaviourEnum,
UserAliasDeleteAction,
BlockedDomain,
)
from app.proton.proton_unlink import can_unlink_proton_account
from app.utils import (
Expand Down Expand Up @@ -81,6 +83,12 @@ def setting():
else:
pending_email = None

blocked_domains = (
BlockedDomain.filter_by(user_id=current_user.id)
.order_by(BlockedDomain.created_at.desc())
.all()
)

if request.method == "POST":
if not csrf_form.validate():
flash("Invalid request", "warning")
Expand Down Expand Up @@ -284,6 +292,63 @@ def setting():
return redirect(url_for("dashboard.setting"))
Session.commit()
flash("Your preference has been updated", "success")
elif request.form.get("form-name") == "blocked-domains-add":
domain = request.form.get("domain-name")

if not domain:
flash("Domain name is required", "error")
return redirect(url_for("dashboard.setting"))

new_domain = sanitize_domain(domain)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If domain is None this will fail

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If domain is None it will fail with an exception

domain_forbidden_cause = can_blocked_domain_be_used(
current_user, new_domain
)

if domain_forbidden_cause:
flash(domain_forbidden_cause.message(new_domain), "error")
return redirect(url_for("dashboard.setting"))

BlockedDomain.create(user_id=current_user.id, domain=new_domain)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an user audit log here to keep track on what happened.


LOG.i(
f"A new blocked domain [{new_domain}] was added for user [{current_user.id}]"
)

Session.commit()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missign user audit log on the action done

flash(f"Added blocked domain [{domain}]", "success")
return redirect(url_for("dashboard.setting"))
elif request.form.get("form-name") == "blocked-domains-remove":
domain_id = request.form.get("domain_id")
domain_name = request.form.get("domain_name")

if not domain_id:
flash("Domain Id is missing", "error")
return redirect(url_for("dashboard.setting"))

if not domain_name:
flash("Domain Name is missing", "error")
return redirect(url_for("dashboard.setting"))

domain = BlockedDomain.get(domain_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If domain_id is none this will crash

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If domain_id is None it will fail with an exception

if not domain or domain.user_id != current_user.id:
LOG.e(
f"Blocked domain with id [{domain_id}] not found or not owned by user [{current_user.id}] therefore couldn't be deleted"
)
flash(
"Blocked domain not found or the user doesn't have access to it",
"error",
)
return redirect(url_for("dashboard.setting"))

BlockedDomain.delete(domain.id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any user seems to be able to delete any domain.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now validate if the current user is the owner of the domain before deleting it, let me know if that looks ok
Other option is to have a call like this:

deleted = (
    BlockedDomain.filter_by(
        id=domain_id_int,
        user_id=current_user.id,
    ).delete()
)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any user can delete any domain


LOG.i(
f"A blocked domain [{domain}] was deleted by user [{current_user.id}]"
)

Session.commit()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Missing an user audit log.

flash(f"Deleted blocked domain [{domain_name}]", "success")
return redirect(url_for("dashboard.setting"))

manual_sub = ManualSubscription.get_by(user_id=current_user.id)
apple_sub = AppleSubscription.get_by(user_id=current_user.id)
Expand Down Expand Up @@ -318,4 +383,5 @@ def setting():
ALIAS_RAND_SUFFIX_LENGTH=ALIAS_RANDOM_SUFFIX_LENGTH,
connect_with_proton=CONNECT_WITH_PROTON,
can_unlink_proton_account=can_unlink_proton_account(current_user),
blocked_domains=blocked_domains,
)
16 changes: 16 additions & 0 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):

default_mailbox = orm.relationship("Mailbox", foreign_keys=[default_mailbox_id])

_blocked_domains = orm.relationship("BlockedDomain")

# user can set a more strict max_spam score to block spams more aggressively
max_spam_score = sa.Column(sa.Integer, nullable=True)

Expand Down Expand Up @@ -3252,6 +3254,20 @@ class DomainMailbox(Base, ModelMixin):
)


class BlockedDomain(Base, ModelMixin):
"""store the blocked domains for a user"""

__tablename__ = "blocked_domain"

__table_args__ = (
sa.UniqueConstraint("user_id", "domain", name="uq_blocked_domain"),
)

domain = sa.Column(sa.String(128), nullable=False)

user_id = sa.Column(sa.ForeignKey(User.id, ondelete="cascade"), nullable=False)


_NB_RECOVERY_CODE = 8
_RECOVERY_CODE_LENGTH = 8

Expand Down
16 changes: 16 additions & 0 deletions email_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
change_alias_status,
get_alias_recipient_name,
)
from app.blocked_domain_utils import is_domain_blocked
from app.config import (
EMAIL_DOMAIN,
URL,
Expand Down Expand Up @@ -613,6 +614,21 @@ def handle_forward(envelope, msg: Message, rcpt_to: str) -> List[Tuple[bool, str
handle_email_sent_to_ourself(alias, addr, msg, user)
return [(True, status.E209)]

mail_from_domain = get_email_domain_part(mail_from)
if is_domain_blocked(user.id, mail_from_domain):
LOG.i(
f"Email [{mail_from}] was ignored for the user [{user.id}] because of a blocked domain [{mail_from_domain}]"
)
# by default return 2** instead of 5** to allow user to receive emails again when domain is unblocked
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a log here so that we can track when an email is dropped in the logs.

res_status = status.E200
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a log entry here to mark that the email has been dropped.

if user.block_behaviour == BlockBehaviourEnum.return_5xx:
LOG.i(
f"Email [{mail_from}] was rejected for the user [{user.id}] because of a blocked domain [{mail_from_domain}]"
)
res_status = status.E502

return [(True, res_status)]

from_header = get_header_unicode(msg[headers.FROM])
LOG.d("Create or get contact for from_header:%s", from_header)
contact = get_or_create_contact(from_header, envelope.mail_from, alias)
Expand Down
37 changes: 37 additions & 0 deletions migrations/versions/2025_101020_fdb1b61d9bdb_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""empty message

Revision ID: c18048c40ed9
Revises: 3ee37864eb67
Create Date: 2025-10-10 20:29:32.701784

"""
import sqlalchemy as sa
import sqlalchemy_utils
from alembic import op

# revision identifiers, used by Alembic.
revision = 'c18048c40ed9'
down_revision = '3ee37864eb67'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('blocked_domain',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('created_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=False),
sa.Column('updated_at', sqlalchemy_utils.types.arrow.ArrowType(), nullable=True),
sa.Column('domain', sa.String(length=128), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ondelete='cascade'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('user_id', 'domain', name='uq_blocked_domain')
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('blocked_domain')
# ### end Alembic commands ###
50 changes: 50 additions & 0 deletions templates/dashboard/setting.html
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,56 @@
</div>
</div>
<!-- END Alias import/export -->
<!-- Blocked domains -->
<div class="card" id="blocked-domains">
<div class="card-body">
<div class="card-title">Blocked domains</div>
<div class="mb-3">You can add domains to prevent emails forwarded from them to your Mailbox.</div>
<hr />
<h4>New domain</h4>
<form method="post" action="#blocked-domains">
{{ csrf_form.csrf_token }}
<input type="hidden" name="form-name" value="blocked-domains-add">
<div class="form-group">
<input class="form-control mr-2"
name="domain-name"
placeholder="Domain to be blocked">
</div>
<button class="btn btn-outline-primary mt-2">Add</button>
</form>
{% if blocked_domains | length > 0 %}

<div class="mt-2">
<hr />
<h4>Blocked domains</h4>
<div class="row">
{% for blocked_domain in blocked_domains %}

<div class="col-6">
<div class="my-2 p-2 card">
<div class="row">
<div class="col">
<div class="py-2 font-weight-bold">{{ blocked_domain.domain }}</div>
</div>
<div class="col-2">
<form method="post" action="#blocked-domains">
{{ csrf_form.csrf_token }}
<input type="hidden" name="form-name" value="blocked-domains-remove">
<input type="hidden" name="domain_name" value="{{ blocked_domain.domain }}">
<input type="hidden" name="domain_id" value="{{ blocked_domain.id }}">
<button class="btn btn-link text-danger">Delete</button>
</form>
</div>
</div>
</div>
</div>
{% endfor %}
</div>
</div>
{% endif %}
</div>
</div>
<!-- END Blocked domains -->
</div>
{% endblock %}
{% block script %}
Expand Down
Loading