Skip to content

Scenarios API

Package Exports

Scenario helpers for PARE.

PAREScenario

Bases: Scenario

Base class for all PARE scenarios.

Source code in pare/scenarios/scenario.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class PAREScenario(Scenario):
    """Base class for all PARE scenarios."""

    def __post_init__(self) -> None:
        super().__post_init__()
        # Copy class-level field overrides to instance attributes.
        # This allows subclasses to define fields like `additional_system_prompt` as class
        # attributes, which would otherwise be shadowed by the dataclass field defaults.
        for f in fields(self):
            class_value = getattr(self.__class__, f.name, None)
            if class_value is not None:
                setattr(self, f.name, class_value)

    def initialize(self, *args: Any, **kwargs: Any) -> None:
        """Initialize the scenario with all events and noise configurations."""
        if self._initialized:  # type: ignore[has-type]
            return

        # Initialize apps with the context
        self.init_and_populate_apps(*args, **kwargs)

        # Set the seed for each app
        if self.apps is not None:
            for app in self.apps:
                app.set_seed(self.seed)

        self.apply_augmentation_configs()

        # Preserve the initial state of the apps.
        self._initial_apps = {
            app.name: {
                "class_name": app.__class__.__name__,
                "serialized_state": json.dumps(app.get_state(), cls=EnumEncoder),
            }
            for app in self.apps or []
        }

        self.build_events_flow()

        if self.env_events_config is not None:
            augmentation_data_path_relative = os.getenv(
                "ENV_AUGMENTATION_DATA_PATH", "data/metaare_augmentation_data.json"
            )
            augmentation_data_path = PROJECT_ROOT / augmentation_data_path_relative
            if not augmentation_data_path.exists():
                raise ValueError(
                    f"ENV_AUGMENTATION_DATA_PATH is not set, but Environmental Noise is enabled. Expected path: {augmentation_data_path}"
                )
            with open(augmentation_data_path) as f:
                augmentation_data = json.load(f)
            self.augmentation_data = augmentation_data
            expander = PAREEnvEventsExpander(env_events_config=self.env_events_config)
            expander.add_env_events_to_scenario(scenario=self, apps_augmentation_data=self.augmentation_data["apps"])

        self._initialized = True

    def apply_augmentation_configs(self) -> None:
        """Apply the augmentation configurations to the scenario."""
        # We don't apply any augmentation to the system and agent ui app.
        apps_to_filter = ["PAREAgentUserInterface", "HomeScreenSystemApp"]
        filtered_apps = [app for app in self.apps if app.name not in apps_to_filter]
        if self.tool_augmentation_config is not None and self.apps is not None:
            for app in filtered_apps:
                app.set_failure_probability(self.tool_augmentation_config.tool_failure_probability)

            if self.augmentation_data is not None:
                name_map = self.augmentation_data.get("tool_names_mapping", {})
                desc_map = self.augmentation_data.get("tool_descriptions_mapping", {})

                for app in filtered_apps:
                    for tool in app.get_tools():
                        if self.tool_augmentation_config.apply_tool_name_augmentation:
                            tool._public_name = name_map.get(tool.name, tool.name)

                        if self.tool_augmentation_config.apply_tool_description_augmentation:
                            tool._public_description = desc_map.get(tool.name, tool.function_description)

apply_augmentation_configs()

Apply the augmentation configurations to the scenario.

Source code in pare/scenarios/scenario.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def apply_augmentation_configs(self) -> None:
    """Apply the augmentation configurations to the scenario."""
    # We don't apply any augmentation to the system and agent ui app.
    apps_to_filter = ["PAREAgentUserInterface", "HomeScreenSystemApp"]
    filtered_apps = [app for app in self.apps if app.name not in apps_to_filter]
    if self.tool_augmentation_config is not None and self.apps is not None:
        for app in filtered_apps:
            app.set_failure_probability(self.tool_augmentation_config.tool_failure_probability)

        if self.augmentation_data is not None:
            name_map = self.augmentation_data.get("tool_names_mapping", {})
            desc_map = self.augmentation_data.get("tool_descriptions_mapping", {})

            for app in filtered_apps:
                for tool in app.get_tools():
                    if self.tool_augmentation_config.apply_tool_name_augmentation:
                        tool._public_name = name_map.get(tool.name, tool.name)

                    if self.tool_augmentation_config.apply_tool_description_augmentation:
                        tool._public_description = desc_map.get(tool.name, tool.function_description)

initialize(*args, **kwargs)

Initialize the scenario with all events and noise configurations.

Source code in pare/scenarios/scenario.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def initialize(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the scenario with all events and noise configurations."""
    if self._initialized:  # type: ignore[has-type]
        return

    # Initialize apps with the context
    self.init_and_populate_apps(*args, **kwargs)

    # Set the seed for each app
    if self.apps is not None:
        for app in self.apps:
            app.set_seed(self.seed)

    self.apply_augmentation_configs()

    # Preserve the initial state of the apps.
    self._initial_apps = {
        app.name: {
            "class_name": app.__class__.__name__,
            "serialized_state": json.dumps(app.get_state(), cls=EnumEncoder),
        }
        for app in self.apps or []
    }

    self.build_events_flow()

    if self.env_events_config is not None:
        augmentation_data_path_relative = os.getenv(
            "ENV_AUGMENTATION_DATA_PATH", "data/metaare_augmentation_data.json"
        )
        augmentation_data_path = PROJECT_ROOT / augmentation_data_path_relative
        if not augmentation_data_path.exists():
            raise ValueError(
                f"ENV_AUGMENTATION_DATA_PATH is not set, but Environmental Noise is enabled. Expected path: {augmentation_data_path}"
            )
        with open(augmentation_data_path) as f:
            augmentation_data = json.load(f)
        self.augmentation_data = augmentation_data
        expander = PAREEnvEventsExpander(env_events_config=self.env_events_config)
        expander.add_env_events_to_scenario(scenario=self, apps_augmentation_data=self.augmentation_data["apps"])

    self._initialized = True

Base Scenario and Validation Types

PAREScenario

Bases: Scenario

Base class for all PARE scenarios.

Source code in pare/scenarios/scenario.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class PAREScenario(Scenario):
    """Base class for all PARE scenarios."""

    def __post_init__(self) -> None:
        super().__post_init__()
        # Copy class-level field overrides to instance attributes.
        # This allows subclasses to define fields like `additional_system_prompt` as class
        # attributes, which would otherwise be shadowed by the dataclass field defaults.
        for f in fields(self):
            class_value = getattr(self.__class__, f.name, None)
            if class_value is not None:
                setattr(self, f.name, class_value)

    def initialize(self, *args: Any, **kwargs: Any) -> None:
        """Initialize the scenario with all events and noise configurations."""
        if self._initialized:  # type: ignore[has-type]
            return

        # Initialize apps with the context
        self.init_and_populate_apps(*args, **kwargs)

        # Set the seed for each app
        if self.apps is not None:
            for app in self.apps:
                app.set_seed(self.seed)

        self.apply_augmentation_configs()

        # Preserve the initial state of the apps.
        self._initial_apps = {
            app.name: {
                "class_name": app.__class__.__name__,
                "serialized_state": json.dumps(app.get_state(), cls=EnumEncoder),
            }
            for app in self.apps or []
        }

        self.build_events_flow()

        if self.env_events_config is not None:
            augmentation_data_path_relative = os.getenv(
                "ENV_AUGMENTATION_DATA_PATH", "data/metaare_augmentation_data.json"
            )
            augmentation_data_path = PROJECT_ROOT / augmentation_data_path_relative
            if not augmentation_data_path.exists():
                raise ValueError(
                    f"ENV_AUGMENTATION_DATA_PATH is not set, but Environmental Noise is enabled. Expected path: {augmentation_data_path}"
                )
            with open(augmentation_data_path) as f:
                augmentation_data = json.load(f)
            self.augmentation_data = augmentation_data
            expander = PAREEnvEventsExpander(env_events_config=self.env_events_config)
            expander.add_env_events_to_scenario(scenario=self, apps_augmentation_data=self.augmentation_data["apps"])

        self._initialized = True

    def apply_augmentation_configs(self) -> None:
        """Apply the augmentation configurations to the scenario."""
        # We don't apply any augmentation to the system and agent ui app.
        apps_to_filter = ["PAREAgentUserInterface", "HomeScreenSystemApp"]
        filtered_apps = [app for app in self.apps if app.name not in apps_to_filter]
        if self.tool_augmentation_config is not None and self.apps is not None:
            for app in filtered_apps:
                app.set_failure_probability(self.tool_augmentation_config.tool_failure_probability)

            if self.augmentation_data is not None:
                name_map = self.augmentation_data.get("tool_names_mapping", {})
                desc_map = self.augmentation_data.get("tool_descriptions_mapping", {})

                for app in filtered_apps:
                    for tool in app.get_tools():
                        if self.tool_augmentation_config.apply_tool_name_augmentation:
                            tool._public_name = name_map.get(tool.name, tool.name)

                        if self.tool_augmentation_config.apply_tool_description_augmentation:
                            tool._public_description = desc_map.get(tool.name, tool.function_description)

apply_augmentation_configs()

Apply the augmentation configurations to the scenario.

Source code in pare/scenarios/scenario.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def apply_augmentation_configs(self) -> None:
    """Apply the augmentation configurations to the scenario."""
    # We don't apply any augmentation to the system and agent ui app.
    apps_to_filter = ["PAREAgentUserInterface", "HomeScreenSystemApp"]
    filtered_apps = [app for app in self.apps if app.name not in apps_to_filter]
    if self.tool_augmentation_config is not None and self.apps is not None:
        for app in filtered_apps:
            app.set_failure_probability(self.tool_augmentation_config.tool_failure_probability)

        if self.augmentation_data is not None:
            name_map = self.augmentation_data.get("tool_names_mapping", {})
            desc_map = self.augmentation_data.get("tool_descriptions_mapping", {})

            for app in filtered_apps:
                for tool in app.get_tools():
                    if self.tool_augmentation_config.apply_tool_name_augmentation:
                        tool._public_name = name_map.get(tool.name, tool.name)

                    if self.tool_augmentation_config.apply_tool_description_augmentation:
                        tool._public_description = desc_map.get(tool.name, tool.function_description)

initialize(*args, **kwargs)

Initialize the scenario with all events and noise configurations.

Source code in pare/scenarios/scenario.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def initialize(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the scenario with all events and noise configurations."""
    if self._initialized:  # type: ignore[has-type]
        return

    # Initialize apps with the context
    self.init_and_populate_apps(*args, **kwargs)

    # Set the seed for each app
    if self.apps is not None:
        for app in self.apps:
            app.set_seed(self.seed)

    self.apply_augmentation_configs()

    # Preserve the initial state of the apps.
    self._initial_apps = {
        app.name: {
            "class_name": app.__class__.__name__,
            "serialized_state": json.dumps(app.get_state(), cls=EnumEncoder),
        }
        for app in self.apps or []
    }

    self.build_events_flow()

    if self.env_events_config is not None:
        augmentation_data_path_relative = os.getenv(
            "ENV_AUGMENTATION_DATA_PATH", "data/metaare_augmentation_data.json"
        )
        augmentation_data_path = PROJECT_ROOT / augmentation_data_path_relative
        if not augmentation_data_path.exists():
            raise ValueError(
                f"ENV_AUGMENTATION_DATA_PATH is not set, but Environmental Noise is enabled. Expected path: {augmentation_data_path}"
            )
        with open(augmentation_data_path) as f:
            augmentation_data = json.load(f)
        self.augmentation_data = augmentation_data
        expander = PAREEnvEventsExpander(env_events_config=self.env_events_config)
        expander.add_env_events_to_scenario(scenario=self, apps_augmentation_data=self.augmentation_data["apps"])

    self._initialized = True

PARE-specific validation result classes for scenario execution.

PAREMultiScenarioValidationResult dataclass

PARE-specific multi-scenario validation result with proactive agent metrics.

Source code in pare/scenarios/validation_result.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
@dataclass
class PAREMultiScenarioValidationResult:
    """PARE-specific multi-scenario validation result with proactive agent metrics."""

    run_config: MultiScenarioRunnerConfig

    # Dictionary mapping (base_scenario_id, run_number) tuples to their respective validation results
    scenario_results: dict[tuple[str, int | None], PAREScenarioValidationResult] = field(default_factory=dict)

    # Duration of the entire validation run in seconds
    duration: float = 0.0

    # Counts of different scenario outcomes
    successful_count: int = 0
    failed_count: int = 0
    exception_count: int = 0
    no_validation_count: int = 0

    @property
    def total_proposals(self) -> int:
        """Total number of proposals across all scenarios."""
        return sum(result.proposal_count for result in self.scenario_results.values())

    @property
    def total_acceptances(self) -> int:
        """Total number of accepted proposals across all scenarios."""
        return sum(result.acceptance_count for result in self.scenario_results.values())

    @property
    def total_turns(self) -> int:
        """Total number of turns across all scenarios."""
        return sum(result.number_of_turns for result in self.scenario_results.values())

    @property
    def total_read_only_actions(self) -> int:
        """Total number of read-only actions across all scenarios."""
        return sum(result.read_only_actions for result in self.scenario_results.values())

    @property
    def total_write_actions(self) -> int:
        """Total number of write actions across all scenarios."""
        return sum(result.write_actions for result in self.scenario_results.values())

    @property
    def aggregate_proposal_rate(self) -> float:
        """Overall proposals per turn across all scenarios."""
        if self.total_turns == 0:
            return 0.0
        return self.total_proposals / self.total_turns

    @property
    def aggregate_acceptance_rate(self) -> float:
        """Overall accepted proposals / total proposals across all scenarios."""
        if self.total_proposals == 0:
            return 0.0
        return self.total_acceptances / self.total_proposals

    @property
    def success_rate(self) -> float:
        """Overall success rate across all scenarios."""
        total_validations = self.successful_count + self.failed_count + self.exception_count + self.no_validation_count
        if total_validations == 0:
            return 0.0
        return self.successful_count / total_validations

    def success_rate_updated(self) -> float:
        """Overall success rate across all scenarios."""
        total_validations = self.successful_count + self.failed_count + self.exception_count + self.no_validation_count
        if total_validations == 0:
            return 0.0
        return self.successful_count / total_validations

    def add_result(self, result: PAREScenarioValidationResult, scenario_id: str, run_number: int | None = None) -> None:
        """Add a scenario validation result to the multi-scenario results.

        Args:
            result: The PAREScenarioValidationResult to add.
            scenario_id: The base scenario ID.
            run_number: The run number (optional).
        """
        self.scenario_results[(scenario_id, run_number)] = result

        # Update counts based on the result's success status
        if result.success is True:
            self.successful_count += 1
        elif result.success is False:
            self.failed_count += 1
        elif result.exception is not None:
            self.exception_count += 1
        else:
            self.no_validation_count += 1

    def to_polars(self, extra_columns: dict[str, str] | None = None) -> pl.DataFrame:
        """Convert the multi-scenario validation results to a Polars DataFrame.

        Args:
            extra_columns: Addtional columns to add to each row (e.g., phase_name, config, etc.)

        Returns:
            Polars DataFrame with one row per scenario run.
        """
        rows = []

        for scenario_key, scenario_result in self.scenario_results.items():
            base_scenario_id, run_number = scenario_key

            # Convert success to numeric (1.0 for True, 0.0 for False, None for exception)
            success_numeric = (
                1.0 if scenario_result.success is True else 0.0 if scenario_result.success is False else None
            )

            # Determine status
            if scenario_result.success is True:
                status = "success"
            elif scenario_result.success is False:
                status = "failed"
            elif scenario_result.exception is not None:
                status = "exception"
            else:
                status = "no_validation"

            row = {
                # Scenario identification
                "base_scenario_id": base_scenario_id,
                "run_number": run_number,
                # Success fields
                "success_numeric": success_numeric,
                "success_bool": scenario_result.success,
                "status": status,
                # Exception fields
                "has_exception": scenario_result.exception is not None,
                "exception_type": type(scenario_result.exception).__name__ if scenario_result.exception else None,
                "exception_message": str(scenario_result.exception) if scenario_result.exception else None,
                # Other base fields
                "rationale": scenario_result.rationale,
                "export_path": scenario_result.export_path,
                "run_duration": scenario_result.duration,
                "job_duration": self.duration,
                # Model configuration (PARE has 3 agents) - use aliases for human-readable names
                "user_model": self.run_config.user_model_alias,
                "user_provider": self.run_config.user_engine_config.provider,
                "observe_model": self.run_config.observe_model_alias,
                "observe_provider": self.run_config.observe_engine_config.provider,
                "execute_model": self.run_config.execute_model_alias,
                "execute_provider": self.run_config.execute_engine_config.provider,
                # Agent type and proactive model identifier (for aggregation key)
                "agent_type": self.run_config.agent_type,
                "proactive_model": f"{self.run_config.agent_type}_{self.run_config.observe_model_alias}_{self.run_config.execute_model_alias}",
                # Noise configuration
                "tool_failure_probability": (
                    self.run_config.tool_augmentation_config.tool_failure_probability
                    if self.run_config.tool_augmentation_config is not None
                    else 0.0
                ),
                "num_env_events_per_minute": (
                    self.run_config.env_events_config.num_env_events_per_minute
                    if self.run_config.env_events_config is not None
                    else 0
                ),
                # PARE-specific metrics
                "proposal_count": scenario_result.proposal_count,
                "acceptance_count": scenario_result.acceptance_count,
                "read_only_actions": scenario_result.read_only_actions,
                "write_actions": scenario_result.write_actions,
                "number_of_turns": scenario_result.number_of_turns,
                "proposal_rate": scenario_result.proposal_rate,
                "acceptance_rate": scenario_result.acceptance_rate,
            }

            # Add any extra columns provided (cast all values to string to ensure consistent schema)
            if extra_columns:
                row.update({k: str(v) for k, v in extra_columns.items()})
            rows.append(row)

        # Build schema from the module-level constant, adding any extra columns
        schema = dict(PARE_RESULT_SCHEMA)
        if extra_columns:
            for col_name in extra_columns:
                if col_name not in schema:
                    schema[col_name] = pl.Utf8

        return pl.DataFrame(rows, schema=schema)

    def description(
        self,
        split: str = "unknown",
        weight_per_app_class: dict[str, float] | None = None,
    ) -> str:
        """Generate human-readable summary with PARE metrics.

        Uses the reporting infrastructure for consistency with combined reports.

        Args:
            split: Dataset split name (e.g., "full", "ablation").
            weight_per_app_class: Weight per app class from EnvEventsConfig.

        Returns:
            Formatted report string.
        """
        # Import inside method to avoid circular import
        from pare.benchmark.report_stats import generate_validation_report

        df = self.to_polars()
        return generate_validation_report(df, split, weight_per_app_class)

aggregate_acceptance_rate property

Overall accepted proposals / total proposals across all scenarios.

aggregate_proposal_rate property

Overall proposals per turn across all scenarios.

success_rate property

Overall success rate across all scenarios.

total_acceptances property

Total number of accepted proposals across all scenarios.

total_proposals property

Total number of proposals across all scenarios.

total_read_only_actions property

Total number of read-only actions across all scenarios.

total_turns property

Total number of turns across all scenarios.

total_write_actions property

Total number of write actions across all scenarios.

add_result(result, scenario_id, run_number=None)

Add a scenario validation result to the multi-scenario results.

Parameters:

Name Type Description Default
result PAREScenarioValidationResult

The PAREScenarioValidationResult to add.

required
scenario_id str

The base scenario ID.

required
run_number int | None

The run number (optional).

None
Source code in pare/scenarios/validation_result.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def add_result(self, result: PAREScenarioValidationResult, scenario_id: str, run_number: int | None = None) -> None:
    """Add a scenario validation result to the multi-scenario results.

    Args:
        result: The PAREScenarioValidationResult to add.
        scenario_id: The base scenario ID.
        run_number: The run number (optional).
    """
    self.scenario_results[(scenario_id, run_number)] = result

    # Update counts based on the result's success status
    if result.success is True:
        self.successful_count += 1
    elif result.success is False:
        self.failed_count += 1
    elif result.exception is not None:
        self.exception_count += 1
    else:
        self.no_validation_count += 1

description(split='unknown', weight_per_app_class=None)

Generate human-readable summary with PARE metrics.

Uses the reporting infrastructure for consistency with combined reports.

Parameters:

Name Type Description Default
split str

Dataset split name (e.g., "full", "ablation").

'unknown'
weight_per_app_class dict[str, float] | None

Weight per app class from EnvEventsConfig.

None

Returns:

Type Description
str

Formatted report string.

Source code in pare/scenarios/validation_result.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def description(
    self,
    split: str = "unknown",
    weight_per_app_class: dict[str, float] | None = None,
) -> str:
    """Generate human-readable summary with PARE metrics.

    Uses the reporting infrastructure for consistency with combined reports.

    Args:
        split: Dataset split name (e.g., "full", "ablation").
        weight_per_app_class: Weight per app class from EnvEventsConfig.

    Returns:
        Formatted report string.
    """
    # Import inside method to avoid circular import
    from pare.benchmark.report_stats import generate_validation_report

    df = self.to_polars()
    return generate_validation_report(df, split, weight_per_app_class)

success_rate_updated()

Overall success rate across all scenarios.

Source code in pare/scenarios/validation_result.py
160
161
162
163
164
165
def success_rate_updated(self) -> float:
    """Overall success rate across all scenarios."""
    total_validations = self.successful_count + self.failed_count + self.exception_count + self.no_validation_count
    if total_validations == 0:
        return 0.0
    return self.successful_count / total_validations

to_polars(extra_columns=None)

Convert the multi-scenario validation results to a Polars DataFrame.

Parameters:

Name Type Description Default
extra_columns dict[str, str] | None

Addtional columns to add to each row (e.g., phase_name, config, etc.)

None

Returns:

Type Description
DataFrame

Polars DataFrame with one row per scenario run.

Source code in pare/scenarios/validation_result.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def to_polars(self, extra_columns: dict[str, str] | None = None) -> pl.DataFrame:
    """Convert the multi-scenario validation results to a Polars DataFrame.

    Args:
        extra_columns: Addtional columns to add to each row (e.g., phase_name, config, etc.)

    Returns:
        Polars DataFrame with one row per scenario run.
    """
    rows = []

    for scenario_key, scenario_result in self.scenario_results.items():
        base_scenario_id, run_number = scenario_key

        # Convert success to numeric (1.0 for True, 0.0 for False, None for exception)
        success_numeric = (
            1.0 if scenario_result.success is True else 0.0 if scenario_result.success is False else None
        )

        # Determine status
        if scenario_result.success is True:
            status = "success"
        elif scenario_result.success is False:
            status = "failed"
        elif scenario_result.exception is not None:
            status = "exception"
        else:
            status = "no_validation"

        row = {
            # Scenario identification
            "base_scenario_id": base_scenario_id,
            "run_number": run_number,
            # Success fields
            "success_numeric": success_numeric,
            "success_bool": scenario_result.success,
            "status": status,
            # Exception fields
            "has_exception": scenario_result.exception is not None,
            "exception_type": type(scenario_result.exception).__name__ if scenario_result.exception else None,
            "exception_message": str(scenario_result.exception) if scenario_result.exception else None,
            # Other base fields
            "rationale": scenario_result.rationale,
            "export_path": scenario_result.export_path,
            "run_duration": scenario_result.duration,
            "job_duration": self.duration,
            # Model configuration (PARE has 3 agents) - use aliases for human-readable names
            "user_model": self.run_config.user_model_alias,
            "user_provider": self.run_config.user_engine_config.provider,
            "observe_model": self.run_config.observe_model_alias,
            "observe_provider": self.run_config.observe_engine_config.provider,
            "execute_model": self.run_config.execute_model_alias,
            "execute_provider": self.run_config.execute_engine_config.provider,
            # Agent type and proactive model identifier (for aggregation key)
            "agent_type": self.run_config.agent_type,
            "proactive_model": f"{self.run_config.agent_type}_{self.run_config.observe_model_alias}_{self.run_config.execute_model_alias}",
            # Noise configuration
            "tool_failure_probability": (
                self.run_config.tool_augmentation_config.tool_failure_probability
                if self.run_config.tool_augmentation_config is not None
                else 0.0
            ),
            "num_env_events_per_minute": (
                self.run_config.env_events_config.num_env_events_per_minute
                if self.run_config.env_events_config is not None
                else 0
            ),
            # PARE-specific metrics
            "proposal_count": scenario_result.proposal_count,
            "acceptance_count": scenario_result.acceptance_count,
            "read_only_actions": scenario_result.read_only_actions,
            "write_actions": scenario_result.write_actions,
            "number_of_turns": scenario_result.number_of_turns,
            "proposal_rate": scenario_result.proposal_rate,
            "acceptance_rate": scenario_result.acceptance_rate,
        }

        # Add any extra columns provided (cast all values to string to ensure consistent schema)
        if extra_columns:
            row.update({k: str(v) for k, v in extra_columns.items()})
        rows.append(row)

    # Build schema from the module-level constant, adding any extra columns
    schema = dict(PARE_RESULT_SCHEMA)
    if extra_columns:
        for col_name in extra_columns:
            if col_name not in schema:
                schema[col_name] = pl.Utf8

    return pl.DataFrame(rows, schema=schema)

PAREScenarioValidationResult dataclass

PARE-specific scenario validation result with proactive agent metrics.

Standalone dataclass (not extending Meta-ARE's ScenarioValidationResult) to avoid dataclass inheritance issues.

Source code in pare/scenarios/validation_result.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@dataclass
class PAREScenarioValidationResult:
    """PARE-specific scenario validation result with proactive agent metrics.

    Standalone dataclass (not extending Meta-ARE's ScenarioValidationResult)
    to avoid dataclass inheritance issues.
    """

    # Base fields (mirrored from Meta-ARE's ScenarioValidationResult)

    # Flag indicating whether the scenario validation was successful.
    # None indicated that the judge or run failed (an exception occurred).
    success: bool | None

    # Optional exception that occured during validation, if any.
    exception: Exception | None = None

    # Optional path to exported traces, if applicable.
    export_path: str | None = None

    # Optional description of the rationale.
    rationale: str | None = None

    # Duration of the run in seconds.
    duration: float | None = None

    # PARE-specific stored fields
    proposal_count: int = 0
    acceptance_count: int = 0
    read_only_actions: int = 0
    write_actions: int = 0
    number_of_turns: int = 0

    @property
    def proposal_rate(self) -> float:
        """Proposals per turn."""
        if self.number_of_turns == 0:
            return 0.0
        return self.proposal_count / self.number_of_turns

    @property
    def acceptance_rate(self) -> float:
        """Accepted proposals / total proposals."""
        if self.proposal_count == 0:
            return 0.0
        return self.acceptance_count / self.proposal_count

acceptance_rate property

Accepted proposals / total proposals.

proposal_rate property

Proposals per turn.

Registration and Discovery

Registration module for PARE user scenarios.

This module follows Meta-ARE's pattern for auto-registering scenarios. It is loaded via the entry point system when the scenario registry is initialized.

The scenarios directory can be configured via the PARE_SCENARIOS_DIR environment variable.

register_pare_scenarios(registry)

Register all PARE user scenarios with the provided registry.

This function is called by Meta-ARE's scenario registry when it discovers the PARE scenarios entry point. It imports all scenario modules from the configured scenarios directory (or benchmark by default).

The scenarios directory can be configured via PARE_SCENARIOS_DIR environment variable: - Relative path (e.g., "benchmark", "generator") - Multiple directories separated by commas (e.g., "benchmark,generator")

Parameters:

Name Type Description Default
registry ScenarioRegistry

The ScenarioRegistry instance to register with.

required
Source code in pare/scenarios/registration.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def register_pare_scenarios(registry: ScenarioRegistry) -> None:
    """Register all PARE user scenarios with the provided registry.

    This function is called by Meta-ARE's scenario registry when it discovers
    the PARE scenarios entry point. It imports all scenario modules from the
    configured scenarios directory (or benchmark by default).

    The scenarios directory can be configured via PARE_SCENARIOS_DIR environment variable:
    - Relative path (e.g., "benchmark", "generator")
    - Multiple directories separated by commas (e.g., "benchmark,generator")

    Args:
        registry: The ScenarioRegistry instance to register with.
    """
    logger.info("Registering PARE scenarios")

    # Get the base scenarios directory (parent of this file)
    base_scenarios_dir = Path(__file__).parent

    # Get scenarios directory from environment variable or use default
    scenarios_dirs_config = os.getenv("PARE_SCENARIOS_DIR", "benchmark")

    # Support multiple directories separated by commas
    scenarios_dirs = [d.strip() for d in scenarios_dirs_config.split(",")]

    total_imported = 0

    for dir_name in scenarios_dirs:
        # Resolve relative path from base scenarios directory
        scenarios_dir = base_scenarios_dir / dir_name

        if not scenarios_dir.exists():
            logger.warning(f"Scenarios directory not found: {scenarios_dir} (from PARE_SCENARIOS_DIR={dir_name})")
            continue

        logger.info(f"Discovering scenarios in: {scenarios_dir}")

        # Import all Python files in the scenarios directory
        imported_count = 0
        for file_path in scenarios_dir.glob("*.py"):
            # Skip __init__.py
            if file_path.name == "__init__.py":
                continue

            # Get module name - construct full import path
            # Convert path relative to pare/scenarios to module path
            rel_path = file_path.relative_to(base_scenarios_dir)
            module_parts = [*list(rel_path.parts[:-1]), rel_path.stem]
            module_name = f"pare.scenarios.{'.'.join(module_parts)}"

            try:
                # Import the module (triggers @register_scenario decorator)
                importlib.import_module(module_name)
                imported_count += 1
                logger.debug(f"Imported PARE scenario module: {module_name}")
            except Exception as e:
                logger.warning(f"Failed to import PARE scenario module {module_name}: {e}", exc_info=True)

        logger.info(f"Registered {imported_count} scenarios from {scenarios_dir}")
        total_imported += imported_count

    logger.info(f"Total PARE scenarios registered: {total_imported}")

Standalone scenario registry for PARE.

This module provides PARE's own scenario registry that is completely independent of Meta-ARE's scenario registry. PARE scenarios are registered exclusively here.

ScenarioRegistry

Bases: ScenarioRegistry

Standalone scenario registry for PARE.

This registry extends Meta-ARE's ScenarioRegistry but operates completely independently. It only registers PARE scenarios and never loads Meta-ARE's built-in scenarios.

Source code in pare/scenarios/utils/registry.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class ScenarioRegistry(BaseScenarioRegistry):
    """Standalone scenario registry for PARE.

    This registry extends Meta-ARE's ScenarioRegistry but operates completely independently.
    It only registers PARE scenarios and never loads Meta-ARE's built-in scenarios.
    """

    def _discover_and_import_scenarios(self) -> None:
        """Discover and import PARE scenario modules using entry points.

        This method overrides the parent to skip Meta-ARE's built-in scenarios entirely.
        Only PARE scenarios from entry points are loaded.
        """
        if self._scenarios_discovered:  # type: ignore[has-type]
            return

        # Count how many entry points we've loaded
        loaded_entry_points = 0

        # Discover scenarios via entry points (PARE scenarios only)
        for entry_point in importlib_metadata.entry_points(group=SCENARIO_ENTRY_POINT_GROUP):
            try:
                logger.info(f"Loading scenario entry point: {entry_point.name} from {entry_point.dist}")

                # Load the entry point
                scenario_loader = entry_point.load()

                # If it's a callable, call it with this registry
                if callable(scenario_loader):
                    scenario_loader(self)
                    loaded_entry_points += 1
                else:
                    logger.warning(f"Entry point {entry_point.name} is not callable, skipping")
            except Exception as e:
                logger.warning(
                    f"Failed to load scenario entry point {entry_point.name}: {e}",
                    exc_info=True,
                )

        self._scenarios_discovered = True
        logger.info(f"Discovered and loaded {loaded_entry_points} PARE scenario entry points")

register_scenario(scenario_id)

Decorator to register a scenario with PARE registry.

This decorator is PARE's standalone alternative to Meta-ARE's @register_scenario. It registers scenarios exclusively to the PARE registry, keeping it separate from Meta-ARE's global registry.

Usage

from pare.scenarios.registry import register_scenario

@register_scenario('my_scenario_id') class MyScenario(Scenario): ...

Parameters:

Name Type Description Default
scenario_id str

The ID to register the scenario under.

required

Returns:

Type Description
Callable[[type[T]], type[T]]

A decorator function that registers the scenario class.

Source code in pare/scenarios/utils/registry.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def register_scenario(scenario_id: str) -> Callable[[type[T]], type[T]]:
    """Decorator to register a scenario with PARE registry.

    This decorator is PARE's standalone alternative to Meta-ARE's @register_scenario.
    It registers scenarios exclusively to the PARE registry, keeping it separate from
    Meta-ARE's global registry.

    Usage:
        from pare.scenarios.registry import register_scenario

        @register_scenario('my_scenario_id')
        class MyScenario(Scenario):
            ...

    Args:
        scenario_id: The ID to register the scenario under.

    Returns:
        A decorator function that registers the scenario class.
    """
    return registry.register(scenario_id)

Runner Config and Expansion Helpers

MultiScenarioRunnerConfig

Bases: ScenarioRunnerConfig

Configuration for running multiple PARE scenarios in parallel.

Source code in pare/scenarios/config.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class MultiScenarioRunnerConfig(ScenarioRunnerConfig):
    """Configuration for running multiple PARE scenarios in parallel."""

    # Maximum number of concurrent scenarios to run. If not specified, automatically sets based on the number of CPUs.
    max_concurrent_scenarios: int | None = None

    # Timeout for individual scenarios in seconds. If not specified, no timeout is applied.
    timeout_seconds: int | None = None

    # Type of executor to use for running scenarios, options: "sequential", "thread", "process"
    executor_type: str = "thread"

    # Logging Level to use for the runner and worker threads
    log_level: str = "INFO"

    # Whether to log to file
    log_to_file: bool = True

    # Directory for logs files. This is parent level logs directory.
    logs_dir: str = "logs"

    # Enable scenario result caching to skip re-running identical scenarios
    enable_caching: bool = True

    # Experiment name for organizing logs and outputs
    experiment_name: str = "default"

    @model_validator(mode="after")
    def maybe_build_logs_dir(self) -> MultiScenarioRunnerConfig:
        """Maybe build the full logs directory after validation."""
        if self.log_to_file and self.executor_type == "thread":
            import warnings

            warnings.warn(
                "log_to_file is True but executor_type is 'thread' - skipping log directory build", stacklevel=2
            )
            return self
        self._build_logs_dir_internal()
        return self

    def _build_logs_dir_internal(self) -> None:
        """Build the full logs directory based on experiment name.

        Structure: {logs_dir}/{experiment_name}_{config_params}/{proactive_model}_{timestamp}
        """
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

        # build config suffix from relevant params - use aliases for human-readable names
        config_suffix = f"{self.experiment_name}_user_{self.user_type}_{self.user_model_alias}_proactive_{self.agent_type}_mt_{self.max_turns}_umi_{self.user_max_iterations}_ome_{self.observe_max_iterations}_exe_{self.execute_max_iterations}"

        # Add noise params if set
        if self.tool_augmentation_config is not None:
            tfp = getattr(self.tool_augmentation_config, "tool_failure_probability", 0.0)
            config_suffix += f"_tfp_{tfp}"

        if self.env_events_config is not None:
            enmi = getattr(self.env_events_config, "num_env_events_per_min", 0.0)
            config_suffix += f"_enmi_{enmi}"

        # ! TODO: Make it general, get the proactive model identifier from registry and should depend on agent_type
        # Proactive model identifier - use aliases for human-readable names
        proactive_model = f"obs_{self.observe_model_alias}_exec_{self.execute_model_alias}"

        # Build full path
        base_dir = Path(self.logs_dir)
        full_path = base_dir / f"{config_suffix}" / f"{proactive_model}_{timestamp}"
        self.logs_dir = str(full_path)

    def build_logs_dir(self, experiment_name: str | None = None) -> None:
        """Explicitly build the full logs directory path with a new experiment name. Should be called before running scenarios.

        Args:
            experiment_name: The experiment name to use. If None, uses the existing experiment_name in the config.
        """
        if self.log_to_file and self.executor_type == "thread":
            import warnings

            warnings.warn(
                "log_to_file is True but executor_type is 'thread' - skipping log directory build", stacklevel=2
            )
            return
        if experiment_name is not None:
            self.experiment_name = experiment_name
        self._build_logs_dir_internal()

build_logs_dir(experiment_name=None)

Explicitly build the full logs directory path with a new experiment name. Should be called before running scenarios.

Parameters:

Name Type Description Default
experiment_name str | None

The experiment name to use. If None, uses the existing experiment_name in the config.

None
Source code in pare/scenarios/config.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def build_logs_dir(self, experiment_name: str | None = None) -> None:
    """Explicitly build the full logs directory path with a new experiment name. Should be called before running scenarios.

    Args:
        experiment_name: The experiment name to use. If None, uses the existing experiment_name in the config.
    """
    if self.log_to_file and self.executor_type == "thread":
        import warnings

        warnings.warn(
            "log_to_file is True but executor_type is 'thread' - skipping log directory build", stacklevel=2
        )
        return
    if experiment_name is not None:
        self.experiment_name = experiment_name
    self._build_logs_dir_internal()

maybe_build_logs_dir()

Maybe build the full logs directory after validation.

Source code in pare/scenarios/config.py
183
184
185
186
187
188
189
190
191
192
193
194
@model_validator(mode="after")
def maybe_build_logs_dir(self) -> MultiScenarioRunnerConfig:
    """Maybe build the full logs directory after validation."""
    if self.log_to_file and self.executor_type == "thread":
        import warnings

        warnings.warn(
            "log_to_file is True but executor_type is 'thread' - skipping log directory build", stacklevel=2
        )
        return self
    self._build_logs_dir_internal()
    return self

ScenarioRunnerConfig

Bases: BaseModel

Configuration for running a single PARE scenario.

Source code in pare/scenarios/config.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class ScenarioRunnerConfig(BaseModel):
    """Configuration for running a single PARE scenario."""

    # User Agent LLM Configuration (default: gpt-5-mini)
    user_engine_config: LLMEngineConfig = Field(
        default_factory=lambda: LLMEngineConfig(
            model_name="gpt-5-mini", provider="openai", description="LLM configuration for the user agent"
        )
    )

    # Maximum number of iterations the user agent can take per turn (default: 1)
    user_max_iterations: int | None = 1

    # Agent architecture type (Default: "observe-execute")
    agent_type: str = "observe-execute"

    # User Agent Type (Default: "default")
    user_type: str = "default"

    # Proactive Observe Agent LLM configuration (default: gpt-5)
    observe_engine_config: LLMEngineConfig = Field(
        default_factory=lambda: LLMEngineConfig(
            model_name="gpt-5", provider="openai", description="LLM configuration for the observe agent"
        )
    )

    # Maximum number of iterations the observe agent can take per turn (default: 1)
    observe_max_iterations: int | None = 1

    # Proactive Execute Agent LLM configuration (default: gpt-5)
    execute_engine_config: LLMEngineConfig = Field(
        default_factory=lambda: LLMEngineConfig(
            model_name="gpt-5", provider="openai", description="LLM configuration for the execute agent"
        )
    )

    # Maximum number of iterations the execute agent can take per turn (default: 1)
    execute_max_iterations: int | None = 1

    # Flag indicating whether to run the scenarios in Oracle Mode where oracle events (i.e. user defined agent events) are ran. (default: False)
    oracle: bool = False

    # Maximum number of turns of the conversation between the user and the agent. (default: 1)
    max_turns: int | None = 10

    # Flag indicating whether to export traces to a JSON file (default: False)
    export: bool = False

    # Directory to output the scenario states, traces and logs (default: None)
    output_dir: str | None = None

    # Toggles scenario JSON export format -- must be one of "hf" or "lite" (default: "hf")
    trace_dump_format: str = "hf"

    # Whether to use the custom logger in the agent (default: True)
    use_custom_logger: bool = True

    # Simulated generation time mode (default: "measured")
    simulated_generation_time_mode: str = "measured"

    # Tool augmentation configuration for noise injection
    tool_augmentation_config: ToolAugmentationConfig | None = None

    # Environment events configuration for noise injection
    env_events_config: EnvEventsConfig | None = None

    # ! TODO: Judge mode is not fully supported yet
    # Whether to run only the judge for scenarios.
    judge_only: bool = False

    # Judge engine configuration
    judge_engine_config: LLMEngineConfig | None = None

    # Maximum scenario duration in seconds (default: 600)
    max_scenario_duration: int = MAX_SCENARIO_DURATION

    # Human-readable model aliases (used for caching, display, results)
    # These are the canonical identifiers - deployment paths may change but aliases stay consistent
    user_model_alias: str | None = None
    observe_model_alias: str | None = None
    execute_model_alias: str | None = None

    @model_validator(mode="after")
    def fill_model_aliases(self) -> ScenarioRunnerConfig:
        """Fill in model aliases from engine configs if not explicitly set."""
        if self.user_model_alias is None:
            self.user_model_alias = self.user_engine_config.model_name
        if self.observe_model_alias is None:
            self.observe_model_alias = self.observe_engine_config.model_name
        if self.execute_model_alias is None:
            self.execute_model_alias = self.execute_engine_config.model_name
        return self

    def get_config_hash(self) -> str:
        """Generate a hash of the relevant config parameters that affect scenario execution.

        Excludes parameters that only affect:
        - Parallel execution (max_concurrent_scenarios, timeout_seconds, executor_type)
        - Logging (log_level, log_to_file, logs_dir, use_custom_logger)
        - Output location (output_dir, export, trace_dump_format)
        - Caching meta-config (enable_caching)
        - Engine configs (replaced by model aliases for consistent caching)

        Uses model aliases as canonical identifiers. Aliases are always set via
        the model validator (filled from engine configs if not explicitly provided).

        This enables cache reuse across experiments with different output directories
        and when model deployments change but the logical model is the same.
        """
        exclude_fields = {
            # Parallel execution
            "max_concurrent_scenarios",
            "timeout_seconds",
            "executor_type",
            # Logging
            "log_level",
            "log_to_file",
            "logs_dir",
            "use_custom_logger",
            "experiment_name",
            # Output location
            "output_dir",
            "export",
            "trace_dump_format",
            # Caching meta-config
            "enable_caching",
            # Engine configs (we use aliases instead for consistent caching)
            "user_engine_config",
            "observe_engine_config",
            "execute_engine_config",
            "judge_engine_config",
        }

        # Use pydantic's model_dump with exclude parameter, then serialize to JSON
        # Model aliases are included and always set via the model validator
        config_dict = self.model_dump(exclude=exclude_fields)
        config_str = json.dumps(config_dict, sort_keys=True, default=str)
        return hashlib.md5(config_str.encode()).hexdigest()[:8]  # noqa: S324

fill_model_aliases()

Fill in model aliases from engine configs if not explicitly set.

Source code in pare/scenarios/config.py
 98
 99
100
101
102
103
104
105
106
107
@model_validator(mode="after")
def fill_model_aliases(self) -> ScenarioRunnerConfig:
    """Fill in model aliases from engine configs if not explicitly set."""
    if self.user_model_alias is None:
        self.user_model_alias = self.user_engine_config.model_name
    if self.observe_model_alias is None:
        self.observe_model_alias = self.observe_engine_config.model_name
    if self.execute_model_alias is None:
        self.execute_model_alias = self.execute_engine_config.model_name
    return self

get_config_hash()

Generate a hash of the relevant config parameters that affect scenario execution.

Excludes parameters that only affect: - Parallel execution (max_concurrent_scenarios, timeout_seconds, executor_type) - Logging (log_level, log_to_file, logs_dir, use_custom_logger) - Output location (output_dir, export, trace_dump_format) - Caching meta-config (enable_caching) - Engine configs (replaced by model aliases for consistent caching)

Uses model aliases as canonical identifiers. Aliases are always set via the model validator (filled from engine configs if not explicitly provided).

This enables cache reuse across experiments with different output directories and when model deployments change but the logical model is the same.

Source code in pare/scenarios/config.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def get_config_hash(self) -> str:
    """Generate a hash of the relevant config parameters that affect scenario execution.

    Excludes parameters that only affect:
    - Parallel execution (max_concurrent_scenarios, timeout_seconds, executor_type)
    - Logging (log_level, log_to_file, logs_dir, use_custom_logger)
    - Output location (output_dir, export, trace_dump_format)
    - Caching meta-config (enable_caching)
    - Engine configs (replaced by model aliases for consistent caching)

    Uses model aliases as canonical identifiers. Aliases are always set via
    the model validator (filled from engine configs if not explicitly provided).

    This enables cache reuse across experiments with different output directories
    and when model deployments change but the logical model is the same.
    """
    exclude_fields = {
        # Parallel execution
        "max_concurrent_scenarios",
        "timeout_seconds",
        "executor_type",
        # Logging
        "log_level",
        "log_to_file",
        "logs_dir",
        "use_custom_logger",
        "experiment_name",
        # Output location
        "output_dir",
        "export",
        "trace_dump_format",
        # Caching meta-config
        "enable_caching",
        # Engine configs (we use aliases instead for consistent caching)
        "user_engine_config",
        "observe_engine_config",
        "execute_engine_config",
        "judge_engine_config",
    }

    # Use pydantic's model_dump with exclude parameter, then serialize to JSON
    # Model aliases are included and always set via the model validator
    config_dict = self.model_dump(exclude=exclude_fields)
    config_str = json.dumps(config_dict, sort_keys=True, default=str)
    return hashlib.md5(config_str.encode()).hexdigest()[:8]  # noqa: S324

PAREEnvEventsExpander

Bases: EnvEventsExpander

Environmental events expander compatible with PARE Apps and Scenarios.

Overrides the add_env_events_to_scenario method to work with PARE Stateful App types instead of Meta-ARE base app types.

Source code in pare/scenarios/utils/scenario_expander.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
class PAREEnvEventsExpander(EnvEventsExpander):
    """Environmental events expander compatible with PARE Apps and Scenarios.

    Overrides the `add_env_events_to_scenario` method to work with PARE Stateful App types instead of Meta-ARE base app types.
    """

    def get_num_env_events_per_app(self, num_env_events: int) -> dict[str, int]:
        """Get the number of environmental events per app for PARE Env Events Expander."""
        # Calculate the number of events per app
        num_env_events_per_app = {}
        total_weight = sum(
            self.config.weight_per_app_class.get(self.resolved_app_names[app], 0) for app in self.resolved_app_names
        )

        for app in self.resolved_app_names:
            weight = self.config.weight_per_app_class.get(self.resolved_app_names[app], 0)
            num_env_events_per_app[app] = int((weight / total_weight) * num_env_events)
        return num_env_events_per_app

    def _resolve_app_names(self, app_names: list[str]) -> dict[str, str]:
        """Resolve app names to their canonical form for PARE Env Events Expander."""
        # Import here to avoid circular import
        from pare.constants import APP_ALIAS

        resolved_names = {}
        for app in app_names:
            for canonical_name, aliases in APP_ALIAS.items():
                if app == canonical_name or app in (aliases if isinstance(aliases, list) else [aliases]):
                    resolved_names[app] = canonical_name
                    break
        return resolved_names

    def add_env_events_to_scenario(self, scenario: Scenario, apps_augmentation_data: list[dict[str, Any]]) -> None:
        """Add environmental noise to a PARE Scenario.

        This override replaces Meta-ARE app type casts with PARE Stateful App types. Additionally, the noisy events do not depend on a start event from the scenario. They are scheduled to start at the beginning of the scenario.

        Args:
            scenario: The PARE Scenario to add environmental noise to.
            apps_augmentation_data: The augmentation data for the apps in the scenario.
        """
        scenario_app_class_names = [app.__class__.__name__ for app in scenario.apps]
        augmentation_app_names = [d["name"] for d in apps_augmentation_data]

        resolved_aug_names = self._resolve_app_names(augmentation_app_names)
        # Only keep the augmentation app names that are in the scenario
        self.resolved_app_names = {
            aug_name: resolved_aug_names.get(aug_name)
            for aug_name in augmentation_app_names
            if resolved_aug_names.get(aug_name) in scenario_app_class_names
        }

        duration = scenario.duration if scenario.duration else ENV_EVENT_DEFAULT_HORIZON

        np_rng = np.random.default_rng(self.config.env_events_seed)
        rng = random.Random(self.config.env_events_seed)  # noqa: S311

        num_env_events = int(self.config.num_env_events_per_minute * duration / 60)
        num_env_events_per_app = self.get_num_env_events_per_app(num_env_events)

        # Define app type mappings
        messaging_apps = ["StatefulMessagingApp", "Messages", "Chats"]
        email_apps = ["StatefulEmailApp", "Email", "Emails"]
        shopping_apps = ["StatefulShoppingApp", "Shopping"]

        d_events: dict[str, Any] = {}

        with EventRegisterer.capture_mode():
            for d in apps_augmentation_data:
                app_name = self.resolved_app_names.get(d["name"], "")
                if not app_name:
                    continue

                # Handle messaging events - use StatefulMessagingApp
                if d["name"] in messaging_apps:
                    self._add_messaging_events(
                        scenario=scenario,
                        app_name=d["name"],
                        app_data=d["app_state"],
                        d_events=d_events,
                        duration=duration,
                        num_events=num_env_events_per_app[d["name"]],
                        np_rng=np_rng,
                        rng=rng,
                    )

                # Handle email events - use StatefulEmailApp
                if d["name"] in email_apps:
                    self._add_email_events(
                        scenario=scenario,
                        app_name=d["name"],
                        app_data=d["app_state"],
                        d_events=d_events,
                        duration=duration,
                        num_events=num_env_events_per_app[d["name"]],
                        np_rng=np_rng,
                        rng=rng,
                    )

                # Handle shopping events - use StatefulShoppingApp
                if d["name"] in shopping_apps:
                    self._add_shopping_events(
                        scenario=scenario,
                        app_name=d["name"],
                        app_data=d["app_state"],
                        d_events=d_events,
                        duration=duration,
                        num_events=num_env_events_per_app[d["name"]],
                        np_rng=np_rng,
                        rng=rng,
                    )

            scenario.events += [e.with_id(f"{ENV_EVENT_EXPANSION_TAG}_{key}") for key, e in d_events.items()]

            logger.warning(f"Added {len(d_events)} env events to the scenario, total {len(scenario.events)} events")

    def _add_messaging_events(
        self,
        scenario: Scenario,
        app_name: str,
        app_data: dict[str, Any],
        d_events: dict[str, Any],
        duration: float,
        num_events: int,
        np_rng: np.random.Generator,
        rng: random.Random,
    ) -> None:
        # try getting the app from scenario, if it fails, don't add events for this app since it is not in the scenario
        try:
            app = cast("StatefulMessagingApp", scenario.get_app(app_name))
        except ValueError:
            logger.warning(f"App {app_name} not found in scenario, skipping environmental noise events")
            return

        conversations = list(app_data["conversations"].values())
        n_conversation_events = max(
            num_events // self.config.n_message_events_per_conversation,
            len(conversations),
        )
        n_conversation_events = min(n_conversation_events, len(conversations))
        conversations = rng.sample(conversations, k=n_conversation_events)
        average_rate = n_conversation_events / duration
        inter_arrival_times = np_rng.exponential(scale=1 / average_rate, size=n_conversation_events)
        ticks = np.cumsum(inter_arrival_times)
        for i, (tick, conversation) in enumerate(zip(ticks, conversations, strict=False)):
            if tick > duration:
                break
            n_messages = len(conversation["messages"])
            if n_messages == 0:
                continue
            n_message_events = min(n_messages, self.config.n_message_events_per_conversation)
            message_average_rate = n_message_events / (duration - tick)
            message_inter_arrival_times = np_rng.exponential(scale=1 / message_average_rate, size=n_message_events)
            for i, message in enumerate(conversation["messages"]):
                if i >= n_message_events:
                    break
                else:
                    d_events[f"{app_name}_{conversation['conversation_id']}_{i}"] = app.create_and_add_message(
                        conversation_id=conversation["conversation_id"],
                        sender_id=message["sender_id"],
                        content=message["content"],
                    )
                if i == 0:
                    d_events[f"{app_name}_{conversation['conversation_id']}_{i}"].depends_on(None, delay_seconds=tick)
                else:
                    d_events[f"{app_name}_{conversation['conversation_id']}_{i}"].depends_on(
                        d_events[f"{app_name}_{conversation['conversation_id']}_{i - 1}"],
                        delay_seconds=message_inter_arrival_times[i - 1],
                    )

    def _add_email_events(
        self,
        scenario: Scenario,
        app_name: str,
        app_data: dict[str, Any],
        d_events: dict[str, Any],
        duration: float,
        num_events: int,
        np_rng: np.random.Generator,
        rng: random.Random,
    ) -> None:
        try:
            app = cast("StatefulEmailApp", scenario.get_app(app_name))
        except ValueError:
            logger.warning(f"App {app_name} not found in scenario, skipping environmental noise events")
            return
        emails = list(app_data["folders"]["INBOX"]["emails"])
        rng.shuffle(emails)
        n_emails = len(emails)
        if n_emails == 0:
            return

        n_events = min(n_emails, num_events)
        average_rate = n_events / duration
        inter_arrival_times = np_rng.exponential(scale=1 / average_rate, size=n_events)
        ticks = np.cumsum(inter_arrival_times)
        for _, (tick, email) in enumerate(zip(ticks, emails, strict=False)):
            d_events[f"email_{email['email_id']}"] = app.create_and_add_email(
                sender=email["sender"],
                recipients=email["recipients"],
                subject=email["subject"],
                content=email["content"],
                folder_name="INBOX",
            ).depends_on(None, delay_seconds=tick)

    def _add_shopping_events(
        self,
        scenario: Scenario,
        app_name: str,
        app_data: dict[str, Any],
        d_events: dict[str, Any],
        duration: float,
        num_events: int,
        np_rng: np.random.Generator,
        rng: random.Random,
    ) -> None:
        # ! TODO: Uncomment following lines when we have a ShoppingApp in PARE
        # try:
        #     app = cast("StatefulShoppingApp", scenario.get_app(app_name))
        # except ValueError:
        #     logger.warning(f"App {app_name} not found in scenario, skipping environmental noise events")
        #     return
        # n_products = len(app_data["products"])
        # products_list = list(app_data["products"].values())
        # rng.shuffle(products_list)
        # if n_products == 0:
        #     return

        # n_events = min(n_products, num_events // self.config.n_item_events_per_product)
        # average_rate = n_events / duration
        # inter_arrival_times = np_rng.exponential(scale=1 / average_rate, size=n_events)
        # ticks = np.cumsum(inter_arrival_times)
        # for i, (tick, product) in enumerate(zip(ticks, products_list, strict=False)):
        #     if tick > duration:
        #         break
        #     d_events[f"shopping_product_{product['product_id']}"] = app.add_product(
        #         name=product["name"],
        #     ).depends_on(None, delay_seconds=tick)

        #     n_items = len(product["variants"])
        #     if n_items == 0:
        #         continue
        #     n_item_events = min(n_items, self.config.n_item_events_per_product)
        #     item_average_rate = n_item_events / (duration - tick)
        #     item_inter_arrival_times = np_rng.exponential(scale=1 / item_average_rate, size=n_item_events)
        #     item_ticks = np.cumsum(item_inter_arrival_times)
        #     for i, (item_tick, item) in enumerate(zip(item_ticks, product["variants"].values(), strict=False)):
        #         d_events[f"shopping_item_{item['item_id']}"] = app.add_item_to_product(
        #             product_id=f"{{{{{ENV_EVENT_EXPANSION_TAG}_shopping_product_{product['product_id']}}}}}",
        #             price=item["price"],
        #             available=item["available"],
        #             options=item["options"],
        #         ).depends_on(d_events[f"shopping_product_{product['product_id']}"], delay_seconds=item_tick)

        # for i, (item_id, discount_codes) in enumerate(d["app_state"]["discount_codes"].items()):
        #     discount_codes = cast("dict[str, float]", discount_codes)
        #     discount_codes = {str(k): float(v) for k, v in discount_codes.items()}
        #     delay_tick = np_rng.exponential(scale=duration // 2, size=1)[0]
        #     if f"shopping_item_{item_id}" in d_events:
        #         for code, value in discount_codes.items():
        #             discount_code = {code: value}
        #             d_events[f"shopping_discount_code_{item_id}_{code}"] = app.add_discount_code(
        #                 item_id=f"{{{{{ENV_EVENT_EXPANSION_TAG}_shopping_item_{item_id}}}}}",
        #                 discount_code=discount_code,
        #             ).depends_on(d_events[f"shopping_item_{item_id}"], delay_seconds=delay_tick)
        pass

add_env_events_to_scenario(scenario, apps_augmentation_data)

Add environmental noise to a PARE Scenario.

This override replaces Meta-ARE app type casts with PARE Stateful App types. Additionally, the noisy events do not depend on a start event from the scenario. They are scheduled to start at the beginning of the scenario.

Parameters:

Name Type Description Default
scenario Scenario

The PARE Scenario to add environmental noise to.

required
apps_augmentation_data list[dict[str, Any]]

The augmentation data for the apps in the scenario.

required
Source code in pare/scenarios/utils/scenario_expander.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def add_env_events_to_scenario(self, scenario: Scenario, apps_augmentation_data: list[dict[str, Any]]) -> None:
    """Add environmental noise to a PARE Scenario.

    This override replaces Meta-ARE app type casts with PARE Stateful App types. Additionally, the noisy events do not depend on a start event from the scenario. They are scheduled to start at the beginning of the scenario.

    Args:
        scenario: The PARE Scenario to add environmental noise to.
        apps_augmentation_data: The augmentation data for the apps in the scenario.
    """
    scenario_app_class_names = [app.__class__.__name__ for app in scenario.apps]
    augmentation_app_names = [d["name"] for d in apps_augmentation_data]

    resolved_aug_names = self._resolve_app_names(augmentation_app_names)
    # Only keep the augmentation app names that are in the scenario
    self.resolved_app_names = {
        aug_name: resolved_aug_names.get(aug_name)
        for aug_name in augmentation_app_names
        if resolved_aug_names.get(aug_name) in scenario_app_class_names
    }

    duration = scenario.duration if scenario.duration else ENV_EVENT_DEFAULT_HORIZON

    np_rng = np.random.default_rng(self.config.env_events_seed)
    rng = random.Random(self.config.env_events_seed)  # noqa: S311

    num_env_events = int(self.config.num_env_events_per_minute * duration / 60)
    num_env_events_per_app = self.get_num_env_events_per_app(num_env_events)

    # Define app type mappings
    messaging_apps = ["StatefulMessagingApp", "Messages", "Chats"]
    email_apps = ["StatefulEmailApp", "Email", "Emails"]
    shopping_apps = ["StatefulShoppingApp", "Shopping"]

    d_events: dict[str, Any] = {}

    with EventRegisterer.capture_mode():
        for d in apps_augmentation_data:
            app_name = self.resolved_app_names.get(d["name"], "")
            if not app_name:
                continue

            # Handle messaging events - use StatefulMessagingApp
            if d["name"] in messaging_apps:
                self._add_messaging_events(
                    scenario=scenario,
                    app_name=d["name"],
                    app_data=d["app_state"],
                    d_events=d_events,
                    duration=duration,
                    num_events=num_env_events_per_app[d["name"]],
                    np_rng=np_rng,
                    rng=rng,
                )

            # Handle email events - use StatefulEmailApp
            if d["name"] in email_apps:
                self._add_email_events(
                    scenario=scenario,
                    app_name=d["name"],
                    app_data=d["app_state"],
                    d_events=d_events,
                    duration=duration,
                    num_events=num_env_events_per_app[d["name"]],
                    np_rng=np_rng,
                    rng=rng,
                )

            # Handle shopping events - use StatefulShoppingApp
            if d["name"] in shopping_apps:
                self._add_shopping_events(
                    scenario=scenario,
                    app_name=d["name"],
                    app_data=d["app_state"],
                    d_events=d_events,
                    duration=duration,
                    num_events=num_env_events_per_app[d["name"]],
                    np_rng=np_rng,
                    rng=rng,
                )

        scenario.events += [e.with_id(f"{ENV_EVENT_EXPANSION_TAG}_{key}") for key, e in d_events.items()]

        logger.warning(f"Added {len(d_events)} env events to the scenario, total {len(scenario.events)} events")

get_num_env_events_per_app(num_env_events)

Get the number of environmental events per app for PARE Env Events Expander.

Source code in pare/scenarios/utils/scenario_expander.py
38
39
40
41
42
43
44
45
46
47
48
49
def get_num_env_events_per_app(self, num_env_events: int) -> dict[str, int]:
    """Get the number of environmental events per app for PARE Env Events Expander."""
    # Calculate the number of events per app
    num_env_events_per_app = {}
    total_weight = sum(
        self.config.weight_per_app_class.get(self.resolved_app_names[app], 0) for app in self.resolved_app_names
    )

    for app in self.resolved_app_names:
        weight = self.config.weight_per_app_class.get(self.resolved_app_names[app], 0)
        num_env_events_per_app[app] = int((weight / total_weight) * num_env_events)
    return num_env_events_per_app

default_weight_per_app_class()

Default weight per app class for PARE Env Events Expander.

Source code in pare/scenarios/utils/scenario_expander.py
23
24
25
26
27
28
29
def default_weight_per_app_class() -> dict[str, float]:
    """Default weight per app class for PARE Env Events Expander."""
    return {
        "StatefulEmailApp": 1.0,
        "StatefulMessagingApp": 1.0,
        "StatefulShoppingApp": 1.0,
    }