Skip to content

Scenario Generator API

Generator Entry and Prompt Context Utilities

build_all_tools_block(app_instances, target_apps)

Describe all event-registered tools for the selected apps.

Source code in pare/scenarios/generator/scenario_generator.py
567
568
569
570
571
572
573
574
575
576
577
578
def build_all_tools_block(app_instances: dict[str, object], target_apps: list[str]) -> str:
    """Describe all event-registered tools for the selected apps."""
    lines = []
    for app_name in target_apps:
        inst = app_instances.get(app_name)
        if inst is None:
            continue
        entries = _gather_event_registered_entries(inst)
        if entries:
            formatted_entries = "\n".join(f"  - {entry}" for entry in entries)
            lines.append(f"{app_name}:\n{formatted_entries}")
    return "\n\n".join(lines) if lines else "(none)"

build_engine(model, provider, endpoint)

Deprecated: the multi-step generator now uses claude-agent-sdk directly.

Source code in pare/scenarios/generator/scenario_generator.py
32
33
34
35
36
37
def build_engine(model: str, provider: str | None, endpoint: str | None) -> None:
    """Deprecated: the multi-step generator now uses `claude-agent-sdk` directly."""
    raise RuntimeError(
        "build_engine() is deprecated for the multi-step scenario generator. "
        "Claude Agent SDK is used instead of the Meta-ARE LLMEngine.",
    )

build_import_instructions_block(app_names)

Return a formatted block with import instructions for the selected apps.

Uses the hard-coded APP_IMPORT_INSTRUCTIONS mapping so prompts stay stable even if the underlying packages change.

Source code in pare/scenarios/generator/scenario_generator.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def build_import_instructions_block(app_names: list[str]) -> str:
    """Return a formatted block with import instructions for the selected apps.

    Uses the hard-coded `APP_IMPORT_INSTRUCTIONS` mapping so prompts stay stable
    even if the underlying packages change.
    """
    ordered: list[str] = []
    for name in app_names:
        if name not in ordered:
            ordered.append(name)

    lines: list[str] = []
    for name in ordered:
        spec_obj = APP_IMPORT_INSTRUCTIONS.get(name)
        if spec_obj is None:
            continue
        spec = cast("Mapping[str, object]", spec_obj)
        instr = spec.get("import instruction")
        if not instr:
            continue

        # Normalize to a list of strings to support single or multiple imports.
        if isinstance(instr, str):
            imports = [instr]
        elif isinstance(instr, (list, tuple, set)):
            imports = [str(item) for item in instr]
        else:
            imports = [str(instr)]

        lines.append(f"{name}:")
        for imp in imports:
            lines.append(f"  - {imp}")

    if not lines:
        return "(none)"
    return "\n".join(lines)

build_non_oracle_block(app_instances, selected_apps)

Describe non-oracle notification methods per selected app.

Source code in pare/scenarios/generator/scenario_generator.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def build_non_oracle_block(app_instances: dict[str, object], selected_apps: list[str]) -> str:
    """Describe non-oracle notification methods per selected app."""
    lines = []
    for app_name in selected_apps:
        inst = app_instances.get(app_name)
        scoped_methods: dict[str, set[str]] = {}
        descriptions: dict[str, str] = {}
        for scope in ("user", "agent"):
            scope_methods = NOTIFICATION_TEMPLATES.get(scope, {}).get(app_name, {})
            for method_name, template in scope_methods.items():
                scoped_methods.setdefault(method_name, set()).add(scope)
                if method_name not in descriptions:
                    descriptions[method_name] = template
        if not scoped_methods:
            continue
        entries = []
        for method_name in sorted(scoped_methods):
            method_obj = getattr(inst, method_name, None) if inst else None
            fallback_description = descriptions.get(method_name, "")
            raw_doc = getattr(method_obj, "__doc__", None) if method_obj else None
            description = _summarize_docstring(raw_doc or fallback_description)
            signature = _signature_from_callable(method_name, method_obj)
            args_dict = _args_from_callable(method_obj)
            return_info = (
                _return_info_from_callable(method_obj) if method_obj is not None else {"description": "", "type": "Any"}
            )
            scopes = ", ".join(sorted(scoped_methods[method_name]))
            note = f"notification scopes: {scopes}"
            entries.append(
                _format_tool_entry(
                    signature,
                    description=description,
                    args_dict=args_dict,
                    return_info=return_info,
                    note=note,
                )
            )
        if entries:
            formatted_entries = "\n".join(f"  - {entry}" for entry in entries)
            lines.append(f"{app_name}:\n{formatted_entries}")
    return "\n\n".join(lines) if lines else "(none)"

build_oracle_block(app_instances, selected_apps)

Describe oracle-style app tools that can be invoked during events.

Source code in pare/scenarios/generator/scenario_generator.py
456
457
458
459
460
461
462
463
464
465
466
467
def build_oracle_block(app_instances: dict[str, object], selected_apps: list[str]) -> str:
    """Describe oracle-style app tools that can be invoked during events."""
    lines = []
    for app_name in selected_apps:
        inst = app_instances.get(app_name)
        if inst is None:
            continue
        entries = _gather_oracle_entries(inst)
        if entries:
            formatted_entries = "\n".join(f"  - {entry}" for entry in entries)
            lines.append(f"{app_name}:\n{formatted_entries}")
    return "\n\n".join(lines) if lines else "(none)"

build_selected_tools_block(app_instances, target_apps)

Describe event-registered tools for the selected apps in a brief, narrative-oriented format.

Source code in pare/scenarios/generator/scenario_generator.py
581
582
583
584
585
586
587
588
589
590
591
592
def build_selected_tools_block(app_instances: dict[str, object], target_apps: list[str]) -> str:
    """Describe event-registered tools for the selected apps in a brief, narrative-oriented format."""
    lines = []
    for app_name in target_apps:
        inst = app_instances.get(app_name)
        if inst is None:
            continue
        entries = _gather_event_registered_brief_entries(inst)
        if entries:
            formatted_entries = "\n".join(f"  - {entry}" for entry in entries)
            lines.append(f"{app_name}:\n{formatted_entries}")
    return "\n\n".join(lines) if lines else "(none)"

build_tool_descriptions(app_def_scenario, target_apps)

Summarize tools for the given apps so the LLM knows what it can call.

Source code in pare/scenarios/generator/scenario_generator.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def build_tool_descriptions(app_def_scenario: object, target_apps: list[str]) -> str:
    """Summarize tools for the given apps so the LLM knows what it can call."""
    try:
        tools = app_def_scenario.get_tools()  # type: ignore[attr-defined]  # naq: app_def_scenario is a PAREScenario
    except Exception:
        tools = []
    filtered = []
    target_set = set(target_apps)
    for tool in tools:
        inst = getattr(tool, "class_instance", None)
        inst_name = getattr(inst, "__class__", type("", (), {})).__name__ if inst else None
        if inst_name in target_set:
            filtered.append(tool)
    if not filtered:
        return "(none)"
    toolbox = Toolbox(tools=[AppToolAdapter(t) for t in filtered])
    return toolbox.show_tool_descriptions(DEFAULT_TOOL_DESCRIPTION_TEMPLATE)

determine_selected_apps(app_instances, requested)

Choose which apps to expose to the generator based on availability and CLI overrides.

Source code in pare/scenarios/generator/scenario_generator.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def determine_selected_apps(app_instances: dict[str, object], requested: Iterable[str] | None) -> list[str]:
    """Choose which apps to expose to the generator based on availability and CLI overrides."""
    available = [name for name in app_instances if name not in SYSTEM_APPS]
    available.sort()
    if not available:
        return []
    if not requested:
        return available
    requested_unique: list[str] = []
    for item in requested:
        if item not in requested_unique:
            requested_unique.append(item)
    valid = [name for name in requested_unique if name in available]
    invalid = sorted(set(requested_unique) - set(valid))
    if invalid:
        logging.warning("Ignoring unknown apps: %s (available: %s)", ", ".join(invalid), ", ".join(available))
    return valid or available

main()

CLI entry point for the multi-step scenario generator.

Source code in pare/scenarios/generator/scenario_generator.py
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
def main() -> None:
    """CLI entry point for the multi-step scenario generator."""
    logging.basicConfig(level=logging.INFO)
    parser = argparse.ArgumentParser("multi-steps-scenario-generator")
    parser.add_argument(
        "--output-dir",
        dest="output_dir",
        type=Path,
        default=None,
        help="Directory where intermediate step files should be written.",
    )
    parser.add_argument(
        "--model",
        dest="model",
        default="gpt-5-mini-2025-08-07",
        help="LLM model identifier supported by the configured provider.",
    )
    parser.add_argument(
        "--provider",
        dest="provider",
        default="openai",
        help="LLM provider name (e.g., openai, azure, anthropic).",
    )
    parser.add_argument(
        "--endpoint",
        dest="endpoint",
        default=None,
        help="Optional custom endpoint for the provider.",
    )
    parser.add_argument(
        "--max-iterations",
        dest="max_iterations",
        type=int,
        default=2,
        help="Maximum number of attempts per step.",
    )
    parser.add_argument(
        "--resume-from-step2",
        dest="resume_from_step2",
        action="store_true",
        default=False,
        help=(
            "[DEPRECATED] Reuse an existing Step 1 description from the output "
            "directory and start the pipeline at Step 2 (apps & data). "
            "Prefer --resume-from-step step2 instead."
        ),
    )
    parser.add_argument(
        "--resume-from-step",
        dest="resume_from_step",
        choices=["step2", "step3", "step4"],
        default=None,
        help=(
            "Resume the pipeline from a specific step. "
            "Valid values: step2 (reuse Step 1 narrative), "
            "step3 (reuse Step 1 narrative and Step 2 code), "
            "step4 (reuse narrative and code from Steps 2 and 3). "
            "Takes precedence over --resume-from-step2 if both are provided."
        ),
    )
    parser.add_argument(
        "--trajectory-dir",
        dest="trajectory_dir",
        type=Path,
        default=None,
        help=(
            "Optional path to a step trajectory directory "
            "(e.g., pare/scenarios/generator/step_trajectory/trajectory_YYYYMMDDTHHMMSS). "
            "If not provided, a new directory will be created under "
            "pare/scenarios/generator/step_trajectory for this run."
        ),
    )
    parser.add_argument(
        "--num-scenarios",
        dest="num_scenarios",
        type=int,
        default=1,
        help=(
            "Number of distinct scenarios to generate in this invocation. "
            "Each scenario runs a separate multi-step pipeline. Defaults to 1."
        ),
    )
    parser.add_argument(
        "--debug-prompts",
        dest="debug_prompts",
        action="store_true",
        default=False,
        help="If set, skip LLM calls and print the prompts for all agents instead.",
    )
    parser.add_argument(
        "--apps",
        dest="selected_apps",
        nargs="*",
        default=["StatefulMessagingApp", "StatefulContactsApp", "StatefulCalendarApp", "StatefulEmailApp"],
        help=(
            "Explicit list of app class names to include (PAREAgentUserInterface and "
            "HomeScreenSystemApp are always available). Defaults to all apps in the app definition scenario."
        ),
    )
    args = parser.parse_args()

    # Load environment variables
    load_dotenv()

    app_def_scenario = ScenarioWithAllPAREApps()
    app_def_scenario.initialize()
    app_instances = {app.__class__.__name__: app for app in getattr(app_def_scenario, "apps", [])}
    selected_apps = determine_selected_apps(app_instances, args.selected_apps)
    if not selected_apps:
        logging.warning("No selectable apps found; continuing with system apps only.")
    prompt_context = prepare_prompt_context_data(app_def_scenario, selected_apps)

    # Emit a warning if the deprecated flag is used without the new one.
    if args.resume_from_step2 and args.resume_from_step is None:
        logging.warning(
            "--resume-from-step2 is deprecated; prefer --resume-from-step step2 instead.",
        )

    num_scenarios = max(1, args.num_scenarios)
    results: list[dict[str, Any]] = []
    for idx in range(num_scenarios):
        # For multiple scenarios with an explicit trajectory_dir, create
        # per-run subdirectories so snapshots and JSONL logs do not collide.
        if args.trajectory_dir is None:
            trajectory_dir = None
        else:
            trajectory_dir = args.trajectory_dir
            if num_scenarios > 1:
                trajectory_dir = trajectory_dir / f"run_{idx + 1}"

        agent = ScenarioGeneratingAgentOrchestrator(
            output_dir=args.output_dir,
            max_iterations=args.max_iterations,
            trajectory_dir=trajectory_dir,
            prompt_context=prompt_context,
            debug_prompts=args.debug_prompts,
            resume_from_step2=args.resume_from_step2,
            resume_from_step=args.resume_from_step,
        )
        try:
            result = agent.run()
        except Exception as exc:
            logging.exception(
                "Scenario generation failed for run %s/%s (trajectory_dir=%s). Continuing.",
                idx + 1,
                num_scenarios,
                trajectory_dir,
            )
            results.append({
                "run_index": idx + 1,
                "status": "failed",
                "error": str(exc),
                "trajectory_dir": str(trajectory_dir) if trajectory_dir is not None else None,
            })
            continue

        results.append({
            "run_index": idx + 1,
            "status": "success",
            **result,
        })

    if num_scenarios > 1:
        failed = [r for r in results if r.get("status") != "success"]
        logging.info(
            "Completed %s scenario runs: %s success, %s failed.",
            num_scenarios,
            num_scenarios - len(failed),
            len(failed),
        )

prepare_prompt_context_data(app_def_scenario, selected_apps)

Assemble all dynamic prompt blocks used by the multi-step generator.

Source code in pare/scenarios/generator/scenario_generator.py
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
def prepare_prompt_context_data(app_def_scenario: object, selected_apps: list[str]) -> dict[str, str]:
    """Assemble all dynamic prompt blocks used by the multi-step generator."""
    app_instances = {app.__class__.__name__: app for app in getattr(app_def_scenario, "apps", [])}
    selected_plus_system = selected_apps + [name for name in SYSTEM_APPS if name in app_instances]
    import_instructions = build_import_instructions_block(selected_plus_system)
    tool_descriptions = build_tool_descriptions(app_def_scenario, selected_plus_system)
    allowed_non_oracle = build_non_oracle_block(app_instances, selected_apps)
    allowed_oracle = build_oracle_block(app_instances, selected_apps)
    allowed_all_tools = build_all_tools_block(app_instances, selected_plus_system)
    selected_tools_description = build_selected_tools_block(app_instances, selected_apps)
    app_init_block = build_app_initialization_block(selected_plus_system)
    selected_display = ", ".join(selected_plus_system) if selected_plus_system else "(none)"
    return {
        "selected_apps": selected_display,
        "import_instructions": import_instructions,
        "tool_descriptions": tool_descriptions,
        "allowed_non_oracle_block": allowed_non_oracle,
        "allowed_oracle_block": allowed_oracle,
        "allowed_all_tools_block": allowed_all_tools,
        "app_initialization_block": app_init_block,
        "selected_tools_description": selected_tools_description,
    }

Orchestrator

RunCheckResult dataclass

Summary of a single scenario run used to gate multi-step progress.

Source code in pare/scenarios/generator/agent/scenario_generating_agent_orchestrator.py
30
31
32
33
34
35
36
37
38
@dataclass
class RunCheckResult:
    """Summary of a single scenario run used to gate multi-step progress."""

    passed: bool
    feedback: str
    runtime_error: bool
    validation_reached: bool
    validation_success: bool

ScenarioGeneratingAgentOrchestrator

Coordinates the dedicated step agents to build a proactive scenario.

Source code in pare/scenarios/generator/agent/scenario_generating_agent_orchestrator.py
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
class ScenarioGeneratingAgentOrchestrator:
    """Coordinates the dedicated step agents to build a proactive scenario."""

    _ALWAYS_INCLUDED_APPS = {"PAREAgentUserInterface", "HomeScreenSystemApp"}  # noqa: RUF012

    def __init__(
        self,
        *,
        output_dir: str | Path | None = None,
        max_iterations: int = 3,
        trajectory_dir: str | Path | None = None,
        prompt_context: dict[str, str] | None = None,
        debug_prompts: bool = False,
        resume_from_step2: bool = False,
        resume_from_step: str | None = None,
        claude_filesystem_config: ClaudeFilesystemConfig | None = None,
    ) -> None:
        """Initialize the orchestrator and supporting step agents."""
        self.max_iterations = max_iterations
        self.debug_prompts = debug_prompts
        # Backwards compatibility: boolean resume_from_step2 maps to "step2" unless
        # an explicit resume_from_step value is provided.
        self.resume_from_step = resume_from_step or ("step2" if resume_from_step2 else None)
        # This file lives under `pare/scenarios/generator/agent/...`.
        # - generator_dir: pare/scenarios/generator
        # - scenarios_dir: pare/scenarios
        # - pare_dir:       pare
        generator_dir = Path(__file__).resolve().parents[1]
        scenarios_dir = generator_dir.parent
        pas_dir = scenarios_dir.parent

        # Keep repo_root aligned to the `pare/` package directory (so relative paths
        # like repo_root/"scenarios"/... resolve under `pare/scenarios/`).
        self.repo_root = pas_dir

        # Directory that tracks the per-step trajectory for this run, e.g.,
        # pare/scenario_generator/step_trajectory/trajectory_YYYYMMDDTHHMMSS.
        trajectory_root = generator_dir / "step_trajectory"
        if trajectory_dir is not None:
            self.trajectory_dir = Path(trajectory_dir)
        else:
            timestamp = datetime.utcnow().strftime("%Y%m%dT%H%M%S")
            self.trajectory_dir = trajectory_root / f"trajectory_{timestamp}"
        self.trajectory_dir.mkdir(parents=True, exist_ok=True)

        # Directory where intermediate markdown artifacts live. We no longer write
        # to `pare/scenario_generator/generated_scenarios/`; keep artifacts scoped
        # to the trajectory directory by default.
        self.output_dir = Path(output_dir) if output_dir is not None else self.trajectory_dir
        self.output_dir.mkdir(parents=True, exist_ok=True)

        # Directory that holds the single editable working copy plus the final
        # exported scenarios produced by the multi-step generator.
        #
        # IMPORTANT: this directory must live directly under `pare/scenarios/`
        # so that `PARE_SCENARIOS_DIR=generator` can discover and import the
        # working file as `pare.scenarios.generator.<module>`.
        self.seed_scenarios_dir = generator_dir
        self.seed_scenarios_dir.mkdir(parents=True, exist_ok=True)

        # Use the editable_seed_scenario-based working file so Claude Agent can
        # repeatedly edit a single, stable filename. The original seed template
        # remains read-only for reference.
        self.scenario_file = self.seed_scenarios_dir / "editable_seed_scenario.py"

        # Global scenario metadata used for uniqueness checks and analysis.
        # Stored under `pare/scenarios/scenario_metadata.json` so it is shared
        # across runs and not tied to a particular output directory.
        self.scenario_metadata_path = self.repo_root / "scenarios" / "scenario_metadata.json"

        # Dynamic prompt context (selected apps/tools) for this run.
        # IMPORTANT: must be set before any helper that reads `_prompt_context`.
        self._prompt_context: dict[str, str] = prompt_context or {}

        self._last_check_result: RunCheckResult | None = None
        # Declarative filesystem policy for Claude Agent SDK usage. Enforcement
        # will be wired via hooks and tool options in a follow-up change.
        if claude_filesystem_config is None:
            self.claude_filesystem_config = ClaudeFilesystemConfig(
                read_only_roots=[self.repo_root],
                editable_files=[self.scenario_file],
            )
        else:
            self.claude_filesystem_config = claude_filesystem_config
        self._historical_descriptions = self._read_scenario_metadata()

        # For Step 0/1 prompting we often want to scope uniqueness comparisons to
        # scenarios that use the same core app combination as this run (excluding
        # the always-present PAREAgentUserInterface + HomeScreenSystemApp).
        self.scenario_metadata_path_for_prompt = self.scenario_metadata_path
        self._historical_descriptions_for_prompt = self._historical_descriptions
        filtered_path, filtered_entries = self._maybe_write_filtered_metadata_for_prompt()
        if filtered_path is not None:
            self.scenario_metadata_path_for_prompt = filtered_path
            self._historical_descriptions_for_prompt = filtered_entries

        # Per-step Claude runtime configurations. Narrative and uniqueness
        # checks do not need code-editing tools, while Steps 2-4 use Read/Write
        # to modify the seed_scenario file.
        self._claude_config_uniqueness = ClaudeAgentRuntimeConfig(
            cwd=self.repo_root,
            allowed_tools=["Read"],
            permission_mode="acceptEdits",
            filesystem=self.claude_filesystem_config,
        )
        self._claude_config_step1 = ClaudeAgentRuntimeConfig(
            cwd=self.repo_root,
            allowed_tools=["Read"],
            permission_mode="acceptEdits",
            filesystem=self.claude_filesystem_config,
        )
        self._claude_config_code_steps = ClaudeAgentRuntimeConfig(
            cwd=self.repo_root,
            allowed_tools=["Read", "Write"],
            permission_mode="acceptEdits",
            filesystem=self.claude_filesystem_config,
        )

        if prompt_context is not None:
            configure_dynamic_context(**prompt_context)

        if self.debug_prompts:
            logger.info(
                "Debug prompts mode enabled for multi-step scenario generator; all Claude calls "
                "will be skipped. Prompts and planned file operations will be logged instead.",
            )

        # Use the canonical original seed template with explicit start/end markers
        # from the PARE scenarios package so we can safely strip any
        # natural-language preamble/epilogue that Claude might emit around the
        # template body.
        self.seed_template_path = generator_dir / "utils" / "original_seed_scenario.py"
        self.seed_template_text = self._safe_read_text(self.seed_template_path)

        if self.debug_prompts:
            logger.info("Scenario working file: %s", self.scenario_file)
            logger.info("Seed template path: %s", self.seed_template_path)
            logger.info("Scenario metadata path: %s", self.scenario_metadata_path)
            logger.info(
                "Claude filesystem config: read_only_roots=%s, editable_files=%s",
                self.claude_filesystem_config.read_only_roots,
                self.claude_filesystem_config.editable_files,
            )
            if prompt_context is not None:
                logger.info("Selected apps for this run: %s", prompt_context.get("selected_apps", "(unknown)"))

        self.uniqueness_agent = ScenarioUniquenessCheckAgent(
            historical_descriptions=self._historical_descriptions_for_prompt,
            scenario_metadata_path=str(self.scenario_metadata_path_for_prompt),
            debug_prompts=debug_prompts,
            claude_runtime_config=self._claude_config_uniqueness,
        )
        self.step1_agent = StepEditAgent(
            step_name="Step 1: Scenario Description",
            step_kind="description",
            system_prompt=prompt_module.SCENARIO_DESCRIPTION_SYSTEM_PROMPT,
            max_iterations=max_iterations,
            uniqueness_agent=self.uniqueness_agent,
            debug_prompts=debug_prompts,
            claude_runtime_config=self._claude_config_step1,
        )
        self.step2_agent = StepEditAgent(
            step_name="Step 2: Apps & Data Setup",
            step_kind="apps_and_data",
            system_prompt=prompt_module.APPS_AND_DATA_SYSTEM_PROMPT,
            max_iterations=max_iterations,
            debug_prompts=debug_prompts,
            claude_runtime_config=self._claude_config_code_steps,
        )
        self.step3_agent = StepEditAgent(
            step_name="Step 3: Events Flow",
            step_kind="events_flow",
            system_prompt=prompt_module.EVENTS_FLOW_SYSTEM_PROMPT,
            max_iterations=max_iterations,
            debug_prompts=debug_prompts,
            claude_runtime_config=self._claude_config_code_steps,
        )
        self.step4_agent = StepEditAgent(
            step_name="Step 4: Validation Conditions",
            step_kind="validation",
            system_prompt=prompt_module.VALIDATION_SYSTEM_PROMPT,
            max_iterations=max_iterations,
            debug_prompts=debug_prompts,
            claude_runtime_config=self._claude_config_code_steps,
        )

    @classmethod
    def _dedupe_scenario_id(cls, scenario_id: str, existing_ids: set[str]) -> str:
        """Return a scenario_id that does not collide with `existing_ids`.

        Note: we intentionally do NOT enforce the Step 1 <= 40 char constraint here.
        This is a post-processing safety shim to avoid overwriting an existing scenario.
        """
        if scenario_id not in existing_ids:
            return scenario_id

        base = scenario_id
        # Choose smallest suffix that avoids collisions: foo_2, foo_3, ...
        i = 2
        while True:
            suffix = f"_{i}"
            candidate = f"{base}{suffix}"
            if candidate not in existing_ids:
                return candidate
            i += 1

    @staticmethod
    def _dedupe_class_name(class_name: str, existing_class_names: set[str]) -> str:
        """Return a class name that does not collide with `existing_class_names`.

        Uses numeric suffixes to keep the name a valid PascalCase identifier, e.g. Foo2, Foo3.
        """
        if class_name not in existing_class_names:
            return class_name
        i = 2
        while True:
            candidate = f"{class_name}{i}"
            if candidate not in existing_class_names:
                return candidate
            i += 1

    def _ensure_unique_step1_identifiers(self, *, scenario_id: str, class_name: str) -> tuple[str, str, dict[str, Any]]:
        """Ensure Step 1 identifiers won't silently overwrite existing artifacts."""
        existing = self._read_scenario_metadata()
        existing_ids: set[str] = {
            str(entry.get("scenario_id")).strip()
            for entry in existing
            if isinstance(entry, dict) and entry.get("scenario_id")
        }
        existing_class_names: set[str] = {
            str(entry.get("class_name")).strip()
            for entry in existing
            if isinstance(entry, dict) and entry.get("class_name")
        }

        # Also prevent filename collisions in the canonical generated scenarios dir.
        try:
            for path in self.seed_scenarios_dir.glob("*.py"):
                existing_class_names.add(path.stem)
        except Exception:
            logger.exception("Failed to scan existing scenario filenames under %s", self.seed_scenarios_dir)

        original = {"scenario_id": scenario_id, "class_name": class_name}
        new_scenario_id = self._dedupe_scenario_id(scenario_id, existing_ids)
        # If we had to dedupe the id, include it in the "existing_ids" set so class dedupe notes remain consistent.
        existing_ids.add(new_scenario_id)

        new_class_name = self._dedupe_class_name(class_name, existing_class_names)
        notes: dict[str, Any] = {}
        if new_scenario_id != scenario_id:
            notes["scenario_id_deduped_from"] = scenario_id
        if new_class_name != class_name:
            notes["class_name_deduped_from"] = class_name
        if notes:
            notes["original_identifiers"] = original
            notes["deduped_identifiers"] = {"scenario_id": new_scenario_id, "class_name": new_class_name}
            logger.warning(
                "Deduped Step 1 identifiers to avoid overwriting existing scenarios: %s -> %s",
                original,
                notes["deduped_identifiers"],
            )
        return new_scenario_id, new_class_name, notes

    def run(self) -> dict[str, Any]:  # noqa: C901
        """Execute the four-step pipeline and return artifact metadata."""
        logger.info("Starting multi-step scenario generation.")

        try:
            step1_path = self.output_dir / "step1_scenario_description.md"
            resume_mode = self.resume_from_step

            # If resuming from later steps, restore the working scenario file
            # from the appropriate trajectory snapshot when available. This
            # keeps the single editable_seed_scenario.py in sync with the code
            # that previously passed validation for that step.
            if not self.debug_prompts:
                if resume_mode == "step2":
                    # Restore the scenario file as it looked after Step 1
                    # header updates.
                    self._restore_scenario_from_trajectory("step1")
                elif resume_mode == "step3":
                    # Restore the scenario as of the end of Step 2.
                    self._restore_scenario_from_trajectory("step2")
                elif resume_mode == "step4":
                    # Restore the scenario as of the end of Step 3.
                    self._restore_scenario_from_trajectory("step3")

            # For fresh runs (no resume) we start from a pristine copy of the
            # original seed scenario so that Step 1 can update its header
            # (scenario id, class name, and docstring) deterministically.
            if resume_mode not in {"step2", "step3", "step4"} and not self.debug_prompts:
                self._initialize_working_scenario_from_seed()

            if resume_mode in {"step2", "step3", "step4"} and not self.debug_prompts:
                step1 = self._load_existing_step1_result(step1_path)
            else:

                def step1_check(description: str, iteration: int) -> tuple[bool, str]:
                    # This callback itself does not write any files. Step 1 side
                    # effects (updating `valid_descriptions.json` and the
                    # editable_seed_scenario.py header) are applied by the
                    # orchestrator after the step completes.
                    return True, ""

                check1 = None if self.debug_prompts else step1_check

                step1 = self.step1_agent.run(
                    scenario_metadata_path=str(self.scenario_metadata_path_for_prompt),
                    check_callback=check1,
                )
                logger.info("Step 1 completed with %s iterations.", step1.iterations)
                if not self.debug_prompts:
                    scenario_id, class_name, description = self._parse_step1_output(step1.content)
                    # Only persist metadata + update headers when Step 1 produced
                    # a parseable identifier and a non-empty description.
                    if scenario_id is None or class_name is None or not description.strip():
                        logger.warning(
                            "Step 1 output did not include a parseable Scenario ID/Class Name/Description. "
                            "Skipping metadata/header update for this run."
                        )
                    else:
                        # Avoid silent overwrites when the generator proposes identifiers that already exist.
                        scenario_id, class_name, dedupe_notes = self._ensure_unique_step1_identifiers(
                            scenario_id=scenario_id, class_name=class_name
                        )
                        self._append_scenario_metadata(
                            scenario_id=scenario_id,
                            class_name=class_name,
                            description=description,
                        )
                        self._update_scenario_header(
                            scenario_id=scenario_id, class_name=class_name, description=description
                        )
                        self._append_step_trajectory("step1", step1)
                        if dedupe_notes:
                            # Best-effort: persist dedupe info next to the trajectory for debugging.
                            try:
                                (self.trajectory_dir / "step1_identifier_dedupe.json").write_text(
                                    json.dumps(dedupe_notes, indent=2), encoding="utf-8"
                                )
                            except Exception:
                                logger.exception("Failed to write Step 1 identifier dedupe notes")
                        # Snapshot the scenario after Step 1 header updates so users
                        # can inspect the early state if Step 2 fails.
                        self._snapshot_scenario("step1")

            # Step 2: Apps & Data Setup
            if resume_mode in {"step3", "step4"} and not self.debug_prompts:
                logger.info("Resuming from %s: skipping Step 2 generation.", resume_mode)
                scenario_seed_content = self._safe_read_text(self.scenario_file)
                scenario_after_step2 = scenario_seed_content
                step2 = StepResult(
                    name="Step 2: Apps & Data Setup (resumed)",
                    content=scenario_seed_content,
                    iterations=0,
                    notes={"resumed_from_disk": True},
                    conversation=[],
                )
                if not self.debug_prompts:
                    self._append_step_trajectory("step2", step2)
            else:
                # For fresh runs, Step 1 has already initialized and updated
                # the working scenario file. For resumed runs that reach this
                # branch, use the existing editable_seed_scenario.py on disk.
                scenario_seed_content = self._get_or_initialize_scenario_file()
                check2 = (
                    None
                    if self.debug_prompts
                    else functools.partial(
                        self._step_check_callback,
                        step_label="apps-data-check",
                        guardrail_feedback=(
                            "[apps-data-check] Your previous edits did not yield a "
                            "complete Python scenario file. Use the code editing tools "
                            "to update only the imports and init_and_populate_apps() "
                            "within the existing template, ensuring the file still "
                            "contains the original template start/end markers and "
                            "a @register_scenario(...) decorator."
                        ),
                        require_validation_success=False,
                    )
                )

                step2 = self.step2_agent.run(
                    scenario_description=step1.content,
                    scenario_file_path=str(self.scenario_file),
                    check_callback=check2,
                )
                logger.info("Step 2 completed with %s iterations.", step2.iterations)
                if not self.debug_prompts:
                    self._append_step_trajectory("step2", step2)

                scenario_after_step2 = (
                    self._debug_placeholder_content("scenario_after_step2", step2.content)
                    if self.debug_prompts
                    else self._safe_read_text(self.scenario_file)
                )

                # Snapshot the scenario after Step 2 completes successfully.
                if not self.debug_prompts:
                    self._snapshot_scenario("step2")

            # Step 3: Events Flow
            if resume_mode == "step4" and not self.debug_prompts:
                logger.info("Resuming from step4: skipping Step 3 generation.")
                scenario_after_step3 = self._safe_read_text(self.scenario_file)
                step3 = StepResult(
                    name="Step 3: Events Flow (resumed)",
                    content=scenario_after_step3,
                    iterations=0,
                    notes={"resumed_from_disk": True},
                    conversation=[],
                )
                if not self.debug_prompts:
                    self._append_step_trajectory("step3", step3)
            else:
                check3 = (
                    None
                    if self.debug_prompts
                    else functools.partial(
                        self._step_check_callback,
                        step_label="events-flow-check",
                        guardrail_feedback=(
                            "[events-flow-check] Your previous edits did not yield a "
                            "complete Python scenario file. Use the code editing tools "
                            "to update only build_events_flow() within the existing "
                            "template, preserving the template markers and "
                            "@register_scenario(...) decorator."
                        ),
                        require_validation_success=False,
                    )
                )

                step3 = self.step3_agent.run(
                    scenario_description=step1.content,
                    apps_and_data=step2.content,
                    scenario_file_path=str(self.scenario_file),
                    check_callback=check3,
                )
                logger.info("Step 3 completed with %s iterations.", step3.iterations)
                if not self.debug_prompts:
                    self._append_step_trajectory("step3", step3)

                scenario_after_step3 = (
                    self._debug_placeholder_content("scenario_after_step3", step3.content)
                    if self.debug_prompts
                    else self._safe_read_text(self.scenario_file)
                )

                # Snapshot the scenario after Step 3 completes successfully.
                if not self.debug_prompts:
                    self._snapshot_scenario("step3")

            check4 = (
                None
                if self.debug_prompts
                else functools.partial(
                    self._step_check_callback,
                    step_label="validation-check",
                    guardrail_feedback=(
                        "[validation-check] Your previous edits did not yield a "
                        "complete Python scenario file. Use the code editing tools "
                        "to focus only on validate() inside the existing template, "
                        "preserving the template markers and @register_scenario(...)."
                    ),
                    require_validation_success=True,
                )
            )

            step4 = self.step4_agent.run(
                scenario_description=step1.content,
                events_flow=step3.content,
                scenario_file_path=str(self.scenario_file),
                check_callback=check4,
            )
            logger.info("Step 4 completed with %s iterations.", step4.iterations)

            if not self.debug_prompts:
                self._append_step_trajectory("step4", step4)
                # Export a class-named copy (guarded by validation success) and
                # reset the working file back to the pristine seed template so
                # the next run starts from a clean slate.
                self._export_final_scenario_and_reset()

            logger.info("Multi-step scenario generation pipeline complete.")
            return {
                "description_path": str(self.scenario_metadata_path),
                "scenario_file_path": str(self.scenario_file),
                "trajectory_dir": str(self.trajectory_dir),
                "steps": [
                    step1,
                    step2,
                    step3,
                    step4,
                ],
            }
        except Exception as exc:
            logger.exception("Multi-step generation failed")
            runtime_error = True
            validation_reached = False
            if self._last_check_result is not None:
                runtime_error = self._last_check_result.runtime_error or not self._last_check_result.validation_reached
                validation_reached = self._last_check_result.validation_reached
            self._persist_failed_scenario(str(exc), runtime_error=runtime_error, validation_reached=validation_reached)
            raise

    def _maybe_write_filtered_metadata_for_prompt(self) -> tuple[Path | None, list[dict[str, Any]]]:
        """Optionally write a filtered metadata JSON for Step 0/1 prompts.

        Motivation: when generating scenarios for a specific app set (e.g.,
        --apps StatefulCalendarApp StatefulMessagingApp), it can be useful to
        only enforce uniqueness against *scenarios with the same core app
        combination* rather than against scenarios that involve other apps.
        """
        selected = self._parse_selected_apps_from_prompt_context()
        if not selected:
            return None, self._historical_descriptions

        core_selected = selected - self._ALWAYS_INCLUDED_APPS
        if not core_selected:
            return None, self._historical_descriptions

        filtered = self._filter_metadata_by_core_apps(self._historical_descriptions, core_selected)
        # Always write the file when we have an explicit core app selection,
        # even if it results in an empty list. This makes the prompting behavior
        # deterministic and avoids surprising "other app" rejections.
        filtered_path = self.trajectory_dir / "scenario_metadata.filtered.json"
        try:
            filtered_path.write_text(json.dumps(filtered, indent=2), encoding="utf-8")
        except Exception:
            logger.exception(
                "Failed to write filtered scenario metadata to %s; falling back to full metadata", filtered_path
            )
            return None, self._historical_descriptions
        return filtered_path, filtered

    def _parse_selected_apps_from_prompt_context(self) -> set[str]:
        """Parse selected app class names from the provided dynamic prompt context."""
        raw = (self._prompt_context or {}).get("selected_apps", "") or ""
        if not raw.strip():
            return set()
        # The prompt context may be word-wrapped, which can split long class
        # names across newlines (e.g., "StatefulCale\ndarApp"). Normalize by
        # removing all whitespace before parsing.
        compact = re.sub(r"\s+", "", raw)
        tokens: set[str] = set()
        for part in compact.split(","):
            item = part.strip()
            if not item:
                continue
            if re.fullmatch(r"[A-Za-z_][A-Za-z0-9_]*", item):
                tokens.add(item)
        return tokens

    @classmethod
    def _filter_metadata_by_core_apps(
        cls,
        entries: list[dict[str, Any]],
        core_selected: set[str],
    ) -> list[dict[str, Any]]:
        """Return metadata entries whose *core* app set exactly matches `core_selected`."""
        filtered: list[dict[str, Any]] = []
        for entry in entries:
            apps = entry.get("apps") or []
            if not isinstance(apps, list):
                continue
            core = {a for a in apps if isinstance(a, str)} - cls._ALWAYS_INCLUDED_APPS
            if core == core_selected:
                filtered.append(entry)
        return filtered

    def _write_output(
        self,
        *,
        content: str,
        path: Path,
        header: str,
        append: bool,
        include_header: bool = True,
    ) -> None:
        """Persist model output to disk, inserting lightweight headers for traceability."""
        path.parent.mkdir(parents=True, exist_ok=True)
        normalized = content.strip()

        # For the working scenario file, defensively strip any natural-language
        # preamble or epilogue that might have been emitted around the template.
        if path == self.scenario_file:
            normalized = self._strip_outside_template_markers(normalized)

        if include_header:
            block = f"# {header}\n{normalized}\n"
            text_to_write = block
            if append and path.exists():
                text_to_write = "\n\n" + block
        else:
            text_to_write = f"{normalized}\n"
            append = False

        if append:
            with path.open("a", encoding="utf-8") as file:
                file.write(text_to_write)
        else:
            path.write_text(text_to_write, encoding="utf-8")

    def _initialize_working_scenario_from_seed(self) -> None:
        """Initialize the editable working scenario file from the original seed."""
        if self.seed_template_text:
            self.scenario_file.write_text(self.seed_template_text, encoding="utf-8")
        else:
            # Ensure the working file exists even if the template is missing.
            self.scenario_file.touch(exist_ok=True)

    @staticmethod
    def _strip_outside_template_markers(text: str) -> str:
        """Keep only the portion of the file between the template start/end markers.

        This allows Claude to think "out loud" in its response while ensuring that
        the persisted Python file remains importable by trimming any prose that
        appears before or after the canonical template body.
        """
        start_marker = '"""start of the template to build scenario for Proactive Agent."""'
        end_marker = '"""end of the template to build scenario for Proactive Agent."""'

        lines = text.splitlines()
        start_idx: int | None = None
        end_idx: int | None = None

        for idx, line in enumerate(lines):
            if start_marker in line and start_idx is None:
                start_idx = idx
            if end_marker in line:
                end_idx = idx

        if start_idx is not None and end_idx is not None and start_idx <= end_idx:
            kept = lines[start_idx : end_idx + 1]
            return "\n".join(kept).strip()

        # Fallback: if markers are missing (e.g., older templates), leave text as-is.
        return text

    @staticmethod
    def _looks_like_complete_scenario(text: str) -> bool:
        """Heuristic check that a Claude reply is a full scenario file, not just prose."""
        stripped = text.strip()
        if not stripped:
            return False

        # Require our template markers and a register_scenario decorator as signs
        # that the model is returning an edited scenario, not only analysis.
        has_start = '"""start of the template to build scenario for Proactive Agent."""' in stripped
        has_end = '"""end of the template to build scenario for Proactive Agent."""' in stripped
        has_register = "@register_scenario(" in stripped
        return has_start and has_end and has_register

    def _step_check_callback(
        self,
        response: str,
        iteration: int,
        *,
        step_label: str,
        guardrail_feedback: str,
        require_validation_success: bool = False,
    ) -> tuple[bool, str]:
        """Shared `check_callback` implementation for Steps 2-4.

        Claude Agent SDK typically edits scenario files via tools rather than returning full code in the
        assistant message, so this check validates `editable_seed_scenario.py` on disk instead of the
        raw model reply.
        """
        _ = response  # unused: Claude's text reply is not the source of truth for code steps
        scenario_text = self._safe_read_text(self.scenario_file)
        if not self._looks_like_complete_scenario(scenario_text):
            # Structural guardrail only; no runtime feedback available yet.
            return False, guardrail_feedback

        # Run the dynamic scenario check (TwoAgentScenarioRunner) and, if it
        # fails, thread the concrete runtime error back into the feedback so
        # the next iteration has a precise signal about what went wrong.
        result = self._run_step_check(
            step_label,
            self.scenario_file,
            require_validation_success=require_validation_success,
        )

        if result.passed:
            return True, result.feedback

        # For runtime/validation failures, return ONLY the concrete summary from
        # `_run_step_check`. The static guardrail text about "complete Python
        # scenario files" is only relevant when the structural check fails.
        return False, result.feedback

    @staticmethod
    def _sanitize_docstring_text(text: str) -> str:
        """Ensure the description can safely live inside a triple-quoted docstring."""
        return text.replace('"""', '\\"""')

    def _parse_step1_output(self, text: str) -> tuple[str | None, str | None, str]:  # noqa: C901
        """Parse Step 1 agent output into (scenario_id, class_name, description)."""
        raw = text.strip()
        if not raw:
            return None, None, ""

        lines = raw.splitlines()
        scenario_id: str | None = None
        class_name: str | None = None
        description_lines: list[str] = []
        in_description = False
        in_explanation = False

        def _strip_md_label_prefix(s: str) -> str:
            # Normalize common markdown variants like "**Scenario ID:** foo"
            # to "Scenario ID: foo" so parsing is robust.
            s = s.strip()
            # Remove leading list markers (e.g., "-", "*", "1.")
            s = re.sub(r"^\s*(?:[-*]|\d+\.)\s+", "", s)
            # Remove surrounding **bold** markers around the label portion.
            s = re.sub(r"^\*\*(.+?)\*\*\s*$", r"\1", s)
            return s

        def _match_labeled_value(label: str, s: str) -> str | None:
            # Accept "Label: value" and "**Label:** value" and "Label:** value" variants.
            # Returns the value if matched.
            normalized = s.strip()
            # Handle "**Label:** value" (colon inside the bold span)
            m = re.match(
                rf"^\*\*\s*{re.escape(label)}\s*:\s*\*\*\s*(.+)$",
                normalized,
                flags=re.IGNORECASE,
            )
            if m:
                return m.group(1).strip() or None
            # Handle "**Label**: value" (colon outside the bold span)
            m = re.match(
                rf"^\*\*\s*{re.escape(label)}\s*\*\*\s*:\s*(.+)$",
                normalized,
                flags=re.IGNORECASE,
            )
            if m:
                return m.group(1).strip() or None
            # Handle "Label: value" (with optional bold markers around label)
            m = re.match(rf"^{re.escape(label)}\s*:\s*(.+)$", _strip_md_label_prefix(normalized), flags=re.IGNORECASE)
            if m:
                return m.group(1).strip() or None
            return None

        for _idx, line in enumerate(lines):
            stripped = line.strip()
            lower = stripped.lower()
            if not in_description:
                scenario_id_val = _match_labeled_value("Scenario ID", stripped)
                if scenario_id_val is not None:
                    scenario_id = scenario_id_val
                class_name_val = _match_labeled_value("Class Name", stripped)
                if class_name_val is not None:
                    class_name = class_name_val
                # Description section header: allow "Description:" or "**Description:**"
                if (
                    re.match(r"^\s*\*\*\s*Description\s*:\s*\*\*\s*$", stripped, flags=re.IGNORECASE)
                    or re.match(r"^\s*\*\*\s*Description\s*\*\*\s*:\s*$", stripped, flags=re.IGNORECASE)
                    or (_strip_md_label_prefix(stripped).lower() == "description:")
                ):
                    in_description = True
                    continue
                # Description header with content on same line: "Description: foo"
                desc_inline = _match_labeled_value("Description", stripped)
                if desc_inline is not None:
                    in_description = True
                    description_lines.append(desc_inline)
                    continue
                continue
            if in_description and not in_explanation:
                # Stop the description at the start of an optional Explanation section.
                if (
                    re.match(r"^\s*\*\*\s*Explanation\s*:\s*\*\*\s*$", stripped, flags=re.IGNORECASE)
                    or re.match(r"^\s*\*\*\s*Explanation\s*\*\*\s*:\s*$", stripped, flags=re.IGNORECASE)
                    or re.match(r"^\s*(?:\*\*\s*)?explanation(?:\s*\*\*)?\s*:\s*$", stripped, flags=re.IGNORECASE)
                    or lower.startswith("explanation:")
                ):
                    in_explanation = True
                    continue
                # Safety: some models accidentally paste additional drafts (including new Scenario ID/Class Name blocks)
                # inside the Description section. Stop capturing at the start of a new header to avoid polluting metadata.
                if re.match(r"^\s*\*{0,2}\s*Scenario ID\s*:\s*", stripped, flags=re.IGNORECASE) or re.match(
                    r"^\s*\*{0,2}\s*Class Name\s*:\s*", stripped, flags=re.IGNORECASE
                ):
                    break
                description_lines.append(line)

        if not in_description:
            # Fallback: treat the whole text as description.
            return None, None, raw

        description = "\n".join(description_lines).strip()
        return scenario_id, class_name, description

    def _update_scenario_header(  # noqa: C901
        self,
        *,
        scenario_id: str | None,
        class_name: str | None,
        description: str,
    ) -> None:
        """Apply Step 1 outputs to the working scenario header (id, class, docstring)."""
        if not description.strip():
            return

        original_code = self._safe_read_text(self.scenario_file)
        code = original_code
        if not code.strip():
            if self.seed_template_text:
                code = self.seed_template_text
            else:
                return

        # If the Step 1 agent omitted id or class name, skip header updates to
        # avoid writing partially configured identifiers.
        if scenario_id is None or class_name is None:
            return

        # Update @register_scenario("<id>")
        def _replace_register(match: re.Match[str]) -> str:
            return f'@register_scenario("{scenario_id}")'

        code = re.sub(
            r'@register_scenario\(\s*["\']([^"\']+)["\']\s*\)',
            _replace_register,
            code,
            count=1,
        )

        # Update `class <Name>(PAREScenario):`
        def _replace_class(match: re.Match[str]) -> str:
            return f"class {class_name}(PAREScenario):"

        code = re.sub(
            r"class\s+(\w+)\s*\(PAREScenario\):",
            _replace_class,
            code,
            count=1,
        )

        # Update the class docstring to reflect the full scenario description.
        # Prefer replacing the <<scenario_description>> placeholder (keeps template
        # indentation stable) rather than wholesale replacing the docstring.
        try:
            class_idx = code.index("class ")
        except ValueError:
            class_idx = 0
        doc_start = code.find('"""', class_idx)
        doc_end = code.find('"""', doc_start + 3) if doc_start != -1 else -1
        if doc_start != -1 and doc_end != -1:
            sanitized = self._sanitize_docstring_text(description.strip())
            doc_body = code[doc_start + 3 : doc_end]
            if "<<scenario_description>>" in doc_body:
                new_body = doc_body.replace("<<scenario_description>>", sanitized)
            else:
                new_body = sanitized

            # IMPORTANT: `code[:doc_start]` already contains the indentation
            # prefix for the docstring line, so do NOT prepend indentation again
            # (doing so leads to an `IndentationError` later when class-level
            # fields like `start_time` return to the correct indentation).
            new_doc = f'"""{new_body}"""'
            code = code[:doc_start] + new_doc + code[doc_end + 3 :]

        # Defensive: never persist an invalid Python file; syntax errors here
        # can cascade into confusing registry/runtime failures later.
        try:
            ast.parse(code)
        except SyntaxError as exc:
            # Revert to the original on-disk content (best effort) and fail fast.
            if original_code:
                self.scenario_file.write_text(original_code, encoding="utf-8")
            raise RuntimeError(f"Invalid Python after Step 1 header update: {exc}") from exc

        self.scenario_file.write_text(code, encoding="utf-8")

    def _snapshot_scenario(self, step_label: str) -> None:
        """Save a point-in-time copy of the editable seed scenario after a step.

        For example, after Step 2 and Step 3 complete successfully we capture
        `editable_seed_scenario_step2.py` and `editable_seed_scenario_step3.py`
        under the step_trajectory/trajectory_*/ directory so users can inspect
        or resume from those artifacts if needed.
        """
        try:
            import shutil

            snapshot_name = f"editable_seed_scenario_{step_label}.py"
            snapshot_path = self.trajectory_dir / snapshot_name
            shutil.copy2(self.scenario_file, snapshot_path)
            logger.info("Snapshot for %s written to %s", step_label, snapshot_path)
        except Exception:  # pragma: no cover - snapshot failures are non-fatal
            logger.exception("Failed to snapshot scenario after %s", step_label)

    def _restore_scenario_from_trajectory(self, step_label: str) -> str:
        """Restore the working scenario file from a previously snapshotted step.

        This is used when resuming the pipeline so that we can reuse the
        scenario code as of a particular step (e.g., 'step2' or 'step3').
        If no snapshot is available, the method logs a warning and preserves
        the existing working file contents.
        """
        snapshot_name = f"editable_seed_scenario_{step_label}.py"
        snapshot_path = self.trajectory_dir / snapshot_name
        snapshot_text = self._safe_read_text(snapshot_path)
        if not snapshot_text:
            logger.warning(
                "Requested resume from %s but no snapshot found at %s; continuing with existing scenario file at %s",
                step_label,
                snapshot_path,
                self.scenario_file,
            )
            return self._safe_read_text(self.scenario_file)

        self.scenario_file.write_text(snapshot_text, encoding="utf-8")
        logger.info("Restored scenario file for %s from trajectory snapshot %s", step_label, snapshot_path)
        return snapshot_text

    def _append_step_trajectory(self, step_label: str, step_result: StepResult) -> None:
        """Append a single step's trajectory record to the trajectory directory.

        This writes a JSONL file (`steps.jsonl`) under `trajectory_dir` where
        each line is a JSON object describing one step run, including the full
        LLM conversation and metadata. Failures are logged but do not stop the
        main pipeline.
        """
        try:
            record = {
                "step_label": step_label,
                "timestamp": datetime.utcnow().isoformat(),
                "step": asdict(step_result),
            }
            path = self.trajectory_dir / "steps.jsonl"
            with path.open("a", encoding="utf-8") as handle:
                handle.write(json.dumps(record))
                handle.write("\n")
        except Exception:  # pragma: no cover - trajectory logging is best-effort
            logger.exception("Failed to append step trajectory for %s", step_label)

    def _export_final_scenario_and_reset(self) -> None:
        """Export the final scenario by class name, then reset the working file.

        After all four steps and checks have passed, this method:
        1. Reads `editable_seed_scenario.py` and extracts the PAREScenario class
           name (e.g., `MyScenarioName`).
        2. Copies the final scenario into
           `pare/scenarios/generator/MyScenarioName.py`.
        3. Resets `editable_seed_scenario.py` back to the original seed template
           so the next multi-step run starts from a clean, canonical file.
        """
        code = self._safe_read_text(self.scenario_file)
        if not code.strip():
            logger.warning("Final scenario export skipped: working scenario file is empty.")
            return

        match = re.search(r"class\s+(\w+)\s*\(PAREScenario\):", code)
        if not match:
            logger.warning(
                "Final scenario export skipped: could not parse PAREScenario class name from %s",
                self.scenario_file,
            )
            return

        class_name = match.group(1)
        target_path = self.seed_scenarios_dir / f"{class_name}.py"
        if target_path.exists():
            # Safety guard: avoid silently overwriting an existing scenario file.
            # Prefer adding a numeric suffix to the filename (class name inside the file remains unchanged).
            i = 2
            while True:
                candidate = self.seed_scenarios_dir / f"{class_name}{i}.py"
                if not candidate.exists():
                    logger.warning(
                        "Target scenario file %s already exists; exporting to %s instead to avoid overwrite.",
                        target_path,
                        candidate,
                    )
                    target_path = candidate
                    break
                i += 1

        # Safety guard: never export into the default generation output directory
        # unless the most recent run check reached validation and succeeded.
        if (
            self._last_check_result is None
            or self._last_check_result.runtime_error
            or not self._last_check_result.validation_reached
            or not self._last_check_result.validation_success
        ):
            logger.warning(
                "Skipping final scenario export for class %s: last run check did not validate successfully.",
                class_name,
            )
            # Still reset the working file so subsequent runs start clean.
            self._initialize_working_scenario_from_seed()
            return

        try:
            shutil.copy2(self.scenario_file, target_path)
            logger.info(
                "Exported final scenario for class %s to %s",
                class_name,
                target_path,
            )
        except Exception:  # pragma: no cover - export failures are non-fatal
            logger.exception("Failed to export final scenario for class %s", class_name)

        # Reset the editable working file back to the original seed template so
        # subsequent runs begin from a pristine scenario skeleton.
        try:
            self._initialize_working_scenario_from_seed()
            logger.info("Reset working scenario file %s from original seed template.", self.scenario_file)
        except Exception:  # pragma: no cover - reset failures are non-fatal
            logger.exception("Failed to reset working scenario file %s", self.scenario_file)

    def _load_existing_step1_result(self, step1_path: Path) -> StepResult:
        """Load a previously generated Step 1 description from disk.

        This is used when resuming the pipeline from Step 2 after fixing issues
        downstream, so we can reuse the narrative without re-running the LLM.
        """
        # Prefer the legacy markdown path if present and non-empty (backward compatible),
        # otherwise fall back to the most recent entry in `valid_descriptions.json`.
        raw = self._safe_read_text(step1_path)
        description: str | None = None
        if raw.strip():
            lines = raw.splitlines()
            # Drop header/comment lines (e.g., "# Step 1 - Scenario Description")
            content_lines = [line for line in lines if not line.lstrip().startswith("#")]
            candidate = "\n".join(content_lines).strip()
            if candidate:
                description = candidate
        if description is None:
            history = self._read_scenario_metadata()
            if history:
                last = history[-1]
                candidate = (last.get("description") or "").strip()
                if candidate:
                    description = candidate
        if not description:
            raise RuntimeError(
                "Cannot resume from Step 2: missing markdown and no valid description found in valid_descriptions.json"
            )

        return StepResult(
            name="Step 1: Scenario Description (resumed)",
            content=description,
            iterations=0,
            notes={
                "resumed_from_disk": True,
                "source_path": str(step1_path),
            },
            conversation=[],
        )

    def _run_step_check(  # noqa: C901
        self,
        label: str,
        artifact_path: Path,
        require_validation_success: bool = False,
    ) -> RunCheckResult:
        """Run the generated scenario and summarize the result.

        This helper is the canonical integration point between the Claude-backed
        step agents (which edit the scenario file) and the PARE/meta-ARE runner,
        which validates that the scenario can be imported, executed, and passes
        its `validate()` checks.
        """
        code = self._safe_read_text(artifact_path)
        scenario_id = self._extract_scenario_id(code)
        if scenario_id is None:
            result = RunCheckResult(
                passed=False,
                feedback=f"[{label}] Failed to parse scenario_id from file {artifact_path}",
                runtime_error=True,
                validation_reached=False,
                validation_success=False,
            )
            self._last_check_result = result
            return result

        # Ensure the PARE scenario registry can see the working scenario file.
        # We do this by updating PARE_SCENARIOS_DIR to include the directory
        # that contains `editable_seed_scenario.py` (or any other artifact
        # passed in). The PARE registry lazily discovers scenarios based on
        # this environment variable when `registry.get_scenario(...)` is first
        # called inside `run_demo`.
        scenarios_dir_name = artifact_path.parent.name
        existing_dirs = os.getenv("PARE_SCENARIOS_DIR", "benchmark")
        dirs = [d.strip() for d in existing_dirs.split(",") if d.strip()]
        if scenarios_dir_name not in dirs:
            dirs.append(scenarios_dir_name)
            os.environ["PARE_SCENARIOS_DIR"] = ",".join(dirs)

        # IMPORTANT: `editable_seed_scenario.py` is imported as a module during
        # PARE scenario discovery. When generating multiple scenarios in the same
        # Python process (e.g., `--num-scenarios > 1`), Python's module cache and
        # the PARE registry's `_scenarios_discovered` flag can prevent the updated
        # decorator/class from being re-imported, leading to:
        #   "No scenario registered with ID '<new_id>'"
        # We force a best-effort refresh of just the working module and trigger
        # the registry to re-discover scenarios.
        try:
            from pare.scenarios.utils.registry import registry as pas_registry

            module_name = f"pare.scenarios.{scenarios_dir_name}.{artifact_path.stem}"
            if module_name in sys.modules:
                del sys.modules[module_name]
            importlib.invalidate_caches()
            if hasattr(pas_registry, "_scenarios_discovered"):
                pas_registry._scenarios_discovered = False
        except Exception:
            logger.exception("Failed to refresh PARE scenario registry/module cache before run check")

        logger.info(
            "Running scenario check '%s' for scenario_id='%s' using artifact '%s' via TwoAgentScenarioRunner",
            label,
            scenario_id,
            artifact_path,
        )
        # Use the two-agent demo runner in oracle mode (no LLM calls) to execute
        # the scenario deterministically and obtain a ScenarioValidationResult.
        try:
            validation_result = run_scenarios(
                scenario_names=[scenario_id],
                oracle_mode=True,
                max_turns=None,
                tool_failure_prob=0.0,
                env_events_per_min=0.0,
                env_events_seed=42,
            )
        except Exception as exc:  # pragma: no cover - runtime failure path
            runtime_error = True
            validation_reached = False
            validation_success = False
            passed = False
            # Ensure we always surface a meaningful message, even when the
            # exception has an empty string representation.
            exc_msg = str(exc).strip() or repr(exc)
            feedback = (
                f"[{label}] FAILED run for scenario '{scenario_id}'.\n"
                f"Runtime error while executing scenario via TwoAgentScenarioRunner: {exc_msg}"
            )
            result = RunCheckResult(
                passed=passed,
                feedback=feedback,
                runtime_error=runtime_error,
                validation_reached=validation_reached,
                validation_success=validation_success,
            )
            self._last_check_result = result
            return result

        # `run_demo` returns the ScenarioValidationResult from the runner.
        runtime_error = validation_result.results[0].exception is not None if validation_result.results else False
        validation_reached = True
        validation_success = getattr(validation_result, "passed", 0) > 0

        passed = True
        if runtime_error or (require_validation_success and not validation_success):
            passed = False

        # Build a concise feedback summary; detailed logs are already emitted by
        # the runner and its logging configuration.
        status_line = "SUCCESS" if validation_success else "FAILED"
        rationale = getattr(validation_result, "rationale", None)
        exception = getattr(validation_result, "exception", None)
        export_path = getattr(validation_result, "export_path", None)
        details: list[str] = [f"Validation: {status_line}"]
        if rationale:
            details.append(f"Rationale: {rationale}")
        if exception:
            details.append(f"Exception: {exception}")
        if export_path:
            details.append(f"Trace export path: {export_path}")
        summary = "\n".join(details) if details else "No additional validation details."

        feedback = f"[{label}] {'PARESED' if passed else 'FAILED'} run for scenario '{scenario_id}'.\n{summary}"
        result = RunCheckResult(
            passed=passed,
            feedback=feedback,
            runtime_error=runtime_error,
            validation_reached=validation_reached,
            validation_success=validation_success,
        )
        self._last_check_result = result
        return result

    def _get_or_initialize_scenario_file(self) -> str:
        """Return current scenario file contents, seeding from template if missing."""
        if self.scenario_file.exists():
            return self._safe_read_text(self.scenario_file)
        if self.seed_template_text:
            self.scenario_file.write_text(self.seed_template_text, encoding="utf-8")
            return self.seed_template_text
        self.scenario_file.touch()
        return ""

    @staticmethod
    def _safe_read_text(path: Path) -> str:
        try:
            return path.read_text(encoding="utf-8")
        except FileNotFoundError:
            return ""

    def _read_scenario_metadata(self) -> list[dict[str, Any]]:
        """Return the list of stored scenario metadata entries.

        Each entry is a dict that includes at least `description` and
        `timestamp`, plus any additional fields (scenario_id, class_name, apps,
        file_path, etc.). If the metadata file is missing or malformed, an
        empty list is returned.
        """
        existing_text = self._safe_read_text(self.scenario_metadata_path).strip()
        if not existing_text:
            return []
        try:
            parsed = json.loads(existing_text)
        except json.JSONDecodeError:
            return []
        if isinstance(parsed, list):
            return parsed
        return []

    @staticmethod
    def _extract_scenario_id(code_text: str) -> str | None:
        match = re.search(r'@register_scenario\(\s*["\']([^"\']+)["\']\s*\)', code_text)
        if match:
            return match.group(1).strip()
        return None

    @staticmethod
    def _summarize_run_output(
        output: str,
        *,
        registration_ok: bool,
        runtime_error: bool,
        validation_reached: bool,
        validation_success: bool,
    ) -> str:
        lines = [line.strip() for line in output.strip().splitlines() if line.strip()]

        def find_line(pattern: str) -> str | None:
            for line in lines:
                if pattern in line:
                    return line
            return None

        registration_summary = (
            "PARES - Scenario registered successfully."
            if registration_ok
            else "FAILED - Scenario did not reach execution phase."
        )

        runtime_summary = "PARES - No runtime errors detected."
        if runtime_error:
            error_lines = [line for line in lines if "ERROR" in line or "Exception" in line]
            snippet = "\n".join(error_lines[:3]) or "\n".join(lines[-5:])
            runtime_summary = f"FAILED - Runtime issues observed:\n{snippet}"

        validation_summary = "NOT RUN - Validation step not reached."
        if validation_reached and validation_success:
            validation_summary = "PARES - ScenarioValidationResult(success=True)."
        elif validation_reached:
            val_line = find_line("ScenarioValidationResult(") or ""
            validation_summary = f"FAILED - {val_line}"

        return "\n".join([
            f"Registration: {registration_summary}",
            f"Runtime: {runtime_summary}",
            f"Validation: {validation_summary}",
        ])

    def _append_scenario_metadata(
        self,
        *,
        scenario_id: str | None,
        class_name: str | None,
        description: str,
    ) -> None:
        """Append a metadata record for the current scenario.

        This captures description, timestamp, apps used, and optional
        identifiers so downstream analysis and uniqueness checks have a single
        source of truth.
        """
        if not description.strip():
            return

        apps_display = (self._prompt_context or {}).get("selected_apps", "")
        apps: list[str] = []
        if apps_display and apps_display not in {"(none)", "(unknown)"}:
            apps = [item.strip() for item in apps_display.split(",") if item.strip()]

        entry: dict[str, Any] = {
            "scenario_id": scenario_id,
            "class_name": class_name,
            "description": description,
            "apps": apps,
            "timestamp": datetime.utcnow().isoformat(),
        }
        existing = self._read_scenario_metadata()
        existing.append(entry)
        self.scenario_metadata_path.parent.mkdir(parents=True, exist_ok=True)
        self.scenario_metadata_path.write_text(json.dumps(existing, indent=2), encoding="utf-8")
        self._historical_descriptions = existing

    def _persist_failed_scenario(
        self, reason: str, runtime_error: bool = True, validation_reached: bool = False
    ) -> None:
        """Persist failure details under the trajectory directory.

        NOTE: We intentionally do NOT write into `pare/scenario_generator/generated_scenarios/`
        anymore (that directory is noisy to clean up). The working scenario file
        and per-step snapshots already live under the trajectory directory.
        """
        _ = runtime_error
        _ = validation_reached
        # Snapshot the failed working scenario into the trajectory directory so
        # users can inspect the final on-disk contents that caused the failure
        # (e.g., after Step 2/3 guardrails reject the edits).
        try:
            failed_snapshot = self.trajectory_dir / "editable_seed_scenario_failed.py"
            if self.scenario_file.exists():
                shutil.copy2(self.scenario_file, failed_snapshot)
            (self.trajectory_dir / "failure_reason.txt").write_text(f"{reason}\n", encoding="utf-8")
            logger.info("Snapshot for failed scenario written to %s", failed_snapshot)
        except Exception:  # pragma: no cover - trajectory snapshots are best-effort
            logger.exception("Failed to snapshot failed scenario to trajectory directory")

    def _debug_print(self, message: str) -> None:
        logger.info(message)

    @staticmethod
    def _debug_placeholder_content(label: str, detail: str | None = None) -> str:
        detail_block = f"\n{detail}" if detail else ""
        return f"# [DEBUG PLACEHOLDER] {label}{detail_block}"

__init__(*, output_dir=None, max_iterations=3, trajectory_dir=None, prompt_context=None, debug_prompts=False, resume_from_step2=False, resume_from_step=None, claude_filesystem_config=None)

Initialize the orchestrator and supporting step agents.

Source code in pare/scenarios/generator/agent/scenario_generating_agent_orchestrator.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def __init__(
    self,
    *,
    output_dir: str | Path | None = None,
    max_iterations: int = 3,
    trajectory_dir: str | Path | None = None,
    prompt_context: dict[str, str] | None = None,
    debug_prompts: bool = False,
    resume_from_step2: bool = False,
    resume_from_step: str | None = None,
    claude_filesystem_config: ClaudeFilesystemConfig | None = None,
) -> None:
    """Initialize the orchestrator and supporting step agents."""
    self.max_iterations = max_iterations
    self.debug_prompts = debug_prompts
    # Backwards compatibility: boolean resume_from_step2 maps to "step2" unless
    # an explicit resume_from_step value is provided.
    self.resume_from_step = resume_from_step or ("step2" if resume_from_step2 else None)
    # This file lives under `pare/scenarios/generator/agent/...`.
    # - generator_dir: pare/scenarios/generator
    # - scenarios_dir: pare/scenarios
    # - pare_dir:       pare
    generator_dir = Path(__file__).resolve().parents[1]
    scenarios_dir = generator_dir.parent
    pas_dir = scenarios_dir.parent

    # Keep repo_root aligned to the `pare/` package directory (so relative paths
    # like repo_root/"scenarios"/... resolve under `pare/scenarios/`).
    self.repo_root = pas_dir

    # Directory that tracks the per-step trajectory for this run, e.g.,
    # pare/scenario_generator/step_trajectory/trajectory_YYYYMMDDTHHMMSS.
    trajectory_root = generator_dir / "step_trajectory"
    if trajectory_dir is not None:
        self.trajectory_dir = Path(trajectory_dir)
    else:
        timestamp = datetime.utcnow().strftime("%Y%m%dT%H%M%S")
        self.trajectory_dir = trajectory_root / f"trajectory_{timestamp}"
    self.trajectory_dir.mkdir(parents=True, exist_ok=True)

    # Directory where intermediate markdown artifacts live. We no longer write
    # to `pare/scenario_generator/generated_scenarios/`; keep artifacts scoped
    # to the trajectory directory by default.
    self.output_dir = Path(output_dir) if output_dir is not None else self.trajectory_dir
    self.output_dir.mkdir(parents=True, exist_ok=True)

    # Directory that holds the single editable working copy plus the final
    # exported scenarios produced by the multi-step generator.
    #
    # IMPORTANT: this directory must live directly under `pare/scenarios/`
    # so that `PARE_SCENARIOS_DIR=generator` can discover and import the
    # working file as `pare.scenarios.generator.<module>`.
    self.seed_scenarios_dir = generator_dir
    self.seed_scenarios_dir.mkdir(parents=True, exist_ok=True)

    # Use the editable_seed_scenario-based working file so Claude Agent can
    # repeatedly edit a single, stable filename. The original seed template
    # remains read-only for reference.
    self.scenario_file = self.seed_scenarios_dir / "editable_seed_scenario.py"

    # Global scenario metadata used for uniqueness checks and analysis.
    # Stored under `pare/scenarios/scenario_metadata.json` so it is shared
    # across runs and not tied to a particular output directory.
    self.scenario_metadata_path = self.repo_root / "scenarios" / "scenario_metadata.json"

    # Dynamic prompt context (selected apps/tools) for this run.
    # IMPORTANT: must be set before any helper that reads `_prompt_context`.
    self._prompt_context: dict[str, str] = prompt_context or {}

    self._last_check_result: RunCheckResult | None = None
    # Declarative filesystem policy for Claude Agent SDK usage. Enforcement
    # will be wired via hooks and tool options in a follow-up change.
    if claude_filesystem_config is None:
        self.claude_filesystem_config = ClaudeFilesystemConfig(
            read_only_roots=[self.repo_root],
            editable_files=[self.scenario_file],
        )
    else:
        self.claude_filesystem_config = claude_filesystem_config
    self._historical_descriptions = self._read_scenario_metadata()

    # For Step 0/1 prompting we often want to scope uniqueness comparisons to
    # scenarios that use the same core app combination as this run (excluding
    # the always-present PAREAgentUserInterface + HomeScreenSystemApp).
    self.scenario_metadata_path_for_prompt = self.scenario_metadata_path
    self._historical_descriptions_for_prompt = self._historical_descriptions
    filtered_path, filtered_entries = self._maybe_write_filtered_metadata_for_prompt()
    if filtered_path is not None:
        self.scenario_metadata_path_for_prompt = filtered_path
        self._historical_descriptions_for_prompt = filtered_entries

    # Per-step Claude runtime configurations. Narrative and uniqueness
    # checks do not need code-editing tools, while Steps 2-4 use Read/Write
    # to modify the seed_scenario file.
    self._claude_config_uniqueness = ClaudeAgentRuntimeConfig(
        cwd=self.repo_root,
        allowed_tools=["Read"],
        permission_mode="acceptEdits",
        filesystem=self.claude_filesystem_config,
    )
    self._claude_config_step1 = ClaudeAgentRuntimeConfig(
        cwd=self.repo_root,
        allowed_tools=["Read"],
        permission_mode="acceptEdits",
        filesystem=self.claude_filesystem_config,
    )
    self._claude_config_code_steps = ClaudeAgentRuntimeConfig(
        cwd=self.repo_root,
        allowed_tools=["Read", "Write"],
        permission_mode="acceptEdits",
        filesystem=self.claude_filesystem_config,
    )

    if prompt_context is not None:
        configure_dynamic_context(**prompt_context)

    if self.debug_prompts:
        logger.info(
            "Debug prompts mode enabled for multi-step scenario generator; all Claude calls "
            "will be skipped. Prompts and planned file operations will be logged instead.",
        )

    # Use the canonical original seed template with explicit start/end markers
    # from the PARE scenarios package so we can safely strip any
    # natural-language preamble/epilogue that Claude might emit around the
    # template body.
    self.seed_template_path = generator_dir / "utils" / "original_seed_scenario.py"
    self.seed_template_text = self._safe_read_text(self.seed_template_path)

    if self.debug_prompts:
        logger.info("Scenario working file: %s", self.scenario_file)
        logger.info("Seed template path: %s", self.seed_template_path)
        logger.info("Scenario metadata path: %s", self.scenario_metadata_path)
        logger.info(
            "Claude filesystem config: read_only_roots=%s, editable_files=%s",
            self.claude_filesystem_config.read_only_roots,
            self.claude_filesystem_config.editable_files,
        )
        if prompt_context is not None:
            logger.info("Selected apps for this run: %s", prompt_context.get("selected_apps", "(unknown)"))

    self.uniqueness_agent = ScenarioUniquenessCheckAgent(
        historical_descriptions=self._historical_descriptions_for_prompt,
        scenario_metadata_path=str(self.scenario_metadata_path_for_prompt),
        debug_prompts=debug_prompts,
        claude_runtime_config=self._claude_config_uniqueness,
    )
    self.step1_agent = StepEditAgent(
        step_name="Step 1: Scenario Description",
        step_kind="description",
        system_prompt=prompt_module.SCENARIO_DESCRIPTION_SYSTEM_PROMPT,
        max_iterations=max_iterations,
        uniqueness_agent=self.uniqueness_agent,
        debug_prompts=debug_prompts,
        claude_runtime_config=self._claude_config_step1,
    )
    self.step2_agent = StepEditAgent(
        step_name="Step 2: Apps & Data Setup",
        step_kind="apps_and_data",
        system_prompt=prompt_module.APPS_AND_DATA_SYSTEM_PROMPT,
        max_iterations=max_iterations,
        debug_prompts=debug_prompts,
        claude_runtime_config=self._claude_config_code_steps,
    )
    self.step3_agent = StepEditAgent(
        step_name="Step 3: Events Flow",
        step_kind="events_flow",
        system_prompt=prompt_module.EVENTS_FLOW_SYSTEM_PROMPT,
        max_iterations=max_iterations,
        debug_prompts=debug_prompts,
        claude_runtime_config=self._claude_config_code_steps,
    )
    self.step4_agent = StepEditAgent(
        step_name="Step 4: Validation Conditions",
        step_kind="validation",
        system_prompt=prompt_module.VALIDATION_SYSTEM_PROMPT,
        max_iterations=max_iterations,
        debug_prompts=debug_prompts,
        claude_runtime_config=self._claude_config_code_steps,
    )

run()

Execute the four-step pipeline and return artifact metadata.

Source code in pare/scenarios/generator/agent/scenario_generating_agent_orchestrator.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
def run(self) -> dict[str, Any]:  # noqa: C901
    """Execute the four-step pipeline and return artifact metadata."""
    logger.info("Starting multi-step scenario generation.")

    try:
        step1_path = self.output_dir / "step1_scenario_description.md"
        resume_mode = self.resume_from_step

        # If resuming from later steps, restore the working scenario file
        # from the appropriate trajectory snapshot when available. This
        # keeps the single editable_seed_scenario.py in sync with the code
        # that previously passed validation for that step.
        if not self.debug_prompts:
            if resume_mode == "step2":
                # Restore the scenario file as it looked after Step 1
                # header updates.
                self._restore_scenario_from_trajectory("step1")
            elif resume_mode == "step3":
                # Restore the scenario as of the end of Step 2.
                self._restore_scenario_from_trajectory("step2")
            elif resume_mode == "step4":
                # Restore the scenario as of the end of Step 3.
                self._restore_scenario_from_trajectory("step3")

        # For fresh runs (no resume) we start from a pristine copy of the
        # original seed scenario so that Step 1 can update its header
        # (scenario id, class name, and docstring) deterministically.
        if resume_mode not in {"step2", "step3", "step4"} and not self.debug_prompts:
            self._initialize_working_scenario_from_seed()

        if resume_mode in {"step2", "step3", "step4"} and not self.debug_prompts:
            step1 = self._load_existing_step1_result(step1_path)
        else:

            def step1_check(description: str, iteration: int) -> tuple[bool, str]:
                # This callback itself does not write any files. Step 1 side
                # effects (updating `valid_descriptions.json` and the
                # editable_seed_scenario.py header) are applied by the
                # orchestrator after the step completes.
                return True, ""

            check1 = None if self.debug_prompts else step1_check

            step1 = self.step1_agent.run(
                scenario_metadata_path=str(self.scenario_metadata_path_for_prompt),
                check_callback=check1,
            )
            logger.info("Step 1 completed with %s iterations.", step1.iterations)
            if not self.debug_prompts:
                scenario_id, class_name, description = self._parse_step1_output(step1.content)
                # Only persist metadata + update headers when Step 1 produced
                # a parseable identifier and a non-empty description.
                if scenario_id is None or class_name is None or not description.strip():
                    logger.warning(
                        "Step 1 output did not include a parseable Scenario ID/Class Name/Description. "
                        "Skipping metadata/header update for this run."
                    )
                else:
                    # Avoid silent overwrites when the generator proposes identifiers that already exist.
                    scenario_id, class_name, dedupe_notes = self._ensure_unique_step1_identifiers(
                        scenario_id=scenario_id, class_name=class_name
                    )
                    self._append_scenario_metadata(
                        scenario_id=scenario_id,
                        class_name=class_name,
                        description=description,
                    )
                    self._update_scenario_header(
                        scenario_id=scenario_id, class_name=class_name, description=description
                    )
                    self._append_step_trajectory("step1", step1)
                    if dedupe_notes:
                        # Best-effort: persist dedupe info next to the trajectory for debugging.
                        try:
                            (self.trajectory_dir / "step1_identifier_dedupe.json").write_text(
                                json.dumps(dedupe_notes, indent=2), encoding="utf-8"
                            )
                        except Exception:
                            logger.exception("Failed to write Step 1 identifier dedupe notes")
                    # Snapshot the scenario after Step 1 header updates so users
                    # can inspect the early state if Step 2 fails.
                    self._snapshot_scenario("step1")

        # Step 2: Apps & Data Setup
        if resume_mode in {"step3", "step4"} and not self.debug_prompts:
            logger.info("Resuming from %s: skipping Step 2 generation.", resume_mode)
            scenario_seed_content = self._safe_read_text(self.scenario_file)
            scenario_after_step2 = scenario_seed_content
            step2 = StepResult(
                name="Step 2: Apps & Data Setup (resumed)",
                content=scenario_seed_content,
                iterations=0,
                notes={"resumed_from_disk": True},
                conversation=[],
            )
            if not self.debug_prompts:
                self._append_step_trajectory("step2", step2)
        else:
            # For fresh runs, Step 1 has already initialized and updated
            # the working scenario file. For resumed runs that reach this
            # branch, use the existing editable_seed_scenario.py on disk.
            scenario_seed_content = self._get_or_initialize_scenario_file()
            check2 = (
                None
                if self.debug_prompts
                else functools.partial(
                    self._step_check_callback,
                    step_label="apps-data-check",
                    guardrail_feedback=(
                        "[apps-data-check] Your previous edits did not yield a "
                        "complete Python scenario file. Use the code editing tools "
                        "to update only the imports and init_and_populate_apps() "
                        "within the existing template, ensuring the file still "
                        "contains the original template start/end markers and "
                        "a @register_scenario(...) decorator."
                    ),
                    require_validation_success=False,
                )
            )

            step2 = self.step2_agent.run(
                scenario_description=step1.content,
                scenario_file_path=str(self.scenario_file),
                check_callback=check2,
            )
            logger.info("Step 2 completed with %s iterations.", step2.iterations)
            if not self.debug_prompts:
                self._append_step_trajectory("step2", step2)

            scenario_after_step2 = (
                self._debug_placeholder_content("scenario_after_step2", step2.content)
                if self.debug_prompts
                else self._safe_read_text(self.scenario_file)
            )

            # Snapshot the scenario after Step 2 completes successfully.
            if not self.debug_prompts:
                self._snapshot_scenario("step2")

        # Step 3: Events Flow
        if resume_mode == "step4" and not self.debug_prompts:
            logger.info("Resuming from step4: skipping Step 3 generation.")
            scenario_after_step3 = self._safe_read_text(self.scenario_file)
            step3 = StepResult(
                name="Step 3: Events Flow (resumed)",
                content=scenario_after_step3,
                iterations=0,
                notes={"resumed_from_disk": True},
                conversation=[],
            )
            if not self.debug_prompts:
                self._append_step_trajectory("step3", step3)
        else:
            check3 = (
                None
                if self.debug_prompts
                else functools.partial(
                    self._step_check_callback,
                    step_label="events-flow-check",
                    guardrail_feedback=(
                        "[events-flow-check] Your previous edits did not yield a "
                        "complete Python scenario file. Use the code editing tools "
                        "to update only build_events_flow() within the existing "
                        "template, preserving the template markers and "
                        "@register_scenario(...) decorator."
                    ),
                    require_validation_success=False,
                )
            )

            step3 = self.step3_agent.run(
                scenario_description=step1.content,
                apps_and_data=step2.content,
                scenario_file_path=str(self.scenario_file),
                check_callback=check3,
            )
            logger.info("Step 3 completed with %s iterations.", step3.iterations)
            if not self.debug_prompts:
                self._append_step_trajectory("step3", step3)

            scenario_after_step3 = (
                self._debug_placeholder_content("scenario_after_step3", step3.content)
                if self.debug_prompts
                else self._safe_read_text(self.scenario_file)
            )

            # Snapshot the scenario after Step 3 completes successfully.
            if not self.debug_prompts:
                self._snapshot_scenario("step3")

        check4 = (
            None
            if self.debug_prompts
            else functools.partial(
                self._step_check_callback,
                step_label="validation-check",
                guardrail_feedback=(
                    "[validation-check] Your previous edits did not yield a "
                    "complete Python scenario file. Use the code editing tools "
                    "to focus only on validate() inside the existing template, "
                    "preserving the template markers and @register_scenario(...)."
                ),
                require_validation_success=True,
            )
        )

        step4 = self.step4_agent.run(
            scenario_description=step1.content,
            events_flow=step3.content,
            scenario_file_path=str(self.scenario_file),
            check_callback=check4,
        )
        logger.info("Step 4 completed with %s iterations.", step4.iterations)

        if not self.debug_prompts:
            self._append_step_trajectory("step4", step4)
            # Export a class-named copy (guarded by validation success) and
            # reset the working file back to the pristine seed template so
            # the next run starts from a clean slate.
            self._export_final_scenario_and_reset()

        logger.info("Multi-step scenario generation pipeline complete.")
        return {
            "description_path": str(self.scenario_metadata_path),
            "scenario_file_path": str(self.scenario_file),
            "trajectory_dir": str(self.trajectory_dir),
            "steps": [
                step1,
                step2,
                step3,
                step4,
            ],
        }
    except Exception as exc:
        logger.exception("Multi-step generation failed")
        runtime_error = True
        validation_reached = False
        if self._last_check_result is not None:
            runtime_error = self._last_check_result.runtime_error or not self._last_check_result.validation_reached
            validation_reached = self._last_check_result.validation_reached
        self._persist_failed_scenario(str(exc), runtime_error=runtime_error, validation_reached=validation_reached)
        raise

Step Agents

BaseStepAgent

Base helper that wraps LLM calls and retry/check logic for a pipeline step.

Source code in pare/scenarios/generator/agent/step_agents.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class BaseStepAgent:
    """Base helper that wraps LLM calls and retry/check logic for a pipeline step."""

    def __init__(
        self,
        *,
        name: str,
        system_prompt: str,
        max_iterations: int = 3,
        uniqueness_agent: ScenarioUniquenessCheckAgent | None = None,
        debug_prompts: bool = False,
        claude_runtime_config: ClaudeAgentRuntimeConfig | None = None,
    ) -> None:
        """Configure shared settings for a single multi-step generation phase."""
        self.name = name
        self.system_prompt = system_prompt
        self.max_iterations = max_iterations
        self.uniqueness_agent = uniqueness_agent
        self.debug_prompts = debug_prompts
        self._claude_config = claude_runtime_config

    def _run_with_prompt(
        self,
        *,
        user_prompt: str,
        check_callback: Callable[[str, int], tuple[bool, str]] | None = None,
        debug_response_builder: Callable[[str], str] | None = None,
    ) -> StepResult:
        conversation = [
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": user_prompt},
        ]
        if self.debug_prompts:
            return self._run_in_debug_mode(
                conversation=conversation,
                debug_response_builder=debug_response_builder,
            )
        for iteration in range(1, self.max_iterations + 1):
            response = self._invoke_llm(conversation, iteration)
            assistant_msg = {"role": "assistant", "content": response}
            notes: dict[str, Any] = {"iteration": iteration}

            if self.uniqueness_agent is not None:
                unique, verdict = self.uniqueness_agent.evaluate(response)
                notes["uniqueness_verdict"] = verdict
                if not unique:
                    logger.info(
                        "%s uniqueness rejection (iteration %s): %s\nCandidate description:\n%s",
                        self.name,
                        iteration,
                        verdict,
                        response,
                    )
                    conversation.extend([
                        assistant_msg,
                        {
                            "role": "user",
                            "content": f"Uniqueness review failed: {verdict}",
                        },
                    ])
                    continue

            check_passed = True
            feedback = ""
            if check_callback is not None:
                check_passed, feedback = check_callback(response, iteration)
                if feedback:
                    notes["check_feedback"] = feedback

            if not check_passed:
                logger.warning(
                    "%s check_callback failed at iteration %s. Feedback:\n%s",
                    self.name,
                    iteration,
                    feedback,
                )
                conversation.extend([
                    assistant_msg,
                    {
                        "role": "user",
                        "content": feedback or "Check failed; please revise.",
                    },
                ])
                continue

            full_conversation = [*conversation, assistant_msg]
            logger.info(
                "%s succeeded at iteration %s",
                self.name,
                iteration,
            )
            return StepResult(
                name=self.name,
                content=response,
                iterations=iteration,
                notes=notes,
                conversation=full_conversation,
            )
        raise StepExecutionError(f"{self.name} failed after {self.max_iterations} attempts.")

    def _invoke_llm(self, conversation: list[dict[str, str]], iteration: int) -> str:
        if self._claude_config is None:
            raise StepExecutionError(f"{self.name} is misconfigured: missing Claude runtime config.")
        return run_claude_conversation(
            conversation,
            system_prompt=self.system_prompt,
            config=self._claude_config,
            step_tag=self.name,
            iteration=iteration,
        )

    def _run_in_debug_mode(
        self,
        *,
        conversation: list[dict[str, str]],
        debug_response_builder: Callable[[str], str] | None = None,
    ) -> StepResult:
        self._emit_debug_prompts(conversation)
        user_prompt = conversation[-1]["content"]
        builder = debug_response_builder or self._default_debug_response
        response = builder(user_prompt)
        notes: dict[str, Any] = {"debug_mode": True}
        if self.uniqueness_agent is not None:
            _unique, verdict = self.uniqueness_agent.evaluate(response)
            notes["uniqueness_verdict"] = verdict
        assistant_msg = {"role": "assistant", "content": response}
        full_conversation = [*conversation, assistant_msg]
        return StepResult(
            name=self.name,
            content=response,
            iterations=0,
            notes=notes,
            conversation=full_conversation,
        )

    def _default_debug_response(self, _: str) -> str:
        return f"[DEBUG MOCK OUTPUT for {self.name}]"

    def _emit_debug_prompts(self, conversation: list[dict[str, str]]) -> None:
        header = f"\n=== DEBUG PROMPTS :: {self.name} ==="
        logger.info(header)
        for message in conversation:
            role = message.get("role", "unknown").upper()
            logger.info("[%s]\n%s", role, message.get("content", ""))

__init__(*, name, system_prompt, max_iterations=3, uniqueness_agent=None, debug_prompts=False, claude_runtime_config=None)

Configure shared settings for a single multi-step generation phase.

Source code in pare/scenarios/generator/agent/step_agents.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self,
    *,
    name: str,
    system_prompt: str,
    max_iterations: int = 3,
    uniqueness_agent: ScenarioUniquenessCheckAgent | None = None,
    debug_prompts: bool = False,
    claude_runtime_config: ClaudeAgentRuntimeConfig | None = None,
) -> None:
    """Configure shared settings for a single multi-step generation phase."""
    self.name = name
    self.system_prompt = system_prompt
    self.max_iterations = max_iterations
    self.uniqueness_agent = uniqueness_agent
    self.debug_prompts = debug_prompts
    self._claude_config = claude_runtime_config

StepEditAgent

Bases: BaseStepAgent

Unified multi-step scenario agent parametrized by step kind.

Source code in pare/scenarios/generator/agent/step_agents.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
class StepEditAgent(BaseStepAgent):
    """Unified multi-step scenario agent parametrized by step kind."""

    def __init__(
        self,
        *,
        step_name: str,
        step_kind: str,
        system_prompt: str,
        max_iterations: int,
        debug_prompts: bool = False,
        claude_runtime_config: ClaudeAgentRuntimeConfig | None = None,
        uniqueness_agent: ScenarioUniquenessCheckAgent | None = None,
    ) -> None:
        """Initialize a generic scenario step agent."""
        self.step_kind = step_kind
        super().__init__(
            name=step_name,
            system_prompt=system_prompt,
            max_iterations=max_iterations,
            uniqueness_agent=uniqueness_agent,
            debug_prompts=debug_prompts,
            claude_runtime_config=claude_runtime_config,
        )

    def run(  # noqa: C901
        self,
        *,
        scenario_metadata_path: str | None = None,
        scenario_description: str | None = None,
        scenario_file_path: str | None = None,
        apps_and_data: str | None = None,
        events_flow: str | None = None,
        check_callback: Callable[[str, int], tuple[bool, str]] | None = None,
    ) -> StepResult:
        """Dispatch to the appropriate per-step prompt builder."""
        if self.step_kind == "description":
            metadata_path = (scenario_metadata_path or "").strip() or "pare/scenarios/scenario_metadata.json"
            user_prompt = SCENARIO_DESCRIPTION_USER_PROMPT.format(scenario_metadata_path=metadata_path)

            def debug_builder(_: str) -> str:
                return "[DEBUG SCENARIO DESCRIPTION | novel request]"

        elif self.step_kind == "apps_and_data":
            if scenario_description is None or scenario_file_path is None:
                raise StepExecutionError(
                    "Apps & Data step requires scenario_description and scenario_file_path.",
                )
            user_prompt = APPS_AND_DATA_USER_PROMPT.format(
                scenario_description=scenario_description,
                scenario_file_path=scenario_file_path,
            )

            def debug_builder(_: str) -> str:
                return (
                    "[DEBUG APPS & DATA OUTPUT placeholder]\n"
                    f"# scenario_file_path: {scenario_file_path}\n"
                    "# (LLM call skipped)"
                )

        elif self.step_kind == "events_flow":
            if scenario_description is None or apps_and_data is None or scenario_file_path is None:
                raise StepExecutionError(
                    "Events Flow step requires scenario_description, apps_and_data, and scenario_file_path.",
                )
            user_prompt = EVENTS_FLOW_USER_PROMPT.format(
                scenario_description=scenario_description,
                apps_and_data=apps_and_data,
                scenario_file_path=scenario_file_path,
            )

            def debug_builder(_: str) -> str:
                return "[DEBUG EVENTS FLOW OUTPUT placeholder]\n# (LLM call skipped)"

        elif self.step_kind == "validation":
            if scenario_description is None or events_flow is None or scenario_file_path is None:
                raise StepExecutionError(
                    "Validation step requires scenario_description, events_flow, and scenario_file_path.",
                )
            user_prompt = VALIDATION_USER_PROMPT.format(
                scenario_description=scenario_description,
                events_flow=events_flow,
                scenario_file_path=scenario_file_path,
            )

            def debug_builder(_: str) -> str:
                return "[DEBUG VALIDATION OUTPUT placeholder]\n# (LLM call skipped)"

        else:
            raise StepExecutionError(f"Unknown step kind: {self.step_kind!r}")

        return self._run_with_prompt(
            user_prompt=user_prompt,
            check_callback=check_callback,
            debug_response_builder=debug_builder,
        )

__init__(*, step_name, step_kind, system_prompt, max_iterations, debug_prompts=False, claude_runtime_config=None, uniqueness_agent=None)

Initialize a generic scenario step agent.

Source code in pare/scenarios/generator/agent/step_agents.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def __init__(
    self,
    *,
    step_name: str,
    step_kind: str,
    system_prompt: str,
    max_iterations: int,
    debug_prompts: bool = False,
    claude_runtime_config: ClaudeAgentRuntimeConfig | None = None,
    uniqueness_agent: ScenarioUniquenessCheckAgent | None = None,
) -> None:
    """Initialize a generic scenario step agent."""
    self.step_kind = step_kind
    super().__init__(
        name=step_name,
        system_prompt=system_prompt,
        max_iterations=max_iterations,
        uniqueness_agent=uniqueness_agent,
        debug_prompts=debug_prompts,
        claude_runtime_config=claude_runtime_config,
    )

run(*, scenario_metadata_path=None, scenario_description=None, scenario_file_path=None, apps_and_data=None, events_flow=None, check_callback=None)

Dispatch to the appropriate per-step prompt builder.

Source code in pare/scenarios/generator/agent/step_agents.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def run(  # noqa: C901
    self,
    *,
    scenario_metadata_path: str | None = None,
    scenario_description: str | None = None,
    scenario_file_path: str | None = None,
    apps_and_data: str | None = None,
    events_flow: str | None = None,
    check_callback: Callable[[str, int], tuple[bool, str]] | None = None,
) -> StepResult:
    """Dispatch to the appropriate per-step prompt builder."""
    if self.step_kind == "description":
        metadata_path = (scenario_metadata_path or "").strip() or "pare/scenarios/scenario_metadata.json"
        user_prompt = SCENARIO_DESCRIPTION_USER_PROMPT.format(scenario_metadata_path=metadata_path)

        def debug_builder(_: str) -> str:
            return "[DEBUG SCENARIO DESCRIPTION | novel request]"

    elif self.step_kind == "apps_and_data":
        if scenario_description is None or scenario_file_path is None:
            raise StepExecutionError(
                "Apps & Data step requires scenario_description and scenario_file_path.",
            )
        user_prompt = APPS_AND_DATA_USER_PROMPT.format(
            scenario_description=scenario_description,
            scenario_file_path=scenario_file_path,
        )

        def debug_builder(_: str) -> str:
            return (
                "[DEBUG APPS & DATA OUTPUT placeholder]\n"
                f"# scenario_file_path: {scenario_file_path}\n"
                "# (LLM call skipped)"
            )

    elif self.step_kind == "events_flow":
        if scenario_description is None or apps_and_data is None or scenario_file_path is None:
            raise StepExecutionError(
                "Events Flow step requires scenario_description, apps_and_data, and scenario_file_path.",
            )
        user_prompt = EVENTS_FLOW_USER_PROMPT.format(
            scenario_description=scenario_description,
            apps_and_data=apps_and_data,
            scenario_file_path=scenario_file_path,
        )

        def debug_builder(_: str) -> str:
            return "[DEBUG EVENTS FLOW OUTPUT placeholder]\n# (LLM call skipped)"

    elif self.step_kind == "validation":
        if scenario_description is None or events_flow is None or scenario_file_path is None:
            raise StepExecutionError(
                "Validation step requires scenario_description, events_flow, and scenario_file_path.",
            )
        user_prompt = VALIDATION_USER_PROMPT.format(
            scenario_description=scenario_description,
            events_flow=events_flow,
            scenario_file_path=scenario_file_path,
        )

        def debug_builder(_: str) -> str:
            return "[DEBUG VALIDATION OUTPUT placeholder]\n# (LLM call skipped)"

    else:
        raise StepExecutionError(f"Unknown step kind: {self.step_kind!r}")

    return self._run_with_prompt(
        user_prompt=user_prompt,
        check_callback=check_callback,
        debug_response_builder=debug_builder,
    )

StepExecutionError

Bases: RuntimeError

Raised when a step cannot complete within the allotted attempts.

Source code in pare/scenarios/generator/agent/step_agents.py
26
27
class StepExecutionError(RuntimeError):
    """Raised when a step cannot complete within the allotted attempts."""

StepResult dataclass

Container for a single step's outcome and trace metadata.

Source code in pare/scenarios/generator/agent/step_agents.py
30
31
32
33
34
35
36
37
38
@dataclass
class StepResult:
    """Container for a single step's outcome and trace metadata."""

    name: str
    content: str
    iterations: int
    notes: dict[str, Any]
    conversation: list[dict[str, str]]

Uniqueness and Summarization Agents

ScenarioUniquenessCheckAgent

Lightweight reviewer that enforces the Step 0 uniqueness requirement.

Source code in pare/scenarios/generator/agent/scenario_uniqueness_agent.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class ScenarioUniquenessCheckAgent:
    """Lightweight reviewer that enforces the Step 0 uniqueness requirement."""

    def __init__(
        self,
        historical_descriptions: list[dict[str, Any]] | None = None,
        *,
        scenario_metadata_path: str | None = None,
        debug_prompts: bool = False,
        claude_runtime_config: ClaudeAgentRuntimeConfig | None = None,
    ) -> None:
        """Configure the LLM engine and historical description buffer."""
        self.historical_descriptions: list[dict[str, Any]] = historical_descriptions or []
        self.scenario_metadata_path = scenario_metadata_path or "pare/scenarios/scenario_metadata.json"
        self.debug_prompts = debug_prompts
        self._claude_config = claude_runtime_config

    def evaluate(self, scenario_description: str) -> tuple[bool, str]:
        """Return (is_unique, verdict_text)."""
        user_prompt = SCENARIO_UNIQUENESS_USER_PROMPT.format(
            scenario_description=scenario_description.strip(),
            scenario_metadata_path=self.scenario_metadata_path,
        )
        if self.debug_prompts:
            self._emit_debug_prompts(
                system_prompt=SCENARIO_UNIQUENESS_SYSTEM_PROMPT,
                user_prompt=user_prompt,
            )
            verdict = "[DEBUG MODE] Scenario uniqueness check skipped."
            return True, verdict
        verdict = self._invoke_llm(
            system_prompt=SCENARIO_UNIQUENESS_SYSTEM_PROMPT,
            user_prompt=user_prompt,
            trace_tag="multi_step_step1_uniqueness",
        )
        text = verdict.strip()
        # Primary rule: look at the FIRST non-empty line so any later
        # "Comparison/Key overlap" analysis can't override a clear verdict.
        first_line = ""
        for line in text.splitlines():
            if line.strip():
                first_line = line.strip().lstrip("* ").strip()
                break
        if not first_line:
            return False, text

        upper = first_line.upper()
        if upper.startswith("PARES"):
            return True, text
        if upper.startswith("RETRY"):
            return False, text

        # Fallback: if the model didn't follow the "first line is PARES/RETRY"
        # contract, scan subsequent non-empty lines for a line that *starts*
        # with PARES or RETRY. This still ignores any mention of those tokens
        # inside later prose or bullet points.
        lines = [line.strip() for line in text.splitlines() if line.strip()]
        for line in lines:
            normalized = line.upper().lstrip("* ").strip()
            if normalized.startswith("PARES"):
                return True, text
            if normalized.startswith("RETRY"):
                return False, text

        return False, text

    def get_recent_history(self, limit: int = 8) -> str:
        """Return a human-friendly summary of previously accepted descriptions."""
        return self._format_historical_descriptions(limit=limit)

    def _format_historical_descriptions(self, limit: int = 8) -> str:
        if not self.historical_descriptions:
            return "(none recorded yet)"
        recent = self.historical_descriptions[-limit:]
        lines: list[str] = []
        for entry in reversed(recent):
            description = (entry.get("description") or "").strip().replace("\n", " ")
            if len(description) > 220:
                description = f"{description[:220].rstrip()}..."
            timestamp = entry.get("timestamp") or "unknown time"
            lines.append(f"- {description} (logged {timestamp})")
        return "\n".join(lines)

    def _invoke_llm(self, *, system_prompt: str, user_prompt: str, trace_tag: str) -> str:
        if self._claude_config is None:
            raise TypeError("Scenario uniqueness check is misconfigured: missing Claude runtime config.")
        conversation = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ]
        return run_claude_conversation(
            conversation,
            system_prompt=system_prompt,
            config=self._claude_config,
            step_tag=trace_tag,
            iteration=1,
        )

    def _emit_debug_prompts(self, *, system_prompt: str, user_prompt: str) -> None:
        logger.info("\n=== DEBUG PROMPTS :: Scenario Uniqueness Check ===")
        logger.info("[SYSTEM PROMPT]\n%s", system_prompt)
        logger.info("[USER PROMPT]\n%s", user_prompt)

__init__(historical_descriptions=None, *, scenario_metadata_path=None, debug_prompts=False, claude_runtime_config=None)

Configure the LLM engine and historical description buffer.

Source code in pare/scenarios/generator/agent/scenario_uniqueness_agent.py
19
20
21
22
23
24
25
26
27
28
29
30
31
def __init__(
    self,
    historical_descriptions: list[dict[str, Any]] | None = None,
    *,
    scenario_metadata_path: str | None = None,
    debug_prompts: bool = False,
    claude_runtime_config: ClaudeAgentRuntimeConfig | None = None,
) -> None:
    """Configure the LLM engine and historical description buffer."""
    self.historical_descriptions: list[dict[str, Any]] = historical_descriptions or []
    self.scenario_metadata_path = scenario_metadata_path or "pare/scenarios/scenario_metadata.json"
    self.debug_prompts = debug_prompts
    self._claude_config = claude_runtime_config

evaluate(scenario_description)

Return (is_unique, verdict_text).

Source code in pare/scenarios/generator/agent/scenario_uniqueness_agent.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def evaluate(self, scenario_description: str) -> tuple[bool, str]:
    """Return (is_unique, verdict_text)."""
    user_prompt = SCENARIO_UNIQUENESS_USER_PROMPT.format(
        scenario_description=scenario_description.strip(),
        scenario_metadata_path=self.scenario_metadata_path,
    )
    if self.debug_prompts:
        self._emit_debug_prompts(
            system_prompt=SCENARIO_UNIQUENESS_SYSTEM_PROMPT,
            user_prompt=user_prompt,
        )
        verdict = "[DEBUG MODE] Scenario uniqueness check skipped."
        return True, verdict
    verdict = self._invoke_llm(
        system_prompt=SCENARIO_UNIQUENESS_SYSTEM_PROMPT,
        user_prompt=user_prompt,
        trace_tag="multi_step_step1_uniqueness",
    )
    text = verdict.strip()
    # Primary rule: look at the FIRST non-empty line so any later
    # "Comparison/Key overlap" analysis can't override a clear verdict.
    first_line = ""
    for line in text.splitlines():
        if line.strip():
            first_line = line.strip().lstrip("* ").strip()
            break
    if not first_line:
        return False, text

    upper = first_line.upper()
    if upper.startswith("PARES"):
        return True, text
    if upper.startswith("RETRY"):
        return False, text

    # Fallback: if the model didn't follow the "first line is PARES/RETRY"
    # contract, scan subsequent non-empty lines for a line that *starts*
    # with PARES or RETRY. This still ignores any mention of those tokens
    # inside later prose or bullet points.
    lines = [line.strip() for line in text.splitlines() if line.strip()]
    for line in lines:
        normalized = line.upper().lstrip("* ").strip()
        if normalized.startswith("PARES"):
            return True, text
        if normalized.startswith("RETRY"):
            return False, text

    return False, text

get_recent_history(limit=8)

Return a human-friendly summary of previously accepted descriptions.

Source code in pare/scenarios/generator/agent/scenario_uniqueness_agent.py
82
83
84
def get_recent_history(self, limit: int = 8) -> str:
    """Return a human-friendly summary of previously accepted descriptions."""
    return self._format_historical_descriptions(limit=limit)

SummaryGeneratingAgent

Agent for generating summaries of scenario code.

Source code in pare/scenarios/generator/agent/summary_generating_agent.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class SummaryGeneratingAgent:
    """Agent for generating summaries of scenario code."""

    def __init__(self, llm_engine: LLMEngine) -> None:
        """Initialize the summary generating agent.

        Args:
            llm_engine: The LLM engine to use for summary generation
        """
        self.llm_engine = llm_engine
        self.system_prompt = DEFAULT_SUMMARY_GENERATOR_SYSTEM_PROMPT

    def generate_summary(self, scenario_code: str) -> str | None:
        """Generate a summary for the given scenario code.

        Args:
            scenario_code: The scenario Python code as a string

        Returns:
            The generated summary or None if generation failed
        """
        # Create the task message
        task_message = SUMMARY_TASK_TEMPLATE.format(scenario_code=scenario_code)

        # Create messages for the LLM
        messages = [{"role": "system", "content": self.system_prompt}, {"role": "user", "content": task_message}]

        logger.info("Generating summary for scenario code...")

        try:
            # Call the LLM
            llm_output_tuple = self.llm_engine(
                messages, stop_sequences=[], additional_trace_tags=["scenario_summary_generation"], schema=None
            )
            if isinstance(llm_output_tuple, tuple) and len(llm_output_tuple) == 2:
                llm_output, _ = llm_output_tuple
            else:
                llm_output = llm_output_tuple

            if isinstance(llm_output, str):
                # Clean up the output - remove any code blocks or extra formatting
                summary = llm_output.strip()
                # Remove markdown code blocks if present
                summary = re.sub(r"```[a-z]*\n?", "", summary)
                summary = re.sub(r"```", "", summary)
                summary = summary.strip()

                # Remove common prefixes that LLMs might add
                prefixes_to_remove = ["Summary:", "The scenario", "This scenario", "Scenario summary:", "Summary"]
                for prefix in prefixes_to_remove:
                    if summary.lower().startswith(prefix.lower()):
                        summary = summary[len(prefix) :].strip()
                        # Remove leading colon if present
                        if summary.startswith(":"):
                            summary = summary[1:].strip()
                        break

                logger.info(f"Generated summary: {summary[:100]}...")
                return summary
            else:
                logger.warning("LLM output is not a string")
                return None

        except Exception:
            logger.exception("Error generating summary")
            return None

    def generate_summary_from_file(self, file_path: Path | str) -> tuple[str | None, str | None]:
        """Generate a summary for a scenario file and extract its scenario ID.

        Args:
            file_path: Path to the scenario Python file

        Returns:
            Tuple of (scenario_id, summary). Returns (None, None) if extraction/generation fails.
        """
        file_path = Path(file_path)
        if not file_path.exists():
            logger.error(f"Scenario file not found: {file_path}")
            return None, None

        # Read the scenario code
        try:
            scenario_code = file_path.read_text(encoding="utf-8")
        except Exception:
            logger.exception(f"Failed to read scenario file {file_path}")
            return None, None

        # Extract scenario ID from the code
        scenario_id = self._extract_scenario_id(scenario_code)
        if not scenario_id:
            logger.warning(f"Could not extract scenario ID from {file_path}")
            # Try to extract from filename as fallback
            scenario_id = file_path.stem

        # Generate summary
        summary = self.generate_summary(scenario_code)

        return scenario_id, summary

    def _extract_scenario_id(self, code: str) -> str | None:
        """Extract the scenario ID from scenario code.

        Args:
            code: The scenario Python code as a string

        Returns:
            The scenario ID or None if not found
        """
        # Look for @register_scenario decorator
        match = re.search(r'@register_scenario\(["\']([^"\']+)["\']\)', code)
        if match:
            return match.group(1).strip()
        return None

__init__(llm_engine)

Initialize the summary generating agent.

Parameters:

Name Type Description Default
llm_engine LLMEngine

The LLM engine to use for summary generation

required
Source code in pare/scenarios/generator/agent/summary_generating_agent.py
22
23
24
25
26
27
28
29
def __init__(self, llm_engine: LLMEngine) -> None:
    """Initialize the summary generating agent.

    Args:
        llm_engine: The LLM engine to use for summary generation
    """
    self.llm_engine = llm_engine
    self.system_prompt = DEFAULT_SUMMARY_GENERATOR_SYSTEM_PROMPT

generate_summary(scenario_code)

Generate a summary for the given scenario code.

Parameters:

Name Type Description Default
scenario_code str

The scenario Python code as a string

required

Returns:

Type Description
str | None

The generated summary or None if generation failed

Source code in pare/scenarios/generator/agent/summary_generating_agent.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def generate_summary(self, scenario_code: str) -> str | None:
    """Generate a summary for the given scenario code.

    Args:
        scenario_code: The scenario Python code as a string

    Returns:
        The generated summary or None if generation failed
    """
    # Create the task message
    task_message = SUMMARY_TASK_TEMPLATE.format(scenario_code=scenario_code)

    # Create messages for the LLM
    messages = [{"role": "system", "content": self.system_prompt}, {"role": "user", "content": task_message}]

    logger.info("Generating summary for scenario code...")

    try:
        # Call the LLM
        llm_output_tuple = self.llm_engine(
            messages, stop_sequences=[], additional_trace_tags=["scenario_summary_generation"], schema=None
        )
        if isinstance(llm_output_tuple, tuple) and len(llm_output_tuple) == 2:
            llm_output, _ = llm_output_tuple
        else:
            llm_output = llm_output_tuple

        if isinstance(llm_output, str):
            # Clean up the output - remove any code blocks or extra formatting
            summary = llm_output.strip()
            # Remove markdown code blocks if present
            summary = re.sub(r"```[a-z]*\n?", "", summary)
            summary = re.sub(r"```", "", summary)
            summary = summary.strip()

            # Remove common prefixes that LLMs might add
            prefixes_to_remove = ["Summary:", "The scenario", "This scenario", "Scenario summary:", "Summary"]
            for prefix in prefixes_to_remove:
                if summary.lower().startswith(prefix.lower()):
                    summary = summary[len(prefix) :].strip()
                    # Remove leading colon if present
                    if summary.startswith(":"):
                        summary = summary[1:].strip()
                    break

            logger.info(f"Generated summary: {summary[:100]}...")
            return summary
        else:
            logger.warning("LLM output is not a string")
            return None

    except Exception:
        logger.exception("Error generating summary")
        return None

generate_summary_from_file(file_path)

Generate a summary for a scenario file and extract its scenario ID.

Parameters:

Name Type Description Default
file_path Path | str

Path to the scenario Python file

required

Returns:

Type Description
tuple[str | None, str | None]

Tuple of (scenario_id, summary). Returns (None, None) if extraction/generation fails.

Source code in pare/scenarios/generator/agent/summary_generating_agent.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def generate_summary_from_file(self, file_path: Path | str) -> tuple[str | None, str | None]:
    """Generate a summary for a scenario file and extract its scenario ID.

    Args:
        file_path: Path to the scenario Python file

    Returns:
        Tuple of (scenario_id, summary). Returns (None, None) if extraction/generation fails.
    """
    file_path = Path(file_path)
    if not file_path.exists():
        logger.error(f"Scenario file not found: {file_path}")
        return None, None

    # Read the scenario code
    try:
        scenario_code = file_path.read_text(encoding="utf-8")
    except Exception:
        logger.exception(f"Failed to read scenario file {file_path}")
        return None, None

    # Extract scenario ID from the code
    scenario_id = self._extract_scenario_id(scenario_code)
    if not scenario_id:
        logger.warning(f"Could not extract scenario ID from {file_path}")
        # Try to extract from filename as fallback
        scenario_id = file_path.stem

    # Generate summary
    summary = self.generate_summary(scenario_code)

    return scenario_id, summary

Generator Utilities

Scenario initializing all PARE apps (stateful wrappers around Meta ARE apps).

ScenarioWithAllPAREApps

Bases: PAREScenario

Scenario with ALL PARE applications initialized.

Initializes all core applications defined under pare.apps, which provide stateful, navigation-aware wrappers around Meta-ARE applications.

Initialized apps include: - Core: PAREAgentUserInterface, HomeScreenSystemApp - Communication: StatefulEmailApp, StatefulMessagingApp - Organization: StatefulCalendarApp, StatefulContactsApp - Commerce & logistics: StatefulShoppingApp, StatefulCabApp - Housing: StatefulApartmentApp

Source code in pare/scenarios/generator/utils/apps_init_instructions.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@register_scenario("scenario_with_all_pare_apps")
class ScenarioWithAllPAREApps(PAREScenario):
    """Scenario with ALL PARE applications initialized.

    Initializes all core applications defined under `pare.apps`, which provide
    stateful, navigation-aware wrappers around Meta-ARE applications.

    Initialized apps include:
    - Core: PAREAgentUserInterface, HomeScreenSystemApp
    - Communication: StatefulEmailApp, StatefulMessagingApp
    - Organization: StatefulCalendarApp, StatefulContactsApp
    - Commerce & logistics: StatefulShoppingApp, StatefulCabApp
    - Housing: StatefulApartmentApp
    """

    start_time = datetime(2025, 11, 18, 9, 0, 0, tzinfo=UTC).timestamp()
    status = ScenarioStatus.Draft
    is_benchmark_ready = False

    def init_and_populate_apps(self, *args: Any, **kwargs: Any) -> None:
        """Initialize and populate applications with data."""
        # =============================================================================
        # PARE APPS
        # =============================================================================
        self.agui = PAREAgentUserInterface()  # Proactive agent-user interface
        self.system = HomeScreenSystemApp(name="System")  # PARE system app with navigation helpers

        # Communication apps
        self.email = StatefulEmailApp(name="Emails")
        self.messaging = StatefulMessagingApp(name="Messages")

        # Organization and productivity apps
        self.calendar = StatefulCalendarApp(name="Calendar")
        self.contacts = StatefulContactsApp(name="Contacts")

        # Commerce, transport, housing, and personal organization apps
        self.shopping = StatefulShoppingApp(name="Shopping")
        self.cab = StatefulCabApp(name="Cab")
        self.apartment = StatefulApartmentApp(name="Apartment")
        self.note = StatefulNotesApp(name="Notes")
        self.reminder = StatefulReminderApp(name="Reminders")

        # =============================================================================
        # REGISTER ALL INITIALIZED APPLICATIONS
        # =============================================================================
        self.apps = [
            # Core PARE apps
            self.agui,
            self.system,
            # Communication apps
            self.email,
            self.messaging,
            # Organization and productivity apps
            self.calendar,
            self.contacts,
            # Commerce & logistics
            self.shopping,
            self.cab,
            # Housing and personal organization
            self.apartment,
            self.note,
            self.reminder,
        ]

    def build_events_flow(self) -> None:
        """Build the flow of events for the scenario."""
        # This scenario serves as an initialization example, so no specific events are needed
        # All the work is done in init_and_populate_apps() where all apps are initialized
        aui = self.get_typed_app(PAREAgentUserInterface)
        email = self.get_typed_app(StatefulEmailApp, "Emails")
        calendar = self.get_typed_app(StatefulCalendarApp, "Calendar")
        messaging = self.get_typed_app(StatefulMessagingApp, "Messages")
        contacts = self.get_typed_app(StatefulContactsApp, "Contacts")

        self.events: list[Any] = []  # Empty events list since this is just an initialization scenario

    def validate(self, env: AbstractEnvironment) -> ScenarioValidationResult:
        """Validate that all applications are properly initialized."""
        try:
            # Check that we have the expected number of apps
            # Core (2) + communication (2) + org (2) + shopping + cab + apartment + note + reminder = 11
            expected_app_count = 11
            actual_app_count = len(self.apps)

            if actual_app_count != expected_app_count:
                return ScenarioValidationResult(
                    success=False, exception=ValueError(f"Expected {expected_app_count} apps, got {actual_app_count}")
                )

            # Check that all required app types are present
            app_types = {app.__class__.__name__ for app in self.apps}
            required_apps = {
                "PAREAgentUserInterface",
                "HomeScreenSystemApp",
                "StatefulEmailApp",
                "StatefulMessagingApp",
                "StatefulCalendarApp",
                "StatefulContactsApp",
                "StatefulShoppingApp",
                "StatefulCabApp",
                "StatefulApartmentApp",
                "StatefulNotesApp",
                "StatefulReminderApp",
            }

            missing_apps = required_apps - app_types
            if missing_apps:
                return ScenarioValidationResult(
                    success=False, exception=ValueError(f"Missing required apps: {missing_apps}")
                )

            # Check that we can get tools from all apps
            tools = self.get_tools()
            if not tools:
                return ScenarioValidationResult(
                    success=False, exception=ValueError("No tools available from initialized apps")
                )

            return ScenarioValidationResult(success=True)

        except Exception as exc:
            return ScenarioValidationResult(success=False, exception=exc)

build_events_flow()

Build the flow of events for the scenario.

Source code in pare/scenarios/generator/utils/apps_init_instructions.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
def build_events_flow(self) -> None:
    """Build the flow of events for the scenario."""
    # This scenario serves as an initialization example, so no specific events are needed
    # All the work is done in init_and_populate_apps() where all apps are initialized
    aui = self.get_typed_app(PAREAgentUserInterface)
    email = self.get_typed_app(StatefulEmailApp, "Emails")
    calendar = self.get_typed_app(StatefulCalendarApp, "Calendar")
    messaging = self.get_typed_app(StatefulMessagingApp, "Messages")
    contacts = self.get_typed_app(StatefulContactsApp, "Contacts")

    self.events: list[Any] = []  # Empty events list since this is just an initialization scenario

init_and_populate_apps(*args, **kwargs)

Initialize and populate applications with data.

Source code in pare/scenarios/generator/utils/apps_init_instructions.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def init_and_populate_apps(self, *args: Any, **kwargs: Any) -> None:
    """Initialize and populate applications with data."""
    # =============================================================================
    # PARE APPS
    # =============================================================================
    self.agui = PAREAgentUserInterface()  # Proactive agent-user interface
    self.system = HomeScreenSystemApp(name="System")  # PARE system app with navigation helpers

    # Communication apps
    self.email = StatefulEmailApp(name="Emails")
    self.messaging = StatefulMessagingApp(name="Messages")

    # Organization and productivity apps
    self.calendar = StatefulCalendarApp(name="Calendar")
    self.contacts = StatefulContactsApp(name="Contacts")

    # Commerce, transport, housing, and personal organization apps
    self.shopping = StatefulShoppingApp(name="Shopping")
    self.cab = StatefulCabApp(name="Cab")
    self.apartment = StatefulApartmentApp(name="Apartment")
    self.note = StatefulNotesApp(name="Notes")
    self.reminder = StatefulReminderApp(name="Reminders")

    # =============================================================================
    # REGISTER ALL INITIALIZED APPLICATIONS
    # =============================================================================
    self.apps = [
        # Core PARE apps
        self.agui,
        self.system,
        # Communication apps
        self.email,
        self.messaging,
        # Organization and productivity apps
        self.calendar,
        self.contacts,
        # Commerce & logistics
        self.shopping,
        self.cab,
        # Housing and personal organization
        self.apartment,
        self.note,
        self.reminder,
    ]

validate(env)

Validate that all applications are properly initialized.

Source code in pare/scenarios/generator/utils/apps_init_instructions.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def validate(self, env: AbstractEnvironment) -> ScenarioValidationResult:
    """Validate that all applications are properly initialized."""
    try:
        # Check that we have the expected number of apps
        # Core (2) + communication (2) + org (2) + shopping + cab + apartment + note + reminder = 11
        expected_app_count = 11
        actual_app_count = len(self.apps)

        if actual_app_count != expected_app_count:
            return ScenarioValidationResult(
                success=False, exception=ValueError(f"Expected {expected_app_count} apps, got {actual_app_count}")
            )

        # Check that all required app types are present
        app_types = {app.__class__.__name__ for app in self.apps}
        required_apps = {
            "PAREAgentUserInterface",
            "HomeScreenSystemApp",
            "StatefulEmailApp",
            "StatefulMessagingApp",
            "StatefulCalendarApp",
            "StatefulContactsApp",
            "StatefulShoppingApp",
            "StatefulCabApp",
            "StatefulApartmentApp",
            "StatefulNotesApp",
            "StatefulReminderApp",
        }

        missing_apps = required_apps - app_types
        if missing_apps:
            return ScenarioValidationResult(
                success=False, exception=ValueError(f"Missing required apps: {missing_apps}")
            )

        # Check that we can get tools from all apps
        tools = self.get_tools()
        if not tools:
            return ScenarioValidationResult(
                success=False, exception=ValueError("No tools available from initialized apps")
            )

        return ScenarioValidationResult(success=True)

    except Exception as exc:
        return ScenarioValidationResult(success=False, exception=exc)

compare_files(f1, f2, keep_strings=False, k=3)

Compare two files using multiple similarity metrics.

Source code in pare/scenarios/generator/utils/deduplicate_scenarios.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def compare_files(f1: str, f2: str, keep_strings: bool = False, k: int = 3) -> dict[str, float | int]:
    """Compare two files using multiple similarity metrics."""
    t1 = load_and_normalize(f1, keep_strings=keep_strings)
    t2 = load_and_normalize(f2, keep_strings=keep_strings)

    # Metric 1: difflib (good general-purpose edit similarity)
    sm_ratio = difflib_ratio(t1, t2)

    # Metric 2: Jaccard over token shingles (robust to minor edits)
    toks1, toks2 = tokens_from_text(t1), tokens_from_text(t2)
    sh1, sh2 = shingles(toks1, k=k), shingles(toks2, k=k)
    jac = jaccard(sh1, sh2)

    # Metric 3: Cosine over token frequency (bag-of-words style)
    # Note: This will often be high (>0.8) since scenarios share framework vocabulary
    # (API class names, method names, common identifiers). Use with caution for duplicate detection.
    c1, c2 = Counter(toks1), Counter(toks2)
    cos = cosine_counter(c1, c2)

    return {
        "difflib_ratio": sm_ratio,
        "jaccard_shingles": jac,
        "cosine_tokens": cos,
        "len_tokens_1": len(toks1),
        "len_tokens_2": len(toks2),
    }

cosine_counter(ca, cb)

Calculate cosine similarity between two Counter objects.

Source code in pare/scenarios/generator/utils/deduplicate_scenarios.py
81
82
83
84
85
86
87
88
89
def cosine_counter(ca: Counter[str], cb: Counter[str]) -> float:
    """Calculate cosine similarity between two Counter objects."""
    if not ca and not cb:
        return 1.0
    keys = set(ca) | set(cb)
    dot = sum(ca[k] * cb[k] for k in keys)
    na = sum(v * v for v in ca.values()) ** 0.5
    nb = sum(v * v for v in cb.values()) ** 0.5
    return 0.0 if na == 0 or nb == 0 else dot / (na * nb)

difflib_ratio(a, b)

Calculate edit similarity ratio using difflib.

Source code in pare/scenarios/generator/utils/deduplicate_scenarios.py
92
93
94
def difflib_ratio(a: str, b: str) -> float:
    """Calculate edit similarity ratio using difflib."""
    return difflib.SequenceMatcher(a=a, b=b, autojunk=False).ratio()

jaccard(a, b)

Calculate Jaccard similarity between two sets.

Source code in pare/scenarios/generator/utils/deduplicate_scenarios.py
72
73
74
75
76
77
78
def jaccard(a: set[str], b: set[str]) -> float:
    """Calculate Jaccard similarity between two sets."""
    if not a and not b:
        return 1.0
    inter = len(a & b)
    union = len(a | b) or 1
    return inter / union

load_and_normalize(path, keep_strings=False)

Read a Python file and return a normalized text.

  • remove comments and (optionally) strings/docstrings
  • collapse whitespace
Source code in pare/scenarios/generator/utils/deduplicate_scenarios.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def load_and_normalize(path: str, keep_strings: bool = False) -> str:
    """Read a Python file and return a normalized text.

    - remove comments and (optionally) strings/docstrings
    - collapse whitespace
    """
    with open(path, "rb") as f:
        tokens = tokenize.tokenize(f.readline)
        out = []
        prev_was_name = False
        for tok in tokens:
            ttype, tstr = tok.type, tok.string
            if ttype in (
                tokenize.COMMENT,
                tokenize.NL,
                tokenize.NEWLINE,
                tokenize.ENCODING,
                tokenize.INDENT,
                tokenize.DEDENT,
            ):
                continue
            if ttype == tokenize.STRING and not keep_strings:
                # skip string literals including docstrings
                continue
            # normalize identifiers & keywords spacing a bit
            if ttype == tokenize.NAME:
                if prev_was_name:
                    out.append(" ")
                out.append(tstr)
                prev_was_name = True
            else:
                out.append(tstr)
                prev_was_name = False
        text = "".join(out)
    # collapse whitespace to a single space for stability
    text = re.sub(r"\s+", " ", text).strip()
    return text

main()

Main function to detect near-duplicate scenario classes.

Note: Cosine similarity (cosine_tokens) will typically be high (>0.8) for scenarios using the same API framework, even when their content and objectives differ significantly. This is expected behavior since all scenarios share common framework vocabulary (class names, method names, API terms).

The scenario generation agent uses different thresholds for different metrics: - difflib_ratio ≥0.8 (structural similarity) - jaccard_shingles ≥0.8 (pattern similarity) - cosine_tokens ≥0.93 (vocabulary similarity, higher threshold due to framework overlap)

For detecting true duplicates, consider using difflib_ratio or jaccard_shingles with a lower threshold, or use the 'max' metric which triggers on any high score.

Source code in pare/scenarios/generator/utils/deduplicate_scenarios.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def main() -> None:
    """Main function to detect near-duplicate scenario classes.

    Note: Cosine similarity (cosine_tokens) will typically be high (>0.8) for scenarios
    using the same API framework, even when their content and objectives differ significantly.
    This is expected behavior since all scenarios share common framework vocabulary
    (class names, method names, API terms).

    The scenario generation agent uses different thresholds for different metrics:
    - difflib_ratio ≥0.8 (structural similarity)
    - jaccard_shingles ≥0.8 (pattern similarity)
    - cosine_tokens ≥0.93 (vocabulary similarity, higher threshold due to framework overlap)

    For detecting true duplicates, consider using difflib_ratio or jaccard_shingles
    with a lower threshold, or use the 'max' metric which triggers on any high score.
    """
    p = argparse.ArgumentParser(
        description="Detect near-duplicate scenario files using multiple similarity metrics.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python deduplicate_scenarios.py file1.py file2.py --threshold 0.8 --metric max
  python deduplicate_scenarios.py file1.py file2.py --metric difflib --threshold 0.85

Note: Cosine similarity often exceeds 0.8 for scenarios using the same API framework.
Consider using difflib or jaccard metrics for duplicate detection.
        """,
    )
    p.add_argument("file1")
    p.add_argument("file2")
    p.add_argument(
        "--threshold",
        type=float,
        default=0.85,
        help="Similarity threshold for flagging duplicates (default: 0.85). "
        "Note: cosine similarity often exceeds 0.8 for scenarios using the same API framework. "
        "The agent uses different thresholds: difflib/jaccard ≥0.8, cosine ≥0.93.",
    )
    p.add_argument(
        "--metric",
        choices=["difflib", "jaccard", "cosine", "max"],
        default="max",
        help="Which metric to use for the decision. 'max' = if any metric ≥ threshold.",
    )
    p.add_argument(
        "--keep-strings", action="store_true", help="Keep string literals/docstrings in similarity (off by default)."
    )
    p.add_argument("--k", type=int, default=3, help="Shingle size for Jaccard (default: 3).")
    args = p.parse_args()

    if not os.path.exists(args.file1) or not os.path.exists(args.file2):
        print("File not found.", file=sys.stderr)
        sys.exit(2)

    res = compare_files(args.file1, args.file2, keep_strings=args.keep_strings, k=args.k)
    print("=== Similarity Scores ===")
    print(f"difflib_ratio   : {res['difflib_ratio']:.4f}")
    print(f"jaccard_shingles: {res['jaccard_shingles']:.4f} (k={args.k})")
    print(f"cosine_tokens   : {res['cosine_tokens']:.4f}")
    print(f"len(tokens)     : {res['len_tokens_1']} vs {res['len_tokens_2']}")

    # decision
    if args.metric == "difflib":
        score = res["difflib_ratio"]
    elif args.metric == "jaccard":
        score = res["jaccard_shingles"]
    elif args.metric == "cosine":
        score = res["cosine_tokens"]
    else:  # max
        score = max(res["difflib_ratio"], res["jaccard_shingles"], res["cosine_tokens"])

    is_dup = score >= args.threshold
    print("\nDecision:")
    print(f"metric={args.metric} score={score:.4f} threshold={args.threshold:.2f}")
    print("=> NEAR-DUPLICATE ✅" if is_dup else "=> Different enough ❌")

shingles(tokens, k=3)

Generate k-gram token shingles from a list of tokens.

Source code in pare/scenarios/generator/utils/deduplicate_scenarios.py
65
66
67
68
69
def shingles(tokens: list[str], k: int = 3) -> set[str]:
    """Generate k-gram token shingles from a list of tokens."""
    if len(tokens) < k:
        return {" ".join(tokens)} if tokens else set()
    return {" ".join(tokens[i : i + k]) for i in range(len(tokens) - k + 1)}

tokens_from_text(text)

Extract identifiers and keywords from text.

  • identifiers & keywords only (ignore numbers and punctuation)

Note: This includes ALL identifiers including API class names, method names, and framework terms. Cosine similarity will be high for scenarios using the same API framework, even if their content differs significantly.

Source code in pare/scenarios/generator/utils/deduplicate_scenarios.py
53
54
55
56
57
58
59
60
61
62
def tokens_from_text(text: str) -> list[str]:
    """Extract identifiers and keywords from text.

    - identifiers & keywords only (ignore numbers and punctuation)

    Note: This includes ALL identifiers including API class names, method names,
    and framework terms. Cosine similarity will be high for scenarios using the
    same API framework, even if their content differs significantly.
    """
    return re.findall(r"[A-Za-z_][A-Za-z_0-9]*", text)