Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def get_env_dict(env_var: str) -> dict[str, str]:
HIBP_SCAN_INTERVAL_DAYS = 7
HIBP_API_KEYS = sl_getenv("HIBP_API_KEYS", list) or []
HIBP_MAX_ALIAS_CHECK = 10_000
HIBP_RPM = int(os.environ.get("HIBP_API_RPM", 100))
HIBP_CONCURRENT_TASKS = int(os.environ.get("HIBP_CONCURRENT_TASKS", 2))
HIBP_SKIP_PARTNER_ALIAS = os.environ.get("HIBP_SKIP_PARTNER_ALIAS")

KEEP_OLD_DATA_DAYS = 30
Expand Down
6 changes: 3 additions & 3 deletions app/events/event_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,18 @@ def send_event(
if dispatcher is None:
dispatcher = GlobalDispatcher.get_dispatcher()
if config.EVENT_WEBHOOK_DISABLE:
LOG.i("Not sending events because webhook is disabled")
LOG.d("Not sending events because webhook is disabled")
return

if not config.EVENT_WEBHOOK and skip_if_webhook_missing:
LOG.i(
LOG.d(
"Not sending events because webhook is not configured and allowed to be empty"
)
return

partner_user = EventDispatcher.__partner_user(user.id)
if not partner_user:
LOG.i(f"Not sending events because there's no partner user for user {user}")
LOG.d(f"Not sending events because there's no partner user for user {user}")
return

event = event_pb2.Event(
Expand Down
129 changes: 65 additions & 64 deletions cron.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import argparse
import asyncio
import time

import aiohttp
import urllib.parse
from typing import List, Tuple

Expand Down Expand Up @@ -997,86 +1000,84 @@ def delete_expired_tokens():
LOG.d("Delete api to cookie tokens older than %s, nb row %s", max_time, nb_row)


async def _hibp_check(api_key: str, queue: asyncio.Queue):
async def _hibp_process_queue_check(api_key: str, queue: asyncio.Queue):
"""
Uses a single API key to check the queue as fast as possible.

This function to be ran simultaneously (multiple _hibp_check functions with different keys on the same queue) to make maximum use of multiple API keys.
"""
default_rate_sleep = (60.0 / config.HIBP_RPM) + 0.1
rate_sleep = default_rate_sleep
rate_hit_counter = 0
alias_tasks = []
while True:
try:
alias_id = queue.get_nowait()
except asyncio.QueueEmpty:
if len(alias_tasks) > 0:
await asyncio.gather(*alias_tasks, return_exceptions=True)
return

alias = Alias.get(alias_id)
if not alias:
continue
user = alias.user
if user.disabled or not user.is_premium():
# Mark it as hibp done to skip it as if it had been checked
alias.hibp_last_check = arrow.utcnow()
Session.commit()
continue
if alias.flags & Alias.FLAG_PARTNER_CREATED > 0:
# Mark as hibp done
alias.hibp_last_check = arrow.utcnow()
Session.commit()
continue

LOG.d("Checking HIBP for %s", alias)
alias_tasks.append(_hibp_check_alias(alias, api_key))

request_headers = {
"user-agent": "SimpleLogin",
"hibp-api-key": api_key,
}
r = requests.get(
f"https://haveibeenpwned.com/api/v3/breachedaccount/{urllib.parse.quote(alias.email)}",
headers=request_headers,
)
if r.status_code == 200:
# Breaches found
alias.hibp_breaches = [
Hibp.get_by(name=entry["Name"]) for entry in r.json()
]
if len(alias.hibp_breaches) > 0:
LOG.w("%s appears in HIBP breaches %s", alias, alias.hibp_breaches)
if rate_hit_counter > 0:
rate_hit_counter -= 1
elif r.status_code == 404:
# No breaches found
alias.hibp_breaches = []
elif r.status_code == 429:
# rate limited
LOG.w("HIBP rate limited, check alias %s in the next run", alias)
rate_hit_counter += 1
rate_sleep = default_rate_sleep + (0.2 * rate_hit_counter)
if rate_hit_counter > 10:
LOG.w(f"HIBP rate limited too many times stopping with alias {alias}")
return
# Just sleep for a while
asyncio.sleep(5)
elif r.status_code > 500:
LOG.w("HIBP server 5** error %s", r.status_code)
return
else:
LOG.error(
"An error occurred while checking alias %s: %s - %s",
alias,
r.status_code,
r.text,
)
return
if len(alias_tasks) >= config.HIBP_CONCURRENT_TASKS:
await asyncio.gather(*alias_tasks, return_exceptions=True)
alias_tasks = []

alias.hibp_last_check = arrow.utcnow()
Session.add(alias)
Session.commit()

LOG.d("Updated breach info for %s", alias)
await asyncio.sleep(rate_sleep)
async def _hibp_check_alias(alias: Alias, api_key: str, retries: int = 0):
user = alias.user
if user.disabled or not user.is_premium():
return
if alias.flags & Alias.FLAG_PARTNER_CREATED > 0:
return
LOG.d("Checking HIBP for %s", alias)

request_headers = {
"user-agent": "SimpleLogin",
"hibp-api-key": api_key,
}
url = f"https://haveibeenpwned.com/api/v3/breachedaccount/{urllib.parse.quote(alias.email)}"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=request_headers) as response:
if response.status == 200:
# Breaches found
json_response = await response.json()
alias.hibp_breaches = [
Hibp.get_by(name=entry["Name"]) for entry in json_response
]
if len(alias.hibp_breaches) > 0:
LOG.w("HIBP: %s appears in breaches %s", alias, alias.hibp_breaches)
alias.hibp_last_check = arrow.utcnow()
Session.commit()
LOG.d("HIBP: Updated new breach info for %s", alias)
elif response.status == 404:
# No breaches found
alias.hibp_breaches = []
alias.hibp_last_check = arrow.utcnow()
Session.commit()
LOG.d("HIBP: Updated no breach info for %s", alias)
elif response.status == 429:
# rate limited
if retries > 0:
LOG.w(f"HIBP: Cannot retry {alias} more times. Stopping")
return
# Just sleep until the next minute (go 1 sec over to make sure we change minute)
sleep_time = 61 - int(time.time()) % 60
LOG.w(
f"HIBP: Rate limited, check alias {alias} in the next run in {sleep_time} seconds"
)
await asyncio.sleep(sleep_time)
await _hibp_check_alias(alias, api_key, retries + 1)
elif response.status > 500:
LOG.w("HIBP: server 5** error %s", response.status)
else:
LOG.error(
"An error occurred while checking alias %s: %s - %s",
alias,
response.status,
await response.text(),
)


def get_alias_to_check_hibp(
Expand Down Expand Up @@ -1185,7 +1186,7 @@ async def check_hibp():
checkers = []
for i in range(len(config.HIBP_API_KEYS)):
checker = asyncio.create_task(
_hibp_check(
_hibp_process_queue_check(
config.HIBP_API_KEYS[i],
queue,
)
Expand Down
Loading