diff --git a/blueprints/redirector.py b/blueprints/redirector.py index 4fc27ca0..4c8770fe 100644 --- a/blueprints/redirector.py +++ b/blueprints/redirector.py @@ -1,4 +1,3 @@ -import time from flask import ( Blueprint, request, @@ -8,15 +7,12 @@ ) from utils.url_utils import ( BOT_USER_AGENTS, - get_country, get_client_ip, validate_emoji_alias, ) from utils.mongo_utils import ( load_url, - update_url, load_emoji_url, - update_emoji_url, ) from cache import cache_query as cq from cache.cache_url import UrlData @@ -27,25 +23,24 @@ from datetime import datetime, timezone from urllib.parse import unquote import re -import tldextract from crawlerdetect import CrawlerDetect +from workers.stats_publisher import send_to_queue + crawler_detect = CrawlerDetect() -tld_no_cache_extract = tldextract.TLDExtract(cache_dir=None) url_redirector = Blueprint("url_redirector", __name__) @url_redirector.route("/", methods=["GET"]) @limiter.exempt -def redirect_url(short_code): +def redirect_url(short_code: str): user_ip = get_client_ip() projection = { "_id": 1, "url": 1, "password": 1, "max-clicks": 1, - "expiration-time": 1, "total-clicks": 1, "ips": {"$elemMatch": {"$eq": user_ip}}, "block-bots": 1, @@ -53,11 +48,7 @@ def redirect_url(short_code): } short_code = unquote(short_code) - - is_emoji = False - - # Measure redirection time - start_time = time.perf_counter() + is_emoji = validate_emoji_alias(short_code) cached_url_data = cq.get_url_data(short_code) if cached_url_data: @@ -67,8 +58,7 @@ def redirect_url(short_code): "block-bots": cached_url_data.block_bots, } else: - if validate_emoji_alias(short_code): - is_emoji = True + if is_emoji: url_data = load_emoji_url(short_code, projection) else: url_data = load_url(short_code, projection) @@ -134,7 +124,7 @@ def redirect_url(short_code): try: ua = parse(user_agent) - if not ua or not ua.user_agent or not ua.os: + if not ua or not ua.string: return jsonify( { "error_code": "400", @@ -151,45 +141,17 @@ def redirect_url(short_code): } ), 400 - os_name = ua.os.family - browser = ua.user_agent.family + os_name = ua.os.family if ua.os else "Unknown" + browser = ua.user_agent.family if ua.user_agent else "Unknown" referrer = request.headers.get("Referer") - country = get_country(user_ip) is_unique_click = url_data.get("ips", None) is None - - if country: - country = country.replace(".", " ") - - updates = {"$inc": {}, "$set": {}, "$addToSet": {}} - - if "ips" not in url_data: - url_data["ips"] = [] - - if referrer: - referrer_raw = tld_no_cache_extract(referrer) - referrer = ( - f"{referrer_raw.domain}.{referrer_raw.suffix}" - if referrer_raw.suffix - else referrer_raw.domain - ) - sanitized_referrer = re.sub(r"[.$\x00-\x1F\x7F-\x9F]", "_", referrer) - - updates["$inc"][f"referrer.{sanitized_referrer}.counts"] = 1 - updates["$addToSet"][f"referrer.{sanitized_referrer}.ips"] = user_ip - - updates["$inc"][f"country.{country}.counts"] = 1 - updates["$addToSet"][f"country.{country}.ips"] = user_ip - - updates["$inc"][f"browser.{browser}.counts"] = 1 - updates["$addToSet"][f"browser.{browser}.ips"] = user_ip - - updates["$inc"][f"os_name.{os_name}.counts"] = 1 - updates["$addToSet"][f"os_name.{os_name}.ips"] = user_ip + bot_name: str | None = None for bot in BOT_USER_AGENTS: bot_re = re.compile(bot, re.IGNORECASE) if bot_re.search(user_agent): + bot_name = bot if url_data.get("block-bots", False): return ( jsonify( @@ -201,11 +163,10 @@ def redirect_url(short_code): ), 403, ) - sanitized_bot = re.sub(r"[.$\x00-\x1F\x7F-\x9F]", "_", bot) - updates["$inc"][f"bots.{sanitized_bot}"] = 1 break else: if crawler_detect.isCrawler(user_agent): + bot_name = crawler_detect.getMatches()[0] if url_data.get("block-bots", False): return ( jsonify( @@ -217,42 +178,25 @@ def redirect_url(short_code): ), 403, ) - updates["$inc"][f"bots.{crawler_detect.getMatches()}"] = 1 - - # increment the counter for the short code - today = str(datetime.now()).split()[0] - updates["$inc"][f"counter.{today}"] = 1 - - if is_unique_click: - updates["$inc"][f"unique_counter.{today}"] = 1 - updates["$addToSet"]["ips"] = user_ip - - updates["$inc"]["total-clicks"] = 1 - - updates["$set"]["last-click"] = str( - datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") - ) - updates["$set"]["last-click-browser"] = browser - updates["$set"]["last-click-os"] = os_name - updates["$set"]["last-click-country"] = country - - # Calculate redirection time - end_time = time.perf_counter() - redirection_time = (end_time - start_time) * 1000 - - curr_avg = url_data.get("average_redirection_time", 0) - - # Update Average Redirection Time - alpha = 0.1 # Smoothing factor, adjust as needed - updates["$set"]["average_redirection_time"] = round( - (1 - alpha) * curr_avg + alpha * redirection_time, 2 - ) + message = { + "short_code": short_code, + "os_name": os_name, + "browser": browser, + "referrer": referrer, + "ip": user_ip, + "user_agent": user_agent, + "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"), + "is_unique_click": is_unique_click, + "bot_name": bot_name, + "is_emoji": is_emoji, + } - if is_emoji: - update_emoji_url(short_code, updates) + # send the stats message to the stats queue to be processed later + if request.method == "HEAD": + pass else: - update_url(short_code, updates) + send_to_queue(message) return redirect(url) diff --git a/pyproject.toml b/pyproject.toml index e218c6fb..bf7e9baf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,6 +5,7 @@ description = "Open-Source URL Shortener Written in Flask" readme = "README.md" requires-python = ">=3.9" dependencies = [ + "aio-pika>=9.5.5", "crawlerdetect>=0.3.0", "dicttoxml>=1.7.16", "emoji>=2.14.1", @@ -14,7 +15,9 @@ dependencies = [ "flask-limiter[mongodb]>=3.11.0", "geoip2>=5.1.0", "gunicorn>=23.0.0", + "loguru>=0.7.3", "openpyxl>=3.1.5", + "pika>=1.3.2", "pycountry>=24.6.1", "pymongo>=4.13.0", "python-dotenv>=1.1.0", diff --git a/requirements.txt b/requirements.txt index 72d87a37..54649308 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,7 @@ +aio-pika==9.5.5 aiohappyeyeballs==2.6.1 aiohttp==3.12.4 +aiormq==6.8.1 aiosignal==1.3.2 attrs==25.3.0 blinker==1.9.0 @@ -14,6 +16,7 @@ dicttoxml==1.7.16 dnspython==2.7.0 emoji==2.14.1 et-xmlfile==2.0.0 +exceptiongroup==1.3.0 filelock==3.18.0 flask==3.1.1 flask-caching==2.3.1 @@ -27,6 +30,7 @@ iniconfig==2.1.0 itsdangerous==2.2.0 jinja2==3.1.6 limits==4.2 +loguru==0.7.3 markdown-it-py==3.0.0 markupsafe==3.0.2 maxminddb==2.7.0 @@ -36,6 +40,8 @@ multidict==6.4.4 openpyxl==3.1.5 ordered-set==4.1.0 packaging==24.2 +pamqp==3.3.0 +pika==1.3.2 pluggy==1.6.0 propcache==0.3.1 pycountry==24.6.1 @@ -47,7 +53,7 @@ pytest-mock==3.14.1 python-dotenv==1.1.0 pytz==2025.2 redis==6.2.0 -requests==2.32.4 +requests==2.32.3 requests-file==2.1.0 requests-mock==1.12.1 rich==13.9.4 @@ -62,5 +68,6 @@ urllib3==2.4.0 uv==0.7.8 validators==0.35.0 werkzeug==3.1.3 +win32-setctime==1.2.0 wrapt==1.17.2 yarl==1.20.0 diff --git a/start_worker.py b/start_worker.py new file mode 100644 index 00000000..1fe0fddf --- /dev/null +++ b/start_worker.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +""" +Analytics Worker Runner + +This script starts the asynchronous analytics worker that processes +click data from RabbitMQ and updates MongoDB with analytics information. +""" + +import sys +import os +import asyncio + +# Ensure project root is on the path +ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, ROOT_DIR) + +from workers.stats_worker import StatsWorker # noqa: E402 + + +def main(): + """Main function to start the analytics worker""" + print("=" * 60) + print("šŸš€ Starting Spoo.me Shortener Stats Worker") + print("=" * 60) + print("This worker will process click analytics asynchronously") + print("and update MongoDB with detailed statistics.") + print() + + try: + # Run the async worker entrypoint + asyncio.run(StatsWorker()) + except KeyboardInterrupt: + print("\nšŸ‘‹ Worker stopped by user") + except Exception as e: + print(f"\nāŒ Worker failed with error: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/uv.lock b/uv.lock index 743a0735..d9f334d9 100644 --- a/uv.lock +++ b/uv.lock @@ -2,6 +2,21 @@ version = 1 revision = 2 requires-python = ">=3.9" +[[package]] +name = "aio-pika" +version = "9.5.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiormq" }, + { name = "exceptiongroup" }, + { name = "typing-extensions", marker = "python_full_version < '3.10'" }, + { name = "yarl" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/48/00/5391405f15e85bd6cb859186dbe04d99186ca29410a7cdc52848b55a1d72/aio_pika-9.5.5.tar.gz", hash = "sha256:3d2f25838860fa7e209e21fc95555f558401f9b49a832897419489f1c9e1d6a4", size = 48468, upload-time = "2025-02-26T11:15:56.595Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/71/cf/efa5581760bd08263bce8dbf943f32006b6dfd5bc120f43a26257281b546/aio_pika-9.5.5-py3-none-any.whl", hash = "sha256:94e0ac3666398d6a28b0c3b530c1febf4c6d4ececb345620727cfd7bfe1c02e0", size = 54257, upload-time = "2025-02-26T11:15:54.066Z" }, +] + [[package]] name = "aiohappyeyeballs" version = "2.6.1" @@ -114,6 +129,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a3/8b/8d1ca815a0725b66dfc094796c660281ec91ca31e37c788311ecba9bb128/aiohttp-3.12.4-cp39-cp39-win_amd64.whl", hash = "sha256:592086c0ed4fc071fecf097c54acebfac725376a0bdbbd1be31f1cc23cbf84c5", size = 444323, upload-time = "2025-05-29T01:36:54.667Z" }, ] +[[package]] +name = "aiormq" +version = "6.8.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pamqp" }, + { name = "yarl" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a4/79/5397756a8782bf3d0dce392b48260c3ec81010f16bef8441ff03505dccb4/aiormq-6.8.1.tar.gz", hash = "sha256:a964ab09634be1da1f9298ce225b310859763d5cf83ef3a7eae1a6dc6bd1da1a", size = 30528, upload-time = "2024-09-04T11:16:38.655Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2e/be/1a613ae1564426f86650ff58c351902895aa969f7e537e74bfd568f5c8bf/aiormq-6.8.1-py3-none-any.whl", hash = "sha256:5da896c8624193708f9409ffad0b20395010e2747f22aa4150593837f40aa017", size = 31174, upload-time = "2024-09-04T11:16:37.238Z" }, +] + [[package]] name = "aiosignal" version = "1.3.2" @@ -616,6 +644,19 @@ mongodb = [ { name = "pymongo" }, ] +[[package]] +name = "loguru" +version = "0.7.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "colorama", marker = "sys_platform == 'win32'" }, + { name = "win32-setctime", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/3a/05/a1dae3dffd1116099471c643b8924f5aa6524411dc6c63fdae648c4f1aca/loguru-0.7.3.tar.gz", hash = "sha256:19480589e77d47b8d85b2c827ad95d49bf31b0dcde16593892eb51dd18706eb6", size = 63559, upload-time = "2024-12-06T11:20:56.608Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0c/29/0348de65b8cc732daa3e33e67806420b2ae89bdce2b04af740289c5c6c8c/loguru-0.7.3-py3-none-any.whl", hash = "sha256:31a33c10c8e1e10422bfd431aeb5d351c7cf7fa671e3c4df004162264b28220c", size = 61595, upload-time = "2024-12-06T11:20:54.538Z" }, +] + [[package]] name = "markdown-it-py" version = "3.0.0" @@ -959,6 +1000,24 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/88/ef/eb23f262cca3c0c4eb7ab1933c3b1f03d021f2c48f54763065b6f0e321be/packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759", size = 65451, upload-time = "2024-11-08T09:47:44.722Z" }, ] +[[package]] +name = "pamqp" +version = "3.3.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/fb/62/35bbd3d3021e008606cd0a9532db7850c65741bbf69ac8a3a0d8cfeb7934/pamqp-3.3.0.tar.gz", hash = "sha256:40b8795bd4efcf2b0f8821c1de83d12ca16d5760f4507836267fd7a02b06763b", size = 30993, upload-time = "2024-01-12T20:37:25.085Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ac/8d/c1e93296e109a320e508e38118cf7d1fc2a4d1c2ec64de78565b3c445eb5/pamqp-3.3.0-py2.py3-none-any.whl", hash = "sha256:c901a684794157ae39b52cbf700db8c9aae7a470f13528b9d7b4e5f7202f8eb0", size = 33848, upload-time = "2024-01-12T20:37:21.359Z" }, +] + +[[package]] +name = "pika" +version = "1.3.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/db/db/d4102f356af18f316c67f2cead8ece307f731dd63140e2c71f170ddacf9b/pika-1.3.2.tar.gz", hash = "sha256:b2a327ddddf8570b4965b3576ac77091b850262d34ce8c1d8cb4e4146aa4145f", size = 145029, upload-time = "2023-05-05T14:25:43.368Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f9/f3/f412836ec714d36f0f4ab581b84c491e3f42c6b5b97a6c6ed1817f3c16d0/pika-1.3.2-py3-none-any.whl", hash = "sha256:0779a7c1fafd805672796085560d290213a465e4f6f76a6fb19e378d8041a14f", size = 155415, upload-time = "2023-05-05T14:25:41.484Z" }, +] + [[package]] name = "pluggy" version = "1.6.0" @@ -1320,6 +1379,7 @@ name = "spoo-me" version = "1.0.0" source = { virtual = "." } dependencies = [ + { name = "aio-pika" }, { name = "crawlerdetect" }, { name = "dicttoxml" }, { name = "emoji" }, @@ -1329,7 +1389,9 @@ dependencies = [ { name = "flask-limiter", extra = ["mongodb"] }, { name = "geoip2" }, { name = "gunicorn" }, + { name = "loguru" }, { name = "openpyxl" }, + { name = "pika" }, { name = "pycountry" }, { name = "pymongo" }, { name = "python-dotenv" }, @@ -1353,6 +1415,7 @@ dev = [ [package.metadata] requires-dist = [ + { name = "aio-pika", specifier = ">=9.5.5" }, { name = "crawlerdetect", specifier = ">=0.3.0" }, { name = "dicttoxml", specifier = ">=1.7.16" }, { name = "emoji", specifier = ">=2.14.1" }, @@ -1362,7 +1425,9 @@ requires-dist = [ { name = "flask-limiter", extras = ["mongodb"], specifier = ">=3.11.0" }, { name = "geoip2", specifier = ">=5.1.0" }, { name = "gunicorn", specifier = ">=23.0.0" }, + { name = "loguru", specifier = ">=0.7.3" }, { name = "openpyxl", specifier = ">=3.1.5" }, + { name = "pika", specifier = ">=1.3.2" }, { name = "pycountry", specifier = ">=24.6.1" }, { name = "pymongo", specifier = ">=4.13.0" }, { name = "python-dotenv", specifier = ">=1.1.0" }, @@ -1551,6 +1616,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/52/24/ab44c871b0f07f491e5d2ad12c9bd7358e527510618cb1b803a88e986db1/werkzeug-3.1.3-py3-none-any.whl", hash = "sha256:54b78bf3716d19a65be4fceccc0d1d7b89e608834989dfae50ea87564639213e", size = 224498, upload-time = "2024-11-08T15:52:16.132Z" }, ] +[[package]] +name = "win32-setctime" +version = "1.2.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b3/8f/705086c9d734d3b663af0e9bb3d4de6578d08f46b1b101c2442fd9aecaa2/win32_setctime-1.2.0.tar.gz", hash = "sha256:ae1fdf948f5640aae05c511ade119313fb6a30d7eabe25fef9764dca5873c4c0", size = 4867, upload-time = "2024-12-07T15:28:28.314Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e1/07/c6fe3ad3e685340704d314d765b7912993bcb8dc198f0e7a89382d37974b/win32_setctime-1.2.0-py3-none-any.whl", hash = "sha256:95d644c4e708aba81dc3704a116d8cbc974d70b3bdb8be1d150e36be6e9d1390", size = 4083, upload-time = "2024-12-07T15:28:26.465Z" }, +] + [[package]] name = "wrapt" version = "1.17.2" diff --git a/workers/__init__.py b/workers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workers/async_mongo.py b/workers/async_mongo.py new file mode 100644 index 00000000..0074acb7 --- /dev/null +++ b/workers/async_mongo.py @@ -0,0 +1,24 @@ +import os +from pymongo import AsyncMongoClient +from dotenv import load_dotenv + +_client = None +_db = None + +load_dotenv(override=True) + + +async def init_async_db(uri=None, db_name=None): + global _client, _db + uri = uri or os.environ.get("MONGODB_URI", "mongodb://localhost:27017/") + db_name = db_name or os.environ.get("MONGO_DB_NAME", "url-shortener") + _client = AsyncMongoClient(uri) + # Explicitly connect + await _client.aconnect() + _db = _client[db_name] + + +async def get_async_db(): + if _db is None: + await init_async_db() + return _db diff --git a/workers/stats_handler.py b/workers/stats_handler.py new file mode 100644 index 00000000..14598976 --- /dev/null +++ b/workers/stats_handler.py @@ -0,0 +1,84 @@ +import re +import tldextract +from utils.url_utils import get_country +from workers.async_mongo import get_async_db +from loguru import logger + +# Initialize extractor without disk caching +tld_extractor = tldextract.TLDExtract(cache_dir=None) + + +async def handle_click_event(data: dict): + """ + Async processing of click analytics and updating MongoDB. + """ + # Ensure DB is initialized + db = await get_async_db() + + short_code = data.get("short_code") + os_name = data.get("os_name") + browser = data.get("browser") + referrer = data.get("referrer") + ip = data.get("ip") + timestamp = data.get("timestamp") + unique_click = data.get("is_unique_click") + bot_name = data.get("bot_name") + is_emoji = data.get("is_emoji", False) + + # Enrich country info + country = get_country(ip) or None + if country: + country = country.replace(".", " ") + + # Build update document + updates = {"$inc": {}, "$set": {}, "$addToSet": {}} + + # Process referrer domain + if referrer: + raw = tld_extractor(referrer) + domain = f"{raw.domain}.{raw.suffix}" if raw.suffix else raw.domain + sanitized = re.sub(r"[.$\x00-\x1F\x7F-\x9F]", "_", domain) + updates["$inc"][f"referrer.{sanitized}.counts"] = 1 + updates["$addToSet"][f"referrer.{sanitized}.ips"] = ip + + # Country stats + if country: + updates["$inc"][f"country.{country}.counts"] = 1 + updates["$addToSet"][f"country.{country}.ips"] = ip + + # Browser & OS stats + updates["$inc"][f"browser.{browser}.counts"] = 1 + updates["$addToSet"][f"browser.{browser}.ips"] = ip + updates["$inc"][f"os_name.{os_name}.counts"] = 1 + updates["$addToSet"][f"os_name.{os_name}.ips"] = ip + + # Bot counts + if bot_name: + sanitized_bot = re.sub(r"[.$\x00-\x1F\x7F-\x9F]", "_", bot_name) + updates["$inc"][f"bots.{sanitized_bot}"] = 1 + + # Daily and unique counters + date_str = timestamp.split()[0] + updates["$inc"][f"counter.{date_str}"] = 1 + if unique_click: + updates["$inc"][f"unique_counter.{date_str}"] = 1 + + # Global clicks + updates["$addToSet"]["ips"] = ip + updates["$inc"]["total-clicks"] = 1 + + # Last click info + updates["$set"]["last-click"] = timestamp + updates["$set"]["last-click-browser"] = browser + updates["$set"]["last-click-os"] = os_name + updates["$set"]["last-click-country"] = country + + # Determine correct collection + collection = db.emojis if is_emoji else db.urls + + # Perform atomic update + try: + await collection.update_one({"_id": short_code}, updates, upsert=True) + logger.info(f"[āœ“] Processed analytics for {short_code}") + except Exception as e: + logger.error(f"[!] Error updating MongoDB: {e}") diff --git a/workers/stats_publisher.py b/workers/stats_publisher.py new file mode 100644 index 00000000..1fbf1f7d --- /dev/null +++ b/workers/stats_publisher.py @@ -0,0 +1,19 @@ +import pika +import json + + +def send_to_queue(data): + connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) + channel = connection.channel() + + channel.queue_declare(queue="stats_queue", durable=True) + channel.basic_publish( + exchange="", + routing_key="stats_queue", + body=json.dumps(data), + properties=pika.BasicProperties( + delivery_mode=2 # make message persistent + ), + ) + + connection.close() diff --git a/workers/stats_worker.py b/workers/stats_worker.py new file mode 100644 index 00000000..075a7fc6 --- /dev/null +++ b/workers/stats_worker.py @@ -0,0 +1,60 @@ +import asyncio +import json +import aio_pika +from workers.stats_handler import handle_click_event +from workers.async_mongo import init_async_db +import sys +from loguru import logger + +logger.remove() + +logger.add( + sys.stdout, + level="INFO", + enqueue=True, + backtrace=True, + diagnose=False, + colorize=True, + format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} | {message}", +) + +RABBITMQ_URL = "amqp://localhost/" +QUEUE_NAME = "stats_queue" + + +async def StatsWorker(): + # Initialize async MongoDB connection + await init_async_db() + logger.info("[*] Async MongoDB initialized.") + + # Connect to RabbitMQ + connection = await aio_pika.connect_robust(RABBITMQ_URL) + channel = await connection.channel() + await channel.set_qos(prefetch_count=1) + queue = await channel.declare_queue(QUEUE_NAME, durable=True) + + logger.info(f"[*] Connected to RabbitMQ, consuming from '{QUEUE_NAME}'...") + async with queue.iterator() as queue_iter: + async for message in queue_iter: + async with message.process(): + try: + data = json.loads(message.body) + logger.info( + f"[x] Received click data for: {data.get('short_code', 'unknown')}" + ) + # Delegate to the async handler + await handle_click_event(data) + except Exception as e: + logger.error(f"[!] Error processing message: {e}") + # The message will be requeued automatically on failure + + # Cleanup on exit + await connection.close() + + +if __name__ == "__main__": + logger.info("[*] Starting Stats Worker...") + try: + asyncio.run(StatsWorker()) + except KeyboardInterrupt: + logger.error(" [!] Stats Worker stopped by user.")