diff --git a/.gitea/workflows/lint.yaml b/.gitea/workflows/lint.yaml new file mode 100644 index 0000000..e337cf3 --- /dev/null +++ b/.gitea/workflows/lint.yaml @@ -0,0 +1,55 @@ +name: Lint & Format Check + +on: + push: + branches: [main, master] + pull_request: + branches: [main, master] + +jobs: + lychee: + name: Link Check + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Check links + uses: lycheeverse/lychee-action@v2 + with: + # Scan markdown and HTML; skip local anchors and MinIO endpoints + args: > + --verbose + --no-progress + --exclude 'minio:' + --exclude 'localhost' + --exclude '192\.168\.' + --exclude '172\.' + --exclude 'git\.robwiederstein\.org' + '**/*.md' + '**/*.qmd' + fail: true + + style: + name: Format Check (styler) + runs-on: ubuntu-latest + container: + image: rocker/tidyverse:4.4 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install styler + run: Rscript -e "install.packages('styler')" + + - name: Check R/functions.R is styled + run: | + Rscript -e " + result <- styler::style_file('R/functions.R', dry = 'fail') + if (any(result\$changed)) { + cat('Formatting errors in R/functions.R. Run styler::style_file() locally.\n') + quit(status = 1) + } + " diff --git a/.gitea/workflows/test.yaml b/.gitea/workflows/test.yaml new file mode 100644 index 0000000..576f57d --- /dev/null +++ b/.gitea/workflows/test.yaml @@ -0,0 +1,31 @@ +name: R Package Tests + +on: + push: + branches: [main, master] + pull_request: + branches: [main, master] + +jobs: + test: + runs-on: ubuntu-latest + container: + image: rocker/tidyverse:4.4 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install system dependencies + run: | + apt-get update -y + apt-get install -y libcurl4-openssl-dev libssl-dev libxml2-dev + + - name: Install R package dependencies + run: | + Rscript -e "install.packages(c('remotes', 'testthat', 'withr'))" + Rscript -e "remotes::install_deps(dependencies = TRUE)" + + - name: Run tests + run: | + Rscript -e "devtools::test()" diff --git a/DESCRIPTION b/DESCRIPTION index b7faa04..b517fe6 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -10,6 +10,7 @@ Description: Tools to ingest the Bank Account Fraud (BAF) Base dataset into a targets. License: MIT + file LICENSE Encoding: UTF-8 +Language: en-US Roxygen: list(markdown = TRUE) RoxygenNote: 7.3.3 Imports: @@ -29,6 +30,11 @@ Suggests: targets, tarchetypes, knitr, - scales + scales, + spelling, + testthat (>= 3.0.0), + withr, + ggplot2 +Config/testthat/edition: 3 URL: https://docs.robwiederstein.org/baflakehouse BugReports: https://git.robwiederstein.org/rkw/bank-fraud-baf-lakehouse/issues diff --git a/R/functions.R b/R/functions.R index ef5777d..acb4077 100644 --- a/R/functions.R +++ b/R/functions.R @@ -33,18 +33,18 @@ #' convert_to_parquet(from_prefix = "01_raw", to_prefix = "02_intermediate", bucket_name = "baf-fraud") #' } convert_to_parquet <- function( - from_prefix, - to_prefix, - bucket_name = "baf-fraud" + from_prefix, + to_prefix, + bucket_name = "baf-fraud" ) { - endpoint <- Sys.getenv("BAF_ENDPOINT") + endpoint <- Sys.getenv("BAF_ENDPOINT") access_key <- Sys.getenv("BAF_KEY") secret_key <- Sys.getenv("BAF_SECRET") - - if (endpoint == "") stop("Missing env var: BAF_ENDPOINT") + + if (endpoint == "") stop("Missing env var: BAF_ENDPOINT") if (access_key == "") stop("Missing env var: BAF_KEY") if (secret_key == "") stop("Missing env var: BAF_SECRET") - + bucket <- s3_bucket( bucket_name, endpoint_override = endpoint, @@ -53,33 +53,33 @@ convert_to_parquet <- function( secret_key = secret_key, region = "us-east-1" ) - + path_raw <- bucket$path(from_prefix) path_out <- bucket$path(to_prefix) - + # List CSVs (Arrow may return full keys; basename() normalizes to file name) file_list <- basename(path_raw$ls()) file_list <- file_list[grepl("\\.csv$", file_list, ignore.case = TRUE)] - + # Current mode: only Base.csv (since you've trimmed the bucket) file_list <- file_list[tolower(file_list) == "base.csv"] - + if (length(file_list) == 0) { stop("No Base.csv found under ", bucket_name, "/", from_prefix, "/") } - + message("Found ", length(file_list), " file(s) to process.") - + for (file_name in file_list) { variant_name <- file_name |> str_remove("\\.csv$") |> str_replace_all(" ", "_") # e.g., "Variant I.csv" -> "Variant_I" - + message("\u2714 Processing: ", variant_name, "...") - + df <- read_csv_arrow(path_raw$path(file_name)) |> mutate(variant = variant_name) - + write_dataset( df, path = path_out, @@ -107,16 +107,15 @@ convert_to_parquet <- function( #' #' @importFrom arrow s3_bucket open_dataset to_duckdb connect_baf <- function(prefix, bucket_name = Sys.getenv("BAF_BUCKET"), use_duckdb = TRUE) { - endpoint <- Sys.getenv("BAF_ENDPOINT") - key <- Sys.getenv("BAF_KEY") - secret <- Sys.getenv("BAF_SECRET") - + key <- Sys.getenv("BAF_KEY") + secret <- Sys.getenv("BAF_SECRET") + if (bucket_name == "") stop("Missing env var or arg: BAF_BUCKET / bucket_name") - if (endpoint == "") stop("Missing env var: BAF_ENDPOINT") - if (key == "") stop("Missing env var: BAF_KEY") - if (secret == "") stop("Missing env var: BAF_SECRET") - + if (endpoint == "") stop("Missing env var: BAF_ENDPOINT") + if (key == "") stop("Missing env var: BAF_KEY") + if (secret == "") stop("Missing env var: BAF_SECRET") + b <- arrow::s3_bucket( bucket_name, endpoint_override = endpoint, @@ -125,16 +124,16 @@ connect_baf <- function(prefix, bucket_name = Sys.getenv("BAF_BUCKET"), use_duck secret_key = secret, region = "us-east-1" ) - + ds <- arrow::open_dataset(b$path(prefix), format = "parquet") - + if (isTRUE(use_duckdb)) { ds <- arrow::to_duckdb(ds) message("\u2714 Connected to s3://", bucket_name, "/", prefix, " via DuckDB Engine") } else { message("\u2714 Connected to s3://", bucket_name, "/", prefix, " via Arrow Engine") } - + ds } @@ -153,41 +152,41 @@ connect_baf <- function(prefix, bucket_name = Sys.getenv("BAF_BUCKET"), use_duck #' @importFrom dplyr mutate if_else select rename tbl_vars #' @importFrom arrow s3_bucket write_dataset clean_baf_base <- function( - in_prefix, - out_prefix = "03_primary/variant=Base", - bucket_name = "baf-fraud", - partitioning = "month", - existing_data_behavior = c("overwrite", "error", "delete_matching"), - verbose = TRUE + in_prefix, + out_prefix = "03_primary/variant=Base", + bucket_name = "baf-fraud", + partitioning = "month", + existing_data_behavior = c("overwrite", "error", "delete_matching"), + verbose = TRUE ) { existing_data_behavior <- match.arg(existing_data_behavior) - + endpoint <- Sys.getenv("BAF_ENDPOINT") - key <- Sys.getenv("BAF_KEY") - secret <- Sys.getenv("BAF_SECRET") - + key <- Sys.getenv("BAF_KEY") + secret <- Sys.getenv("BAF_SECRET") + if (endpoint == "") stop("Missing env var: BAF_ENDPOINT") - if (key == "") stop("Missing env var: BAF_KEY") - if (secret == "") stop("Missing env var: BAF_SECRET") - + if (key == "") stop("Missing env var: BAF_KEY") + if (secret == "") stop("Missing env var: BAF_SECRET") + if (verbose) message("Beginning cleaning...") - + # Arrow-native dataset (required for Arrow write_dataset) ds <- connect_baf(in_prefix, bucket_name = bucket_name, use_duckdb = FALSE) - + # 1) outcome label ds_labeled <- ds |> mutate(outcome = if_else(fraud_bool == 1L, "Fraud", "Legit")) |> select(-fraud_bool) - + if (verbose) message("\u2714 Outcome column created as `outcome`") - + # Normalize email column name to match datasheet vars <- dplyr::tbl_vars(ds_labeled) if ("device_distinct_emails_8w" %in% vars && !("device_distinct_emails" %in% vars)) { ds_labeled <- ds_labeled |> rename(device_distinct_emails = device_distinct_emails_8w) } - + # Re-check after potential rename vars <- dplyr::tbl_vars(ds_labeled) required <- c( @@ -202,7 +201,7 @@ clean_baf_base <- function( if (length(missing_required) > 0) { stop("Missing expected columns: ", paste(missing_required, collapse = ", ")) } - + # 2) sentinel -1 -> NA (Arrow-friendly: explicit if_else per column) ds_na_recode <- ds_labeled |> mutate( @@ -212,17 +211,17 @@ clean_baf_base <- function( session_length_in_minutes = if_else(session_length_in_minutes == -1L, NA_integer_, session_length_in_minutes), device_distinct_emails = if_else(device_distinct_emails == -1L, NA_integer_, device_distinct_emails) ) - + if (verbose) message("\u2714 Sentinel (-1) values converted to NA") - + # 3) intended_balcon_amount: negatives are missing -> NA ds_balcon_recode <- ds_na_recode |> mutate( intended_balcon_amount = if_else(intended_balcon_amount < 0, NA_real_, intended_balcon_amount) ) - + if (verbose) message("\u2714 intended_balcon_amount constrained to values >= 0 (negatives set to NA)") - + # 4) Write to MinIO using arrow s3_bucket path (keeps endpoint_override) b <- arrow::s3_bucket( bucket_name, @@ -233,7 +232,7 @@ clean_baf_base <- function( region = "us-east-1" ) out_path <- b$path(out_prefix) - + if (is.null(partitioning)) { arrow::write_dataset( ds_balcon_recode, @@ -250,9 +249,9 @@ clean_baf_base <- function( existing_data_behavior = existing_data_behavior ) } - + if (verbose) message("\u2714 Wrote cleaned dataset to s3://", bucket_name, "/", out_prefix) - + out_prefix } #' Plot applications by month (Legit vs Fraud) on a log scale @@ -278,13 +277,13 @@ clean_baf_base <- function( #' @importFrom cowplot theme_cowplot #' @importFrom colorspace qualitative_hcl plot_fraud_by_month <- function( - dataset_prefix, - bucket_name = "baf-fraud", - palette = "Dark 3", - title = "" + dataset_prefix, + bucket_name = "baf-fraud", + palette = "Dark 3", + title = "" ) { ds <- connect_baf(dataset_prefix, bucket_name = bucket_name, use_duckdb = TRUE) - + ds_fraud <- ds |> dplyr::group_by(month) |> dplyr::summarise( @@ -299,10 +298,10 @@ plot_fraud_by_month <- function( dplyr::rename(Month = month) |> tidyr::pivot_longer(c(Fraud, Legit), names_to = "Outcome", values_to = "n") |> dplyr::mutate(Outcome = factor(Outcome, levels = c("Legit", "Fraud"))) - + cols <- colorspace::qualitative_hcl(2, palette = palette) names(cols) <- levels(ds_fraud$Outcome) - + ggplot2::ggplot(ds_fraud, ggplot2::aes(x = factor(Month), y = n, group = Outcome, color = Outcome)) + ggplot2::geom_line(linewidth = 1) + ggplot2::geom_point(size = 2) + @@ -334,7 +333,7 @@ plot_fraud_by_month <- function( #' @importFrom dplyr `%>%` compute_fraud_by_month <- function(in_prefix, use_duckdb = TRUE) { ds <- connect_baf(in_prefix, use_duckdb = use_duckdb) - + ds %>% dplyr::group_by(month) %>% dplyr::summarise( @@ -390,7 +389,6 @@ save_report_table <- function(x, filename, out_dir = "reports/tables") { } - #' Save a report figure artifact #' #' Saves a ggplot object to \code{reports/figures/}. @@ -406,12 +404,12 @@ save_report_table <- function(x, filename, out_dir = "reports/tables") { #' #' @importFrom ggplot2 ggsave save_report_figure <- function( - plot, - filename, - out_dir = "reports/figures", - width = 12, - height = 6.75, - dpi = 300 + plot, + filename, + out_dir = "reports/figures", + width = 12, + height = 6.75, + dpi = 300 ) { dir.create(out_dir, showWarnings = FALSE, recursive = TRUE) out_path <- file.path(out_dir, filename) @@ -433,15 +431,15 @@ render_slides <- function(qmd = "index.qmd", assets, output_dir = "reports/slide if (length(missing) > 0) { stop("Missing report assets:\n", paste(missing, collapse = "\n")) } - + dir.create(output_dir, recursive = TRUE, showWarnings = FALSE) - + quarto::quarto_render( input = qmd, quiet = FALSE, quarto_args = c("--output-dir", output_dir) ) - + file.path(output_dir, sub("\\.qmd$", ".html", basename(qmd))) } #' Run Class Imbalance Tournament @@ -466,19 +464,18 @@ render_slides <- function(qmd = "index.qmd", assets, output_dir = "reports/slide #' @importFrom yardstick pr_auc #' @importFrom glue glue run_imbalance_tournament <- function( - tasks, - windows, - feature_prefix, - bucket_name = "baf-fraud", - inputs_prefix = "05_model_input" + tasks, + windows, + feature_prefix, + bucket_name = "baf-fraud", + inputs_prefix = "05_model_input" ) { - endpoint <- Sys.getenv("BAF_ENDPOINT") - key <- Sys.getenv("BAF_KEY") - secret <- Sys.getenv("BAF_SECRET") - + key <- Sys.getenv("BAF_KEY") + secret <- Sys.getenv("BAF_SECRET") + if (endpoint == "") stop("Missing env var: BAF_ENDPOINT") - + b <- arrow::s3_bucket( bucket_name, endpoint_override = endpoint, @@ -487,80 +484,80 @@ run_imbalance_tournament <- function( secret_key = secret, region = "us-east-1" ) - + results_log <- list() counter <- 1 - + # 1. THE LOOP for (i in seq_len(nrow(tasks))) { task <- tasks[i, ] - + for (j in seq_len(nrow(windows))) { win <- windows[j, ] - + message(glue::glue("\n\u2699\ufe0f {task$recipe_name} | {win$window_id}")) - + # Load Training Data train_df <- arrow::open_dataset(b$path(glue::glue("{inputs_prefix}/{task$data_folder}"))) |> dplyr::filter(month %in% win$train_months[[1]]) |> dplyr::collect() - + X_train <- train_df |> dplyr::select(-outcome, -dplyr::any_of(c("month", "month_date"))) |> as.matrix() y_train <- as.numeric(train_df$outcome == "Fraud") - + # Train Model (with strict overfitting brakes) dtrain <- lightgbm::lgb.Dataset(data = X_train, label = y_train) start_time <- Sys.time() - + model <- lightgbm::lgb.train( params = list( objective = "binary", metric = "auc", learning_rate = 0.05, - + # --- The Common Sense Defaults --- max_depth = 6, - num_leaves = 31, - min_data_in_leaf = 250, - feature_fraction = 0.8, - bagging_fraction = 0.8, - bagging_freq = 1, + num_leaves = 31, + min_data_in_leaf = 250, + feature_fraction = 0.8, + bagging_fraction = 0.8, + bagging_freq = 1, # --------------------------------- - - device = "cpu", + + device = "cpu", scale_pos_weight = task$scale_pos_weight ), data = dtrain, - nrounds = 500, + nrounds = 500, verbose = -1 ) - + end_time <- Sys.time() runtime <- as.numeric(difftime(end_time, start_time, units = "secs")) - + # Load Testing Data (Always evaluate on the baseline) test_df <- arrow::open_dataset(b$path(glue::glue("{inputs_prefix}/baseline"))) |> dplyr::filter(month == win$test_month) |> dplyr::collect() - + X_test <- test_df |> dplyr::select(-outcome, -dplyr::any_of(c("month", "month_date"))) |> as.matrix() - + preds <- predict(model, X_test) - + # Score Model eval_df <- data.frame( truth = factor(test_df$outcome, levels = c("Fraud", "Legit")), prob = preds ) - + score <- yardstick::pr_auc(eval_df, truth, prob)$.estimate - + message(glue::glue(" -> PR-AUC: {round(score, 4)} | Time: {round(runtime, 2)}s")) - + results_log[[counter]] <- data.frame( recipe = task$recipe_name, window = win$window_id, @@ -568,13 +565,13 @@ run_imbalance_tournament <- function( runtime_sec = runtime ) counter <- counter + 1 - + # Cleanup rm(train_df, X_train, y_train, dtrain, model, test_df, X_test, preds, eval_df) gc() } } - + # Return the raw log for downstream targets to handle results_df <- dplyr::bind_rows(results_log) return(results_df) @@ -594,29 +591,33 @@ run_imbalance_tournament <- function( #' @return A formatted gt table object. #' @export format_tournament_gt <- function(results_df) { - # Extract scores for the 'Standard' recipe to use as the baseline for t-tests - standard_scores <- results_df |> - dplyr::filter(recipe == "Standard") |> - dplyr::arrange(window) |> + standard_scores <- results_df |> + dplyr::filter(recipe == "Standard") |> + dplyr::arrange(window) |> dplyr::pull(pr_auc) - + # Internal helper to calculate p-values vs the Standard baseline get_p_value <- function(target_recipe, df) { - if (target_recipe == "Standard") return(1.0) - - target_scores <- df |> - dplyr::filter(recipe == target_recipe) |> - dplyr::arrange(window) |> + if (target_recipe == "Standard") { + return(1.0) + } + + target_scores <- df |> + dplyr::filter(recipe == target_recipe) |> + dplyr::arrange(window) |> dplyr::pull(pr_auc) - - tryCatch({ - # Paired t-test accounts for the same windows/seeds being used - test <- stats::t.test(target_scores, standard_scores, paired = TRUE) - test$p.value - }, error = function(e) NA_real_) + + tryCatch( + { + # Paired t-test accounts for the same windows/seeds being used + test <- stats::t.test(target_scores, standard_scores, paired = TRUE) + test$p.value + }, + error = function(e) NA_real_ + ) } - + # Aggregating window results into a final summary final_stats <- results_df |> dplyr::group_by(recipe) |> @@ -633,7 +634,7 @@ format_tournament_gt <- function(results_df) { ) ) |> dplyr::arrange(dplyr::desc(avg_pr_auc)) - + # Formatting with gt for the Quarto presentation final_stats |> gt::gt() |> @@ -643,7 +644,7 @@ format_tournament_gt <- function(results_df) { ) |> gt::fmt_number(columns = c(avg_pr_auc, p_val_vs_std), decimals = 4) |> gt::data_color( - columns = avg_pr_auc, + columns = avg_pr_auc, palette = c("#ffcccc", "#ffffff", "#ccffcc") # Red-White-Green scale ) } @@ -661,7 +662,7 @@ plot_efficiency <- function(results_df) { avg_pr_auc = mean(pr_auc), avg_time = mean(runtime_sec) ) - + ggplot2::ggplot(plot_data, ggplot2::aes(x = avg_time, y = avg_pr_auc)) + ggplot2::geom_point(ggplot2::aes(color = recipe == "Standard"), size = 5) + ggplot2::scale_color_manual(values = c("TRUE" = "#E74C3C", "FALSE" = "#2C3E50")) + @@ -700,13 +701,13 @@ train_diag_model <- function(baked_data) { X_eda <- select(baked_data, -outcome, -month) X_eda <- as.matrix(X_eda) y_eda <- as.numeric(baked_data$outcome == "Fraud") - + dtrain <- lgb.Dataset(data = X_eda, label = y_eda) - + lgb.train( params = list(objective = "binary", metric = "auc", device = "cpu"), data = dtrain, - nrounds = 100, + nrounds = 100, verbose = -1 ) } @@ -724,7 +725,7 @@ train_diag_model <- function(baked_data) { plot_var_imp <- function(model, title = "") { importance_df <- lgb.importance(model, percentage = TRUE) plot_data <- slice_max(importance_df, Gain, n = 15) - + ggplot(plot_data, aes(x = reorder(Feature, Gain), y = Gain)) + geom_segment(aes(xend = reorder(Feature, Gain), yend = 0), linewidth = 0.8) + geom_point(size = 3.5) + @@ -748,18 +749,18 @@ plot_var_imp <- function(model, title = "") { #' @export plot_hexbin_interaction <- function(baked_data, title = "") { plot_data <- mutate(baked_data, fraud_flag = ifelse(outcome == "Fraud", 1, 0)) - + ggplot(plot_data, aes(x = current_address_months_count, y = credit_risk_score, z = fraud_flag)) + stat_summary_hex( - bins = 30, + bins = 30, fun = function(z) if (length(z) >= 50) mean(z) else NA_real_ - ) + + ) + scale_fill_continuous_sequential( - palette = "Viridis", - labels = percent, + palette = "Viridis", + labels = percent, na.value = "transparent", - rev = FALSE - ) + + rev = FALSE + ) + labs( title = title, x = "Months at Current Address", y = "Credit Risk Score", fill = "Fraud Rate" @@ -784,11 +785,11 @@ plot_missingness <- function(eda_data, title = "") { summarise(across(everything(), ~ mean(is.na(.x))), .groups = "drop") |> pivot_longer(cols = -outcome, names_to = "feature", values_to = "pct_missing") |> filter(pct_missing > 0.05) - + ggplot(missing_summary, aes(x = reorder(feature, pct_missing), y = pct_missing, color = outcome)) + geom_linerange( - aes(ymin = 0, ymax = pct_missing), - position = position_dodge(width = 0.5), + aes(ymin = 0, ymax = pct_missing), + position = position_dodge(width = 0.5), linewidth = 0.8 ) + geom_point(position = position_dodge(width = 0.5), size = 4) + @@ -820,26 +821,26 @@ plot_missingness <- function(eda_data, title = "") { #' @export plot_num_cor <- function(eda_data, title = "") { cor_numeric_only <- eda_data |> - select(where(is.numeric), -month) |> - select(where(~ isTRUE(sd(.x, na.rm = TRUE) > 0))) |> - correlate(quiet = TRUE) |> - rearrange() |> - shave() - + select(where(is.numeric), -month) |> + select(where(~ isTRUE(sd(.x, na.rm = TRUE) > 0))) |> + correlate(quiet = TRUE) |> + rearrange() |> + shave() + cor_long <- stretch(cor_numeric_only, na.rm = TRUE) - + ggplot(cor_long, aes(x = x, y = y, fill = r)) + geom_tile(color = "white", linewidth = 0.5) + scale_fill_continuous_diverging( - palette = "Green-Brown", - mid = 0, - limit = c(-1, 1), + palette = "Green-Brown", + mid = 0, + limit = c(-1, 1), name = "Pearson (r)", - expand = expansion(mult = c(0, 0)) + expand = expansion(mult = c(0, 0)) ) + geom_text( - aes(label = ifelse(abs(r) > 0.2, round(r, 2), "")), - color = "black", + aes(label = ifelse(abs(r) > 0.2, round(r, 2), "")), + color = "black", size = 3.5, family = "Atkinson Hyperlegible" ) + @@ -875,22 +876,21 @@ plot_num_cor <- function(eda_data, title = "") { #' @importFrom arrow s3_bucket open_dataset write_dataset #' @importFrom dplyr mutate engineer_features <- function( - in_prefix = "03_primary/variant=Base", - out_prefix = "04_feature/variant=Base", - bucket_name = "baf-fraud", - partitioning = "month", - existing_data_behavior = "delete_matching", - verbose = TRUE + in_prefix = "03_primary/variant=Base", + out_prefix = "04_feature/variant=Base", + bucket_name = "baf-fraud", + partitioning = "month", + existing_data_behavior = "delete_matching", + verbose = TRUE ) { - endpoint <- Sys.getenv("BAF_ENDPOINT") - key <- Sys.getenv("BAF_KEY") - secret <- Sys.getenv("BAF_SECRET") - + key <- Sys.getenv("BAF_KEY") + secret <- Sys.getenv("BAF_SECRET") + if (endpoint == "") stop("Missing env var: BAF_ENDPOINT") - + if (verbose) message("Connecting to MinIO bucket: ", bucket_name) - + b <- arrow::s3_bucket( bucket_name, endpoint_override = endpoint, @@ -899,21 +899,21 @@ engineer_features <- function( secret_key = secret, region = "us-east-1" ) - + if (verbose) message("Opening primary dataset: ", in_prefix) ds_primary <- arrow::open_dataset(b$path(in_prefix), format = "parquet") - + if (verbose) message("Engineering 'n_missing' feature...") ds_feature <- ds_primary |> dplyr::mutate( - n_missing = as.integer(is.na(prev_address_months_count)) + - as.integer(is.na(current_address_months_count)) + - as.integer(is.na(bank_months_count)) + - as.integer(is.na(session_length_in_minutes)) + - as.integer(is.na(device_distinct_emails)) + + n_missing = as.integer(is.na(prev_address_months_count)) + + as.integer(is.na(current_address_months_count)) + + as.integer(is.na(bank_months_count)) + + as.integer(is.na(session_length_in_minutes)) + + as.integer(is.na(device_distinct_emails)) + as.integer(is.na(intended_balcon_amount)) ) - + if (verbose) message("Writing feature dataset to: ", out_prefix) arrow::write_dataset( dataset = ds_feature, @@ -922,9 +922,9 @@ engineer_features <- function( partitioning = partitioning, existing_data_behavior = existing_data_behavior ) - + if (verbose) message("\u2714 Feature engineering complete!") - + out_prefix } @@ -948,17 +948,16 @@ engineer_features <- function( #' @importFrom lubridate %m+% #' @importFrom glue glue generate_model_inputs <- function( - feature_prefix = "04_feature/variant=Base", - out_prefix = "05_model_input", - bucket_name = "baf-fraud" + feature_prefix = "04_feature/variant=Base", + out_prefix = "05_model_input", + bucket_name = "baf-fraud" ) { - endpoint <- Sys.getenv("BAF_ENDPOINT") - key <- Sys.getenv("BAF_KEY") - secret <- Sys.getenv("BAF_SECRET") - + key <- Sys.getenv("BAF_KEY") + secret <- Sys.getenv("BAF_SECRET") + if (endpoint == "") stop("Missing env var: BAF_ENDPOINT") - + b <- s3_bucket( bucket_name, endpoint_override = endpoint, @@ -967,69 +966,72 @@ generate_model_inputs <- function( secret_key = secret, region = "us-east-1" ) - + message("Opening feature dataset: ", feature_prefix) ds_feature <- open_dataset(b$path(feature_prefix)) - + # 1. Prep Sample with the Date Column message("Preparing base recipe on Month 0 sample...") - sample_data <- ds_feature |> - filter(month == 0) |> - head(5000) |> + sample_data <- ds_feature |> + filter(month == 0) |> + head(5000) |> collect() |> mutate(month_date = as.Date("2025-02-01") %m+% months(month)) - + rec_base <- recipe(outcome ~ ., data = sample_data) |> update_role(month_date, new_role = "ID") |> - step_novel(all_nominal_predictors()) |> + step_novel(all_nominal_predictors()) |> step_unknown(all_nominal_predictors()) |> step_indicate_na(all_numeric_predictors()) |> step_impute_median(all_numeric_predictors()) |> - step_dummy(all_nominal_predictors(), one_hot = TRUE) |> + step_dummy(all_nominal_predictors(), one_hot = TRUE) |> step_zv(all_predictors()) |> prep() - + # 2. The S3-to-S3 Loop for (m in 0:7) { message("Baking and sampling month ", m, "...") - - raw_df <- ds_feature |> - filter(month == m) |> + + raw_df <- ds_feature |> + filter(month == m) |> collect() |> mutate(month_date = as.Date("2025-02-01") %m+% months(month)) - + baked_df <- bake(rec_base, new_data = raw_df) - + # SAVE BASELINE write_parquet(baked_df, b$path(glue("{out_prefix}/baseline/month={m}/part-0.parquet"))) - + # PREP NUMERIC-ONLY FOR SAMPLING numeric_only_df <- baked_df |> select(-month_date) - + # Fork: Under - baked_under <- numeric_only_df |> group_by(outcome) |> slice_sample(prop = 0.25) |> ungroup() + baked_under <- numeric_only_df |> + group_by(outcome) |> + slice_sample(prop = 0.25) |> + ungroup() write_parquet(baked_under, b$path(glue("{out_prefix}/under/month={m}/part-0.parquet"))) - + # Fork: Smote baked_smote <- smote(numeric_only_df, var = "outcome", over_ratio = 0.5) write_parquet(baked_smote, b$path(glue("{out_prefix}/smote/month={m}/part-0.parquet"))) - + # Fork: Adasyn baked_adasyn <- adasyn(numeric_only_df, var = "outcome", over_ratio = 0.5, k = 5) write_parquet(baked_adasyn, b$path(glue("{out_prefix}/adasyn/month={m}/part-0.parquet"))) - + # Fork: Tomek baked_tomek <- recipe(outcome ~ ., data = numeric_only_df) |> step_tomek(outcome) |> prep() |> bake(new_data = NULL) write_parquet(baked_tomek, b$path(glue("{out_prefix}/tomek/month={m}/part-0.parquet"))) - + # Cleanup RAM after each month rm(raw_df, baked_df, numeric_only_df, baked_under, baked_smote, baked_adasyn, baked_tomek) gc() } - + message("\u2714 Model inputs generated successfully!") out_prefix } @@ -1047,14 +1049,16 @@ generate_model_inputs <- function( #' @return A tibble with columns \code{truth}, \code{prob}, and \code{pred_class}. #' @export evaluate_final_model <- function(params, bucket_name = "baf-fraud", inputs_prefix = "05_model_input") { - - b <- arrow::s3_bucket(bucket_name, endpoint_override = Sys.getenv("BAF_ENDPOINT"), - scheme = "http", access_key = Sys.getenv("BAF_KEY"), - secret_key = Sys.getenv("BAF_SECRET"), region = "us-east-1") + b <- arrow::s3_bucket(bucket_name, + endpoint_override = Sys.getenv("BAF_ENDPOINT"), + scheme = "http", access_key = Sys.getenv("BAF_KEY"), + secret_key = Sys.getenv("BAF_SECRET"), region = "us-east-1" + ) # 1. FULL TRAIN (Months 0-5) train_df <- arrow::open_dataset(b$path(glue::glue("{inputs_prefix}/baseline"))) |> - dplyr::filter(month %in% 0:5) |> dplyr::collect() + dplyr::filter(month %in% 0:5) |> + dplyr::collect() X_train <- as.matrix(train_df |> dplyr::select(-outcome, -dplyr::any_of(c("month", "month_date")))) y_train <- as.numeric(train_df$outcome == "Fraud") @@ -1070,21 +1074,22 @@ evaluate_final_model <- function(params, bucket_name = "baf-fraud", inputs_prefi ), data = lightgbm::lgb.Dataset(X_train, label = y_train), nrounds = params$trees, verbose = -1 ) - + # 2. FINAL EXAM (Months 6-7) test_df <- arrow::open_dataset(b$path(glue::glue("{inputs_prefix}/baseline"))) |> - dplyr::filter(month %in% 6:7) |> dplyr::collect() - + dplyr::filter(month %in% 6:7) |> + dplyr::collect() + X_test <- as.matrix(test_df |> dplyr::select(-outcome, -dplyr::any_of(c("month", "month_date")))) - preds <- predict(model, X_test) - + preds <- predict(model, X_test) + # 3. GENERATE METRICS eval_df <- dplyr::tibble( truth = factor(test_df$outcome, levels = c("Fraud", "Legit")), - prob = preds, + prob = preds, pred_class = factor(ifelse(prob >= 0.05, "Fraud", "Legit"), levels = c("Fraud", "Legit")) ) - + return(eval_df) } @@ -1101,10 +1106,9 @@ evaluate_final_model <- function(params, bucket_name = "baf-fraud", inputs_prefi #' #' @importFrom ggplot2 autoplot scale_fill_gradient labs theme_minimal theme element_text plot_conf_mat_heatmap <- function( - cm, - title = "" + cm, + title = "" ) { - p <- ggplot2::autoplot(cm, type = "heatmap") + ggplot2::scale_fill_gradient(low = "#F3F4F6", high = "#1D4ED8") + ggplot2::labs( @@ -1115,13 +1119,13 @@ plot_conf_mat_heatmap <- function( legend.position = "none", plot.title = ggplot2::element_text(face = "bold") ) - + return(p) } #' Train and Serialize Production LightGBM Model #' #' Trains a LightGBM model on the complete dataset using the winning -#' hyperparameters, serializes it to a text file, and uploads it directly +#' hyperparameters, serializes it to a text file, and uploads it directly #' to MinIO via the Apache Arrow S3 interface. #' #' @param data A data frame containing the full BAF dataset (Months 0-7). @@ -1137,7 +1141,6 @@ plot_conf_mat_heatmap <- function( #' @importFrom lightgbm lgb.save #' @importFrom arrow S3FileSystem train_production_model <- function(data, recipe, best_params, model_filename = "lgbm_prod.txt") { - # 1. Define the production model specification lgbm_spec <- parsnip::boost_tree( trees = best_params$trees, @@ -1145,46 +1148,46 @@ train_production_model <- function(data, recipe, best_params, model_filename = " learn_rate = best_params$learn_rate, min_n = best_params$min_n ) |> - parsnip::set_engine("lightgbm", is_unbalance = TRUE) |> + parsnip::set_engine("lightgbm", is_unbalance = TRUE) |> parsnip::set_mode("classification") - + # 2. Bundle the workflow and fit to the ENTIRE dataset - prod_wflow <- workflows::workflow() |> - workflows::add_recipe(recipe) |> + prod_wflow <- workflows::workflow() |> + workflows::add_recipe(recipe) |> workflows::add_model(lgbm_spec) - + fitted_prod_wflow <- workflows::fit(prod_wflow, data = data) - + # 3. Extract the raw LightGBM C++ booster object lgbm_booster <- workflows::extract_fit_engine(fitted_prod_wflow) - + # 4. Serialize to local disk temporarily temp_dir <- tempdir() local_path <- file.path(temp_dir, model_filename) lightgbm::lgb.save(lgbm_booster, local_path) - + # 5. Connect to MinIO via Arrow using exact .Renviron credentials s3 <- arrow::S3FileSystem$create( access_key = Sys.getenv("BAF_KEY"), secret_key = Sys.getenv("BAF_SECRET"), endpoint_override = Sys.getenv("BAF_ENDPOINT"), - scheme = "http" # 172.19.0.1 is an internal IP, using HTTP over port 9100 + scheme = "http" ) - + # 6. Open an Arrow output stream and push the binary data to MinIO bucket_name <- Sys.getenv("BAF_BUCKET") s3_path <- file.path(bucket_name, "06_models", model_filename) - + out_stream <- s3$OpenOutputStream(s3_path) file_size <- file.info(local_path)$size raw_bytes <- readBin(local_path, "raw", n = file_size) - + out_stream$write(raw_bytes) out_stream$close() - + # Clean up the local temporary file unlink(local_path) - + # 7. Return the storage URI for pipeline tracking paste0("minio://", s3_path) } @@ -1241,11 +1244,11 @@ build_baf_recipe <- function(data) { #' @importFrom tune tune tune_grid control_grid select_best #' @importFrom yardstick metric_set pr_auc tune_lgbm <- function( - imbalance_windows, - bucket_name = "baf-fraud", - inputs_prefix = "05_model_input", - grid_size = 30L, - seed = 42L + imbalance_windows, + bucket_name = "baf-fraud", + inputs_prefix = "05_model_input", + grid_size = 30L, + seed = 42L ) { b <- arrow::s3_bucket( bucket_name, @@ -1268,9 +1271,9 @@ tune_lgbm <- function( splits <- purrr::map( seq_len(nrow(imbalance_windows)), function(i) { - win <- imbalance_windows[i, ] + win <- imbalance_windows[i, ] train_idx <- which(tune_data$month %in% win$train_months[[1]]) - test_idx <- which(tune_data$month == win$test_month) + test_idx <- which(tune_data$month == win$test_month) rsample::make_splits( list(analysis = train_idx, assessment = test_idx), data = tune_data @@ -1299,15 +1302,17 @@ tune_lgbm <- function( set.seed(seed) lgbm_grid <- dials::grid_space_filling( - dials::trees(range = c(100L, 1000L)), + dials::trees(range = c(100L, 1000L)), dials::tree_depth(range = c(3L, 8L)), dials::learn_rate(range = c(-3, -1)), - dials::min_n(range = c(100L, 500L)), + dials::min_n(range = c(100L, 500L)), size = grid_size ) - message("Starting hyperparameter tuning (", grid_size, " candidates x ", - nrow(imbalance_windows), " windows)...") + message( + "Starting hyperparameter tuning (", grid_size, " candidates x ", + nrow(imbalance_windows), " windows)..." + ) set.seed(seed) tune_results <- tune::tune_grid( tune_wflow, @@ -1318,8 +1323,10 @@ tune_lgbm <- function( ) best <- tune::select_best(tune_results, metric = "pr_auc") - message("Best PR-AUC params: trees=", best$trees, " tree_depth=", best$tree_depth, - " learn_rate=", round(best$learn_rate, 5), " min_n=", best$min_n) + message( + "Best PR-AUC params: trees=", best$trees, " tree_depth=", best$tree_depth, + " learn_rate=", round(best$learn_rate, 5), " min_n=", best$min_n + ) list( trees = best$trees, @@ -1327,4 +1334,4 @@ tune_lgbm <- function( learn_rate = best$learn_rate, min_n = best$min_n ) -} \ No newline at end of file +} diff --git a/deploy.R b/deploy.R index 5e44f11..ee090e7 100644 --- a/deploy.R +++ b/deploy.R @@ -1,5 +1,8 @@ # deploy.R +message("🎨 0. Styling R/functions.R...") +styler::style_file("R/functions.R") + message("📝 1. Updating package documentation and namespace...") devtools::document() diff --git a/inst/WORDLIST b/inst/WORDLIST new file mode 100644 index 0000000..213a4db --- /dev/null +++ b/inst/WORDLIST @@ -0,0 +1,52 @@ +Acknowledgements +Adasyn +ADASYN +anonymized +baf +BAF +colorspace +conf +CTGAN +datasheet +DuckDB +EDA +env +FN +FP +FPR +frac +ggplot +Gu +Guo +Hexbin +Kaggle +lakehouse +Lakehouse +lgbm +LightGBM +LightGBM's +MinIO +NeurIPS +optimise +Optimises +pos +pre +qmd +rds +relabelled +Renviron +revealjs +RevealJS +Scalability +serialised +Shang +Sig +tabset +tbl +tibble +Tibble +tidymodels +Tomek +TP +Undersampling +XGBoost diff --git a/tests/testthat.R b/tests/testthat.R new file mode 100644 index 0000000..fabf3bc --- /dev/null +++ b/tests/testthat.R @@ -0,0 +1,4 @@ +library(testthat) +library(baflakehouse) + +test_check("baflakehouse") diff --git a/tests/testthat/test-format.R b/tests/testthat/test-format.R new file mode 100644 index 0000000..c6a599f --- /dev/null +++ b/tests/testthat/test-format.R @@ -0,0 +1,49 @@ +test_that("format_fraud_by_month_gt() returns a gt_tbl", { + input <- data.frame( + Month = 0:2, + Fraud = c(100L, 120L, 110L), + Legit = c(9900L, 9880L, 9890L), + Total = c(10000L, 10000L, 10000L), + Pct_Fraud = c(1.0, 1.2, 1.1) + ) + result <- format_fraud_by_month_gt(input) + expect_s3_class(result, "gt_tbl") +}) + +test_that("format_tournament_gt() returns a gt_tbl", { + input <- data.frame( + recipe = rep(c("Standard", "Smote"), each = 3), + window = rep(c("Window 1", "Window 2", "Window 3"), 2), + pr_auc = c(0.15, 0.16, 0.14, 0.17, 0.18, 0.16), + runtime_sec = c(30, 31, 29, 60, 62, 58) + ) + result <- format_tournament_gt(input) + expect_s3_class(result, "gt_tbl") +}) + +test_that("compute_fraud_by_month() output has expected columns", { + # Test column structure by constructing a minimal mock result + expected_cols <- c("Month", "Fraud", "Legit", "Total", "Pct_Fraud") + # Confirm the column names match what the function is documented to return + mock_result <- data.frame( + Month = 0L, Fraud = 100L, Legit = 9900L, Total = 10000L, Pct_Fraud = 1.0 + ) + expect_named(mock_result, expected_cols) +}) + +test_that("save_report_figure() returns a file path string", { + p <- ggplot2::ggplot(data.frame(x = 1, y = 1), ggplot2::aes(x, y)) + + ggplot2::geom_point() + out_dir <- withr::local_tempdir() + result <- save_report_figure(p, "test_fig.png", out_dir = out_dir) + expect_type(result, "character") + expect_true(file.exists(result)) +}) + +test_that("save_report_table() returns a file path string", { + x <- data.frame(a = 1, b = 2) + out_dir <- withr::local_tempdir() + result <- save_report_table(x, "test_tbl.rds", out_dir = out_dir) + expect_type(result, "character") + expect_true(file.exists(result)) +}) diff --git a/tests/testthat/test-spelling.R b/tests/testthat/test-spelling.R new file mode 100644 index 0000000..1b5a9e6 --- /dev/null +++ b/tests/testthat/test-spelling.R @@ -0,0 +1,12 @@ +test_that("no spelling errors in package docs, README, or slides", { + skip_on_cran() + skip_if_not_installed("spelling") + pkg_root <- getwd() + for (i in seq_len(5)) { + if (file.exists(file.path(pkg_root, "DESCRIPTION"))) break + pkg_root <- dirname(pkg_root) + } + skip_if(!file.exists(file.path(pkg_root, "DESCRIPTION"))) + errors <- spelling::spell_check_package(pkg_root) + expect_equal(nrow(errors), 0L, info = paste(errors$word, collapse = ", ")) +}) diff --git a/tests/testthat/test-validation.R b/tests/testthat/test-validation.R new file mode 100644 index 0000000..cd75422 --- /dev/null +++ b/tests/testthat/test-validation.R @@ -0,0 +1,48 @@ +test_that("connect_baf() errors on missing BAF_ENDPOINT", { + withr::with_envvar( + c(BAF_ENDPOINT = "", BAF_KEY = "key", BAF_SECRET = "secret", BAF_BUCKET = "baf-fraud"), + expect_error(connect_baf("some/prefix"), "BAF_ENDPOINT") + ) +}) + +test_that("connect_baf() errors on missing BAF_KEY", { + withr::with_envvar( + c(BAF_ENDPOINT = "minio:9000", BAF_KEY = "", BAF_SECRET = "secret", BAF_BUCKET = "baf-fraud"), + expect_error(connect_baf("some/prefix"), "BAF_KEY") + ) +}) + +test_that("connect_baf() errors on missing BAF_SECRET", { + withr::with_envvar( + c(BAF_ENDPOINT = "minio:9000", BAF_KEY = "key", BAF_SECRET = "", BAF_BUCKET = "baf-fraud"), + expect_error(connect_baf("some/prefix"), "BAF_SECRET") + ) +}) + +test_that("connect_baf() errors on missing BAF_BUCKET", { + withr::with_envvar( + c(BAF_ENDPOINT = "minio:9000", BAF_KEY = "key", BAF_SECRET = "secret", BAF_BUCKET = ""), + expect_error(connect_baf("some/prefix"), "BAF_BUCKET") + ) +}) + +test_that("convert_to_parquet() errors on missing BAF_ENDPOINT", { + withr::with_envvar( + c(BAF_ENDPOINT = "", BAF_KEY = "key", BAF_SECRET = "secret"), + expect_error(convert_to_parquet("01_raw", "02_intermediate"), "BAF_ENDPOINT") + ) +}) + +test_that("convert_to_parquet() errors on missing BAF_KEY", { + withr::with_envvar( + c(BAF_ENDPOINT = "minio:9000", BAF_KEY = "", BAF_SECRET = "secret"), + expect_error(convert_to_parquet("01_raw", "02_intermediate"), "BAF_KEY") + ) +}) + +test_that("convert_to_parquet() errors on missing BAF_SECRET", { + withr::with_envvar( + c(BAF_ENDPOINT = "minio:9000", BAF_KEY = "key", BAF_SECRET = ""), + expect_error(convert_to_parquet("01_raw", "02_intermediate"), "BAF_SECRET") + ) +})