Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ services:
- .:/app
environment:
- PYTHONPATH=/app
- LOG_LEVEL=DEBUG
- UVICORN_LOG_LEVEL=debug
- PYTHONUNBUFFERED=1
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
Expand Down
213 changes: 206 additions & 7 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
import logging
import traceback
from datetime import datetime
from scipy import stats

# Configure logging
log_level = os.environ.get("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=logging.INFO,
level=getattr(logging, log_level),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("app.log"), logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
logger.info(f"Starting application with log level: {log_level}")

# Create directories if they don't exist
os.makedirs("static", exist_ok=True)
Expand Down Expand Up @@ -66,6 +69,20 @@ def run_conversion_experiment(self, sim_count: int = 100_000, show=False):
)

self.conversion_results = self.conversion_test.evaluate()
# Create the posterior distributions for conversion rates
# For binary data, the posterior is a Beta distribution with parameters:
# alpha = a_prior + positives, beta = b_prior + (totals - positives)
self.conversion_distributions = {}
for variant_name, variant_data in zip(
self.conversion_test.variant_names, self.conversion_results
):
a_prior = 0.5 # Default prior in BinaryDataTest
b_prior = 0.5 # Default prior in BinaryDataTest
alpha = a_prior + variant_data["positives"]
beta = b_prior + (variant_data["totals"] - variant_data["positives"])
# Create a Beta distribution with these parameters
self.conversion_distributions[variant_name] = stats.beta(alpha, beta)

if show:
print(
pd.DataFrame(self.conversion_results).to_markdown(
Expand All @@ -76,17 +93,90 @@ def run_conversion_experiment(self, sim_count: int = 100_000, show=False):
def run_arpu_experiment(self, sim_count: int = 100_000, show=False):
self.arpu_test: DeltaLognormalDataTest = DeltaLognormalDataTest()
for v in self.variants:
rev_logs = [np.log(v.revenue / v.conversions)] * v.conversions
rev_logs = (
[np.log(v.revenue / v.conversions)] * v.conversions
if v.conversions > 0
else []
)
self.arpu_test.add_variant_data_agg(
v.name,
totals=v.impressions,
positives=v.conversions,
sum_values=v.revenue,
sum_logs=sum(rev_logs),
sum_logs_2=sum([np.square(l) for l in rev_logs]),
sum_logs=sum(rev_logs) if rev_logs else 0,
sum_logs_2=sum([np.square(l) for l in rev_logs]) if rev_logs else 0,
)

self.arpu_results = self.arpu_test.evaluate()

# Create the posterior distributions for ARPU
# For delta-lognormal data, we need to simulate from the model
self.arpu_distributions = {}

# Generate samples for each variant using the DeltaLognormalDataTest model
# We'll use the eval_simulation method which returns probabilities and expected loss
pbbs, loss = self.arpu_test.eval_simulation(sim_count=1000, seed=42)

# For each variant, we'll generate samples from the posterior distribution
for variant_name in self.arpu_test.variant_names:
variant_idx = self.arpu_test.variant_names.index(variant_name)

# Get the parameters for this variant
totals = self.arpu_test.totals[variant_idx]
positives = self.arpu_test.positives[variant_idx]
sum_logs = self.arpu_test.sum_logs[variant_idx]
sum_logs_2 = self.arpu_test.sum_logs_2[variant_idx]

# Get the priors
a_prior_beta = self.arpu_test.a_priors_beta[variant_idx]
b_prior_beta = self.arpu_test.b_priors_beta[variant_idx]
m_prior = self.arpu_test.m_priors[variant_idx]
a_prior_ig = self.arpu_test.a_priors_ig[variant_idx]
b_prior_ig = self.arpu_test.b_priors_ig[variant_idx]
w_prior = self.arpu_test.w_priors[variant_idx]

# Generate samples from the posterior distribution
# First, sample from the Beta distribution for conversion rate
np.random.seed(42 + variant_idx) # Different seed for each variant
conversion_rate = stats.beta(
a_prior_beta + positives, b_prior_beta + (totals - positives)
).rvs(size=1000)

# For positive values, we need to sample from the log-normal distribution
# The parameters for the log-normal are derived from the data
if positives > 0:
# Calculate posterior parameters for the log-normal distribution
n = positives
w_n = w_prior + n
m_n = (w_prior * m_prior + sum_logs) / w_n
a_n = a_prior_ig + n / 2
b_n = b_prior_ig + 0.5 * (
sum_logs_2 - 2 * m_n * sum_logs + w_n * m_n**2
)

# Sample from the inverse gamma for variance
np.random.seed(42 + variant_idx + 100) # Different seed
variance = stats.invgamma(a_n, scale=b_n).rvs(size=1000)

# Sample from the normal for mean
np.random.seed(42 + variant_idx + 200) # Different seed
mean = stats.norm(m_n, np.sqrt(variance / w_n)).rvs(size=1000)

# Now sample from the log-normal with these parameters
np.random.seed(42 + variant_idx + 300) # Different seed
log_normal_samples = np.exp(
stats.norm(mean, np.sqrt(variance)).rvs(size=1000)
)

# Combine with conversion rate to get ARPU
arpu_samples = conversion_rate * log_normal_samples
else:
# If no conversions, ARPU is 0
arpu_samples = np.zeros(1000)

# Store the samples
self.arpu_distributions[variant_name] = arpu_samples.tolist()

if show:
print(
pd.DataFrame(self.arpu_results).to_markdown(
Expand Down Expand Up @@ -200,6 +290,16 @@ def compile_full_data(
def get_reports(self, probs_precision: int = 4):
self.compile_full_data()

# Debug: Print the structure of revenue_per_sale_results
print(
"Revenue per sale results structure:",
(
self.revenue_per_sale_results[0]
if self.revenue_per_sale_results
else "No results"
),
)

summaries = []
conv_stats = []
arpu_stats = []
Expand All @@ -216,6 +316,13 @@ def get_reports(self, probs_precision: int = 4):

conv = {"variant": variant.get("variant")}
conv.update(variant.get("conversion"))

# Find the posterior_mean from the conversion_results
for result in self.conversion_results:
if result["variant"] == variant.get("variant"):
conv.update({"posterior_mean": result["posterior_mean"]})
break

conv.update(
{
"lift": round(
Expand All @@ -230,6 +337,13 @@ def get_reports(self, probs_precision: int = 4):

arpu = {"variant": variant.get("variant")}
arpu.update(variant.get("arpu"))

# Find the posterior_mean for ARPU from the arpu_results
for result in self.arpu_results:
if result["variant"] == variant.get("variant"):
arpu.update({"posterior_mean": result["avg_values"]})
break

arpu.update(
{
"lift": round(
Expand All @@ -242,6 +356,13 @@ def get_reports(self, probs_precision: int = 4):

rev_per_sale = {"variant": variant.get("variant")}
rev_per_sale.update(variant.get("revenue_per_sale"))

# Find the posterior_mean for revenue per sale from the revenue_per_sale_results
for result in self.revenue_per_sale_results:
if result["variant"] == variant.get("variant"):
rev_per_sale.update({"posterior_mean": result["posterior_mean"]})
break

baseline_avg_ticket = baseline_res.get("summary").get("avg_ticket")
variant_avg_ticket = summary["avg_ticket"]
# Handle division by zero
Expand All @@ -263,7 +384,62 @@ def get_reports(self, probs_precision: int = 4):
_df_arpu = pd.DataFrame(arpu_stats)
_df_rev_per_sale = pd.DataFrame(rev_per_sale_stats)

return _df_summary, _df_conv, _df_arpu, _df_rev_per_sale
# Get conversion distributions
conversion_distributions = {}
for variant_name, distribution in self.conversion_distributions.items():
# Sample 500 points from the distribution for visualization
# Using a fixed random seed for reproducibility
np.random.seed(42)
conversion_distributions[variant_name] = distribution.rvs(
size=1000
).tolist()

# Get ARPU distributions
arpu_distributions = self.arpu_distributions

# Get revenue per sale distributions
# For exponential data, the posterior is a Gamma distribution with parameters:
# alpha = a_prior + totals, beta = b_prior + sum_values
revenue_per_sale_distributions = {}
for variant_name, variant_data in zip(
self.revenue_per_sale_test.variant_names, self.revenue_per_sale_results
):
# Get the parameters for the Gamma distribution
# In ExponentialDataTest, the prior is Gamma(1, 0)
a_prior = 1 # Default prior in ExponentialDataTest
b_prior = 0 # Default prior in ExponentialDataTest

# Get the data for this variant
totals = variant_data["totals"]
sum_values = variant_data["sum_values"]

# Calculate the parameters for the posterior Gamma distribution
alpha = a_prior + totals
beta = b_prior + sum_values

# Create samples from the Gamma distribution
np.random.seed(
42 + self.revenue_per_sale_test.variant_names.index(variant_name)
)
if totals > 0: # Only generate samples if there are conversions
revenue_per_sale_distributions[variant_name] = (
stats.gamma(alpha, scale=1 / beta if beta > 0 else 1)
.rvs(size=1000)
.tolist()
)
else:
# If no conversions, use a default distribution
revenue_per_sale_distributions[variant_name] = np.zeros(1000).tolist()

return (
_df_summary,
_df_conv,
_df_arpu,
_df_rev_per_sale,
conversion_distributions,
arpu_distributions,
revenue_per_sale_distributions,
)


# Pydantic models for API
Expand Down Expand Up @@ -321,7 +497,15 @@ async def analyze_experiment(experiment_input: ExperimentInput):

# Get reports
logger.info("Generating experiment reports")
df_summary, df_conv, df_arpu, df_rev_per_sale = experiment.get_reports()
(
df_summary,
df_conv,
df_arpu,
df_rev_per_sale,
conversion_distributions,
arpu_distributions,
revenue_per_sale_distributions,
) = experiment.get_reports()

# Convert DataFrames to dictionaries
summary_dict = df_summary.to_dict(orient="records")
Expand All @@ -335,12 +519,27 @@ async def analyze_experiment(experiment_input: ExperimentInput):
"conversion_stats": conv_dict,
"arpu_stats": arpu_dict,
"revenue_per_sale_stats": rev_per_sale_dict,
"conversion_distributions": conversion_distributions,
"arpu_distributions": arpu_distributions,
"revenue_per_sale_distributions": revenue_per_sale_distributions,
}
except Exception as e:
error_msg = f"Error in experiment analysis: {str(e)}"
stack_trace = traceback.format_exc()
logger.error(f"{error_msg}\n{stack_trace}")
raise HTTPException(status_code=400, detail=error_msg)

# Log more details about the input data for debugging
logger.error(f"Input data that caused the error: {experiment_input.dict()}")

# Return more detailed error information
raise HTTPException(
status_code=400,
detail={
"error": error_msg,
"traceback": stack_trace,
"input_data": experiment_input.dict(),
},
)


@app.get("/health")
Expand Down
Loading
Loading