Source code for src.train

import json
import os
from pathlib import Path

import joblib
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    accuracy_score,
    classification_report,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
)
from sklearn.model_selection import train_test_split
from sklearn.multioutput import MultiOutputClassifier

import mlflow

INPUT_PATH = os.getenv("INPUT_PATH", "data/processed/hydraulic_clean.csv")
MODEL_PATH = os.getenv("MODEL_PATH", "models/model.pkl")
METRICS_PATH = os.getenv("METRICS_PATH", "reports/model_metrics.json")

FEATURES = [
    "PS1",
    "PS2",
    "PS3",
    "PS4",
    "PS5",
    "PS6",
    "EPS1",
    "FS1",
    "FS2",
    "TS1",
    "TS2",
    "TS3",
    "TS4",
    "VS1",
    "CE",
    "CP",
    "SE",
]

TARGETS = [
    "cooler_condition",
    "valve_condition",
    "pump_leakage",
    "accumulator_pressure",
]

N_ESTIMATORS = int(os.getenv("N_ESTIMATORS", "100"))
TEST_SIZE = float(os.getenv("TEST_SIZE", "0.2"))
RANDOM_STATE = int(os.getenv("RANDOM_STATE", "42"))


[docs] def train() -> None: tracking_uri = os.getenv("MLFLOW_TRACKING_URI", "./mlruns") mlflow.set_tracking_uri(tracking_uri) mlflow.set_experiment("hydraulic-condition-monitoring") df = pd.read_csv(INPUT_PATH) X = df[FEATURES].values y = df[TARGETS].values X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=y[:, 0], ) model = MultiOutputClassifier( RandomForestClassifier( n_estimators=N_ESTIMATORS, random_state=RANDOM_STATE, ) ) Path("models").mkdir(parents=True, exist_ok=True) Path("reports").mkdir(parents=True, exist_ok=True) metrics = {} with mlflow.start_run(): mlflow.log_params( { "n_estimators": N_ESTIMATORS, "test_size": TEST_SIZE, "random_state": RANDOM_STATE, "n_features": len(FEATURES), "n_targets": len(TARGETS), "train_samples": len(X_train), "test_samples": len(X_test), } ) model.fit(X_train, y_train) y_pred = model.predict(X_test) for i, target in enumerate(TARGETS): accuracy = accuracy_score(y_test[:, i], y_pred[:, i]) precision = precision_score( y_test[:, i], y_pred[:, i], average="weighted", zero_division=0, ) recall = recall_score( y_test[:, i], y_pred[:, i], average="weighted", zero_division=0, ) f1_macro = f1_score( y_test[:, i], y_pred[:, i], average="macro", zero_division=0, ) f1_weighted = f1_score( y_test[:, i], y_pred[:, i], average="weighted", zero_division=0, ) cm = confusion_matrix(y_test[:, i], y_pred[:, i]).tolist() report_dict = classification_report( y_test[:, i], y_pred[:, i], output_dict=True, zero_division=0, ) mlflow.log_metric(f"accuracy_{target}", accuracy) mlflow.log_metric(f"precision_weighted_{target}", precision) mlflow.log_metric(f"recall_weighted_{target}", recall) mlflow.log_metric(f"f1_macro_{target}", f1_macro) mlflow.log_metric(f"f1_weighted_{target}", f1_weighted) metrics[target] = { "accuracy": accuracy, "precision_weighted": precision, "recall_weighted": recall, "f1_macro": f1_macro, "f1_weighted": f1_weighted, "confusion_matrix": cm, "classification_report": report_dict, } joblib.dump(model, MODEL_PATH) mlflow.sklearn.log_model(model, "model") with open(METRICS_PATH, "w", encoding="utf-8") as f: json.dump(metrics, f, indent=2) mlflow.log_artifact(METRICS_PATH, artifact_path="reports") print(f"Tracking URI: {tracking_uri}") print(f"Model saved to: {MODEL_PATH}") print(f"Metrics saved to: {METRICS_PATH}\n") for i, target in enumerate(TARGETS): print(f"── {target} ──") print(classification_report(y_test[:, i], y_pred[:, i], digits=3)) print(f"Confusion matrix:\n{confusion_matrix(y_test[:, i], y_pred[:, i])}\n")
if __name__ == "__main__": train()