Skip to content

Agents API

Package Exports

ProactiveAgent

Proactive agent wrapper that manages observe and execute agents.

Observer agent is responsible for continuous monitoring of the environment and proposing goals to the user. Executer agent is responsible for executing the confirmed goal.

Source code in pare/agents/proactive/agent.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
class ProactiveAgent:
    """Proactive agent wrapper that manages observe and execute agents.

    Observer agent is responsible for continuous monitoring of the environment and proposing goals to the user.
    Executer agent is responsible for executing the confirmed goal.
    """

    def __init__(
        self,
        log_callback: Callable[[BaseAgentLog], None],
        pause_env: Callable[[], None] | None,
        resume_env: Callable[[float], None] | None,
        # ! FIXME: Observer agent doesn't need to be a base agent. A simple LLMEnginer call is enough. Infact observe agent should be a different protocol which users can extend to implement a rule based or statistical ML model based observer.
        observe_llm_engine: LLMEngine,
        observe_agent: BaseAgent,
        execute_llm_engine: LLMEngine,
        execute_agent: BaseAgent,
        time_manager: TimeManager,
        tools: list[Tool] | None = None,
        observe_max_iterations: int = 10,
        execute_max_iterations: int = 20,
        max_turns: int | None = None,
        simulated_generation_time_config: SimulatedGenerationTimeConfig | None = None,
    ) -> None:
        """Initializes the ProactiveAgent wrapper.

        Args:
            log_callback: Callback to log agent logs.
            pause_env: Callback to pause the environment.
            resume_env: Callback to resume the environment.
            observe_llm_engine: LLM engine to use for the observer agent.
            observe_agent: Observer agent to wrap.
            execute_llm_engine: LLM engine to use for the execute agent.
            execute_agent: Execute agent to wrap.
            time_manager: Time manager to use for the agent.
            tools: Tools to use for the agent.
            observe_max_iterations: Maximum number of iterations to run per turn for the observer agent.
            execute_max_iterations: Maximum number of iterations to run per turn for the execute agent.
            max_turns: Maximum number of turns to run.
            simulated_generation_time_config: Simulated generation time config to use for the agent.
        """
        # Wrapper Agent model arguments
        if tools is None:
            tools = []

        self.time_manager = time_manager
        self.max_turns = max_turns
        self.tools = tools
        self.observe_max_iterations = observe_max_iterations
        self.execute_max_iterations = execute_max_iterations
        self.mode = ProactiveAgentMode.OBSERVE
        self.pending_goal: str | None = None

        # Observer Agent arguments
        self.observe_llm_engine = observe_llm_engine
        self.observe_agent = observe_agent
        self.observe_agent.name = "observe_base_agent"
        self.observe_agent.max_iterations = observe_max_iterations
        self.observe_agent.llm_engine = self.observe_llm_engine
        self.observe_agent.time_manager = self.time_manager
        self.observe_agent.log_callback = log_callback
        self.observe_agent.role_dict = DEFAULT_PROACTIVE_STEP_2_ROLE
        self.observe_agent.message_dict = DEFAULT_PROACTIVE_STEP_2_MESSAGE
        self.observe_agent.termination_step = termination_step

        # Execute Agent arguments
        self.execute_llm_engine = execute_llm_engine
        self.execute_agent = execute_agent
        self.execute_agent.name = "execute_base_agent"
        self.execute_agent.max_iterations = execute_max_iterations
        self.execute_agent.llm_engine = self.execute_llm_engine
        self.execute_agent.time_manager = self.time_manager
        self.execute_agent.log_callback = log_callback
        self.execute_agent.role_dict = DEFAULT_PROACTIVE_STEP_2_ROLE
        self.execute_agent.message_dict = DEFAULT_PROACTIVE_STEP_2_MESSAGE
        self.execute_agent.termination_step = termination_step

        # Environment methods to handle simulation time.
        self.simulated_generation_time_config = simulated_generation_time_config
        self.pause_env = pause_env
        self.resume_env = resume_env
        self.observe_agent.simulated_generation_time_config = self.simulated_generation_time_config
        self.execute_agent.simulated_generation_time_config = self.simulated_generation_time_config

        # Tracks if both agents are initialized.
        self._initialized = False

    @property
    def agent_framework(self) -> str:
        """Name of the agent."""
        return "PAREProactiveAgent"

    @property
    def observe_model(self) -> str:
        """Name of the observe model."""
        return self.observe_llm_engine.model_name

    @property
    def execute_model(self) -> str:
        """Name of the execute model."""
        return self.execute_llm_engine.model_name

    def init_tools(self, scenario: Scenario) -> None:
        """Initialize the tools.

        Args:
            scenario: Scenario to initialize the tools for.
        """
        app_tools = self.remove_aui_irrelevant_tools(scenario.get_tools())
        logger.info(f"Found {len(app_tools)} tools: {[tool.name for tool in app_tools]}")

        are_simulation_tools = [AppToolAdapter(tool) for tool in app_tools]

        self.tools += are_simulation_tools

        observe_tool_names = ["PAREAgentUserInterface__wait", "PAREAgentUserInterface__send_message_to_user"]

        observe_tools: list[AppToolAdapter] = []
        execute_tools: list[AppToolAdapter] = []
        for tool in are_simulation_tools:
            if (
                tool.name in observe_tool_names
                or getattr(tool.app_tool.function, "__operation_type__", OperationType.READ) == OperationType.READ
            ):
                observe_tools.append(tool)
            if tool.name != "PAREAgentUserInterface__wait":
                execute_tools.append(tool)

        if len(observe_tools) == 0:
            raise ValueError("No observe tools found. The observe agent must have the send_message_to_user tool.")
        if len(execute_tools) == 0:
            raise ValueError(
                "No execute tools found. The execute agent must have at least the send_message_to_user tool."
            )
        self.observe_agent.tools = {tool.name: tool for tool in observe_tools}
        self.execute_agent.tools = {tool.name: tool for tool in execute_tools}

        logger.debug(f"Observe agent has {len(observe_tools)} tools: {[tool.name for tool in observe_tools]}")
        logger.debug(f"Execute agent has {len(execute_tools)} tools: {[tool.name for tool in execute_tools]}")

    def init_observe_system_prompt(self, scenario: Scenario) -> None:
        """Initialize the observe system prompt.

        Args:
            scenario: Scenario to initialize the observe system prompt for.
        """
        # ! NOTE: We don't need to check the additional system prompt here because that is meant for the user agent.
        notification_system_prompt = get_observe_notification_system_prompt(
            self.observe_agent.notification_system, scenario.apps
        )
        self.observe_agent.init_system_prompts["system_prompt"] = self.observe_agent.init_system_prompts[
            "system_prompt"
        ].replace("<<notification_system_description>>", notification_system_prompt)

        date_str = datetime.fromtimestamp(scenario.start_time or 0, tz=UTC).strftime("%Y-%m-%d %H")
        self.observe_agent.init_system_prompts["system_prompt"] = self.observe_agent.init_system_prompts[
            "system_prompt"
        ].replace("<<curent_time_description>>", f"Today's date in 'YYYY-MM-DD HH' format is {date_str}")

        self.observe_agent.init_system_prompts["system_prompt"] = self.observe_agent.init_system_prompts[
            "system_prompt"
        ].replace("<<agent_reminder_description>>", "")

    def init_execute_system_prompt(self, scenario: Scenario) -> None:
        """Initialize the execute system prompt.

        Args:
            scenario: Scenario to initialize the execute system prompt for.
        """
        notification_system_prompt = get_execute_notification_system_prompt(
            self.execute_agent.notification_system, scenario.apps
        )
        self.execute_agent.init_system_prompts["system_prompt"] = self.execute_agent.init_system_prompts[
            "system_prompt"
        ].replace("<<notification_system_description>>", notification_system_prompt)

        date_str = datetime.fromtimestamp(scenario.start_time or 0, tz=UTC).strftime("%Y-%m-%d %H")
        self.execute_agent.init_system_prompts["system_prompt"] = self.execute_agent.init_system_prompts[
            "system_prompt"
        ].replace("<<curent_time_description>>", f"Today's date in 'YYYY-MM-DD HH' format is {date_str}")

        self.execute_agent.init_system_prompts["system_prompt"] = self.execute_agent.init_system_prompts[
            "system_prompt"
        ].replace("<<agent_reminder_description>>", "")

    def init_notification_system(self, notification_system: BaseNotificationSystem) -> None:
        """Initialize the notification system.

        Args:
            notification_system: Notification system to initialize.
        """
        if notification_system is not None:
            logger.info(f"Setting notification system for Proactive Agent to provided one {notification_system}")
            self.observe_agent.notification_system = notification_system
            self.execute_agent.notification_system = notification_system

    def prepare_proactive_agent_run(
        self,
        scenario: Scenario,
        notification_system: BaseNotificationSystem | None = None,
        observe_agent_logs: list[BaseAgentLog] | None = None,
        execute_agent_logs: list[BaseAgentLog] | None = None,
    ) -> None:
        """Prepare the proactive agent run.

        Args:
            scenario: Scenario to run the turn for.
            notification_system: Notification system to use for the agent.
            observe_agent_logs: Initial agent logs to use for the observe agent.
            execute_agent_logs: Initial agent logs to use for the execute agent.
        """
        self.init_tools(scenario)
        self.init_notification_system(notification_system)
        self.init_observe_system_prompt(scenario)
        self.init_execute_system_prompt(scenario)
        # ! NOTE: We don't need to replay at all for our agents.
        if observe_agent_logs is not None and len(observe_agent_logs) > 0:
            self.observe_agent.replay(observe_agent_logs)
        if execute_agent_logs is not None and len(execute_agent_logs) > 0:
            self.execute_agent.replay(execute_agent_logs)

        if self.simulated_generation_time_config is not None and (self.pause_env is None or self.resume_env is None):
            raise RuntimeError(
                "Pause and resume environment functions must be provided if simulated generation time config is set"
            )
        self.observe_agent.pause_env = self.pause_env
        self.observe_agent.resume_env = self.resume_env
        self.execute_agent.pause_env = self.pause_env
        self.execute_agent.resume_env = self.resume_env
        self._initialized = True

    def remove_aui_irrelevant_tools(self, app_tools: list[AppTool]) -> list[AppTool]:
        """Remove irrelevant tools from the app tools.

        Args:
            app_tools: List of app tools.

        Returns:
            List of app tools.
        """
        aui_tool = next(tool for tool in app_tools if "PAREAgentUserInterface" in tool.name)

        if aui_tool is not None:
            aui: PAREAgentUserInterface = aui_tool.class_instance
            # We set this to True here because all the messages from the user are going to be received by the Agent as notifications
            # And thus handled as new tasks, instead of the Agent blocking when sending a message to the user waiting for a response.
            logger.warning("Setting wait_for_user_response to False in AgentUserInterface")
            aui.wait_for_user_response = False

            # Here we remove these tools, because all user messages will be injected to Agent
            # And thus he won't need to use these tools to get the messages.
            # FIXME: The name of the tools might be wrong here. Check in future.
            tools_to_remove = {
                "PAREAgentUserInterface__get_last_message_from_user",
                "PAREAgentUserInterface__get_last_message_from_agent",
                "PAREAgentUserInterface__get_last_unread_messages",
                "PAREAgentUserInterface__get_all_messages",
            }
            logger.warning(f"Removing tools {tools_to_remove} from app_tools")
            app_tools = [tool for tool in app_tools if tool.name not in tools_to_remove]
        return app_tools

    def get_notifications(self) -> tuple[list[Message], list[Message], list[Message]]:
        """Get new notifications from the custom state, NOT from the notification system.

        Notification system get_by_timestamp() method is destructive and removes messages from the queue. This creates an assymetric view of system notifications for the two agents.

        From the Proactive Agent perspective:
        - user_messages: messages sent by the user to the agent.
        - environment_notifications: Environment events like incoming email, messages etc.
        - env_stop_messages: Environment stop signals

        Returns:
            Tuple of (list of new user messages, list of new environment notifications, list of environment stop messages).
        """
        # new_messages = self.observe_agent.custom_state.get("notifications", [])
        new_messages = self.observe_agent.notification_system.message_queue.get_by_timestamp(
            timestamp=datetime.fromtimestamp(self.time_manager.time(), tz=UTC)
        )

        # Filter for USER_MESSAGE (User messages to Proactive Agent)
        # Note: We check against the string value to support both MessageType and PAREMessageType
        user_messages = [message for message in new_messages if message.message_type == PAREMessageType.USER_MESSAGE]

        env_notifications = [
            message
            for message in new_messages
            if message.message_type == PAREMessageType.ENVIRONMENT_NOTIFICATION_AGENT
        ]

        env_stop_messages = [
            message for message in new_messages if message.message_type == PAREMessageType.ENVIRONMENT_STOP
        ]
        # Reinsert the env notifications for user and agent + any extra messages back into the notification system.
        # This is important because the preprocessing step and the next agent will use the same notification system.
        # Here we don't need to reinsert the user messages because they are consumed while building the task.
        messages_to_put_back = [m for m in new_messages if m not in user_messages + env_stop_messages]

        logger.debug(
            f"Proactive agent get_notifications() -> message types to put back: {'; '.join([m.message_type.value for m in messages_to_put_back])}"
        )
        for message in messages_to_put_back:
            self.observe_agent.notification_system.message_queue.put(message)

        return user_messages, env_notifications, env_stop_messages

    def build_task_from_notifications(self, user_messages: list[Message]) -> str:
        """Build User Agent task from agent messages.

        User messages comes from the User Agent in the form of accept/reject responses.
        Environment notifications are handled separately by preprocessing step.

        Args:
            user_messages: List of user messages.

        Returns:
            Task string for the User Agent.
        """
        if len(user_messages) > 0:
            logger.debug(f"User Messages: {user_messages}")
        task = "\n".join([message.message for message in user_messages])
        return task

    def get_turn_ending_tool(self, agent: BaseAgent) -> dict[str, str | None]:
        """Get the turn ending tool from the agent logs.

        Args:
            agent: The agent to get the turn ending tool for.

        Returns:
            The turn ending tool name.
        """
        logs = agent.get_agent_logs()
        for log in reversed(logs):
            if isinstance(log, ToolCallLog):
                tool_name_lower = log.tool_name.lower()
                if "wait" in tool_name_lower:
                    return {"tool_name": "wait", "tool_arguments": None}
                elif "send_message_to_user" in tool_name_lower:
                    content = (
                        log.tool_arguments.get("content", "")
                        if isinstance(log.tool_arguments, dict)
                        else str(log.tool_arguments)
                    )
                    return {"tool_name": "send_message_to_user", "tool_arguments": content}
            if isinstance(log, TaskLog):
                break
        return {}

    def agent_loop(
        self,
        initial_task: str | None = None,
        reset: bool = True,
    ) -> str | MMObservation | None:
        """Execute one proactive agent turn either in observe or execute mode.

        Args:
            initial_task: Initial task to run the agent with.
            reset: Whether to reset the proactive agent.

        Returns:
            Result from the last agent turn execution.

        Raises:
            RuntimeError: If notification system is not set for either observe or execute agent.
        """
        if self.observe_agent.notification_system is None or self.execute_agent.notification_system is None:
            raise RuntimeError("Notification system not set for either observe or execute agent")

        # ? NOTE: Here also we need to put this notification in the custom state and not in the notification system.
        if initial_task is not None:
            self.observe_agent.custom_state.get("notifications", []).append(
                Message(
                    message_type=PAREMessageType.USER_MESSAGE,
                    message=initial_task,
                    timestamp=datetime.fromtimestamp(self.time_manager.time(), tz=UTC),
                )
            )

        logger.info("=" * 80)
        logger.info(f"Proactive-Agent Mode: {self.mode}")
        new_user_messages, new_env_notifications, env_stop_messages = self.get_notifications()
        logger.debug(f"New user messages: {new_user_messages}")
        logger.debug(f"New environment notifications: {new_env_notifications}")
        logger.debug(f"Environment stop messages: {env_stop_messages}")
        logger.info("=" * 80)

        if len(env_stop_messages) > 0:
            logger.warning(f"Environment stop message received - Stopping Agent: {env_stop_messages}")
            return None

        task = ""
        if self.mode == ProactiveAgentMode.OBSERVE:
            result = self._run_observe_mode(new_user_messages, new_env_notifications, reset)
            return result
        elif self.mode == ProactiveAgentMode.AWAITING_CONFIRMATION:
            accepted, _ = self._check_confirmation(new_user_messages)
            if accepted:
                self.mode = ProactiveAgentMode.EXECUTE
                return self._run_execute_mode(new_user_messages, new_env_notifications)
        elif self.mode == ProactiveAgentMode.EXECUTE:
            return self._run_execute_mode(new_user_messages, new_env_notifications)
        else:
            raise RuntimeError(f"Unknown mode: {self.mode}")
        time.sleep(2)
        return None

    def _check_confirmation(self, user_messages: list[Message]) -> tuple[bool, str | None]:
        """Check if user accepted or rejected the proposal.

        Args:
            user_messages: List of user messages.

        Returns:
            Tuple of (True, response) if user accepted the proposal, (False, None) if user rejected the proposal, (False, None) if response is unclear.
        """
        if not user_messages:
            logger.debug("No user response yet, still awaiting confirmation")
            return False, None

        response = user_messages[-1].message
        if "[ACCEPT]" in response:
            logger.info("User ACCEPTED the proposal")
            return True, response
        elif "[REJECT]" in response:
            logger.info("User REJECTED the proposal")
            self.mode = ProactiveAgentMode.OBSERVE
            return False, None
        else:
            logger.warning(f"Unclear response: {response}, still awaiting confirmation")
            return False, None

    def _run_observe_mode(
        self,
        user_messages: list[Message],
        env_notifications: list[Message],
        reset: bool = True,
    ) -> str | MMObservation | None:
        """Run the observe agent and check for proposals.

        Args:
            user_messages: List of user messages.
            env_notifications: List of environment notifications.
            reset: Whether to reset the observe agent.

        Returns:
            Result from the last agent turn execution.
        """
        # Reset the internal iterations counter, otherwise after first turn, the agent will exit. And if we increase the number of max_iterations, then the agent will take multiple turns.
        # ! FIXME: Find a better solution for this iterations issue.
        self.observe_agent.iterations = 0

        task = self.build_task_from_notifications(user_messages)
        attachments: list[Attachment] = [attachment for message in user_messages for attachment in message.attachments]
        result = self.observe_agent.run(
            task=task, hint=None, reset=reset, attachments=attachments if attachments else None
        )

        turn_end_reason = self.get_turn_ending_tool(self.observe_agent)
        if turn_end_reason.get("tool_name", "") == "send_message_to_user":
            logger.info(f"Proactive Agent sent a proposal: {turn_end_reason.get('tool_arguments', '')}")
            self.mode = ProactiveAgentMode.AWAITING_CONFIRMATION
            self.pending_goal = turn_end_reason.get("tool_arguments", "")
        elif turn_end_reason.get("tool_name", "") == "wait":
            logger.info("Proactive Agent waited for more information")

        return result

    def _run_execute_mode(
        self,
        user_messages: list[Message],
        env_notifications: list[Message],
    ) -> str | MMObservation | None:
        """Run the execute agent and check for completion.

        Args:
            user_messages: List of user messages.
            env_notifications: List of environment notifications.

        Returns:
            Result from the last agent turn execution.

        Raises:
            RuntimeError: If notification system is not set for either observe or execute agent.
        """
        if self.execute_agent.notification_system is None:
            raise RuntimeError("Notification system not set for execute agent")

        if not self.pending_goal:
            raise RuntimeError("Execute mode called without pending_goal")

        task = f"Proposed Goal: {self.pending_goal}"
        task += f"\nUser reply: {self.build_task_from_notifications(user_messages)}"
        attachments: list[Attachment] = [attachment for message in user_messages for attachment in message.attachments]
        self.execute_agent.initialize(attachments=attachments)
        result = self.execute_agent.run(task=task, hint=None, attachments=attachments)

        self.mode = ProactiveAgentMode.OBSERVE
        self.pending_goal = None

        return result

    # ==================== Metric Extraction Methods ====================

    def get_proposal_count(self) -> int:
        """Count the number of proposals made by the observe agent.

        Proposals are identified by calls to send_message_to_user tool.

        Returns:
            Number of proposals made.
        """
        count = 0
        for log in self.observe_agent.get_agent_logs():
            if isinstance(log, ToolCallLog) and "send_message_to_user" in log.tool_name.lower():
                count += 1
        return count

    def get_read_only_actions(self) -> int:
        """Count read-only actions from both observe and execute agents.

        Returns:
            Number of read-only tool calls.
        """
        count = 0
        for log in self.observe_agent.get_agent_logs():
            if isinstance(log, ToolCallLog):
                tool = self.observe_agent.tools.get(log.tool_name)
                # write_operation=False or None means read-only
                if tool is not None and not getattr(tool, "write_operation", False):
                    count += 1
        for log in self.execute_agent.get_agent_logs():
            if isinstance(log, ToolCallLog):
                tool = self.execute_agent.tools.get(log.tool_name)
                if tool is not None and not getattr(tool, "write_operation", False):
                    count += 1
        return count

    def get_write_actions(self) -> int:
        """Count write actions from both observe and execute agents.

        Returns:
            Number of write tool calls.
        """
        count = 0
        for log in self.observe_agent.get_agent_logs():
            if isinstance(log, ToolCallLog):
                tool = self.observe_agent.tools.get(log.tool_name)
                if tool is not None and getattr(tool, "write_operation", False):
                    count += 1
        for log in self.execute_agent.get_agent_logs():
            if isinstance(log, ToolCallLog):
                tool = self.execute_agent.tools.get(log.tool_name)
                if tool is not None and getattr(tool, "write_operation", False):
                    count += 1
        return count

agent_framework property

Name of the agent.

execute_model property

Name of the execute model.

observe_model property

Name of the observe model.

__init__(log_callback, pause_env, resume_env, observe_llm_engine, observe_agent, execute_llm_engine, execute_agent, time_manager, tools=None, observe_max_iterations=10, execute_max_iterations=20, max_turns=None, simulated_generation_time_config=None)

Initializes the ProactiveAgent wrapper.

Parameters:

Name Type Description Default
log_callback Callable[[BaseAgentLog], None]

Callback to log agent logs.

required
pause_env Callable[[], None] | None

Callback to pause the environment.

required
resume_env Callable[[float], None] | None

Callback to resume the environment.

required
observe_llm_engine LLMEngine

LLM engine to use for the observer agent.

required
observe_agent BaseAgent

Observer agent to wrap.

required
execute_llm_engine LLMEngine

LLM engine to use for the execute agent.

required
execute_agent BaseAgent

Execute agent to wrap.

required
time_manager TimeManager

Time manager to use for the agent.

required
tools list[Tool] | None

Tools to use for the agent.

None
observe_max_iterations int

Maximum number of iterations to run per turn for the observer agent.

10
execute_max_iterations int

Maximum number of iterations to run per turn for the execute agent.

20
max_turns int | None

Maximum number of turns to run.

None
simulated_generation_time_config SimulatedGenerationTimeConfig | None

Simulated generation time config to use for the agent.

None
Source code in pare/agents/proactive/agent.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def __init__(
    self,
    log_callback: Callable[[BaseAgentLog], None],
    pause_env: Callable[[], None] | None,
    resume_env: Callable[[float], None] | None,
    # ! FIXME: Observer agent doesn't need to be a base agent. A simple LLMEnginer call is enough. Infact observe agent should be a different protocol which users can extend to implement a rule based or statistical ML model based observer.
    observe_llm_engine: LLMEngine,
    observe_agent: BaseAgent,
    execute_llm_engine: LLMEngine,
    execute_agent: BaseAgent,
    time_manager: TimeManager,
    tools: list[Tool] | None = None,
    observe_max_iterations: int = 10,
    execute_max_iterations: int = 20,
    max_turns: int | None = None,
    simulated_generation_time_config: SimulatedGenerationTimeConfig | None = None,
) -> None:
    """Initializes the ProactiveAgent wrapper.

    Args:
        log_callback: Callback to log agent logs.
        pause_env: Callback to pause the environment.
        resume_env: Callback to resume the environment.
        observe_llm_engine: LLM engine to use for the observer agent.
        observe_agent: Observer agent to wrap.
        execute_llm_engine: LLM engine to use for the execute agent.
        execute_agent: Execute agent to wrap.
        time_manager: Time manager to use for the agent.
        tools: Tools to use for the agent.
        observe_max_iterations: Maximum number of iterations to run per turn for the observer agent.
        execute_max_iterations: Maximum number of iterations to run per turn for the execute agent.
        max_turns: Maximum number of turns to run.
        simulated_generation_time_config: Simulated generation time config to use for the agent.
    """
    # Wrapper Agent model arguments
    if tools is None:
        tools = []

    self.time_manager = time_manager
    self.max_turns = max_turns
    self.tools = tools
    self.observe_max_iterations = observe_max_iterations
    self.execute_max_iterations = execute_max_iterations
    self.mode = ProactiveAgentMode.OBSERVE
    self.pending_goal: str | None = None

    # Observer Agent arguments
    self.observe_llm_engine = observe_llm_engine
    self.observe_agent = observe_agent
    self.observe_agent.name = "observe_base_agent"
    self.observe_agent.max_iterations = observe_max_iterations
    self.observe_agent.llm_engine = self.observe_llm_engine
    self.observe_agent.time_manager = self.time_manager
    self.observe_agent.log_callback = log_callback
    self.observe_agent.role_dict = DEFAULT_PROACTIVE_STEP_2_ROLE
    self.observe_agent.message_dict = DEFAULT_PROACTIVE_STEP_2_MESSAGE
    self.observe_agent.termination_step = termination_step

    # Execute Agent arguments
    self.execute_llm_engine = execute_llm_engine
    self.execute_agent = execute_agent
    self.execute_agent.name = "execute_base_agent"
    self.execute_agent.max_iterations = execute_max_iterations
    self.execute_agent.llm_engine = self.execute_llm_engine
    self.execute_agent.time_manager = self.time_manager
    self.execute_agent.log_callback = log_callback
    self.execute_agent.role_dict = DEFAULT_PROACTIVE_STEP_2_ROLE
    self.execute_agent.message_dict = DEFAULT_PROACTIVE_STEP_2_MESSAGE
    self.execute_agent.termination_step = termination_step

    # Environment methods to handle simulation time.
    self.simulated_generation_time_config = simulated_generation_time_config
    self.pause_env = pause_env
    self.resume_env = resume_env
    self.observe_agent.simulated_generation_time_config = self.simulated_generation_time_config
    self.execute_agent.simulated_generation_time_config = self.simulated_generation_time_config

    # Tracks if both agents are initialized.
    self._initialized = False

agent_loop(initial_task=None, reset=True)

Execute one proactive agent turn either in observe or execute mode.

Parameters:

Name Type Description Default
initial_task str | None

Initial task to run the agent with.

None
reset bool

Whether to reset the proactive agent.

True

Returns:

Type Description
str | MMObservation | None

Result from the last agent turn execution.

Raises:

Type Description
RuntimeError

If notification system is not set for either observe or execute agent.

Source code in pare/agents/proactive/agent.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def agent_loop(
    self,
    initial_task: str | None = None,
    reset: bool = True,
) -> str | MMObservation | None:
    """Execute one proactive agent turn either in observe or execute mode.

    Args:
        initial_task: Initial task to run the agent with.
        reset: Whether to reset the proactive agent.

    Returns:
        Result from the last agent turn execution.

    Raises:
        RuntimeError: If notification system is not set for either observe or execute agent.
    """
    if self.observe_agent.notification_system is None or self.execute_agent.notification_system is None:
        raise RuntimeError("Notification system not set for either observe or execute agent")

    # ? NOTE: Here also we need to put this notification in the custom state and not in the notification system.
    if initial_task is not None:
        self.observe_agent.custom_state.get("notifications", []).append(
            Message(
                message_type=PAREMessageType.USER_MESSAGE,
                message=initial_task,
                timestamp=datetime.fromtimestamp(self.time_manager.time(), tz=UTC),
            )
        )

    logger.info("=" * 80)
    logger.info(f"Proactive-Agent Mode: {self.mode}")
    new_user_messages, new_env_notifications, env_stop_messages = self.get_notifications()
    logger.debug(f"New user messages: {new_user_messages}")
    logger.debug(f"New environment notifications: {new_env_notifications}")
    logger.debug(f"Environment stop messages: {env_stop_messages}")
    logger.info("=" * 80)

    if len(env_stop_messages) > 0:
        logger.warning(f"Environment stop message received - Stopping Agent: {env_stop_messages}")
        return None

    task = ""
    if self.mode == ProactiveAgentMode.OBSERVE:
        result = self._run_observe_mode(new_user_messages, new_env_notifications, reset)
        return result
    elif self.mode == ProactiveAgentMode.AWAITING_CONFIRMATION:
        accepted, _ = self._check_confirmation(new_user_messages)
        if accepted:
            self.mode = ProactiveAgentMode.EXECUTE
            return self._run_execute_mode(new_user_messages, new_env_notifications)
    elif self.mode == ProactiveAgentMode.EXECUTE:
        return self._run_execute_mode(new_user_messages, new_env_notifications)
    else:
        raise RuntimeError(f"Unknown mode: {self.mode}")
    time.sleep(2)
    return None

build_task_from_notifications(user_messages)

Build User Agent task from agent messages.

User messages comes from the User Agent in the form of accept/reject responses. Environment notifications are handled separately by preprocessing step.

Parameters:

Name Type Description Default
user_messages list[Message]

List of user messages.

required

Returns:

Type Description
str

Task string for the User Agent.

Source code in pare/agents/proactive/agent.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def build_task_from_notifications(self, user_messages: list[Message]) -> str:
    """Build User Agent task from agent messages.

    User messages comes from the User Agent in the form of accept/reject responses.
    Environment notifications are handled separately by preprocessing step.

    Args:
        user_messages: List of user messages.

    Returns:
        Task string for the User Agent.
    """
    if len(user_messages) > 0:
        logger.debug(f"User Messages: {user_messages}")
    task = "\n".join([message.message for message in user_messages])
    return task

get_notifications()

Get new notifications from the custom state, NOT from the notification system.

Notification system get_by_timestamp() method is destructive and removes messages from the queue. This creates an assymetric view of system notifications for the two agents.

From the Proactive Agent perspective: - user_messages: messages sent by the user to the agent. - environment_notifications: Environment events like incoming email, messages etc. - env_stop_messages: Environment stop signals

Returns:

Type Description
tuple[list[Message], list[Message], list[Message]]

Tuple of (list of new user messages, list of new environment notifications, list of environment stop messages).

Source code in pare/agents/proactive/agent.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def get_notifications(self) -> tuple[list[Message], list[Message], list[Message]]:
    """Get new notifications from the custom state, NOT from the notification system.

    Notification system get_by_timestamp() method is destructive and removes messages from the queue. This creates an assymetric view of system notifications for the two agents.

    From the Proactive Agent perspective:
    - user_messages: messages sent by the user to the agent.
    - environment_notifications: Environment events like incoming email, messages etc.
    - env_stop_messages: Environment stop signals

    Returns:
        Tuple of (list of new user messages, list of new environment notifications, list of environment stop messages).
    """
    # new_messages = self.observe_agent.custom_state.get("notifications", [])
    new_messages = self.observe_agent.notification_system.message_queue.get_by_timestamp(
        timestamp=datetime.fromtimestamp(self.time_manager.time(), tz=UTC)
    )

    # Filter for USER_MESSAGE (User messages to Proactive Agent)
    # Note: We check against the string value to support both MessageType and PAREMessageType
    user_messages = [message for message in new_messages if message.message_type == PAREMessageType.USER_MESSAGE]

    env_notifications = [
        message
        for message in new_messages
        if message.message_type == PAREMessageType.ENVIRONMENT_NOTIFICATION_AGENT
    ]

    env_stop_messages = [
        message for message in new_messages if message.message_type == PAREMessageType.ENVIRONMENT_STOP
    ]
    # Reinsert the env notifications for user and agent + any extra messages back into the notification system.
    # This is important because the preprocessing step and the next agent will use the same notification system.
    # Here we don't need to reinsert the user messages because they are consumed while building the task.
    messages_to_put_back = [m for m in new_messages if m not in user_messages + env_stop_messages]

    logger.debug(
        f"Proactive agent get_notifications() -> message types to put back: {'; '.join([m.message_type.value for m in messages_to_put_back])}"
    )
    for message in messages_to_put_back:
        self.observe_agent.notification_system.message_queue.put(message)

    return user_messages, env_notifications, env_stop_messages

get_proposal_count()

Count the number of proposals made by the observe agent.

Proposals are identified by calls to send_message_to_user tool.

Returns:

Type Description
int

Number of proposals made.

Source code in pare/agents/proactive/agent.py
567
568
569
570
571
572
573
574
575
576
577
578
579
def get_proposal_count(self) -> int:
    """Count the number of proposals made by the observe agent.

    Proposals are identified by calls to send_message_to_user tool.

    Returns:
        Number of proposals made.
    """
    count = 0
    for log in self.observe_agent.get_agent_logs():
        if isinstance(log, ToolCallLog) and "send_message_to_user" in log.tool_name.lower():
            count += 1
    return count

get_read_only_actions()

Count read-only actions from both observe and execute agents.

Returns:

Type Description
int

Number of read-only tool calls.

Source code in pare/agents/proactive/agent.py
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
def get_read_only_actions(self) -> int:
    """Count read-only actions from both observe and execute agents.

    Returns:
        Number of read-only tool calls.
    """
    count = 0
    for log in self.observe_agent.get_agent_logs():
        if isinstance(log, ToolCallLog):
            tool = self.observe_agent.tools.get(log.tool_name)
            # write_operation=False or None means read-only
            if tool is not None and not getattr(tool, "write_operation", False):
                count += 1
    for log in self.execute_agent.get_agent_logs():
        if isinstance(log, ToolCallLog):
            tool = self.execute_agent.tools.get(log.tool_name)
            if tool is not None and not getattr(tool, "write_operation", False):
                count += 1
    return count

get_turn_ending_tool(agent)

Get the turn ending tool from the agent logs.

Parameters:

Name Type Description Default
agent BaseAgent

The agent to get the turn ending tool for.

required

Returns:

Type Description
dict[str, str | None]

The turn ending tool name.

Source code in pare/agents/proactive/agent.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def get_turn_ending_tool(self, agent: BaseAgent) -> dict[str, str | None]:
    """Get the turn ending tool from the agent logs.

    Args:
        agent: The agent to get the turn ending tool for.

    Returns:
        The turn ending tool name.
    """
    logs = agent.get_agent_logs()
    for log in reversed(logs):
        if isinstance(log, ToolCallLog):
            tool_name_lower = log.tool_name.lower()
            if "wait" in tool_name_lower:
                return {"tool_name": "wait", "tool_arguments": None}
            elif "send_message_to_user" in tool_name_lower:
                content = (
                    log.tool_arguments.get("content", "")
                    if isinstance(log.tool_arguments, dict)
                    else str(log.tool_arguments)
                )
                return {"tool_name": "send_message_to_user", "tool_arguments": content}
        if isinstance(log, TaskLog):
            break
    return {}

get_write_actions()

Count write actions from both observe and execute agents.

Returns:

Type Description
int

Number of write tool calls.

Source code in pare/agents/proactive/agent.py
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
def get_write_actions(self) -> int:
    """Count write actions from both observe and execute agents.

    Returns:
        Number of write tool calls.
    """
    count = 0
    for log in self.observe_agent.get_agent_logs():
        if isinstance(log, ToolCallLog):
            tool = self.observe_agent.tools.get(log.tool_name)
            if tool is not None and getattr(tool, "write_operation", False):
                count += 1
    for log in self.execute_agent.get_agent_logs():
        if isinstance(log, ToolCallLog):
            tool = self.execute_agent.tools.get(log.tool_name)
            if tool is not None and getattr(tool, "write_operation", False):
                count += 1
    return count

init_execute_system_prompt(scenario)

Initialize the execute system prompt.

Parameters:

Name Type Description Default
scenario Scenario

Scenario to initialize the execute system prompt for.

required
Source code in pare/agents/proactive/agent.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def init_execute_system_prompt(self, scenario: Scenario) -> None:
    """Initialize the execute system prompt.

    Args:
        scenario: Scenario to initialize the execute system prompt for.
    """
    notification_system_prompt = get_execute_notification_system_prompt(
        self.execute_agent.notification_system, scenario.apps
    )
    self.execute_agent.init_system_prompts["system_prompt"] = self.execute_agent.init_system_prompts[
        "system_prompt"
    ].replace("<<notification_system_description>>", notification_system_prompt)

    date_str = datetime.fromtimestamp(scenario.start_time or 0, tz=UTC).strftime("%Y-%m-%d %H")
    self.execute_agent.init_system_prompts["system_prompt"] = self.execute_agent.init_system_prompts[
        "system_prompt"
    ].replace("<<curent_time_description>>", f"Today's date in 'YYYY-MM-DD HH' format is {date_str}")

    self.execute_agent.init_system_prompts["system_prompt"] = self.execute_agent.init_system_prompts[
        "system_prompt"
    ].replace("<<agent_reminder_description>>", "")

init_notification_system(notification_system)

Initialize the notification system.

Parameters:

Name Type Description Default
notification_system BaseNotificationSystem

Notification system to initialize.

required
Source code in pare/agents/proactive/agent.py
248
249
250
251
252
253
254
255
256
257
def init_notification_system(self, notification_system: BaseNotificationSystem) -> None:
    """Initialize the notification system.

    Args:
        notification_system: Notification system to initialize.
    """
    if notification_system is not None:
        logger.info(f"Setting notification system for Proactive Agent to provided one {notification_system}")
        self.observe_agent.notification_system = notification_system
        self.execute_agent.notification_system = notification_system

init_observe_system_prompt(scenario)

Initialize the observe system prompt.

Parameters:

Name Type Description Default
scenario Scenario

Scenario to initialize the observe system prompt for.

required
Source code in pare/agents/proactive/agent.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def init_observe_system_prompt(self, scenario: Scenario) -> None:
    """Initialize the observe system prompt.

    Args:
        scenario: Scenario to initialize the observe system prompt for.
    """
    # ! NOTE: We don't need to check the additional system prompt here because that is meant for the user agent.
    notification_system_prompt = get_observe_notification_system_prompt(
        self.observe_agent.notification_system, scenario.apps
    )
    self.observe_agent.init_system_prompts["system_prompt"] = self.observe_agent.init_system_prompts[
        "system_prompt"
    ].replace("<<notification_system_description>>", notification_system_prompt)

    date_str = datetime.fromtimestamp(scenario.start_time or 0, tz=UTC).strftime("%Y-%m-%d %H")
    self.observe_agent.init_system_prompts["system_prompt"] = self.observe_agent.init_system_prompts[
        "system_prompt"
    ].replace("<<curent_time_description>>", f"Today's date in 'YYYY-MM-DD HH' format is {date_str}")

    self.observe_agent.init_system_prompts["system_prompt"] = self.observe_agent.init_system_prompts[
        "system_prompt"
    ].replace("<<agent_reminder_description>>", "")

init_tools(scenario)

Initialize the tools.

Parameters:

Name Type Description Default
scenario Scenario

Scenario to initialize the tools for.

required
Source code in pare/agents/proactive/agent.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def init_tools(self, scenario: Scenario) -> None:
    """Initialize the tools.

    Args:
        scenario: Scenario to initialize the tools for.
    """
    app_tools = self.remove_aui_irrelevant_tools(scenario.get_tools())
    logger.info(f"Found {len(app_tools)} tools: {[tool.name for tool in app_tools]}")

    are_simulation_tools = [AppToolAdapter(tool) for tool in app_tools]

    self.tools += are_simulation_tools

    observe_tool_names = ["PAREAgentUserInterface__wait", "PAREAgentUserInterface__send_message_to_user"]

    observe_tools: list[AppToolAdapter] = []
    execute_tools: list[AppToolAdapter] = []
    for tool in are_simulation_tools:
        if (
            tool.name in observe_tool_names
            or getattr(tool.app_tool.function, "__operation_type__", OperationType.READ) == OperationType.READ
        ):
            observe_tools.append(tool)
        if tool.name != "PAREAgentUserInterface__wait":
            execute_tools.append(tool)

    if len(observe_tools) == 0:
        raise ValueError("No observe tools found. The observe agent must have the send_message_to_user tool.")
    if len(execute_tools) == 0:
        raise ValueError(
            "No execute tools found. The execute agent must have at least the send_message_to_user tool."
        )
    self.observe_agent.tools = {tool.name: tool for tool in observe_tools}
    self.execute_agent.tools = {tool.name: tool for tool in execute_tools}

    logger.debug(f"Observe agent has {len(observe_tools)} tools: {[tool.name for tool in observe_tools]}")
    logger.debug(f"Execute agent has {len(execute_tools)} tools: {[tool.name for tool in execute_tools]}")

prepare_proactive_agent_run(scenario, notification_system=None, observe_agent_logs=None, execute_agent_logs=None)

Prepare the proactive agent run.

Parameters:

Name Type Description Default
scenario Scenario

Scenario to run the turn for.

required
notification_system BaseNotificationSystem | None

Notification system to use for the agent.

None
observe_agent_logs list[BaseAgentLog] | None

Initial agent logs to use for the observe agent.

None
execute_agent_logs list[BaseAgentLog] | None

Initial agent logs to use for the execute agent.

None
Source code in pare/agents/proactive/agent.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def prepare_proactive_agent_run(
    self,
    scenario: Scenario,
    notification_system: BaseNotificationSystem | None = None,
    observe_agent_logs: list[BaseAgentLog] | None = None,
    execute_agent_logs: list[BaseAgentLog] | None = None,
) -> None:
    """Prepare the proactive agent run.

    Args:
        scenario: Scenario to run the turn for.
        notification_system: Notification system to use for the agent.
        observe_agent_logs: Initial agent logs to use for the observe agent.
        execute_agent_logs: Initial agent logs to use for the execute agent.
    """
    self.init_tools(scenario)
    self.init_notification_system(notification_system)
    self.init_observe_system_prompt(scenario)
    self.init_execute_system_prompt(scenario)
    # ! NOTE: We don't need to replay at all for our agents.
    if observe_agent_logs is not None and len(observe_agent_logs) > 0:
        self.observe_agent.replay(observe_agent_logs)
    if execute_agent_logs is not None and len(execute_agent_logs) > 0:
        self.execute_agent.replay(execute_agent_logs)

    if self.simulated_generation_time_config is not None and (self.pause_env is None or self.resume_env is None):
        raise RuntimeError(
            "Pause and resume environment functions must be provided if simulated generation time config is set"
        )
    self.observe_agent.pause_env = self.pause_env
    self.observe_agent.resume_env = self.resume_env
    self.execute_agent.pause_env = self.pause_env
    self.execute_agent.resume_env = self.resume_env
    self._initialized = True

remove_aui_irrelevant_tools(app_tools)

Remove irrelevant tools from the app tools.

Parameters:

Name Type Description Default
app_tools list[AppTool]

List of app tools.

required

Returns:

Type Description
list[AppTool]

List of app tools.

Source code in pare/agents/proactive/agent.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def remove_aui_irrelevant_tools(self, app_tools: list[AppTool]) -> list[AppTool]:
    """Remove irrelevant tools from the app tools.

    Args:
        app_tools: List of app tools.

    Returns:
        List of app tools.
    """
    aui_tool = next(tool for tool in app_tools if "PAREAgentUserInterface" in tool.name)

    if aui_tool is not None:
        aui: PAREAgentUserInterface = aui_tool.class_instance
        # We set this to True here because all the messages from the user are going to be received by the Agent as notifications
        # And thus handled as new tasks, instead of the Agent blocking when sending a message to the user waiting for a response.
        logger.warning("Setting wait_for_user_response to False in AgentUserInterface")
        aui.wait_for_user_response = False

        # Here we remove these tools, because all user messages will be injected to Agent
        # And thus he won't need to use these tools to get the messages.
        # FIXME: The name of the tools might be wrong here. Check in future.
        tools_to_remove = {
            "PAREAgentUserInterface__get_last_message_from_user",
            "PAREAgentUserInterface__get_last_message_from_agent",
            "PAREAgentUserInterface__get_last_unread_messages",
            "PAREAgentUserInterface__get_all_messages",
        }
        logger.warning(f"Removing tools {tools_to_remove} from app_tools")
        app_tools = [tool for tool in app_tools if tool.name not in tools_to_remove]
    return app_tools

UserAgent

User agent wrapper that simulates realistic user behavior on a mobile phone.

Wraps a Meta-ARE BaseAgent and configured for single-action turns. Manages notification polling, task building and tool refreshing. Based on meta-are/are/simulation/agents/default_agent/are_simulation_main.py

Source code in pare/agents/user/agent.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
class UserAgent:
    """User agent wrapper that simulates realistic user behavior on a mobile phone.

    Wraps a Meta-ARE BaseAgent and configured for single-action turns.
    Manages notification polling, task building and tool refreshing.
    Based on meta-are/are/simulation/agents/default_agent/are_simulation_main.py
    """

    def __init__(
        self,
        log_callback: Callable[[BaseAgentLog], None],
        pause_env: Callable[[], None] | None,
        resume_env: Callable[[float], None] | None,
        llm_engine: LLMEngine,
        base_agent: BaseAgent,
        time_manager: TimeManager,
        max_iterations: int = 1,  # We want to run one agent turn per step for user
        max_turns: int | None = None,
        simulated_generation_time_config: SimulatedGenerationTimeConfig | None = None,
    ) -> None:
        """Initializes the UserAgent wrapper.

        Args:
            log_callback: Callback to log agent logs.
            pause_env: Callback to pause the environment.
            resume_env: Callback to resume the environment.
            llm_engine: LLM engine to use for the agent.
            base_agent: Base agent to wrap.
            time_manager: Time manager to use for the agent.
            max_iterations: Maximum number of iterations to run per turn.
            max_turns: Maximum number of turns to run.
            simulated_generation_time_config: Simulated generation time config to use for the agent.
        """
        # Wrapper Agent model arguments
        self.llm_engine = llm_engine
        self.time_manager = time_manager
        self.max_iterations = max_iterations
        self.max_turns = max_turns
        self.tools: list[AppTool] | None = None

        # Built React Agent arguments
        self.react_agent = base_agent
        self.react_agent.name = "user_base_agent"
        self.react_agent.llm_engine = self.llm_engine
        self.react_agent.time_manager = self.time_manager
        self.react_agent.max_iterations = self.max_iterations
        self.react_agent.log_callback = log_callback
        self.react_agent.role_dict = DEFAULT_USER_STEP_2_ROLE
        self.react_agent.message_dict = DEFAULT_USER_STEP_2_MESSAGE

        # Environment methods to handle simulation time.
        self.simulated_generation_time_config = simulated_generation_time_config
        self.pause_env = pause_env
        self.resume_env = resume_env
        self.react_agent.simulated_generation_time_config = self.simulated_generation_time_config

        # ! NO SUB-AGENTS SUPPORTED YET.

        self._initialized = False

    @property
    def agent_framework(self) -> str:
        """Name of the agent."""
        return "PAREUserAgent"

    @property
    def model(self) -> str:
        """Name of the model."""
        return self.llm_engine.model_name

    def init_tools(self, tools: list[AppTool]) -> None:
        """Initialize the tools.

        Args:
            tools: Tools to initialize.
        """
        user_tools = self.remove_aui_irrelevant_tools(tools)
        are_simulation_tools = [AppToolAdapter(tool) for tool in user_tools]
        self.tools = are_simulation_tools
        self.react_agent.tools = {tool.name: tool for tool in self.tools}
        self.react_agent.init_tools()
        logger.debug(f"Initialized {len(self.tools)} tools: {[tool.name for tool in self.tools]}")

    def remove_aui_irrelevant_tools(self, tools: list[AppTool]) -> list[AppTool]:
        """Remove irrelevant tools from the tools.

        Args:
            tools: Tools to remove irrelevant tools from.

        Returns:
            List of tools.
        """
        return [
            tool
            for tool in tools
            if tool.name
            not in [
                "PAREAgentUserInterface__send_message_to_agent",
            ]
        ]

    def init_system_prompt(self, scenario: Scenario) -> None:
        """Initialize the system prompt.

        Args:
            scenario: Scenario to initialize the system prompt for.
        """
        additional_system_prompt = scenario.additional_system_prompt
        logger.debug(f"Additional System Prompt: {additional_system_prompt}")

        task_description = additional_system_prompt if additional_system_prompt is not None else ""
        self.react_agent.init_system_prompts["system_prompt"] = self.react_agent.init_system_prompts[
            "system_prompt"
        ].replace("<<task_description>>", task_description)

        notification_system_prompt = get_notification_system_prompt(self.react_agent.notification_system, scenario.apps)
        self.react_agent.init_system_prompts["system_prompt"] = self.react_agent.init_system_prompts[
            "system_prompt"
        ].replace("<<notification_system_description>>", notification_system_prompt)

        date_str = datetime.fromtimestamp(scenario.start_time or 0, tz=UTC).strftime("%Y-%m-%d %H")

        self.react_agent.init_system_prompts["system_prompt"] = self.react_agent.init_system_prompts[
            "system_prompt"
        ].replace("<<curent_time_description>>", f"Today's date in 'YYYY-MM-DD HH' format is {date_str}")

        self.react_agent.init_system_prompts["system_prompt"] = self.react_agent.init_system_prompts[
            "system_prompt"
        ].replace("<<agent_reminder_description>>", "")

        available_apps = format_available_apps(scenario.apps)
        self.react_agent.init_system_prompts["system_prompt"] = self.react_agent.init_system_prompts[
            "system_prompt"
        ].replace("<<available_apps>>", available_apps)

    def init_notification_system(self, notification_system: BaseNotificationSystem) -> None:
        """Initialize the notification system.

        Args:
            notification_system: Notification system to initialize.
        """
        if notification_system is not None:
            logger.debug(f"Setting notification system for User Agent to provided one {notification_system}")
            self.react_agent.notification_system = notification_system

    def prepare_user_agent_run(
        self,
        scenario: Scenario,
        notification_system: BaseNotificationSystem | None = None,
        initial_agent_logs: list[BaseAgentLog] | None = None,
    ) -> None:
        """Prepare the user agent run.

        Args:
            scenario: Scenario to run the turn for.
            notification_system: Notification system to use for the agent.
            initial_agent_logs: Initial agent logs to use for the agent.

        Raises:
            Exception: If pause and resume environment functions are not provided if simulated generation time config is set.
        """
        self.init_tools(scenario.get_user_tools())
        self.init_notification_system(notification_system)
        self.init_system_prompt(scenario)
        # ! NOTE: We don't need to replay at all for our agents.
        # Sync the base agent time manager
        if initial_agent_logs is not None and len(initial_agent_logs) > 0:
            self.react_agent.replay(initial_agent_logs)

        # Pause/resume env functions
        if self.simulated_generation_time_config is not None and (self.pause_env is None or self.resume_env is None):
            raise RuntimeError(
                "Pause and resume environment functions must be provided if simulated generation time config is set"
            )
        self.react_agent.pause_env = self.pause_env
        self.react_agent.resume_env = self.resume_env
        self._initialized = True

    def get_notifications(self) -> tuple[list[Message], list[Message], list[Message]]:
        """Get ENVIRONMENT_STOP notifications from the notification system.

        Returns:
            Tuple of (list of agent messages, list of environment notifications, list of environment stop messages).
        """
        # new_messages = self.react_agent.custom_state.get("notifications", [])
        new_messages = self.react_agent.notification_system.message_queue.get_by_timestamp(
            timestamp=datetime.fromtimestamp(self.time_manager.time(), tz=UTC)
        )

        agent_messages = [message for message in new_messages if message.message_type == PAREMessageType.AGENT_MESSAGE]
        env_notifications = [
            message for message in new_messages if message.message_type == PAREMessageType.ENVIRONMENT_NOTIFICATION_USER
        ]
        env_stop_messages = [
            message for message in new_messages if message.message_type == PAREMessageType.ENVIRONMENT_STOP
        ]

        # Reinsert the env notifications for user and agent and any extra messages back into the notification system.
        # This is important because the preprocessing step and the next agent will use the same notification system.
        # ! NOTE: DO NOT REINSERT THE AGENT MESSAGES AS THEY HAVE BEEN HANDLED USING TASK LOG.
        messages_to_put_back = [m for m in new_messages if m not in env_stop_messages and m not in agent_messages]

        for message in messages_to_put_back:
            self.react_agent.notification_system.message_queue.put(message)

        return agent_messages, env_notifications, env_stop_messages

    def build_task_from_notifications(self, agent_messages: list[Message]) -> str:
        """Build User Agent task from agent messages.

        User messages comes from the User Agent in the form of accept/reject responses.
        Environment notifications are handled separately by preprocessing step.

        Args:
            agent_messages: List of agent messages.

        Returns:
            Task string for the User Agent.
        """
        if len(agent_messages) > 0:
            logger.debug(f"Agent Messages: {agent_messages}")
        task = "\n".join([message.message for message in agent_messages])
        return task

    def agent_loop(
        self,
        current_tools: list[AppTool],
        current_app: App | None = None,
        current_state: AppState | None = None,
        reset: bool = True,
    ) -> str | MMObservation | None:
        """Execute one user agent turn.

        This is completely synchronous function where the agent runs on notifications.

        Args:
            current_tools: Current tools to use for the turn.
            current_app: Current active app in the environment for the user.
            current_state: Current active state in the active app for the user.
            reset: Whether to reset the user agent.

        Returns:
            Result from the last agent turn execution.

        Raises:
            RuntimeError: If user agent is not initialized or notification system is not set.
        """
        result = ""

        if not self._initialized:
            raise RuntimeError("User agent must be initialized before running a turn.")

        if self.react_agent.notification_system is None:
            raise RuntimeError("Notification system not set")

        # inject current app and state into the custom state of the react agent.
        self.react_agent.custom_state["current_app"] = current_app
        self.react_agent.custom_state["current_state"] = current_state
        # Reset the internal iterations counter, otherwise after first turn, the agent will exit. And if we increase the number of max_iterations, then the agent will take multiple turns.
        # ! FIXME: Find a better solution for this iterations issue.
        self.react_agent.iterations = 0

        self.init_tools(current_tools)

        agent_messages, _, env_stop_messages = self.get_notifications()
        task = self.build_task_from_notifications(agent_messages)
        attachments: list[Attachment] = [attachment for message in agent_messages for attachment in message.attachments]
        if len(env_stop_messages) > 0:
            logger.warning(f"Environment stop message received - Stopping User Agent: {env_stop_messages}")
            return result

        # User will take action no matter what.
        result = self.react_agent.run(
            task=task, hint=None, reset=reset, attachments=attachments if attachments else None
        )
        running_state = self.react_agent.custom_state.get("running_state", None)
        if running_state == RunningState.FAILED:
            agent_logs = self.react_agent.get_agent_logs()
            error_message = f"Last User Agent log: {agent_logs[-1]}" if len(agent_logs) > 0 else "No User Agent logs"
            raise RuntimeError(f"User agent failed. {error_message}")

        return result

    # ==================== Metric Extraction Methods ====================

    def get_acceptance_count(self) -> int:
        """Count the number of proposal acceptances from the user.

        Acceptances are identified by calls to accept_proposal tool.

        Returns:
            Number of acceptances.
        """
        count = 0
        for log in self.react_agent.get_agent_logs():
            if isinstance(log, ToolCallLog) and "accept_proposal" in log.tool_name.lower():
                count += 1
        return count

    def get_read_only_actions(self) -> int:
        """Count read-only actions from the user agent.

        Returns:
            Number of read-only tool calls.
        """
        count = 0
        for log in self.react_agent.get_agent_logs():
            if isinstance(log, ToolCallLog):
                tool = self.react_agent.tools.get(log.tool_name)
                # write_operation=False or None means read-only
                if tool is not None and not getattr(tool, "write_operation", False):
                    count += 1
        return count

    def get_write_actions(self) -> int:
        """Count write actions from the user agent.

        Returns:
            Number of write tool calls.
        """
        count = 0
        for log in self.react_agent.get_agent_logs():
            if isinstance(log, ToolCallLog):
                tool = self.react_agent.tools.get(log.tool_name)
                if tool is not None and getattr(tool, "write_operation", False):
                    count += 1
        return count

agent_framework property

Name of the agent.

model property

Name of the model.

__init__(log_callback, pause_env, resume_env, llm_engine, base_agent, time_manager, max_iterations=1, max_turns=None, simulated_generation_time_config=None)

Initializes the UserAgent wrapper.

Parameters:

Name Type Description Default
log_callback Callable[[BaseAgentLog], None]

Callback to log agent logs.

required
pause_env Callable[[], None] | None

Callback to pause the environment.

required
resume_env Callable[[float], None] | None

Callback to resume the environment.

required
llm_engine LLMEngine

LLM engine to use for the agent.

required
base_agent BaseAgent

Base agent to wrap.

required
time_manager TimeManager

Time manager to use for the agent.

required
max_iterations int

Maximum number of iterations to run per turn.

1
max_turns int | None

Maximum number of turns to run.

None
simulated_generation_time_config SimulatedGenerationTimeConfig | None

Simulated generation time config to use for the agent.

None
Source code in pare/agents/user/agent.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def __init__(
    self,
    log_callback: Callable[[BaseAgentLog], None],
    pause_env: Callable[[], None] | None,
    resume_env: Callable[[float], None] | None,
    llm_engine: LLMEngine,
    base_agent: BaseAgent,
    time_manager: TimeManager,
    max_iterations: int = 1,  # We want to run one agent turn per step for user
    max_turns: int | None = None,
    simulated_generation_time_config: SimulatedGenerationTimeConfig | None = None,
) -> None:
    """Initializes the UserAgent wrapper.

    Args:
        log_callback: Callback to log agent logs.
        pause_env: Callback to pause the environment.
        resume_env: Callback to resume the environment.
        llm_engine: LLM engine to use for the agent.
        base_agent: Base agent to wrap.
        time_manager: Time manager to use for the agent.
        max_iterations: Maximum number of iterations to run per turn.
        max_turns: Maximum number of turns to run.
        simulated_generation_time_config: Simulated generation time config to use for the agent.
    """
    # Wrapper Agent model arguments
    self.llm_engine = llm_engine
    self.time_manager = time_manager
    self.max_iterations = max_iterations
    self.max_turns = max_turns
    self.tools: list[AppTool] | None = None

    # Built React Agent arguments
    self.react_agent = base_agent
    self.react_agent.name = "user_base_agent"
    self.react_agent.llm_engine = self.llm_engine
    self.react_agent.time_manager = self.time_manager
    self.react_agent.max_iterations = self.max_iterations
    self.react_agent.log_callback = log_callback
    self.react_agent.role_dict = DEFAULT_USER_STEP_2_ROLE
    self.react_agent.message_dict = DEFAULT_USER_STEP_2_MESSAGE

    # Environment methods to handle simulation time.
    self.simulated_generation_time_config = simulated_generation_time_config
    self.pause_env = pause_env
    self.resume_env = resume_env
    self.react_agent.simulated_generation_time_config = self.simulated_generation_time_config

    # ! NO SUB-AGENTS SUPPORTED YET.

    self._initialized = False

agent_loop(current_tools, current_app=None, current_state=None, reset=True)

Execute one user agent turn.

This is completely synchronous function where the agent runs on notifications.

Parameters:

Name Type Description Default
current_tools list[AppTool]

Current tools to use for the turn.

required
current_app App | None

Current active app in the environment for the user.

None
current_state AppState | None

Current active state in the active app for the user.

None
reset bool

Whether to reset the user agent.

True

Returns:

Type Description
str | MMObservation | None

Result from the last agent turn execution.

Raises:

Type Description
RuntimeError

If user agent is not initialized or notification system is not set.

Source code in pare/agents/user/agent.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
def agent_loop(
    self,
    current_tools: list[AppTool],
    current_app: App | None = None,
    current_state: AppState | None = None,
    reset: bool = True,
) -> str | MMObservation | None:
    """Execute one user agent turn.

    This is completely synchronous function where the agent runs on notifications.

    Args:
        current_tools: Current tools to use for the turn.
        current_app: Current active app in the environment for the user.
        current_state: Current active state in the active app for the user.
        reset: Whether to reset the user agent.

    Returns:
        Result from the last agent turn execution.

    Raises:
        RuntimeError: If user agent is not initialized or notification system is not set.
    """
    result = ""

    if not self._initialized:
        raise RuntimeError("User agent must be initialized before running a turn.")

    if self.react_agent.notification_system is None:
        raise RuntimeError("Notification system not set")

    # inject current app and state into the custom state of the react agent.
    self.react_agent.custom_state["current_app"] = current_app
    self.react_agent.custom_state["current_state"] = current_state
    # Reset the internal iterations counter, otherwise after first turn, the agent will exit. And if we increase the number of max_iterations, then the agent will take multiple turns.
    # ! FIXME: Find a better solution for this iterations issue.
    self.react_agent.iterations = 0

    self.init_tools(current_tools)

    agent_messages, _, env_stop_messages = self.get_notifications()
    task = self.build_task_from_notifications(agent_messages)
    attachments: list[Attachment] = [attachment for message in agent_messages for attachment in message.attachments]
    if len(env_stop_messages) > 0:
        logger.warning(f"Environment stop message received - Stopping User Agent: {env_stop_messages}")
        return result

    # User will take action no matter what.
    result = self.react_agent.run(
        task=task, hint=None, reset=reset, attachments=attachments if attachments else None
    )
    running_state = self.react_agent.custom_state.get("running_state", None)
    if running_state == RunningState.FAILED:
        agent_logs = self.react_agent.get_agent_logs()
        error_message = f"Last User Agent log: {agent_logs[-1]}" if len(agent_logs) > 0 else "No User Agent logs"
        raise RuntimeError(f"User agent failed. {error_message}")

    return result

build_task_from_notifications(agent_messages)

Build User Agent task from agent messages.

User messages comes from the User Agent in the form of accept/reject responses. Environment notifications are handled separately by preprocessing step.

Parameters:

Name Type Description Default
agent_messages list[Message]

List of agent messages.

required

Returns:

Type Description
str

Task string for the User Agent.

Source code in pare/agents/user/agent.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def build_task_from_notifications(self, agent_messages: list[Message]) -> str:
    """Build User Agent task from agent messages.

    User messages comes from the User Agent in the form of accept/reject responses.
    Environment notifications are handled separately by preprocessing step.

    Args:
        agent_messages: List of agent messages.

    Returns:
        Task string for the User Agent.
    """
    if len(agent_messages) > 0:
        logger.debug(f"Agent Messages: {agent_messages}")
    task = "\n".join([message.message for message in agent_messages])
    return task

get_acceptance_count()

Count the number of proposal acceptances from the user.

Acceptances are identified by calls to accept_proposal tool.

Returns:

Type Description
int

Number of acceptances.

Source code in pare/agents/user/agent.py
343
344
345
346
347
348
349
350
351
352
353
354
355
def get_acceptance_count(self) -> int:
    """Count the number of proposal acceptances from the user.

    Acceptances are identified by calls to accept_proposal tool.

    Returns:
        Number of acceptances.
    """
    count = 0
    for log in self.react_agent.get_agent_logs():
        if isinstance(log, ToolCallLog) and "accept_proposal" in log.tool_name.lower():
            count += 1
    return count

get_notifications()

Get ENVIRONMENT_STOP notifications from the notification system.

Returns:

Type Description
tuple[list[Message], list[Message], list[Message]]

Tuple of (list of agent messages, list of environment notifications, list of environment stop messages).

Source code in pare/agents/user/agent.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def get_notifications(self) -> tuple[list[Message], list[Message], list[Message]]:
    """Get ENVIRONMENT_STOP notifications from the notification system.

    Returns:
        Tuple of (list of agent messages, list of environment notifications, list of environment stop messages).
    """
    # new_messages = self.react_agent.custom_state.get("notifications", [])
    new_messages = self.react_agent.notification_system.message_queue.get_by_timestamp(
        timestamp=datetime.fromtimestamp(self.time_manager.time(), tz=UTC)
    )

    agent_messages = [message for message in new_messages if message.message_type == PAREMessageType.AGENT_MESSAGE]
    env_notifications = [
        message for message in new_messages if message.message_type == PAREMessageType.ENVIRONMENT_NOTIFICATION_USER
    ]
    env_stop_messages = [
        message for message in new_messages if message.message_type == PAREMessageType.ENVIRONMENT_STOP
    ]

    # Reinsert the env notifications for user and agent and any extra messages back into the notification system.
    # This is important because the preprocessing step and the next agent will use the same notification system.
    # ! NOTE: DO NOT REINSERT THE AGENT MESSAGES AS THEY HAVE BEEN HANDLED USING TASK LOG.
    messages_to_put_back = [m for m in new_messages if m not in env_stop_messages and m not in agent_messages]

    for message in messages_to_put_back:
        self.react_agent.notification_system.message_queue.put(message)

    return agent_messages, env_notifications, env_stop_messages

get_read_only_actions()

Count read-only actions from the user agent.

Returns:

Type Description
int

Number of read-only tool calls.

Source code in pare/agents/user/agent.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def get_read_only_actions(self) -> int:
    """Count read-only actions from the user agent.

    Returns:
        Number of read-only tool calls.
    """
    count = 0
    for log in self.react_agent.get_agent_logs():
        if isinstance(log, ToolCallLog):
            tool = self.react_agent.tools.get(log.tool_name)
            # write_operation=False or None means read-only
            if tool is not None and not getattr(tool, "write_operation", False):
                count += 1
    return count

get_write_actions()

Count write actions from the user agent.

Returns:

Type Description
int

Number of write tool calls.

Source code in pare/agents/user/agent.py
372
373
374
375
376
377
378
379
380
381
382
383
384
def get_write_actions(self) -> int:
    """Count write actions from the user agent.

    Returns:
        Number of write tool calls.
    """
    count = 0
    for log in self.react_agent.get_agent_logs():
        if isinstance(log, ToolCallLog):
            tool = self.react_agent.tools.get(log.tool_name)
            if tool is not None and getattr(tool, "write_operation", False):
                count += 1
    return count

init_notification_system(notification_system)

Initialize the notification system.

Parameters:

Name Type Description Default
notification_system BaseNotificationSystem

Notification system to initialize.

required
Source code in pare/agents/user/agent.py
193
194
195
196
197
198
199
200
201
def init_notification_system(self, notification_system: BaseNotificationSystem) -> None:
    """Initialize the notification system.

    Args:
        notification_system: Notification system to initialize.
    """
    if notification_system is not None:
        logger.debug(f"Setting notification system for User Agent to provided one {notification_system}")
        self.react_agent.notification_system = notification_system

init_system_prompt(scenario)

Initialize the system prompt.

Parameters:

Name Type Description Default
scenario Scenario

Scenario to initialize the system prompt for.

required
Source code in pare/agents/user/agent.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def init_system_prompt(self, scenario: Scenario) -> None:
    """Initialize the system prompt.

    Args:
        scenario: Scenario to initialize the system prompt for.
    """
    additional_system_prompt = scenario.additional_system_prompt
    logger.debug(f"Additional System Prompt: {additional_system_prompt}")

    task_description = additional_system_prompt if additional_system_prompt is not None else ""
    self.react_agent.init_system_prompts["system_prompt"] = self.react_agent.init_system_prompts[
        "system_prompt"
    ].replace("<<task_description>>", task_description)

    notification_system_prompt = get_notification_system_prompt(self.react_agent.notification_system, scenario.apps)
    self.react_agent.init_system_prompts["system_prompt"] = self.react_agent.init_system_prompts[
        "system_prompt"
    ].replace("<<notification_system_description>>", notification_system_prompt)

    date_str = datetime.fromtimestamp(scenario.start_time or 0, tz=UTC).strftime("%Y-%m-%d %H")

    self.react_agent.init_system_prompts["system_prompt"] = self.react_agent.init_system_prompts[
        "system_prompt"
    ].replace("<<curent_time_description>>", f"Today's date in 'YYYY-MM-DD HH' format is {date_str}")

    self.react_agent.init_system_prompts["system_prompt"] = self.react_agent.init_system_prompts[
        "system_prompt"
    ].replace("<<agent_reminder_description>>", "")

    available_apps = format_available_apps(scenario.apps)
    self.react_agent.init_system_prompts["system_prompt"] = self.react_agent.init_system_prompts[
        "system_prompt"
    ].replace("<<available_apps>>", available_apps)

init_tools(tools)

Initialize the tools.

Parameters:

Name Type Description Default
tools list[AppTool]

Tools to initialize.

required
Source code in pare/agents/user/agent.py
128
129
130
131
132
133
134
135
136
137
138
139
def init_tools(self, tools: list[AppTool]) -> None:
    """Initialize the tools.

    Args:
        tools: Tools to initialize.
    """
    user_tools = self.remove_aui_irrelevant_tools(tools)
    are_simulation_tools = [AppToolAdapter(tool) for tool in user_tools]
    self.tools = are_simulation_tools
    self.react_agent.tools = {tool.name: tool for tool in self.tools}
    self.react_agent.init_tools()
    logger.debug(f"Initialized {len(self.tools)} tools: {[tool.name for tool in self.tools]}")

prepare_user_agent_run(scenario, notification_system=None, initial_agent_logs=None)

Prepare the user agent run.

Parameters:

Name Type Description Default
scenario Scenario

Scenario to run the turn for.

required
notification_system BaseNotificationSystem | None

Notification system to use for the agent.

None
initial_agent_logs list[BaseAgentLog] | None

Initial agent logs to use for the agent.

None

Raises:

Type Description
Exception

If pause and resume environment functions are not provided if simulated generation time config is set.

Source code in pare/agents/user/agent.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def prepare_user_agent_run(
    self,
    scenario: Scenario,
    notification_system: BaseNotificationSystem | None = None,
    initial_agent_logs: list[BaseAgentLog] | None = None,
) -> None:
    """Prepare the user agent run.

    Args:
        scenario: Scenario to run the turn for.
        notification_system: Notification system to use for the agent.
        initial_agent_logs: Initial agent logs to use for the agent.

    Raises:
        Exception: If pause and resume environment functions are not provided if simulated generation time config is set.
    """
    self.init_tools(scenario.get_user_tools())
    self.init_notification_system(notification_system)
    self.init_system_prompt(scenario)
    # ! NOTE: We don't need to replay at all for our agents.
    # Sync the base agent time manager
    if initial_agent_logs is not None and len(initial_agent_logs) > 0:
        self.react_agent.replay(initial_agent_logs)

    # Pause/resume env functions
    if self.simulated_generation_time_config is not None and (self.pause_env is None or self.resume_env is None):
        raise RuntimeError(
            "Pause and resume environment functions must be provided if simulated generation time config is set"
        )
    self.react_agent.pause_env = self.pause_env
    self.react_agent.resume_env = self.resume_env
    self._initialized = True

remove_aui_irrelevant_tools(tools)

Remove irrelevant tools from the tools.

Parameters:

Name Type Description Default
tools list[AppTool]

Tools to remove irrelevant tools from.

required

Returns:

Type Description
list[AppTool]

List of tools.

Source code in pare/agents/user/agent.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def remove_aui_irrelevant_tools(self, tools: list[AppTool]) -> list[AppTool]:
    """Remove irrelevant tools from the tools.

    Args:
        tools: Tools to remove irrelevant tools from.

    Returns:
        List of tools.
    """
    return [
        tool
        for tool in tools
        if tool.name
        not in [
            "PAREAgentUserInterface__send_message_to_agent",
        ]
    ]

Agent Configuration

PAREAgentConfig

Bases: ABC

Abstract class for PARE agent configurations.

Source code in pare/agents/pare_agent_config.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class PAREAgentConfig(ABC):
    """Abstract class for PARE agent configurations."""

    @abstractmethod
    def get_agent_name(self) -> str | None:
        """Get the name of the agent."""
        pass

    @abstractmethod
    def get_model_dump(self) -> dict[str, Any]:
        """Get the model dump of the agent configuration."""
        pass

    @abstractmethod
    def get_base_agent_configs(self) -> dict[str, ARESimulationReactBaseAgentConfig]:
        """Get the base agent configurations."""
        pass

    @abstractmethod
    def get_model_json_schema(self) -> dict[str, Any]:
        """Get the JSON schema of the agent configuration."""
        pass

    @abstractmethod
    def validate_model(self, agent_config_dict: dict[str, Any]) -> PAREAgentConfig:
        """Validate the agent configuration model."""
        pass

get_agent_name() abstractmethod

Get the name of the agent.

Source code in pare/agents/pare_agent_config.py
13
14
15
16
@abstractmethod
def get_agent_name(self) -> str | None:
    """Get the name of the agent."""
    pass

get_base_agent_configs() abstractmethod

Get the base agent configurations.

Source code in pare/agents/pare_agent_config.py
23
24
25
26
@abstractmethod
def get_base_agent_configs(self) -> dict[str, ARESimulationReactBaseAgentConfig]:
    """Get the base agent configurations."""
    pass

get_model_dump() abstractmethod

Get the model dump of the agent configuration.

Source code in pare/agents/pare_agent_config.py
18
19
20
21
@abstractmethod
def get_model_dump(self) -> dict[str, Any]:
    """Get the model dump of the agent configuration."""
    pass

get_model_json_schema() abstractmethod

Get the JSON schema of the agent configuration.

Source code in pare/agents/pare_agent_config.py
28
29
30
31
@abstractmethod
def get_model_json_schema(self) -> dict[str, Any]:
    """Get the JSON schema of the agent configuration."""
    pass

validate_model(agent_config_dict) abstractmethod

Validate the agent configuration model.

Source code in pare/agents/pare_agent_config.py
33
34
35
36
@abstractmethod
def validate_model(self, agent_config_dict: dict[str, Any]) -> PAREAgentConfig:
    """Validate the agent configuration model."""
    pass

ProactiveObserveExecuteAgentConfig

Bases: BaseModel, PAREAgentConfig

Proactive agent configuration for PARE.

Source code in pare/agents/pare_agent_config.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class ProactiveObserveExecuteAgentConfig(BaseModel, PAREAgentConfig):
    """Proactive agent configuration for PARE."""

    agent_name: str = Field(default="observe-execute")
    observe_base_agent_config: ARESimulationReactBaseAgentConfig = Field(
        default_factory=lambda: ARESimulationReactBaseAgentConfig(max_iterations=5)
    )
    execute_base_agent_config: ARESimulationReactBaseAgentConfig = Field(
        default_factory=lambda: ARESimulationReactBaseAgentConfig(max_iterations=10)
    )
    max_turns: int | None = Field(default=None)

    def get_agent_name(self) -> str | None:
        """Get the name of the agent."""
        return self.agent_name

    def get_base_agent_configs(self) -> dict[str, ARESimulationReactBaseAgentConfig]:
        """Get the base agent configurations for the observe and execute agents."""
        return {"observe": self.observe_base_agent_config, "execute": self.execute_base_agent_config}

    def get_model_dump(self) -> dict[str, Any]:
        """Docstring for get_model_dump."""
        return self.model_dump()

    def get_model_json_schema(self) -> dict[str, Any]:
        """Docstring for get_model_json_schema."""
        return self.model_json_schema()

    def validate_model(self, agent_config_dict: dict[str, Any]) -> PAREAgentConfig:
        """Docstring for validate_model."""
        return type(self).model_validate(agent_config_dict)

get_agent_name()

Get the name of the agent.

Source code in pare/agents/pare_agent_config.py
81
82
83
def get_agent_name(self) -> str | None:
    """Get the name of the agent."""
    return self.agent_name

get_base_agent_configs()

Get the base agent configurations for the observe and execute agents.

Source code in pare/agents/pare_agent_config.py
85
86
87
def get_base_agent_configs(self) -> dict[str, ARESimulationReactBaseAgentConfig]:
    """Get the base agent configurations for the observe and execute agents."""
    return {"observe": self.observe_base_agent_config, "execute": self.execute_base_agent_config}

get_model_dump()

Docstring for get_model_dump.

Source code in pare/agents/pare_agent_config.py
89
90
91
def get_model_dump(self) -> dict[str, Any]:
    """Docstring for get_model_dump."""
    return self.model_dump()

get_model_json_schema()

Docstring for get_model_json_schema.

Source code in pare/agents/pare_agent_config.py
93
94
95
def get_model_json_schema(self) -> dict[str, Any]:
    """Docstring for get_model_json_schema."""
    return self.model_json_schema()

validate_model(agent_config_dict)

Docstring for validate_model.

Source code in pare/agents/pare_agent_config.py
97
98
99
def validate_model(self, agent_config_dict: dict[str, Any]) -> PAREAgentConfig:
    """Docstring for validate_model."""
    return type(self).model_validate(agent_config_dict)

UserDefaultAgentConfig

Bases: BaseModel, PAREAgentConfig

User agent configuration for PARE.

Source code in pare/agents/pare_agent_config.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class UserDefaultAgentConfig(BaseModel, PAREAgentConfig):
    """User agent configuration for PARE."""

    agent_name: str = Field(default="default")
    base_agent_config: ARESimulationReactBaseAgentConfig = Field(
        default_factory=lambda: ARESimulationReactBaseAgentConfig(max_iterations=1)
    )
    max_turns: int | None = Field(default=None)

    def get_agent_name(self) -> str | None:
        """Get the name of the agent."""
        return self.agent_name

    def get_base_agent_configs(self) -> dict[str, ARESimulationReactBaseAgentConfig]:
        """Get the base agent configurations for the user agent."""
        return {"user": self.base_agent_config}

    def get_model_dump(self) -> dict[str, Any]:
        """Docstring for get_model_dump."""
        return self.model_dump()

    def get_model_json_schema(self) -> dict[str, Any]:
        """Docstring for get_model_json_schema."""
        return self.model_json_schema()

    def validate_model(self, agent_config_dict: dict[str, Any]) -> PAREAgentConfig:
        """Docstring for validate_model."""
        return type(self).model_validate(agent_config_dict)

get_agent_name()

Get the name of the agent.

Source code in pare/agents/pare_agent_config.py
48
49
50
def get_agent_name(self) -> str | None:
    """Get the name of the agent."""
    return self.agent_name

get_base_agent_configs()

Get the base agent configurations for the user agent.

Source code in pare/agents/pare_agent_config.py
52
53
54
def get_base_agent_configs(self) -> dict[str, ARESimulationReactBaseAgentConfig]:
    """Get the base agent configurations for the user agent."""
    return {"user": self.base_agent_config}

get_model_dump()

Docstring for get_model_dump.

Source code in pare/agents/pare_agent_config.py
56
57
58
def get_model_dump(self) -> dict[str, Any]:
    """Docstring for get_model_dump."""
    return self.model_dump()

get_model_json_schema()

Docstring for get_model_json_schema.

Source code in pare/agents/pare_agent_config.py
60
61
62
def get_model_json_schema(self) -> dict[str, Any]:
    """Docstring for get_model_json_schema."""
    return self.model_json_schema()

validate_model(agent_config_dict)

Docstring for validate_model.

Source code in pare/agents/pare_agent_config.py
64
65
66
def validate_model(self, agent_config_dict: dict[str, Any]) -> PAREAgentConfig:
    """Docstring for validate_model."""
    return type(self).model_validate(agent_config_dict)

Agent Config Builder

AbstractAgentConfigBuilder

Bases: ABC

Abstract class for building agent configs.

Source code in pare/agents/agent_config_builder.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class AbstractAgentConfigBuilder(ABC):
    """Abstract class for building agent configs."""

    @abstractmethod
    def build(
        self,
        agent_name: str,
    ) -> PAREAgentConfig:
        """Build a config for the specified agent.

        Args:
            agent_name: Name of the agent that affects the config type.

        Returns:
            An instance of the config.
        """

build(agent_name) abstractmethod

Build a config for the specified agent.

Parameters:

Name Type Description Default
agent_name str

Name of the agent that affects the config type.

required

Returns:

Type Description
PAREAgentConfig

An instance of the config.

Source code in pare/agents/agent_config_builder.py
16
17
18
19
20
21
22
23
24
25
26
27
28
@abstractmethod
def build(
    self,
    agent_name: str,
) -> PAREAgentConfig:
    """Build a config for the specified agent.

    Args:
        agent_name: Name of the agent that affects the config type.

    Returns:
        An instance of the config.
    """

ProactiveAgentConfigBuilder

Bases: AbstractAgentConfigBuilder

Builder for ProactiveAgentConfig.

Source code in pare/agents/agent_config_builder.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class ProactiveAgentConfigBuilder(AbstractAgentConfigBuilder):
    """Builder for ProactiveAgentConfig."""

    def build(
        self,
        agent_name: str,
    ) -> PAREAgentConfig:
        """Build the correct ProactiveAgentConfig based on the agent name.

        Args:
            agent_name: Name of the proactive agent type.

        Returns:
            A configured ProactiveObserveExecuteAgentConfig instance.

        Raises:
            ValueError: If the agent name is not recognized.
        """
        match agent_name:
            case "observe-execute":
                return ProactiveObserveExecuteAgentConfig(
                    agent_name=agent_name,
                    observe_base_agent_config=ARESimulationReactBaseAgentConfig(
                        system_prompt=str(DEFAULT_PROACTIVE_OBSERVE_PROMPT_WITH_HINTS),
                        max_iterations=5,
                    ),
                    execute_base_agent_config=ARESimulationReactBaseAgentConfig(
                        system_prompt=str(DEFAULT_PROACTIVE_EXECUTE_PROMPT_WITH_HINTS),
                        max_iterations=10,
                    ),
                )

            case _:
                raise ValueError(f"Unknown proactive agent type: {agent_name}")

build(agent_name)

Build the correct ProactiveAgentConfig based on the agent name.

Parameters:

Name Type Description Default
agent_name str

Name of the proactive agent type.

required

Returns:

Type Description
PAREAgentConfig

A configured ProactiveObserveExecuteAgentConfig instance.

Raises:

Type Description
ValueError

If the agent name is not recognized.

Source code in pare/agents/agent_config_builder.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def build(
    self,
    agent_name: str,
) -> PAREAgentConfig:
    """Build the correct ProactiveAgentConfig based on the agent name.

    Args:
        agent_name: Name of the proactive agent type.

    Returns:
        A configured ProactiveObserveExecuteAgentConfig instance.

    Raises:
        ValueError: If the agent name is not recognized.
    """
    match agent_name:
        case "observe-execute":
            return ProactiveObserveExecuteAgentConfig(
                agent_name=agent_name,
                observe_base_agent_config=ARESimulationReactBaseAgentConfig(
                    system_prompt=str(DEFAULT_PROACTIVE_OBSERVE_PROMPT_WITH_HINTS),
                    max_iterations=5,
                ),
                execute_base_agent_config=ARESimulationReactBaseAgentConfig(
                    system_prompt=str(DEFAULT_PROACTIVE_EXECUTE_PROMPT_WITH_HINTS),
                    max_iterations=10,
                ),
            )

        case _:
            raise ValueError(f"Unknown proactive agent type: {agent_name}")

UserAgentConfigBuilder

Bases: AbstractAgentConfigBuilder

Builder for UserAgentConfig.

Source code in pare/agents/agent_config_builder.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class UserAgentConfigBuilder(AbstractAgentConfigBuilder):
    """Builder for UserAgentConfig."""

    def build(
        self,
        agent_name: str,
    ) -> PAREAgentConfig:
        """Build the correct UserAgentConfig based on the agent name.

        Args:
            agent_name: Name of the user agent type.

        Returns:
            A configured UserDefaultAgentConfig instance.

        Raises:
            ValueError: If the agent name is not recognized.
        """
        match agent_name:
            case "default":
                return UserDefaultAgentConfig(
                    agent_name=agent_name,
                    base_agent_config=ARESimulationReactBaseAgentConfig(
                        system_prompt=str(DEFAULT_USER_AGENT_SYSTEM_PROMPT),
                        max_iterations=1,
                    ),
                )

            case _:
                raise ValueError(f"Unknown user agent type: {agent_name}")

build(agent_name)

Build the correct UserAgentConfig based on the agent name.

Parameters:

Name Type Description Default
agent_name str

Name of the user agent type.

required

Returns:

Type Description
PAREAgentConfig

A configured UserDefaultAgentConfig instance.

Raises:

Type Description
ValueError

If the agent name is not recognized.

Source code in pare/agents/agent_config_builder.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def build(
    self,
    agent_name: str,
) -> PAREAgentConfig:
    """Build the correct UserAgentConfig based on the agent name.

    Args:
        agent_name: Name of the user agent type.

    Returns:
        A configured UserDefaultAgentConfig instance.

    Raises:
        ValueError: If the agent name is not recognized.
    """
    match agent_name:
        case "default":
            return UserDefaultAgentConfig(
                agent_name=agent_name,
                base_agent_config=ARESimulationReactBaseAgentConfig(
                    system_prompt=str(DEFAULT_USER_AGENT_SYSTEM_PROMPT),
                    max_iterations=1,
                ),
            )

        case _:
            raise ValueError(f"Unknown user agent type: {agent_name}")

Agent Builder

Agent builders for PARE agents.

This module contains builders for creating UserAgent and ProactiveAgent instances from their respective configurations.

AbstractAgentBuilder

Bases: ABC

Abstract class for building PARE agents.

Source code in pare/agents/agent_builder.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class AbstractAgentBuilder(ABC):
    """Abstract class for building PARE agents."""

    @abstractmethod
    def list_agents(self) -> list[str]:
        """List all available agent types.

        Returns:
            A list of agent names that this builder can create.
        """

    @abstractmethod
    def build(
        self,
        agent_config: PAREAgentConfig,
        env: StateAwareEnvironmentWrapper | None = None,
        mock_responses: list[str] | None = None,
    ) -> Any:  # noqa: ANN401
        """Build an agent from config.

        Args:
            agent_config: Configuration for the agent to be built.
            env: Optional environment in which the agent will operate.
            mock_responses: Optional list of mock responses for testing.

        Returns:
            An instance of the agent.
        """

build(agent_config, env=None, mock_responses=None) abstractmethod

Build an agent from config.

Parameters:

Name Type Description Default
agent_config PAREAgentConfig

Configuration for the agent to be built.

required
env StateAwareEnvironmentWrapper | None

Optional environment in which the agent will operate.

None
mock_responses list[str] | None

Optional list of mock responses for testing.

None

Returns:

Type Description
Any

An instance of the agent.

Source code in pare/agents/agent_builder.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@abstractmethod
def build(
    self,
    agent_config: PAREAgentConfig,
    env: StateAwareEnvironmentWrapper | None = None,
    mock_responses: list[str] | None = None,
) -> Any:  # noqa: ANN401
    """Build an agent from config.

    Args:
        agent_config: Configuration for the agent to be built.
        env: Optional environment in which the agent will operate.
        mock_responses: Optional list of mock responses for testing.

    Returns:
        An instance of the agent.
    """

list_agents() abstractmethod

List all available agent types.

Returns:

Type Description
list[str]

A list of agent names that this builder can create.

Source code in pare/agents/agent_builder.py
25
26
27
28
29
30
31
@abstractmethod
def list_agents(self) -> list[str]:
    """List all available agent types.

    Returns:
        A list of agent names that this builder can create.
    """

ProactiveAgentBuilder

Bases: AbstractAgentBuilder

Builder for ProactiveAgent instances.

Source code in pare/agents/agent_builder.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class ProactiveAgentBuilder(AbstractAgentBuilder):
    """Builder for ProactiveAgent instances."""

    def __init__(self, llm_engine_builder: LLMEngineBuilder | None = None) -> None:
        """Initialize the ProactiveAgentBuilder.

        Args:
            llm_engine_builder: Optional LLM engine builder. If not provided,
                a default LLMEngineBuilder will be used.
        """
        self.llm_engine_builder = llm_engine_builder or LLMEngineBuilder()

    def list_agents(self) -> list[str]:
        """List all available proactive agent types.

        Returns:
            A list of proactive agent names that this builder can create.
        """
        return ["observe-execute"]

    def build(
        self,
        agent_config: PAREAgentConfig,
        env: StateAwareEnvironmentWrapper | None = None,
        mock_responses: list[str] | None = None,
    ) -> ProactiveAgent:
        """Build a ProactiveAgent from config.

        Args:
            agent_config: Configuration for the proactive agent.
                Must be a ProactiveObserveExecuteAgentConfig.
            env: Environment in which the agent will operate.
            mock_responses: Optional list of mock responses for testing.

        Returns:
            A configured ProactiveAgent instance.

        Raises:
            TypeError: If agent_config is not a ProactiveObserveExecuteAgentConfig.
            ValueError: If the agent name is not supported or required environment components are missing.
        """
        match agent_config.get_agent_name():
            case "observe-execute":
                from pare.agents.agent_factory import create_observe_execute_proactive_agent

                if env is None:
                    raise ValueError("Environment must be provided")
                if env.time_manager is None:
                    raise ValueError("Time manager must be provided")
                if env.append_to_world_logs is None:
                    raise ValueError("Log callback must be provided")

                base_agent_configs = agent_config.get_base_agent_configs()
                observe_llm_engine = self.llm_engine_builder.create_engine(
                    engine_config=base_agent_configs["observe"].llm_engine_config,
                    mock_responses=mock_responses,
                )
                execute_llm_engine = self.llm_engine_builder.create_engine(
                    engine_config=base_agent_configs["execute"].llm_engine_config,
                    mock_responses=mock_responses,
                )

                if isinstance(agent_config, ProactiveObserveExecuteAgentConfig):
                    return create_observe_execute_proactive_agent(
                        agent_config=agent_config,
                        env=env,
                        observe_llm_engine=observe_llm_engine,
                        execute_llm_engine=execute_llm_engine,
                    )
                else:
                    raise TypeError(
                        f"Agent {agent_config.get_agent_name()} requires a ProactiveObserveExecuteAgentConfig"
                    )

            case _:
                raise ValueError(f"Unknown proactive agent type: {agent_config.get_agent_name()}")

__init__(llm_engine_builder=None)

Initialize the ProactiveAgentBuilder.

Parameters:

Name Type Description Default
llm_engine_builder LLMEngineBuilder | None

Optional LLM engine builder. If not provided, a default LLMEngineBuilder will be used.

None
Source code in pare/agents/agent_builder.py
125
126
127
128
129
130
131
132
def __init__(self, llm_engine_builder: LLMEngineBuilder | None = None) -> None:
    """Initialize the ProactiveAgentBuilder.

    Args:
        llm_engine_builder: Optional LLM engine builder. If not provided,
            a default LLMEngineBuilder will be used.
    """
    self.llm_engine_builder = llm_engine_builder or LLMEngineBuilder()

build(agent_config, env=None, mock_responses=None)

Build a ProactiveAgent from config.

Parameters:

Name Type Description Default
agent_config PAREAgentConfig

Configuration for the proactive agent. Must be a ProactiveObserveExecuteAgentConfig.

required
env StateAwareEnvironmentWrapper | None

Environment in which the agent will operate.

None
mock_responses list[str] | None

Optional list of mock responses for testing.

None

Returns:

Type Description
ProactiveAgent

A configured ProactiveAgent instance.

Raises:

Type Description
TypeError

If agent_config is not a ProactiveObserveExecuteAgentConfig.

ValueError

If the agent name is not supported or required environment components are missing.

Source code in pare/agents/agent_builder.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def build(
    self,
    agent_config: PAREAgentConfig,
    env: StateAwareEnvironmentWrapper | None = None,
    mock_responses: list[str] | None = None,
) -> ProactiveAgent:
    """Build a ProactiveAgent from config.

    Args:
        agent_config: Configuration for the proactive agent.
            Must be a ProactiveObserveExecuteAgentConfig.
        env: Environment in which the agent will operate.
        mock_responses: Optional list of mock responses for testing.

    Returns:
        A configured ProactiveAgent instance.

    Raises:
        TypeError: If agent_config is not a ProactiveObserveExecuteAgentConfig.
        ValueError: If the agent name is not supported or required environment components are missing.
    """
    match agent_config.get_agent_name():
        case "observe-execute":
            from pare.agents.agent_factory import create_observe_execute_proactive_agent

            if env is None:
                raise ValueError("Environment must be provided")
            if env.time_manager is None:
                raise ValueError("Time manager must be provided")
            if env.append_to_world_logs is None:
                raise ValueError("Log callback must be provided")

            base_agent_configs = agent_config.get_base_agent_configs()
            observe_llm_engine = self.llm_engine_builder.create_engine(
                engine_config=base_agent_configs["observe"].llm_engine_config,
                mock_responses=mock_responses,
            )
            execute_llm_engine = self.llm_engine_builder.create_engine(
                engine_config=base_agent_configs["execute"].llm_engine_config,
                mock_responses=mock_responses,
            )

            if isinstance(agent_config, ProactiveObserveExecuteAgentConfig):
                return create_observe_execute_proactive_agent(
                    agent_config=agent_config,
                    env=env,
                    observe_llm_engine=observe_llm_engine,
                    execute_llm_engine=execute_llm_engine,
                )
            else:
                raise TypeError(
                    f"Agent {agent_config.get_agent_name()} requires a ProactiveObserveExecuteAgentConfig"
                )

        case _:
            raise ValueError(f"Unknown proactive agent type: {agent_config.get_agent_name()}")

list_agents()

List all available proactive agent types.

Returns:

Type Description
list[str]

A list of proactive agent names that this builder can create.

Source code in pare/agents/agent_builder.py
134
135
136
137
138
139
140
def list_agents(self) -> list[str]:
    """List all available proactive agent types.

    Returns:
        A list of proactive agent names that this builder can create.
    """
    return ["observe-execute"]

UserAgentBuilder

Bases: AbstractAgentBuilder

Builder for UserAgent instances.

Source code in pare/agents/agent_builder.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class UserAgentBuilder(AbstractAgentBuilder):
    """Builder for UserAgent instances."""

    def __init__(self, llm_engine_builder: LLMEngineBuilder | None = None) -> None:
        """Initialize the UserAgentBuilder.

        Args:
            llm_engine_builder: Optional LLM engine builder. If not provided,
                a default LLMEngineBuilder will be used.
        """
        self.llm_engine_builder = llm_engine_builder or LLMEngineBuilder()

    def list_agents(self) -> list[str]:
        """List all available user agent types.

        Returns:
            A list of user agent names that this builder can create.
        """
        return ["default"]

    def build(
        self,
        agent_config: PAREAgentConfig,
        env: StateAwareEnvironmentWrapper | None = None,
        mock_responses: list[str] | None = None,
    ) -> UserAgent:
        """Build a UserAgent from config.

        Args:
            agent_config: Configuration for the user agent. Must be a UserAgentConfig.
            env: Environment in which the agent will operate.
            mock_responses: Optional list of mock responses for testing.

        Returns:
            A configured UserAgent instance.

        Raises:
            TypeError: If agent_config is not a UserDefaultAgentConfig.
            ValueError: If the agent name is not supported or required environment components are missing.
        """
        match agent_config.get_agent_name():
            case "default":
                from pare.agents.agent_factory import create_default_user_agent

                if env is None:
                    raise ValueError("Environment must be provided")
                if env.time_manager is None:
                    raise ValueError("Time manager must be provided")
                if env.append_to_world_logs is None:
                    raise ValueError("Log callback must be provided")

                base_agent_config = agent_config.get_base_agent_configs()["user"]
                llm_engine = self.llm_engine_builder.create_engine(
                    engine_config=base_agent_config.llm_engine_config,
                    mock_responses=mock_responses,
                )

                if isinstance(agent_config, UserDefaultAgentConfig):
                    return create_default_user_agent(
                        agent_config=agent_config,
                        env=env,
                        llm_engine=llm_engine,
                    )
                else:
                    raise TypeError(f"Agent {agent_config.get_agent_name()} requires a UserDefaultAgentConfig")

            case _:
                raise ValueError(f"Unknown user agent type: {agent_config.get_agent_name()}")

__init__(llm_engine_builder=None)

Initialize the UserAgentBuilder.

Parameters:

Name Type Description Default
llm_engine_builder LLMEngineBuilder | None

Optional LLM engine builder. If not provided, a default LLMEngineBuilder will be used.

None
Source code in pare/agents/agent_builder.py
55
56
57
58
59
60
61
62
def __init__(self, llm_engine_builder: LLMEngineBuilder | None = None) -> None:
    """Initialize the UserAgentBuilder.

    Args:
        llm_engine_builder: Optional LLM engine builder. If not provided,
            a default LLMEngineBuilder will be used.
    """
    self.llm_engine_builder = llm_engine_builder or LLMEngineBuilder()

build(agent_config, env=None, mock_responses=None)

Build a UserAgent from config.

Parameters:

Name Type Description Default
agent_config PAREAgentConfig

Configuration for the user agent. Must be a UserAgentConfig.

required
env StateAwareEnvironmentWrapper | None

Environment in which the agent will operate.

None
mock_responses list[str] | None

Optional list of mock responses for testing.

None

Returns:

Type Description
UserAgent

A configured UserAgent instance.

Raises:

Type Description
TypeError

If agent_config is not a UserDefaultAgentConfig.

ValueError

If the agent name is not supported or required environment components are missing.

Source code in pare/agents/agent_builder.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def build(
    self,
    agent_config: PAREAgentConfig,
    env: StateAwareEnvironmentWrapper | None = None,
    mock_responses: list[str] | None = None,
) -> UserAgent:
    """Build a UserAgent from config.

    Args:
        agent_config: Configuration for the user agent. Must be a UserAgentConfig.
        env: Environment in which the agent will operate.
        mock_responses: Optional list of mock responses for testing.

    Returns:
        A configured UserAgent instance.

    Raises:
        TypeError: If agent_config is not a UserDefaultAgentConfig.
        ValueError: If the agent name is not supported or required environment components are missing.
    """
    match agent_config.get_agent_name():
        case "default":
            from pare.agents.agent_factory import create_default_user_agent

            if env is None:
                raise ValueError("Environment must be provided")
            if env.time_manager is None:
                raise ValueError("Time manager must be provided")
            if env.append_to_world_logs is None:
                raise ValueError("Log callback must be provided")

            base_agent_config = agent_config.get_base_agent_configs()["user"]
            llm_engine = self.llm_engine_builder.create_engine(
                engine_config=base_agent_config.llm_engine_config,
                mock_responses=mock_responses,
            )

            if isinstance(agent_config, UserDefaultAgentConfig):
                return create_default_user_agent(
                    agent_config=agent_config,
                    env=env,
                    llm_engine=llm_engine,
                )
            else:
                raise TypeError(f"Agent {agent_config.get_agent_name()} requires a UserDefaultAgentConfig")

        case _:
            raise ValueError(f"Unknown user agent type: {agent_config.get_agent_name()}")

list_agents()

List all available user agent types.

Returns:

Type Description
list[str]

A list of user agent names that this builder can create.

Source code in pare/agents/agent_builder.py
64
65
66
67
68
69
70
def list_agents(self) -> list[str]:
    """List all available user agent types.

    Returns:
        A list of user agent names that this builder can create.
    """
    return ["default"]

Agent Factory

Factory functions for creating PARE agents.

This module contains factory functions that create BaseAgent instances with the appropriate prompts, presteps, and configurations, then wrap them in UserAgent or ProactiveAgent wrappers.

create_default_user_agent(agent_config, env, llm_engine)

Create a default UserAgent with the given configuration.

Parameters:

Name Type Description Default
agent_config UserDefaultAgentConfig

Configuration for the user agent.

required
env StateAwareEnvironmentWrapper

Environment in which the agent will operate.

required
llm_engine LLMEngine

Pre-built LLM engine for the agent.

required

Returns:

Type Description
UserAgent

A configured UserAgent instance.

Source code in pare/agents/agent_factory.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def create_default_user_agent(
    agent_config: UserDefaultAgentConfig,
    env: StateAwareEnvironmentWrapper,
    llm_engine: LLMEngine,
) -> UserAgent:
    """Create a default UserAgent with the given configuration.

    Args:
        agent_config: Configuration for the user agent.
        env: Environment in which the agent will operate.
        llm_engine: Pre-built LLM engine for the agent.

    Returns:
        A configured UserAgent instance.
    """
    base_agent_config = agent_config.get_base_agent_configs()["user"]

    user_base_agent = BaseAgent(
        llm_engine=llm_engine,
        tools={},  # Will be set by UserAgent.init_tools()
        max_iterations=base_agent_config.max_iterations,
        conditional_pre_steps=[get_user_agent_pre_step()],
        action_executor=JsonActionExecutor(
            tools={},
            use_custom_logger=base_agent_config.use_custom_logger,
        ),
        system_prompts={"system_prompt": str(base_agent_config.system_prompt)},
        use_custom_logger=base_agent_config.use_custom_logger,
    )

    return UserAgent(
        log_callback=env.append_to_world_logs,
        pause_env=env.pause,
        resume_env=env.resume_with_offset,
        llm_engine=llm_engine,
        base_agent=user_base_agent,
        time_manager=env.time_manager,
        max_iterations=base_agent_config.max_iterations,
        max_turns=agent_config.max_turns,
        simulated_generation_time_config=base_agent_config.simulated_generation_time_config,
    )

create_observe_execute_proactive_agent(agent_config, env, observe_llm_engine, execute_llm_engine)

Create an observe-execute ProactiveAgent with the given configuration.

Parameters:

Name Type Description Default
agent_config ProactiveObserveExecuteAgentConfig

Configuration for the proactive agent.

required
env StateAwareEnvironmentWrapper

Environment in which the agent will operate.

required
observe_llm_engine LLMEngine

Pre-built LLM engine for the observe agent.

required
execute_llm_engine LLMEngine

Pre-built LLM engine for the execute agent.

required

Returns:

Type Description
ProactiveAgent

A configured ProactiveAgent instance.

Source code in pare/agents/agent_factory.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def create_observe_execute_proactive_agent(
    agent_config: ProactiveObserveExecuteAgentConfig,
    env: StateAwareEnvironmentWrapper,
    observe_llm_engine: LLMEngine,
    execute_llm_engine: LLMEngine,
) -> ProactiveAgent:
    """Create an observe-execute ProactiveAgent with the given configuration.

    Args:
        agent_config: Configuration for the proactive agent.
        env: Environment in which the agent will operate.
        observe_llm_engine: Pre-built LLM engine for the observe agent.
        execute_llm_engine: Pre-built LLM engine for the execute agent.

    Returns:
        A configured ProactiveAgent instance.
    """
    base_agent_configs = agent_config.get_base_agent_configs()
    observe_config = base_agent_configs["observe"]
    execute_config = base_agent_configs["execute"]

    observe_base_agent = BaseAgent(
        llm_engine=observe_llm_engine,
        tools={},  # Will be set by ProactiveAgent.init_tools()
        max_iterations=observe_config.max_iterations,
        conditional_pre_steps=[get_proactive_agent_pre_step()],
        action_executor=JsonActionExecutor(
            tools={},
            use_custom_logger=observe_config.use_custom_logger,
        ),
        system_prompts={"system_prompt": str(observe_config.system_prompt)},
        use_custom_logger=observe_config.use_custom_logger,
    )

    execute_base_agent = BaseAgent(
        llm_engine=execute_llm_engine,
        tools={},  # Will be set by ProactiveAgent.init_tools()
        max_iterations=execute_config.max_iterations,
        conditional_pre_steps=[get_proactive_agent_pre_step()],
        action_executor=JsonActionExecutor(
            tools={},
            use_custom_logger=execute_config.use_custom_logger,
        ),
        system_prompts={"system_prompt": str(execute_config.system_prompt)},
        use_custom_logger=execute_config.use_custom_logger,
    )

    return ProactiveAgent(
        log_callback=env.append_to_world_logs,
        pause_env=env.pause,
        resume_env=env.resume_with_offset,
        observe_llm_engine=observe_llm_engine,
        observe_agent=observe_base_agent,
        execute_llm_engine=execute_llm_engine,
        execute_agent=execute_base_agent,
        time_manager=env.time_manager,
        tools=[],
        observe_max_iterations=observe_config.max_iterations,
        execute_max_iterations=execute_config.max_iterations,
        max_turns=agent_config.max_turns,
        simulated_generation_time_config=observe_config.simulated_generation_time_config,
    )

Agent Log Types

PARE-specific agent log types.

AgentMessageLog dataclass

Bases: BaseAgentLog

Log entry for messages from ProactiveAgent to UserAgent.

Used by UserAgent preprocessing to store AGENT_MESSAGE notifications. These are proposals/messages sent via send_message_to_user tool.

Source code in pare/agents/agent_log.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@dataclass
class AgentMessageLog(BaseAgentLog):
    """Log entry for messages from ProactiveAgent to UserAgent.

    Used by UserAgent preprocessing to store AGENT_MESSAGE notifications.
    These are proposals/messages sent via send_message_to_user tool.
    """

    content: str
    attachments: list[Attachment] = field(default_factory=list)

    def get_content_for_llm(self) -> str | None:
        """Return content to be sent to LLM."""
        return self.content

    def get_content_for_llm_no_attachment(self) -> str | None:
        """Return content without attachment placeholders."""
        content = re.sub(r"<\|attachment:(\d+)\|>", "", self.content)
        return content

    def get_attachments_for_llm(self) -> list[Attachment]:
        """Return attachments for LLM."""
        return self.attachments

    def get_type(self) -> str:
        """Return log type identifier."""
        return "agent_message"

get_attachments_for_llm()

Return attachments for LLM.

Source code in pare/agents/agent_log.py
121
122
123
def get_attachments_for_llm(self) -> list[Attachment]:
    """Return attachments for LLM."""
    return self.attachments

get_content_for_llm()

Return content to be sent to LLM.

Source code in pare/agents/agent_log.py
112
113
114
def get_content_for_llm(self) -> str | None:
    """Return content to be sent to LLM."""
    return self.content

get_content_for_llm_no_attachment()

Return content without attachment placeholders.

Source code in pare/agents/agent_log.py
116
117
118
119
def get_content_for_llm_no_attachment(self) -> str | None:
    """Return content without attachment placeholders."""
    content = re.sub(r"<\|attachment:(\d+)\|>", "", self.content)
    return content

get_type()

Return log type identifier.

Source code in pare/agents/agent_log.py
125
126
127
def get_type(self) -> str:
    """Return log type identifier."""
    return "agent_message"

AvailableToolsLog dataclass

Bases: BaseAgentLog

Log entry for all available tools for user agent at the current state.

Used by UserAgent preprocessing to store AVAILABLE_TOOLS notifications. These are tools that are available to the user agent at the current state.

Source code in pare/agents/agent_log.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@dataclass
class AvailableToolsLog(BaseAgentLog):
    """Log entry for all available tools for user agent at the current state.

    Used by UserAgent preprocessing to store AVAILABLE_TOOLS notifications.
    These are tools that are available to the user agent at the current state.
    """

    content: str

    def get_content_for_llm(self) -> str | None:
        """Return content to be sent to LLM."""
        return self.content

    def get_type(self) -> str:
        """Return log type identifier."""
        return "available_tools"

get_content_for_llm()

Return content to be sent to LLM.

Source code in pare/agents/agent_log.py
140
141
142
def get_content_for_llm(self) -> str | None:
    """Return content to be sent to LLM."""
    return self.content

get_type()

Return log type identifier.

Source code in pare/agents/agent_log.py
144
145
146
def get_type(self) -> str:
    """Return log type identifier."""
    return "available_tools"

CurrentAppStateLog dataclass

Bases: BaseAgentLog

Log entry for the current app state for user agent at the current state.

Used by UserAgent preprocessing to store CURRENT_APP_STATE notifications. These are the current state of the app at the current state.

Source code in pare/agents/agent_log.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@dataclass
class CurrentAppStateLog(BaseAgentLog):
    """Log entry for the current app state for user agent at the current state.

    Used by UserAgent preprocessing to store CURRENT_APP_STATE notifications.
    These are the current state of the app at the current state.
    """

    content: str

    def get_content_for_llm(self) -> str | None:
        """Return content to be sent to LLM."""
        return self.content

    def get_type(self) -> str:
        """Return log type identifier."""
        return "current_app_state"

get_content_for_llm()

Return content to be sent to LLM.

Source code in pare/agents/agent_log.py
178
179
180
def get_content_for_llm(self) -> str | None:
    """Return content to be sent to LLM."""
    return self.content

get_type()

Return log type identifier.

Source code in pare/agents/agent_log.py
182
183
184
def get_type(self) -> str:
    """Return log type identifier."""
    return "current_app_state"

PAREAgentLog dataclass

Bases: BaseAgentLog

Base class for all PARE Agent Logs.

Source code in pare/agents/agent_log.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@dataclass
class PAREAgentLog(BaseAgentLog):
    """Base class for all PARE Agent Logs."""

    @classmethod
    def from_dict(cls, d: dict[str, Any]) -> PAREAgentLog:
        """Create a PAREAgentLog from a dictionary."""
        log_type = d.pop("log_type", None)
        log_id = d.pop("id", str(uuid.uuid4().hex))
        if log_type is None:
            raise ValueError("Log type is not specified")

        log_type_map = {
            "system_prompt": SystemPromptLog,
            "task": TaskLog,
            "llm_input": LLMInputLog,
            "llm_output": LLMOutputThoughtActionLog,
            "llm_output_thought_action": LLMOutputThoughtActionLog,
            "rationale": RationaleLog,
            "tool_call": ToolCallLog,
            "observation": ObservationLog,
            "step": StepLog,
            "subagent": SubagentLog,
            "final_answer": FinalAnswerLog,
            "error": ErrorLog,
            "thought": ThoughtLog,
            "plan": PlanLog,
            "facts": FactsLog,
            "replan": ReplanLog,
            "refacts": RefactsLog,
            "stop": StopLog,
            "action": ActionLog,
            "end_task": EndTaskLog,
            "raw_plan": LLMOutputPlanLog,
            "llm_output_plan": LLMOutputPlanLog,
            "raw_facts": LLMOutputFactsLog,
            "llm_output_facts": LLMOutputFactsLog,
            "agent_user_interface": AgentUserInterfaceLog,
            "available_tools": AvailableToolsLog,
            "current_app_state": CurrentAppStateLog,
            "agent_message": AgentMessageLog,
            "user_action": UserActionLog,
            "environment_notifications": EnvironmentNotificationLog,
            "hint": HintLog,
            "task_reminder": TaskReminderLog,
        }

        log_class = log_type_map.get(log_type)
        if log_class is not None:
            log = log_class(**d)
            if log_type == "subagent":
                log.children = [PAREAgentLog.from_dict(child) for child in d["children"]]
            log.id = log_id
            return log

        raise ValueError(f"Unknown agent log type: {log_type}")

from_dict(d) classmethod

Create a PAREAgentLog from a dictionary.

Source code in pare/agents/agent_log.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@classmethod
def from_dict(cls, d: dict[str, Any]) -> PAREAgentLog:
    """Create a PAREAgentLog from a dictionary."""
    log_type = d.pop("log_type", None)
    log_id = d.pop("id", str(uuid.uuid4().hex))
    if log_type is None:
        raise ValueError("Log type is not specified")

    log_type_map = {
        "system_prompt": SystemPromptLog,
        "task": TaskLog,
        "llm_input": LLMInputLog,
        "llm_output": LLMOutputThoughtActionLog,
        "llm_output_thought_action": LLMOutputThoughtActionLog,
        "rationale": RationaleLog,
        "tool_call": ToolCallLog,
        "observation": ObservationLog,
        "step": StepLog,
        "subagent": SubagentLog,
        "final_answer": FinalAnswerLog,
        "error": ErrorLog,
        "thought": ThoughtLog,
        "plan": PlanLog,
        "facts": FactsLog,
        "replan": ReplanLog,
        "refacts": RefactsLog,
        "stop": StopLog,
        "action": ActionLog,
        "end_task": EndTaskLog,
        "raw_plan": LLMOutputPlanLog,
        "llm_output_plan": LLMOutputPlanLog,
        "raw_facts": LLMOutputFactsLog,
        "llm_output_facts": LLMOutputFactsLog,
        "agent_user_interface": AgentUserInterfaceLog,
        "available_tools": AvailableToolsLog,
        "current_app_state": CurrentAppStateLog,
        "agent_message": AgentMessageLog,
        "user_action": UserActionLog,
        "environment_notifications": EnvironmentNotificationLog,
        "hint": HintLog,
        "task_reminder": TaskReminderLog,
    }

    log_class = log_type_map.get(log_type)
    if log_class is not None:
        log = log_class(**d)
        if log_type == "subagent":
            log.children = [PAREAgentLog.from_dict(child) for child in d["children"]]
        log.id = log_id
        return log

    raise ValueError(f"Unknown agent log type: {log_type}")

UserActionLog dataclass

Bases: BaseAgentLog

Log entry for user actions observed by the proactive agent.

Each log represents a list of user actions since the last step. Multiple logs accumulate in the agent history to show full action sequence.

Source code in pare/agents/agent_log.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@dataclass
class UserActionLog(BaseAgentLog):
    """Log entry for user actions observed by the proactive agent.

    Each log represents a list of user actions since the last step.
    Multiple logs accumulate in the agent history to show full action sequence.
    """

    content: str

    def get_content_for_llm(self) -> str | None:
        """Return content to be sent to LLM."""
        return self.content

    def get_type(self) -> str:
        """Return log type identifier."""
        return "user_action"

get_content_for_llm()

Return content to be sent to LLM.

Source code in pare/agents/agent_log.py
159
160
161
def get_content_for_llm(self) -> str | None:
    """Return content to be sent to LLM."""
    return self.content

get_type()

Return log type identifier.

Source code in pare/agents/agent_log.py
163
164
165
def get_type(self) -> str:
    """Return log type identifier."""
    return "user_action"