Source code for middoe.krnl_expera

# # krnl_expera.py
#
# from middoe.krnl_simula import simula
# import numpy as np
# import pandas as pd
# from pathlib import Path
# import os
#
# def expera(system, models, insilicos, design_decisions, expr, swps=None):
#     """
#     Run an in-silico experiment, produce simulation results, and prepare them for reporting.
#
#     This function:
#     1. Sets up paths and simulation parameters.
#     2. Constructs scaled and unscaled variables, parameters, and switching points as needed.
#     3. Calls the in-silico simulator to generate results.
#     4. Returns the path to an Excel file and the resulting simulation DataFrame.
#
#     Parameters
#     ----------
#     system : dict
#         Structure of the model, including variables and their properties.
#         It contains keys like 'tv_iphi', 'tv_ophi', 'ti_iphi', 'ti_ophi',
#         and 'independant_variables' each with 'var' and 'attributes' sub-dicts.
#
#     models : dict
#         User-defined settings related to the modelling process, including 'theta_parameters'.
#
#     insilicos : dict
#         User-defined settings for the simulator, including:
#         - 'insilico_model': str, name of the model function to be simulated.
#         - 'classic-des': dict, containing default or "classic" design values.
#         - 'nd2': int, number of points for time discretization.
#
#     design_decisions : dict
#         Design decisions for the experiment. May contain keys like 'St', 'tii', 'tvi', 't_values'.
#
#     expr : int
#         Index for the current experiment.
#
#     swps : dict, optional
#         Switching points for the time-variant variables, unscaled. If None, empty dicts are used.
#
#     Returns
#     -------
#     tuple
#         (excel_path, df_combined)
#         excel_path : str
#             The path to the Excel file where results are to be stored.
#         df_combined : pandas.DataFrame
#             The combined results of the in-silico experiment.
#     """
#     # Extract scaling constant for independent variables
#     tsc = system['t_s'][1]
#
#
#     # Construct piecewise functions for initial and DOE cases
#     cvp_initial = {
#         var: 'no_CVP'
#         for var in system['tvi'].keys()
#     }
#     cvp_doe = {
#         var: system['tvi'][var]['cvp']
#         for var in system['tvi'].keys()
#     }
#
#     # Retrieve the model name and related simulation parameters
#     model_name = insilicos.get('tr_m')
#     errortype = insilicos.get('errt', 'abs')
#     classic_des = insilicos['prels']
#     # theta_parameters = models['theta']
#     theta_parameters = insilicos['theta']
#
#     # Standard deviations for measured tv_ophi variables
#     std_dev = {
#         var: system['tvo'][var]['unc']
#         for var in system['tvo'].keys()
#         if system['tvo'][var].get('meas', True)
#     }
#
#     # If no design decisions are provided, run the "initial" case
#     if not design_decisions:
#         # Time values for each measured variable
#         # Step 1: Create tlin
#         dt_real = system['t_r']
#         nodes = int(round(tsc / dt_real)) + 1
#         tlin = np.linspace(0, 1, nodes)
#
#         # Step 2: Generate t_values, snapping each to the closest value in tlin
#         t_values = {
#             var: [float(tlin[np.argmin(np.abs(tlin - t))]) for t in
#                   np.linspace(0, 1, system['tvo'][var]['sp'])]
#             for var in system['tvo']
#             if system['tvo'][var].get('meas', True)
#         }
#
#         # Flatten all time points
#         t_values_flat = [tp for times in t_values.values() for tp in times]
#
#         # Create unique accumulated time points for simulation
#         t_acc_unique = np.unique(np.concatenate((tlin, t_values_flat))).tolist()
#
#         # Construct variables and parameters for the "initial" case
#         phi, phit, phisc, phitsc = _construct_var(system, classic_des, expr)
#         theta, thetac = _construct_par(model_name, theta_parameters)
#
#         if swps is None:
#             swps, swpsu = {}, {}
#
#         case = 'initial'
#         df_combined = _inssimulator(
#             t_values, swps, swpsu, phi, phisc, phit, phitsc, tsc, theta, thetac,
#             cvp_initial, std_dev, t_acc_unique, case, model_name,
#             system, models, errortype
#         )
#
#     else:
#         # When design decisions are provided, run the "doe" case
#         St = design_decisions['St']
#         t_values = {var: np.array(St[var]) / tsc for var in St}
#
#         # Accumulated time points for DOE
#         t_acc_unique = np.array(design_decisions['t_values'])
#         case = 'doe'
#
#         # Construct scaling and parameters
#         _, _, phisc, phitsc = _construct_var(system, classic_des, expr)
#         theta, thetac = _construct_par(model_name, theta_parameters)
#
#         phi = design_decisions['tii']
#         phit = design_decisions['tvi']
#         swpsu = swps if swps is not None else {}
#
#         # Scale tii and tvi
#         phi_scaled = {
#             var: np.array(phi[var]) / phisc[var]
#             for var in phi if var in system['tii'].keys()
#         }
#         phit_scaled = {
#             var: np.array(phit[var])
#             for var in phit if var in system['tvi'].keys()
#         }
#
#         # Scale swps
#         swps_scaled = {}
#         if swpsu:
#             for var in swpsu:
#                 if var.endswith('l'):
#                     # Length-based scaling
#                     var_base = var.rstrip('l')
#                     swps_scaled[var] = np.array(swpsu[var]) / phitsc[var_base]
#                 elif var.endswith('t'):
#                     # Time-based scaling
#                     swps_scaled[var] = np.array(swpsu[var]) / tsc
#                 else:
#                     raise ValueError(f"Unrecognized variable format for {var} in swps.")
#
#         df_combined = _inssimulator(
#             t_values, swps_scaled, swpsu, phi_scaled, phisc, phit_scaled, phitsc, tsc, theta, thetac,
#             cvp_doe, std_dev, t_acc_unique, case, model_name, system, models, errortype)
#
#     # Define Excel path for in-silico data
#     excel_path = Path.cwd() / 'data.xlsx'
#
#     experiment_number = str(expr)  # Ensure experiment_number is a string
#     # Check if the file already exists
#     if os.path.isfile(excel_path):
#         # Open the file in append mode if it exists
#         with pd.ExcelWriter(excel_path, mode='a', engine='openpyxl') as writer:
#             existing_sheets = writer.book.sheetnames
#             if experiment_number in existing_sheets:
#                 # Append to the existing sheet
#                 df_combined.to_excel(writer, sheet_name=experiment_number, index=False)
#             else:
#                 # Create a new sheet
#                 df_combined.to_excel(writer, sheet_name=experiment_number, index=False)
#     else:
#         # If the file does not exist, create it and write the data
#         with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
#             df_combined.to_excel(writer, sheet_name=experiment_number, index=False)
#
#     # Return Excel path and DataFrame, with summary print
#     print(f"\n[✓] In-silico data saved to: {excel_path}")
#     print(f"[INFO] Model used         : {model_name}")
#     print(f"[INFO] Design case        : {'classic/preliminary' if case == 'initial' else 'MBDoE'}")
#     print(f"[INFO] Responses simulated:")
#
#     for response in system.get('tvo', {}):
#         meas = system['tvo'][response].get('meas', True)
#         unc = system['tvo'][response].get('unc', 'N/A')
#         status = "measurable" if meas else "non-measurable"
#         print(f"   - {response:<10} | {status:<15} | std.dev = {unc}")
#
#     return excel_path, df_combined
#
#     # return df_combined
#
#
# def _construct_var(system, classic_des, j):
#     """
#     Construct and scale variable dictionaries for the simulation.
#
#     Parameters
#     ----------
#     system : dict
#         Dictionary describing the model structure.
#     classic_des : dict
#         Dictionary containing "classic" design values for variables.
#     j : int
#         Index of the current experiment.
#
#     Returns
#     -------
#     tuple
#         (tii, tvi, phisc, phitsc):
#         - tii : dict
#             Time-invariant variable values scaled by their maxima.
#         - tvi : dict
#             Time-variant variable values scaled by their maxima.
#         - phisc : dict
#             Scaling factors (max values) for time-invariant variables.
#         - phitsc : dict
#             Scaling factors (max values) for time-variant variables.
#     """
#     phit = {}
#     phisc = {}
#     phitsc = {}
#     phi = {}
#
#     # Time-variant input variables (tv_iphi)
#     for var in system['tvi'].keys():
#         max_val = system['tvi'][var]['max']
#         classic_value = classic_des.get(str(j), {}).get(var, max_val)
#         phitsc[var] = max_val
#         phit[var] = classic_value / max_val
#
#     # Time-invariant input variables (ti_iphi)
#     for var in system['tii'].keys():
#         max_val = system['tii'][var]['max']
#         classic_value = classic_des.get(str(j), {}).get(var, max_val)
#         phisc[var] = max_val
#         phi[var] = classic_value / max_val
#
#     return phi, phit, phisc, phitsc
#
#
# def _construct_par(model_name, theta_parameters):
#     """
#     Construct parameter arrays for the simulation, given the model name and theta parameters.
#
#     Parameters
#     ----------
#     model_name : str
#         The name of the model.
#     theta_parameters : dict
#         Dictionary mapping model names to theta scaling parameters.
#
#     Returns
#     -------
#     tuple
#         (theta, thetac):
#         - theta : list
#             Normalized parameter values (e.g., [1, 1, ...]).
#         - thetac : list
#             Actual parameter scaling values for the given model.
#
#     Raises
#     ------
#     ValueError
#         If parameters cannot be constructed or accessed.
#     """
#     try:
#         # thetac = theta_parameters.get(model_name, [])
#         thetac = theta_parameters
#         theta = [1] * len(thetac)
#         return theta, thetac
#     except Exception as e:
#         raise ValueError(f"Unable to adjust theta or thetac for model {model_name}: {e}")
#
#
# def _inssimulator(t_values, swps, swpsu, phi, phisc, phit, phitsc, tsc, theta, thetac, cvp, std_dev,
#                   t_valuesc, case, model_name, system, models, errortype):
#     """
#     Conduct the in-silico experiments by setting up the simulation environment,
#     running the simulations, adding noise, and saving the results.
#
#     Returns one combined DataFrame with sections:
#     - var_t, var_noisy for each tv_ophi variable (each with its own length).
#     - t_global, tii(s), tvi(s)/phit_interp(s), ti_ophi (aligned with t_valuesc).
#     - piecewise_func in a separate section.
#     - swpsu in another separate section.
#
#     All in one DataFrame, separated by blank columns.
#
#     Parameters
#     ----------
#     t_values : dict
#         Dictionary of time values keyed by dependent variable name,
#         e.g. {'y1': [0, 0.25, 0.5, 1.0], 'y2': [0, 0.5, 1.0]}.
#     swps : dict
#         Scaled switching points for the time-variant variables.
#     swpsu : dict
#         Unscaled switching points for the time-variant variables.
#     phi, phisc : dict
#         Dictionaries of time-invariant variables and their scaling factors.
#     phit, phitsc : dict
#         Dictionaries of time-variant variables and their scaling factors.
#     tsc : float
#         Scaling factor for time.
#     theta, thetac : list
#         Parameter vectors.
#     cvp : dict
#         Dictionary of piecewise functions defining variable trajectories.
#     std_dev : dict
#         Dictionary of standard deviations keyed by variable name.
#     t_valuesc : list
#         Global set of time points for simulation.
#     case : str
#         Indicates the type of run ('initial', 'doe', etc.).
#     model_name : str
#         The name of the model being simulated.
#     system : dict
#         Defines the structure of the model, including variable attributes.
#     models : dict
#         Additional modelling settings.
#     errortype : str
#         Type of error to be applied ('abs' for absolute, 'rel' for relative).
#
#     Returns
#     -------
#     pd.DataFrame
#         A combined DataFrame with simulation results, noisy measurements, piecewise function info, and switching points.
#
#     Raises
#     ------
#     ValueError
#         If the case is unsupported or if the number of independent variables does not match the dependent variables.
#     KeyError
#         If a dependent variable is missing in the simulation results.
#     """
#
#     # Run the model to get tv_ophi, ti_ophi, phit_interp
#     if case == 'initial':
#         tv_ophi, ti_ophi, phit_interp = simula(
#             t_valuesc, {}, phi, phisc, phitsc, tsc, theta, thetac,
#             cvp, phit, model_name, system, models
#         )
#     elif case == 'doe':
#         tv_ophi, ti_ophi, phit_interp = simula(
#             t_valuesc, swps, phi, phisc, phitsc, tsc, theta, thetac,
#             cvp, phit, model_name, system, models
#         )
#     else:
#         raise ValueError(f"Unsupported case: {case}")
#
#     # Identify measured variables from system
#     measured_vars = [
#         var for var in system['tvo'].keys()
#         if system['tvo'][var].get('measured', True)
#     ]
#
#     # Prepare lists of independent and dependent variables
#     # Assuming that there is a one-to-one mapping defined externally,
#     # we match the number of measured vars with the number of independent vars
#     indep_vars = list(range(len(measured_vars)))
#     dep_vars = measured_vars
#
#     if len(indep_vars) != len(dep_vars):
#         raise ValueError("The number of independent variables must match the number of dependent variables.")
#
#     indep_to_dep_mapping = dict(zip([f"X{i+1}" for i in indep_vars], dep_vars))
#
#     # Check that all dependent variables exist in tv_ophi
#     for indep_var, dep_var in indep_to_dep_mapping.items():
#         if dep_var not in tv_ophi:
#             raise KeyError(
#                 f"Dependent variable '{dep_var}' is missing in tv_ophi. Available keys: {list(tv_ophi.keys())}"
#             )
#
#     # 1. Per-variable DataFrames for tv_ophi
#     var_dfs = []
#     for indep_var, dep_var in indep_to_dep_mapping.items():
#         var_times = [t * tsc for t in t_values[dep_var]]  # Times for the dependent variable
#         var_results = []
#
#         for t in var_times:
#             # Find the closest index in the scaled time vector
#             closest_idx = np.argmin(np.abs(np.array(t_valuesc) * tsc - t))
#             original_val = tv_ophi[dep_var][closest_idx]
#
#             # Compute noise based on selected error type
#             if errortype == 'abs':
#                 sigma = std_dev[dep_var]  # constant std deviation
#             elif errortype == 'rel':
#                 sigma = std_dev[dep_var] * abs(original_val)  # signal-dependent
#             else:
#                 raise ValueError(f"Unsupported errortype: {errortype}")
#
#             noisy_val = np.random.normal(loc=original_val, scale=sigma)
#             noisy_val = max(noisy_val, 0.0)
#             var_results.append([t, noisy_val, sigma])
#
#         df_var = pd.DataFrame(var_results, columns=[f"MES_X:{dep_var}", f"MES_Y:{dep_var}", f"MES_E:{dep_var}"])
#         var_dfs.append(df_var)
#
#     # 2. Global DataFrame (t_global, tii, tvi/phit_interp, ti_ophi)
#     phi_order = list(phi.keys())
#     ti_ophi_order = list(ti_ophi.keys())
#     if case == 'classic':
#         phit_order = list(phit.keys())
#     else:
#         phit_order = list(phit_interp.keys())
#
#     # Noise added once for ti_ophi
#     ti_ophi_noisy_once = {
#         var: np.random.normal(val[0], abs(std_dev.get(var, 0) * val[0]))
#         for var, val in ti_ophi.items()
#     }
#
#     global_results = []
#     for i, t_global in enumerate(t_valuesc):
#         row = [t_global * tsc]
#
#         # tii
#         phi_vals = [phi[v] * phisc[v] for v in phi_order]
#         row.extend(phi_vals)
#
#         # tvi or phit_interp
#         if case == 'classic':
#             phit_vals = [phit[v] * phitsc[v] for v in phit_order]
#         else:
#             phit_vals = [phit_interp[v][i] for v in phit_order]
#         row.extend(phit_vals)
#
#         # ti_ophi noisy (constant)
#         ti_ophi_vals = [ti_ophi_noisy_once[v] for v in ti_ophi_order]
#         row.extend(ti_ophi_vals)
#
#         global_results.append(row)
#
#     global_columns = (['X:all']
#                       + phi_order
#                       + phit_order
#                       + ti_ophi_order)
#     df_global = pd.DataFrame(global_results, columns=global_columns)
#
#     # Combine var_dfs and df_global horizontally
#     all_dfs = var_dfs + [df_global]
#     df_main = pd.concat(all_dfs, axis=1)
#
#     # 3. piecewise_func DataFrame
#     df_piecewise = pd.DataFrame([{
#         f"CVP:{var}": method for var, method in cvp.items()
#     }])
#
#     # Combine all into one DataFrame
#     df_combined = pd.concat([df_main, df_piecewise], axis=1)
#
#     # 4. swpsu DataFrame
#     if case == 'doe':
#         swpsu_order = list(swpsu.keys())
#         swps_data = {key: swpsu[key] for key in swpsu_order}
#         df_swps = pd.DataFrame(swps_data)
#
#         # Insert blank columns to separate sections
#         max_length = max(len(df_main), len(df_piecewise), len(df_swps))
#         df_blank_1 = pd.DataFrame({' ': [None] * max_length})
#         df_blank_2 = pd.DataFrame({'  ': [None] * max_length})
#
#         # Combine all into one DataFrame
#         df_combined = pd.concat([df_main, df_blank_1, df_piecewise, df_blank_2, df_swps], axis=1)
#
#     return df_combined


# krnl_expera.py

from middoe.krnl_simula import simula
import numpy as np
import pandas as pd
from pathlib import Path
import os


[docs] def expera(system, models, insilicos, design_decisions, expr, swps=None): r""" Execute in-silico experiment and generate synthetic measurement data with noise. This function orchestrates the complete in-silico experimentation workflow: setting up simulation parameters, running the forward model, adding realistic measurement noise, and saving results to Excel. It supports both preliminary (classic) designs and optimized MBDoE designs. The function handles time-variant and time-invariant inputs/outputs, control variable parameterizations, and switching profiles. Parameters ---------- system : dict System configuration defining model structure: - 't_s' : tuple[float, float] Time span (start, end). - 't_r' : float Time resolution for discretization. - 'tvi' : dict Time-variant input definitions: * 'max' : float — upper bound * 'cvp' : str — control variable parameterization method - 'tii' : dict Time-invariant input definitions. - 'tvo' : dict Time-variant output definitions: * 'meas' : bool — measurement flag * 'unc' : float — measurement uncertainty (std dev) * 'sp' : int — number of sampling points - 'tio' : dict Time-invariant output definitions. models : dict Model definitions (currently unused but passed for extensibility). insilicos : dict In-silico experiment configuration: - 'tr_m' : str True model name (ground truth for simulation). - 'theta' : list[float] True parameter values (ground truth). - 'prels' : dict Preliminary/classic design values. - 'errt' : str, optional Error type: 'abs' (absolute) or 'rel' (relative, default: 'abs'). design_decisions : dict or None MBDoE design decisions. If None, runs preliminary design. If provided: - 'St' : dict[str, np.ndarray] Sampling times for each output variable (unscaled). - 't_values' : np.ndarray Global time vector for simulation. - 'tii' : dict[str, float] Time-invariant input values. - 'tvi' : dict[str, np.ndarray] Time-variant input profiles. expr : int Experiment number (used for Excel sheet naming). swps : dict, optional Switching points for time-variant inputs (unscaled): - '{var}t' : np.ndarray — switching times - '{var}l' : np.ndarray — switching levels Returns ------- excel_path : Path Path to the Excel file containing experiment data ('data.xlsx'). df_combined : pd.DataFrame Combined DataFrame with all simulation results and metadata: - Per-variable sections: 'MES_X:{var}', 'MES_Y:{var}', 'MES_E:{var}' - Global time vector: 'X:all' - Input values: time-invariant and time-variant - Output values: time-invariant (noisy) - CVP metadata: 'CVP:{var}' (control parameterization methods) - Switching points: '{var}t', '{var}l' (if applicable) Notes ----- **Experimental Workflow**: 1. **Setup**: Extract scaling factors, construct variable/parameter dictionaries 2. **Time Discretization**: Create global time vector based on sampling requirements 3. **Simulation**: Call forward model (simula) with specified inputs 4. **Noise Addition**: Add measurement noise according to error type 5. **Data Organization**: Structure results in standardized DataFrame format 6. **Export**: Save to Excel with experiment number as sheet name **Preliminary vs MBDoE Designs**: - **Preliminary** (design_decisions=None): * Uses classic_des values from insilicos['prels'] * Uniform sampling in time for each output * Simple input profiles (constant or default CVP) - **MBDoE** (design_decisions provided): * Uses optimized sampling times from St * Optimized input profiles from tvi * Advanced CVP methods (step, ramp, etc.) **Measurement Noise**: Two error models supported: - **Absolute ('abs')**: \( y_{noisy} = y_{true} + \mathcal{N}(0, \sigma^2) \) - **Relative ('rel')**: \( y_{noisy} = y_{true} + \mathcal{N}(0, (\sigma \cdot y_{true})^2) \) Negative values are clipped to zero (physical constraint). **DataFrame Structure**: The output DataFrame contains multiple sections separated by blank columns: 1. **Per-variable measurements**: Separate columns for each output variable with sampling times (MES_X), noisy values (MES_Y), and uncertainties (MES_E) 2. **Global simulation**: Time vector (X:all), all inputs, time-invariant outputs 3. **CVP metadata**: Control variable parameterization methods 4. **Switching points**: Only for MBDoE designs with switching profiles **Excel Output**: Results are appended to 'data.xlsx' in the current directory. Each experiment creates/updates a sheet named by the experiment number. This allows accumulation of multiple experiments in a single file for batch processing. **Time Scaling**: All time values are normalized to [0, 1] internally, then scaled back to physical units (tsc = system['t_s'][1]) for output. This improves numerical conditioning. References ---------- .. [1] Franceschini, G., & Macchietto, S. (2008). Model-based design of experiments for parameter precision: State of the art. *Chemical Engineering Science*, 63(19), 4846–4872. .. [2] Bard, Y. (1974). *Nonlinear Parameter Estimation*. Academic Press. See Also -------- simula : Forward model simulation kernel. _construct_var : Variable scaling and organization. _construct_par : Parameter vector construction. _inssimulator : Core simulation and noise addition routine. Examples -------- >>> # Preliminary design (uniform sampling) >>> excel_path, df = expera(system, models, insilicos, None, expr=1) >>> print(f"Experiment saved to: {excel_path}") >>> print(f"Measurements: {df.filter(like='MES_Y').columns.tolist()}") >>> # MBDoE design (optimized sampling) >>> design_decisions = { ... 'St': {'y1': np.array([0, 50, 100]), 'y2': np.array([0, 75, 100])}, ... 't_values': np.array([0, 0.5, 0.75, 1.0]), ... 'tii': {'P': 1.5, 'T0': 298}, ... 'tvi': {'T': np.array([300, 320, 310, 300])} ... } >>> swps = {'Tt': np.array([0, 33, 67]), 'Tl': np.array([300, 320, 310])} >>> excel_path, df = expera(system, models, insilicos, design_decisions, ... expr=2, swps=swps) """ # Extract scaling constant for independent variables tsc = system['t_s'][1] # Construct piecewise functions for initial and DOE cases cvp_initial = {var: 'no_CVP' for var in system['tvi'].keys()} cvp_doe = {var: system['tvi'][var]['cvp'] for var in system['tvi'].keys()} # Retrieve the model name and related simulation parameters model_name = insilicos.get('tr_m') errortype = insilicos.get('errt', 'abs') classic_des = insilicos['prels'] theta_parameters = insilicos['theta'] # Standard deviations for measured tv_ophi variables std_dev = { var: system['tvo'][var]['unc'] for var in system['tvo'].keys() if system['tvo'][var].get('meas', True) } # If no design decisions are provided, run the "initial" case if not design_decisions: # Time values for each measured variable dt_real = system['t_r'] nodes = int(round(tsc / dt_real)) + 1 tlin = np.linspace(0, 1, nodes) # Generate t_values, snapping each to the closest value in tlin t_values = { var: [float(tlin[np.argmin(np.abs(tlin - t))]) for t in np.linspace(0, 1, system['tvo'][var]['sp'])] for var in system['tvo'] if system['tvo'][var].get('meas', True) } # Flatten all time points t_values_flat = [tp for times in t_values.values() for tp in times] # Create unique accumulated time points for simulation t_acc_unique = np.unique(np.concatenate((tlin, t_values_flat))).tolist() # Construct variables and parameters for the "initial" case phi, phit, phisc, phitsc = _construct_var(system, classic_des, expr) theta, thetac = _construct_par(model_name, theta_parameters) if swps is None: swps, swpsu = {}, {} case = 'initial' df_combined = _inssimulator( t_values, swps, swpsu, phi, phisc, phit, phitsc, tsc, theta, thetac, cvp_initial, std_dev, t_acc_unique, case, model_name, system, models, errortype ) else: # When design decisions are provided, run the "doe" case St = design_decisions['St'] t_values = {var: np.array(St[var]) / tsc for var in St} # Accumulated time points for DOE t_acc_unique = np.array(design_decisions['t_values']) case = 'doe' # Construct scaling and parameters _, _, phisc, phitsc = _construct_var(system, classic_des, expr) theta, thetac = _construct_par(model_name, theta_parameters) phi = design_decisions['tii'] phit = design_decisions['tvi'] swpsu = swps if swps is not None else {} # Scale tii and tvi phi_scaled = { var: np.array(phi[var]) / phisc[var] for var in phi if var in system['tii'].keys() } phit_scaled = { var: np.array(phit[var]) for var in phit if var in system['tvi'].keys() } # Scale swps swps_scaled = {} if swpsu: for var in swpsu: if var.endswith('l'): # Length-based scaling var_base = var.rstrip('l') swps_scaled[var] = np.array(swpsu[var]) / phitsc[var_base] elif var.endswith('t'): # Time-based scaling swps_scaled[var] = np.array(swpsu[var]) / tsc else: raise ValueError(f"Unrecognized variable format for {var} in swps.") df_combined = _inssimulator( t_values, swps_scaled, swpsu, phi_scaled, phisc, phit_scaled, phitsc, tsc, theta, thetac, cvp_doe, std_dev, t_acc_unique, case, model_name, system, models, errortype ) # Define Excel path for in-silico data excel_path = Path.cwd() / 'data.xlsx' experiment_number = str(expr) # Check if the file already exists if os.path.isfile(excel_path): # Open the file in append mode if it exists with pd.ExcelWriter(excel_path, mode='a', engine='openpyxl') as writer: existing_sheets = writer.book.sheetnames if experiment_number in existing_sheets: # Append to the existing sheet df_combined.to_excel(writer, sheet_name=experiment_number, index=False) else: # Create a new sheet df_combined.to_excel(writer, sheet_name=experiment_number, index=False) else: # If the file does not exist, create it and write the data with pd.ExcelWriter(excel_path, engine='openpyxl') as writer: df_combined.to_excel(writer, sheet_name=experiment_number, index=False) # Return Excel path and DataFrame, with summary print print(f"[✓] In-silico data saved to: {excel_path}") print(f"[INFO] Model used : {model_name}") print(f"[INFO] Design case : {'classic/preliminary' if case == 'initial' else 'MBDoE'}") print(f"[INFO] Responses simulated:") for response in system.get('tvo', {}): meas = system['tvo'][response].get('meas', True) unc = system['tvo'][response].get('unc', 'N/A') status = "measurable" if meas else "non-measurable" print(f" - {response:<10} | {status:<15} | std.dev = {unc}") return excel_path, df_combined
def _construct_var(system, classic_des, j): r""" Construct and normalize variable dictionaries from system configuration. This helper function extracts variable values from preliminary designs and normalizes them by their maximum bounds. It handles both time-invariant and time-variant inputs, preparing them for simulation in normalized coordinates. Parameters ---------- system : dict System configuration with variable definitions: - 'tvi' : dict Time-variant inputs with 'max' bounds. - 'tii' : dict Time-invariant inputs with 'max' bounds. classic_des : dict Preliminary design values: - Keys: experiment indices (str) - Values: dicts mapping variable names to values j : int Experiment index for extracting values from classic_des. Returns ------- phi : dict[str, float] Normalized time-invariant input values (phi / phisc). phit : dict[str, float] Normalized time-variant input values (phit / phitsc). phisc : dict[str, float] Scaling factors (max values) for time-invariant inputs. phitsc : dict[str, float] Scaling factors (max values) for time-variant inputs. Notes ----- **Normalization**: All inputs are normalized to [0, 1] by dividing by their maximum values: \[ \phi_{normalized} = \frac{\phi}{\phi_{max}} \] This improves numerical conditioning and allows bounds-free optimization. **Default Values**: If a variable is not present in classic_des[j], its maximum value is used as default. This ensures all variables have valid values even if partially specified. **Use Cases**: - Preliminary design: Uses classic_des values directly - MBDoE design: Returns only scaling factors (phisc, phitsc) See Also -------- expera : Main function that calls this for variable setup. _construct_par : Analogous function for parameters. Examples -------- >>> system = { ... 'tvi': {'T': {'max': 400}}, ... 'tii': {'P': {'max': 5.0}} ... } >>> classic_des = {'1': {'T': 350, 'P': 3.0}} >>> phi, phit, phisc, phitsc = _construct_var(system, classic_des, 1) >>> print(phi) # {'P': 0.6} (3.0 / 5.0) >>> print(phit) # {'T': 0.875} (350 / 400) >>> print(phisc) # {'P': 5.0} >>> print(phitsc) # {'T': 400} """ phit = {} phisc = {} phitsc = {} phi = {} # Time-variant input variables (tv_iphi) for var in system['tvi'].keys(): max_val = system['tvi'][var]['max'] classic_value = classic_des.get(str(j), {}).get(var, max_val) phitsc[var] = max_val phit[var] = classic_value / max_val # Time-invariant input variables (ti_iphi) for var in system['tii'].keys(): max_val = system['tii'][var]['max'] classic_value = classic_des.get(str(j), {}).get(var, max_val) phisc[var] = max_val phi[var] = classic_value / max_val return phi, phit, phisc, phitsc def _construct_par(model_name, theta_parameters): r""" Construct normalized and scaled parameter vectors for simulation. This helper function prepares parameter vectors for forward model simulation. Parameters are normalized to [1, 1, ...] with separate scaling factors (thetac) following the same convention used in parameter estimation routines. Parameters ---------- model_name : str Model name (currently unused but passed for extensibility). theta_parameters : list[float] True parameter values (ground truth for in-silico experiments). Returns ------- theta : list[float] Normalized parameter vector (all ones: [1, 1, ..., 1]). thetac : list[float] Scaling factors (true parameter values). Raises ------ ValueError If theta_parameters is invalid or cannot be accessed. Notes ----- **Normalization Convention**: Following the pattern used throughout the package: \[ \theta_{physical} = \theta_{normalized} \cdot \theta_c \] where \( \theta_{normalized} = [1, 1, ..., 1] \) for the true model. **True Model Simulation**: In-silico experiments use the "true" parameter values from insilicos['theta']. These represent the ground truth that parameter estimation aims to recover. **Consistency with Estimation**: This normalization matches the convention in parmest(), allowing estimated parameters (theta * thetac) to be directly compared to ground truth (thetac). See Also -------- expera : Main function that calls this for parameter setup. _construct_var : Analogous function for variables. parmest : Parameter estimation routine using same normalization. Examples -------- >>> theta_true = [1.5, 2.3, 0.8] # True parameters >>> theta, thetac = _construct_par('M1', theta_true) >>> print(theta) # [1, 1, 1] >>> print(thetac) # [1.5, 2.3, 0.8] >>> # Physical parameters: theta * thetac = [1.5, 2.3, 0.8] """ try: thetac = theta_parameters theta = [1] * len(thetac) return theta, thetac except Exception as e: raise ValueError(f"Unable to adjust theta or thetac for model {model_name}: {e}") def _inssimulator(t_values, swps, swpsu, phi, phisc, phit, phitsc, tsc, theta, thetac, cvp, std_dev, t_valuesc, case, model_name, system, models, errortype): r""" Execute forward simulation and add measurement noise to create synthetic experimental data. This is the core simulation routine that calls the forward model, generates noisy measurements, and organizes results into a structured DataFrame. It handles both time-variant and time-invariant outputs, interpolates input profiles, and applies realistic measurement noise according to specified error models. Parameters ---------- t_values : dict[str, list[float]] Measurement times for each output variable (normalized [0,1]): - Keys: output variable names (e.g., 'y1', 'y2') - Values: lists of normalized sampling times swps : dict[str, np.ndarray] Scaled switching points: - '{var}t': switching times (normalized) - '{var}l': switching levels (normalized) swpsu : dict[str, np.ndarray] Unscaled switching points (for recording in output). phi : dict[str, float] Normalized time-invariant inputs. phisc : dict[str, float] Scaling factors for time-invariant inputs. phit : dict[str, float] or dict[str, np.ndarray] Time-variant inputs (constant or profile). phitsc : dict[str, float] Scaling factors for time-variant inputs. tsc : float Time scaling factor (physical time = normalized time * tsc). theta : list[float] Normalized parameter vector. thetac : list[float] Parameter scaling factors. cvp : dict[str, str] Control variable parameterization methods for each time-variant input. std_dev : dict[str, float] Measurement standard deviations for each output variable. t_valuesc : np.ndarray Global time vector for simulation (normalized). case : str Experiment type: 'initial' (preliminary) or 'doe' (MBDoE). model_name : str Name of the forward model to simulate. system : dict System configuration. models : dict Model definitions. errortype : str Error type: 'abs' (absolute noise) or 'rel' (relative noise). Returns ------- df_combined : pd.DataFrame Comprehensive DataFrame with all experimental data organized in sections: **Section 1 - Per-variable measurements**: - 'MES_X:{var}': Sampling times (physical units) - 'MES_Y:{var}': Noisy measured values - 'MES_E:{var}': Measurement uncertainties **Section 2 - Global simulation**: - 'X:all': Full time vector - Time-invariant inputs (phi variables) - Time-variant inputs (phit variables, interpolated) - Time-invariant outputs (noisy, constant) **Section 3 - CVP metadata**: - 'CVP:{var}': Control parameterization method for each input **Section 4 - Switching points** (DOE only): - '{var}t': Switching times (unscaled) - '{var}l': Switching levels (unscaled) Raises ------ ValueError If case is not 'initial' or 'doe', or if errortype is invalid. KeyError If a measured variable is missing from simulation results. Notes ----- **Simulation Workflow**: 1. Call forward model (simula) with normalized inputs/parameters 2. Extract predictions at measurement times via interpolation 3. Add Gaussian noise according to error type 4. Clip negative values to zero (physical constraint) 5. Organize into standardized DataFrame format **Noise Models**: - **Absolute** ('abs'): \[ y_{noisy} \sim \mathcal{N}(y_{true}, \sigma^2) \] Constant variance across signal range. - **Relative** ('rel'): \[ y_{noisy} \sim \mathcal{N}(y_{true}, (\sigma \cdot |y_{true}|)^2) \] Signal-dependent variance (heteroscedastic). **Time-Invariant Output Noise**: For time-invariant outputs, noise is added once and replicated across all time points. This reflects that these quantities are measured once per experiment, not continuously. **DataFrame Organization**: Multiple sections are combined horizontally with blank columns as separators. This mimics the structure expected by read_excel() during parameter estimation, ensuring consistency between data generation and analysis. **Negative Value Handling**: Physical quantities (concentrations, temperatures, etc.) cannot be negative. Noisy values are clipped: \( y_{noisy} = \max(y_{noisy}, 0) \). See Also -------- expera : Main function that calls this routine. simula : Forward model simulation kernel. Examples -------- >>> # Typically called internally by expera() >>> df = _inssimulator( ... t_values={'y1': [0, 0.5, 1.0]}, ... swps={}, swpsu={}, ... phi={'P': 0.6}, phisc={'P': 5.0}, ... phit={'T': 0.875}, phitsc={'T': 400}, ... tsc=100, theta=[1, 1], thetac=[1.5, 2.3], ... cvp={'T': 'no_CVP'}, std_dev={'y1': 0.05}, ... t_valuesc=np.array([0, 0.5, 1.0]), ... case='initial', model_name='M1', ... system=system, models=models, errortype='abs' ... ) >>> print(df.filter(like='MES_Y').columns) # ['MES_Y:y1'] """ # Run the model to get tv_ophi, ti_ophi, phit_interp if case == 'initial': tv_ophi, ti_ophi, phit_interp = simula( t_valuesc, {}, phi, phisc, phitsc, tsc, theta, thetac, cvp, phit, model_name, system, models ) elif case == 'doe': tv_ophi, ti_ophi, phit_interp = simula( t_valuesc, swps, phi, phisc, phitsc, tsc, theta, thetac, cvp, phit, model_name, system, models ) else: raise ValueError(f"Unsupported case: {case}") # Identify measured variables from system measured_vars = [ var for var in system['tvo'].keys() if system['tvo'][var].get('measured', True) ] # Prepare lists of independent and dependent variables indep_vars = list(range(len(measured_vars))) dep_vars = measured_vars if len(indep_vars) != len(dep_vars): raise ValueError("The number of independent variables must match the number of dependent variables.") indep_to_dep_mapping = dict(zip([f"X{i+1}" for i in indep_vars], dep_vars)) # Check that all dependent variables exist in tv_ophi for indep_var, dep_var in indep_to_dep_mapping.items(): if dep_var not in tv_ophi: raise KeyError( f"Dependent variable '{dep_var}' is missing in tv_ophi. Available keys: {list(tv_ophi.keys())}" ) # 1. Per-variable DataFrames for tv_ophi var_dfs = [] for indep_var, dep_var in indep_to_dep_mapping.items(): var_times = [t * tsc for t in t_values[dep_var]] var_results = [] for t in var_times: # Find the closest index in the scaled time vector closest_idx = np.argmin(np.abs(np.array(t_valuesc) * tsc - t)) original_val = tv_ophi[dep_var][closest_idx] # Compute noise based on selected error type if errortype == 'abs': sigma = std_dev[dep_var] elif errortype == 'rel': sigma = std_dev[dep_var] * abs(original_val) else: raise ValueError(f"Unsupported errortype: {errortype}") noisy_val = np.random.normal(loc=original_val, scale=sigma) noisy_val = max(noisy_val, 0.0) var_results.append([t, noisy_val, sigma]) df_var = pd.DataFrame(var_results, columns=[f"MES_X:{dep_var}", f"MES_Y:{dep_var}", f"MES_E:{dep_var}"]) var_dfs.append(df_var) # 2. Global DataFrame (t_global, tii, tvi/phit_interp, ti_ophi) phi_order = list(phi.keys()) ti_ophi_order = list(ti_ophi.keys()) if case == 'classic': phit_order = list(phit.keys()) else: phit_order = list(phit_interp.keys()) # Noise added once for ti_ophi ti_ophi_noisy_once = { var: np.random.normal(val[0], abs(std_dev.get(var, 0) * val[0])) for var, val in ti_ophi.items() } global_results = [] for i, t_global in enumerate(t_valuesc): row = [t_global * tsc] # tii phi_vals = [phi[v] * phisc[v] for v in phi_order] row.extend(phi_vals) # tvi or phit_interp if case == 'classic': phit_vals = [phit[v] * phitsc[v] for v in phit_order] else: phit_vals = [phit_interp[v][i] for v in phit_order] row.extend(phit_vals) # ti_ophi noisy (constant) ti_ophi_vals = [ti_ophi_noisy_once[v] for v in ti_ophi_order] row.extend(ti_ophi_vals) global_results.append(row) global_columns = (['X:all'] + phi_order + phit_order + ti_ophi_order) df_global = pd.DataFrame(global_results, columns=global_columns) # Combine var_dfs and df_global horizontally all_dfs = var_dfs + [df_global] df_main = pd.concat(all_dfs, axis=1) # 3. piecewise_func DataFrame df_piecewise = pd.DataFrame([{f"CVP:{var}": method for var, method in cvp.items()}]) # Combine all into one DataFrame df_combined = pd.concat([df_main, df_piecewise], axis=1) # 4. swpsu DataFrame if case == 'doe': swpsu_order = list(swpsu.keys()) swps_data = {key: swpsu[key] for key in swpsu_order} df_swps = pd.DataFrame(swps_data) # Insert blank columns to separate sections max_length = max(len(df_main), len(df_piecewise), len(df_swps)) df_blank_1 = pd.DataFrame({' ': [None] * max_length}) df_blank_2 = pd.DataFrame({' ': [None] * max_length}) # Combine all into one DataFrame df_combined = pd.concat([df_main, df_blank_1, df_piecewise, df_blank_2, df_swps], axis=1) return df_combined