Refactor bucket structure: baf-fraud/ prefix under lake bucket
All functions now default to bucket_name = "lake" with "baf-fraud/" prepended to all layer prefixes, matching the contemporary lakehouse naming convention (one bucket per environment, project as prefix). Migration: copy baf-fraud/ data to lake/baf-fraud/ on analyticsvm, update BAF_BUCKET env var from "baf-fraud" to "lake". Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -13,7 +13,7 @@
|
||||
#'
|
||||
#' @param from_prefix Character. Prefix/key under the bucket containing CSVs (e.g. \code{"01_raw"}).
|
||||
#' @param to_prefix Character. Prefix/key under the bucket to write Parquet dataset (e.g. \code{"02_intermediate"}).
|
||||
#' @param bucket_name Character. Bucket name. Default \code{"baf-fraud"}.
|
||||
#' @param bucket_name Character. Bucket name. Default \code{"lake"}.
|
||||
#'
|
||||
#' @return A character string giving the destination dataset prefix (typically \code{to_prefix}).
|
||||
#'
|
||||
@@ -30,12 +30,12 @@
|
||||
#' BAF_KEY = "YOUR_ACCESS_KEY",
|
||||
#' BAF_SECRET = "YOUR_SECRET_KEY"
|
||||
#' )
|
||||
#' convert_to_parquet(from_prefix = "01_raw", to_prefix = "02_intermediate", bucket_name = "baf-fraud")
|
||||
#' convert_to_parquet(from_prefix = "01_raw", to_prefix = "02_intermediate", bucket_name = "lake")
|
||||
#' }
|
||||
convert_to_parquet <- function(
|
||||
from_prefix,
|
||||
to_prefix,
|
||||
bucket_name = "baf-fraud"
|
||||
bucket_name = "lake"
|
||||
) {
|
||||
endpoint <- Sys.getenv("BAF_ENDPOINT")
|
||||
access_key <- Sys.getenv("BAF_KEY")
|
||||
@@ -141,7 +141,7 @@ connect_baf <- function(prefix, bucket_name = Sys.getenv("BAF_BUCKET"), use_duck
|
||||
#'
|
||||
#' @param in_prefix Character. Input dataset prefix inside bucket (e.g. "02_intermediate/variant=Base").
|
||||
#' @param out_prefix Character. Output dataset prefix inside bucket (e.g. "03_primary/variant=Base").
|
||||
#' @param bucket_name Character. Bucket name. Default "baf-fraud".
|
||||
#' @param bucket_name Character. Bucket name. Default "lake".
|
||||
#' @param partitioning Character vector of columns to partition by. Default "month". Set NULL to disable.
|
||||
#' @param existing_data_behavior One of "overwrite", "error", "delete_matching". Default "overwrite".
|
||||
#' @param verbose Logical. Emit progress messages. Default TRUE.
|
||||
@@ -153,8 +153,8 @@ connect_baf <- function(prefix, bucket_name = Sys.getenv("BAF_BUCKET"), use_duck
|
||||
#' @importFrom arrow s3_bucket write_dataset
|
||||
clean_baf_base <- function(
|
||||
in_prefix,
|
||||
out_prefix = "03_primary/variant=Base",
|
||||
bucket_name = "baf-fraud",
|
||||
out_prefix = "baf-fraud/03_primary/variant=Base",
|
||||
bucket_name = "lake",
|
||||
partitioning = "month",
|
||||
existing_data_behavior = c("overwrite", "error", "delete_matching"),
|
||||
verbose = TRUE
|
||||
@@ -264,7 +264,7 @@ clean_baf_base <- function(
|
||||
#' stored in MinIO/S3, accessed via \code{connect_baf()}.
|
||||
#'
|
||||
#' @param dataset_prefix Character. Prefix inside the bucket, e.g. "03_primary/variant=Base".
|
||||
#' @param bucket_name Character. Bucket name. Default "baf-fraud".
|
||||
#' @param bucket_name Character. Bucket name. Default "lake".
|
||||
#' @param palette Character. colorspace qualitative palette name. Default "Dark 3".
|
||||
#' @param title Character. Plot title. Default "".
|
||||
#'
|
||||
@@ -278,7 +278,7 @@ clean_baf_base <- function(
|
||||
#' @importFrom colorspace qualitative_hcl
|
||||
plot_fraud_by_month <- function(
|
||||
dataset_prefix,
|
||||
bucket_name = "baf-fraud",
|
||||
bucket_name = "lake",
|
||||
palette = "Dark 3",
|
||||
title = ""
|
||||
) {
|
||||
@@ -452,7 +452,7 @@ render_slides <- function(qmd = "index.qmd", assets, output_dir = "reports/slide
|
||||
#' @param tasks A tibble containing recipe_name, data_folder, and scale_pos_weight.
|
||||
#' @param windows A tibble containing window_id, train_months, and test_month.
|
||||
#' @param feature_prefix Character. The upstream dependency prefix (used to force DAG execution).
|
||||
#' @param bucket_name Character. Bucket name. Default "baf-fraud".
|
||||
#' @param bucket_name Character. Bucket name. Default "lake".
|
||||
#' @param inputs_prefix Character. The folder containing the sampled data. Default "05_model_input".
|
||||
#'
|
||||
#' @return A tibble with the summarized tournament results.
|
||||
@@ -467,8 +467,8 @@ run_imbalance_tournament <- function(
|
||||
tasks,
|
||||
windows,
|
||||
feature_prefix,
|
||||
bucket_name = "baf-fraud",
|
||||
inputs_prefix = "05_model_input"
|
||||
bucket_name = "lake",
|
||||
inputs_prefix = "baf-fraud/05_model_input"
|
||||
) {
|
||||
endpoint <- Sys.getenv("BAF_ENDPOINT")
|
||||
key <- Sys.getenv("BAF_KEY")
|
||||
@@ -865,7 +865,7 @@ plot_num_cor <- function(eda_data, title = "") {
|
||||
#'
|
||||
#' @param in_prefix Character. Input dataset prefix (e.g., "03_primary/variant=Base").
|
||||
#' @param out_prefix Character. Output dataset prefix (e.g., "04_feature/variant=Base").
|
||||
#' @param bucket_name Character. The S3/MinIO bucket name. Default "baf-fraud".
|
||||
#' @param bucket_name Character. The S3/MinIO bucket name. Default "lake".
|
||||
#' @param partitioning Character vector. Columns to partition by. Default "month".
|
||||
#' @param existing_data_behavior Character. Behavior when data exists. Default "delete_matching".
|
||||
#' @param verbose Logical. Whether to print progress messages. Default TRUE.
|
||||
@@ -876,9 +876,9 @@ plot_num_cor <- function(eda_data, title = "") {
|
||||
#' @importFrom arrow s3_bucket open_dataset write_dataset
|
||||
#' @importFrom dplyr mutate
|
||||
engineer_features <- function(
|
||||
in_prefix = "03_primary/variant=Base",
|
||||
out_prefix = "04_feature/variant=Base",
|
||||
bucket_name = "baf-fraud",
|
||||
in_prefix = "baf-fraud/03_primary/variant=Base",
|
||||
out_prefix = "baf-fraud/04_feature/variant=Base",
|
||||
bucket_name = "lake",
|
||||
partitioning = "month",
|
||||
existing_data_behavior = "delete_matching",
|
||||
verbose = TRUE
|
||||
@@ -936,7 +936,7 @@ engineer_features <- function(
|
||||
#'
|
||||
#' @param feature_prefix Character. Input prefix (e.g., "04_feature/variant=Base").
|
||||
#' @param out_prefix Character. Output prefix base (e.g., "05_model_input").
|
||||
#' @param bucket_name Character. Bucket name. Default "baf-fraud".
|
||||
#' @param bucket_name Character. Bucket name. Default "lake".
|
||||
#'
|
||||
#' @return Character. The output prefix (for targets dependency tracking).
|
||||
#' @export
|
||||
@@ -948,9 +948,9 @@ engineer_features <- function(
|
||||
#' @importFrom lubridate %m+%
|
||||
#' @importFrom glue glue
|
||||
generate_model_inputs <- function(
|
||||
feature_prefix = "04_feature/variant=Base",
|
||||
out_prefix = "05_model_input",
|
||||
bucket_name = "baf-fraud"
|
||||
feature_prefix = "baf-fraud/04_feature/variant=Base",
|
||||
out_prefix = "baf-fraud/05_model_input",
|
||||
bucket_name = "lake"
|
||||
) {
|
||||
endpoint <- Sys.getenv("BAF_ENDPOINT")
|
||||
key <- Sys.getenv("BAF_KEY")
|
||||
@@ -1043,12 +1043,12 @@ generate_model_inputs <- function(
|
||||
#'
|
||||
#' @param params A named list of LightGBM hyperparameters with elements:
|
||||
#' \code{trees}, \code{tree_depth}, \code{learn_rate}, \code{loss_reduction}, \code{min_n}.
|
||||
#' @param bucket_name Character. Bucket name. Default "baf-fraud".
|
||||
#' @param bucket_name Character. Bucket name. Default "lake".
|
||||
#' @param inputs_prefix Character. Model input prefix. Default "05_model_input".
|
||||
#'
|
||||
#' @return A tibble with columns \code{truth}, \code{prob}, and \code{pred_class}.
|
||||
#' @export
|
||||
evaluate_final_model <- function(params, bucket_name = "baf-fraud", inputs_prefix = "05_model_input") {
|
||||
evaluate_final_model <- function(params, bucket_name = "lake", inputs_prefix = "baf-fraud/05_model_input") {
|
||||
b <- arrow::s3_bucket(bucket_name,
|
||||
endpoint_override = Sys.getenv("BAF_ENDPOINT"),
|
||||
scheme = "http", access_key = Sys.getenv("BAF_KEY"),
|
||||
@@ -1176,7 +1176,7 @@ train_production_model <- function(data, recipe, best_params, model_filename = "
|
||||
|
||||
# 6. Open an Arrow output stream and push the binary data to MinIO
|
||||
bucket_name <- Sys.getenv("BAF_BUCKET")
|
||||
s3_path <- file.path(bucket_name, "06_models", model_filename)
|
||||
s3_path <- file.path(bucket_name, "baf-fraud/06_models", model_filename)
|
||||
|
||||
out_stream <- s3$OpenOutputStream(s3_path)
|
||||
file_size <- file.info(local_path)$size
|
||||
@@ -1223,7 +1223,7 @@ build_baf_recipe <- function(data) {
|
||||
#' @param imbalance_windows A tibble with columns \code{window_id},
|
||||
#' \code{train_months}, and \code{test_month}, as produced by the
|
||||
#' \code{imbalance_windows} target.
|
||||
#' @param bucket_name Character. MinIO bucket name. Default \code{"baf-fraud"}.
|
||||
#' @param bucket_name Character. MinIO bucket name. Default \code{"lake"}.
|
||||
#' @param inputs_prefix Character. Prefix for the model input layer.
|
||||
#' Default \code{"05_model_input"}.
|
||||
#' @param grid_size Integer. Number of space-filling candidates. Default \code{30}.
|
||||
@@ -1245,8 +1245,8 @@ build_baf_recipe <- function(data) {
|
||||
#' @importFrom yardstick metric_set pr_auc
|
||||
tune_lgbm <- function(
|
||||
imbalance_windows,
|
||||
bucket_name = "baf-fraud",
|
||||
inputs_prefix = "05_model_input",
|
||||
bucket_name = "lake",
|
||||
inputs_prefix = "baf-fraud/05_model_input",
|
||||
grid_size = 30L,
|
||||
seed = 42L
|
||||
) {
|
||||
|
||||
20
_targets.R
20
_targets.R
@@ -32,9 +32,9 @@ list(
|
||||
tar_target(
|
||||
baf_parquet_prefix,
|
||||
convert_to_parquet(
|
||||
from_prefix = "01_raw",
|
||||
to_prefix = "02_intermediate",
|
||||
bucket_name = "baf-fraud"
|
||||
from_prefix = "baf-fraud/01_raw",
|
||||
to_prefix = "baf-fraud/02_intermediate",
|
||||
bucket_name = "lake"
|
||||
)
|
||||
),
|
||||
|
||||
@@ -42,8 +42,8 @@ list(
|
||||
baf_primary_prefix,
|
||||
clean_baf_base(
|
||||
in_prefix = baf_parquet_prefix,
|
||||
out_prefix = "03_primary/variant=Base",
|
||||
bucket_name = "baf-fraud",
|
||||
out_prefix = "baf-fraud/03_primary/variant=Base",
|
||||
bucket_name = "lake",
|
||||
partitioning = "month",
|
||||
existing_data_behavior = "delete_matching",
|
||||
verbose = TRUE
|
||||
@@ -54,8 +54,8 @@ list(
|
||||
baf_feature_prefix,
|
||||
engineer_features(
|
||||
in_prefix = baf_primary_prefix,
|
||||
out_prefix = "04_feature/variant=Base",
|
||||
bucket_name = "baf-fraud",
|
||||
out_prefix = "baf-fraud/04_feature/variant=Base",
|
||||
bucket_name = "lake",
|
||||
partitioning = "month",
|
||||
existing_data_behavior = "delete_matching",
|
||||
verbose = TRUE
|
||||
@@ -67,8 +67,8 @@ list(
|
||||
baf_model_input_prefix,
|
||||
generate_model_inputs(
|
||||
feature_prefix = baf_feature_prefix,
|
||||
out_prefix = "05_model_input",
|
||||
bucket_name = "baf-fraud"
|
||||
out_prefix = "baf-fraud/05_model_input",
|
||||
bucket_name = "lake"
|
||||
)
|
||||
),
|
||||
|
||||
@@ -104,7 +104,7 @@ list(
|
||||
# ---- Fraud Prevalence ----
|
||||
tar_target(
|
||||
fig_fraud_by_month,
|
||||
plot_fraud_by_month(baf_primary_prefix, bucket_name = "baf-fraud")
|
||||
plot_fraud_by_month(baf_primary_prefix, bucket_name = "lake")
|
||||
),
|
||||
|
||||
tar_target(
|
||||
|
||||
@@ -6,8 +6,8 @@
|
||||
\usage{
|
||||
clean_baf_base(
|
||||
in_prefix,
|
||||
out_prefix = "03_primary/variant=Base",
|
||||
bucket_name = "baf-fraud",
|
||||
out_prefix = "baf-fraud/03_primary/variant=Base",
|
||||
bucket_name = "lake",
|
||||
partitioning = "month",
|
||||
existing_data_behavior = c("overwrite", "error", "delete_matching"),
|
||||
verbose = TRUE
|
||||
@@ -18,7 +18,7 @@ clean_baf_base(
|
||||
|
||||
\item{out_prefix}{Character. Output dataset prefix inside bucket (e.g. "03_primary/variant=Base").}
|
||||
|
||||
\item{bucket_name}{Character. Bucket name. Default "baf-fraud".}
|
||||
\item{bucket_name}{Character. Bucket name. Default "lake".}
|
||||
|
||||
\item{partitioning}{Character vector of columns to partition by. Default "month". Set NULL to disable.}
|
||||
|
||||
|
||||
@@ -4,14 +4,14 @@
|
||||
\alias{convert_to_parquet}
|
||||
\title{Convert BAF CSV to partitioned Parquet in MinIO (S3)}
|
||||
\usage{
|
||||
convert_to_parquet(from_prefix, to_prefix, bucket_name = "baf-fraud")
|
||||
convert_to_parquet(from_prefix, to_prefix, bucket_name = "lake")
|
||||
}
|
||||
\arguments{
|
||||
\item{from_prefix}{Character. Prefix/key under the bucket containing CSVs (e.g. \code{"01_raw"}).}
|
||||
|
||||
\item{to_prefix}{Character. Prefix/key under the bucket to write Parquet dataset (e.g. \code{"02_intermediate"}).}
|
||||
|
||||
\item{bucket_name}{Character. Bucket name. Default \code{"baf-fraud"}.}
|
||||
\item{bucket_name}{Character. Bucket name. Default \code{"lake"}.}
|
||||
}
|
||||
\value{
|
||||
A character string giving the destination dataset prefix (typically \code{to_prefix}).
|
||||
@@ -36,6 +36,6 @@ Sys.setenv(
|
||||
BAF_KEY = "YOUR_ACCESS_KEY",
|
||||
BAF_SECRET = "YOUR_SECRET_KEY"
|
||||
)
|
||||
convert_to_parquet(from_prefix = "01_raw", to_prefix = "02_intermediate", bucket_name = "baf-fraud")
|
||||
convert_to_parquet(from_prefix = "01_raw", to_prefix = "02_intermediate", bucket_name = "lake")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,9 +5,9 @@
|
||||
\title{Engineer features for the BAF dataset}
|
||||
\usage{
|
||||
engineer_features(
|
||||
in_prefix = "03_primary/variant=Base",
|
||||
out_prefix = "04_feature/variant=Base",
|
||||
bucket_name = "baf-fraud",
|
||||
in_prefix = "baf-fraud/03_primary/variant=Base",
|
||||
out_prefix = "baf-fraud/04_feature/variant=Base",
|
||||
bucket_name = "lake",
|
||||
partitioning = "month",
|
||||
existing_data_behavior = "delete_matching",
|
||||
verbose = TRUE
|
||||
@@ -18,7 +18,7 @@ engineer_features(
|
||||
|
||||
\item{out_prefix}{Character. Output dataset prefix (e.g., "04_feature/variant=Base").}
|
||||
|
||||
\item{bucket_name}{Character. The S3/MinIO bucket name. Default "baf-fraud".}
|
||||
\item{bucket_name}{Character. The S3/MinIO bucket name. Default "lake".}
|
||||
|
||||
\item{partitioning}{Character vector. Columns to partition by. Default "month".}
|
||||
|
||||
|
||||
@@ -6,15 +6,15 @@
|
||||
\usage{
|
||||
evaluate_final_model(
|
||||
params,
|
||||
bucket_name = "baf-fraud",
|
||||
inputs_prefix = "05_model_input"
|
||||
bucket_name = "lake",
|
||||
inputs_prefix = "baf-fraud/05_model_input"
|
||||
)
|
||||
}
|
||||
\arguments{
|
||||
\item{params}{A named list of LightGBM hyperparameters with elements:
|
||||
\code{trees}, \code{tree_depth}, \code{learn_rate}, \code{loss_reduction}, \code{min_n}.}
|
||||
|
||||
\item{bucket_name}{Character. Bucket name. Default "baf-fraud".}
|
||||
\item{bucket_name}{Character. Bucket name. Default "lake".}
|
||||
|
||||
\item{inputs_prefix}{Character. Model input prefix. Default "05_model_input".}
|
||||
}
|
||||
|
||||
@@ -5,9 +5,9 @@
|
||||
\title{Generate Resampled Model Inputs}
|
||||
\usage{
|
||||
generate_model_inputs(
|
||||
feature_prefix = "04_feature/variant=Base",
|
||||
out_prefix = "05_model_input",
|
||||
bucket_name = "baf-fraud"
|
||||
feature_prefix = "baf-fraud/04_feature/variant=Base",
|
||||
out_prefix = "baf-fraud/05_model_input",
|
||||
bucket_name = "lake"
|
||||
)
|
||||
}
|
||||
\arguments{
|
||||
@@ -15,7 +15,7 @@ generate_model_inputs(
|
||||
|
||||
\item{out_prefix}{Character. Output prefix base (e.g., "05_model_input").}
|
||||
|
||||
\item{bucket_name}{Character. Bucket name. Default "baf-fraud".}
|
||||
\item{bucket_name}{Character. Bucket name. Default "lake".}
|
||||
}
|
||||
\value{
|
||||
Character. The output prefix (for targets dependency tracking).
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
\usage{
|
||||
plot_fraud_by_month(
|
||||
dataset_prefix,
|
||||
bucket_name = "baf-fraud",
|
||||
bucket_name = "lake",
|
||||
palette = "Dark 3",
|
||||
title = ""
|
||||
)
|
||||
@@ -14,7 +14,7 @@ plot_fraud_by_month(
|
||||
\arguments{
|
||||
\item{dataset_prefix}{Character. Prefix inside the bucket, e.g. "03_primary/variant=Base".}
|
||||
|
||||
\item{bucket_name}{Character. Bucket name. Default "baf-fraud".}
|
||||
\item{bucket_name}{Character. Bucket name. Default "lake".}
|
||||
|
||||
\item{palette}{Character. colorspace qualitative palette name. Default "Dark 3".}
|
||||
|
||||
|
||||
@@ -8,8 +8,8 @@ run_imbalance_tournament(
|
||||
tasks,
|
||||
windows,
|
||||
feature_prefix,
|
||||
bucket_name = "baf-fraud",
|
||||
inputs_prefix = "05_model_input"
|
||||
bucket_name = "lake",
|
||||
inputs_prefix = "baf-fraud/05_model_input"
|
||||
)
|
||||
}
|
||||
\arguments{
|
||||
@@ -19,7 +19,7 @@ run_imbalance_tournament(
|
||||
|
||||
\item{feature_prefix}{Character. The upstream dependency prefix (used to force DAG execution).}
|
||||
|
||||
\item{bucket_name}{Character. Bucket name. Default "baf-fraud".}
|
||||
\item{bucket_name}{Character. Bucket name. Default "lake".}
|
||||
|
||||
\item{inputs_prefix}{Character. The folder containing the sampled data. Default "05_model_input".}
|
||||
}
|
||||
|
||||
@@ -6,8 +6,8 @@
|
||||
\usage{
|
||||
tune_lgbm(
|
||||
imbalance_windows,
|
||||
bucket_name = "baf-fraud",
|
||||
inputs_prefix = "05_model_input",
|
||||
bucket_name = "lake",
|
||||
inputs_prefix = "baf-fraud/05_model_input",
|
||||
grid_size = 30L,
|
||||
seed = 42L
|
||||
)
|
||||
@@ -17,7 +17,7 @@ tune_lgbm(
|
||||
\code{train_months}, and \code{test_month}, as produced by the
|
||||
\code{imbalance_windows} target.}
|
||||
|
||||
\item{bucket_name}{Character. MinIO bucket name. Default \code{"baf-fraud"}.}
|
||||
\item{bucket_name}{Character. MinIO bucket name. Default \code{"lake"}.}
|
||||
|
||||
\item{inputs_prefix}{Character. Prefix for the model input layer.
|
||||
Default \code{"05_model_input"}.}
|
||||
|
||||
@@ -1,20 +1,20 @@
|
||||
test_that("connect_baf() errors on missing BAF_ENDPOINT", {
|
||||
withr::with_envvar(
|
||||
c(BAF_ENDPOINT = "", BAF_KEY = "key", BAF_SECRET = "secret", BAF_BUCKET = "baf-fraud"),
|
||||
c(BAF_ENDPOINT = "", BAF_KEY = "key", BAF_SECRET = "secret", BAF_BUCKET = "lake"),
|
||||
expect_error(connect_baf("some/prefix"), "BAF_ENDPOINT")
|
||||
)
|
||||
})
|
||||
|
||||
test_that("connect_baf() errors on missing BAF_KEY", {
|
||||
withr::with_envvar(
|
||||
c(BAF_ENDPOINT = "minio:9000", BAF_KEY = "", BAF_SECRET = "secret", BAF_BUCKET = "baf-fraud"),
|
||||
c(BAF_ENDPOINT = "minio:9000", BAF_KEY = "", BAF_SECRET = "secret", BAF_BUCKET = "lake"),
|
||||
expect_error(connect_baf("some/prefix"), "BAF_KEY")
|
||||
)
|
||||
})
|
||||
|
||||
test_that("connect_baf() errors on missing BAF_SECRET", {
|
||||
withr::with_envvar(
|
||||
c(BAF_ENDPOINT = "minio:9000", BAF_KEY = "key", BAF_SECRET = "", BAF_BUCKET = "baf-fraud"),
|
||||
c(BAF_ENDPOINT = "minio:9000", BAF_KEY = "key", BAF_SECRET = "", BAF_BUCKET = "lake"),
|
||||
expect_error(connect_baf("some/prefix"), "BAF_SECRET")
|
||||
)
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user