Reference FinRobot Building Auto Agents

In Finrobot, there are three folders: agents, data_source, and functional, along with utils and toolkit Python files.

agents library include

from finrobot.data_source import *
from finrobot.functional import *
from textwrap import dedent

library = [
    {
        "name": "Software_Developer",
        "profile": "As a Software Developer for this position, you must be able to work collaboratively in a group chat environment to complete tasks assigned by a leader or colleague, primarily using Python programming expertise, excluding the need for code interpretation skills.",
    },
    {
        "name": "Data_Analyst",
        "profile": "As a Data Analyst for this position, you must be adept at analyzing data using Python, completing tasks assigned by leaders or colleagues, and collaboratively solving problems in a group chat setting with professionals of various roles. Reply 'TERMINATE' when everything is done.",
    },
    {
        "name": "Programmer",
        "profile": "As a Programmer for this position, you should be proficient in Python, able to effectively collaborate and solve problems within a group chat environment, and complete tasks assigned by leaders or colleagues without requiring expertise in code interpretation.",
    },
    {
        "name": "Accountant",
        "profile": "As an accountant in this position, one should possess a strong proficiency in accounting principles, the ability to effectively collaborate within team environments, such as group chats, to solve tasks, and have a basic understanding of Python for limited coding tasks, all while being able to follow directives from leaders and colleagues.",
    },
    {   
        "name": "Statistician",
        "profile": "As a Statistician, the applicant should possess a strong background in statistics or mathematics, proficiency in Python for data analysis, the ability to work collaboratively in a team setting through group chats, and readiness to tackle and solve tasks delegated by supervisors or peers.",
    },
    {
        "name": "IT_Specialist",
        "profile": "As an IT Specialist, you should possess strong problem-solving skills, be able to effectively collaborate within a team setting through group chats, complete tasks assigned by leaders or colleagues, and have proficiency in Python programming, excluding the need for code interpretation expertise.",
    },
    {
        "name": "Artificial_Intelligence_Engineer",
        "profile": "As an Artificial Intelligence Engineer, you should be adept in Python, able to fulfill tasks assigned by leaders or colleagues, and capable of collaboratively solving problems in a group chat with diverse professionals.",
    },
    {
        "name": "Financial_Analyst",
        "profile": "As a Financial Analyst, one must possess strong analytical and problem-solving abilities, be proficient in Python for data analysis, have excellent communication skills to collaborate effectively in group chats, and be capable of completing assignments delegated by leaders or colleagues.",
    },
    {
        "name": "Market_Analyst",
        "profile": "As a Market Analyst, one must possess strong analytical and problem-solving abilities, collect necessary financial information and aggregate them based on client's requirement. For coding tasks, only use the functions you have been provided with. Reply TERMINATE when the task is done.",
        "toolkits": [
            FinnHubUtils.get_company_profile,
            FinnHubUtils.get_company_news,
            FinnHubUtils.get_basic_financials,
            YFinanceUtils.get_stock_data,
        ],
    },
    {
        "name": "Expert_Investor",
        "profile": dedent(
            f"""
            Role: Expert Investor
            Department: Finance
            Primary Responsibility: Generation of Customized Financial Analysis Reports

            Role Description:
            As an Expert Investor within the finance domain, your expertise is harnessed to develop bespoke Financial Analysis Reports that cater to specific client requirements. This role demands a deep dive into financial statements and market data to unearth insights regarding a company's financial performance and stability. Engaging directly with clients to gather essential information and continuously refining the report with their feedback ensures the final product precisely meets their needs and expectations.

            Key Objectives:

            Analytical Precision: Employ meticulous analytical prowess to interpret financial data, identifying underlying trends and anomalies.
            Effective Communication: Simplify and effectively convey complex financial narratives, making them accessible and actionable to non-specialist audiences.
            Client Focus: Dynamically tailor reports in response to client feedback, ensuring the final analysis aligns with their strategic objectives.
            Adherence to Excellence: Maintain the highest standards of quality and integrity in report generation, following established benchmarks for analytical rigor.
            Performance Indicators:
            The efficacy of the Financial Analysis Report is measured by its utility in providing clear, actionable insights. This encompasses aiding corporate decision-making, pinpointing areas for operational enhancement, and offering a lucid evaluation of the company's financial health. Success is ultimately reflected in the report's contribution to informed investment decisions and strategic planning.

            Reply TERMINATE when everything is settled.
            """
        ),
        "toolkits": [
            FMPUtils.get_sec_report,  # Retrieve SEC report url and filing date
            IPythonUtils.display_image,  # Display image in IPython
            TextUtils.check_text_length,  # Check text length
            ReportLabUtils.build_annual_report,  # Build annual report in designed pdf format
            ReportAnalysisUtils,  # Expert Knowledge for Report Analysis
            ReportChartUtils,  # Expert Knowledge for Report Chart Plotting
        ],
    },
]
library = {d["name"]: d for d in library}

prompts.py include:

from textwrap import dedent


leader_system_message = dedent(
    """
    You are the leader of the following group members:
    
    {group_desc}
    
    As a group leader, you are responsible for coordinating the team's efforts to achieve the project's objectives. You must ensure that the team is working together effectively and efficiently. 

    - Summarize the status of the whole project progess each time you respond.
    - End your response with an order to one of your team members to progress the project, if the objective has not been achieved yet.
    - Orders should be follow the format: \"[<name of staff>] <order>\".
    - Orders need to be detailed, including necessary time period information, stock information or instruction from higher level leaders. 
    - Make only one order at a time.
    - After receiving feedback from a team member, check the results of the task, and make sure it has been well completed before proceding to th next order.

    Reply "TERMINATE" in the end when everything is done.
    """
)
role_system_message = dedent(
    """
    As a {title}, your reponsibilities are as follows:
    {responsibilities}

    Reply "TERMINATE" in the end when everything is done.
    """
)
order_template = dedent(
    """
    Follow leader's order and complete the following task with your group members:

    {order}

    For coding tasks, provide python scripts and executor will run it for you.
    Save your results or any intermediate data locally and let group leader know how to read them.
    DO NOT include "TERMINATE" in your response until you have received the results from the execution of the Python scripts.
    If the task cannot be done currently or need assistance from other members, report the reasons or requirements to group leader ended with TERMINATE. 
"""
)

utils.py include

import re
from .prompts import order_template


def instruction_trigger(sender):
    # Check if the last message contains the path to the instruction text file
    return "instruction & resources saved to" in sender.last_message()["content"]


def instruction_message(recipient, messages, sender, config):
    # Extract the path to the instruction text file from the last message
    full_order = recipient.chat_messages_for_summary(sender)[-1]["content"]
    txt_path = full_order.replace("instruction & resources saved to ", "").strip()
    with open(txt_path, "r") as f:
        instruction = f.read() + "\n\nReply TERMINATE at the end of your response."
    return instruction


def order_trigger(sender, name, pattern):
    # print(pattern)
    # print(sender.name)
    return sender.name == name and pattern in sender.last_message()["content"]


def order_message(pattern, recipient, messages, sender, config):
    full_order = recipient.chat_messages_for_summary(sender)[-1]["content"]
    pattern = rf"\[{pattern}\](?::)?\s*(.+?)(?=\n\[|$)"
    match = re.search(pattern, full_order, re.DOTALL)
    if match:
        order = match.group(1).strip()
    else:
        order = full_order
    return order_template.format(order=order)

workflow.py include:

from .agent_library import library
from typing import Any, Callable, Dict, List, Optional, Annotated
import autogen
from autogen.cache import Cache
from autogen import (
    ConversableAgent,
    AssistantAgent,
    UserProxyAgent,
    GroupChat,
    GroupChatManager,
    register_function,
)
from collections import defaultdict
from functools import partial
from abc import ABC, abstractmethod
from ..toolkits import register_toolkits
from ..functional.rag import get_rag_function
from .utils import *
from .prompts import leader_system_message, role_system_message


class FinRobot(AssistantAgent):

    def __init__(
        self,
        agent_config: str | Dict[str, Any],
        system_message: str | None = None,  # overwrites previous config
        toolkits: List[Callable | dict | type] = [],  # overwrites previous config
        proxy: UserProxyAgent | None = None,
        **kwargs,
    ):
        orig_name = ""
        if isinstance(agent_config, str):
            orig_name = agent_config
            name = orig_name.replace("_Shadow", "")
            assert name in library, f"FinRobot {name} not found in agent library."
            agent_config = library[name]

        agent_config = self._preprocess_config(agent_config)

        assert agent_config, f"agent_config is required."
        assert agent_config.get("name", ""), f"name needs to be in config."

        name = orig_name if orig_name else agent_config["name"]
        default_system_message = agent_config.get("profile", None)
        default_toolkits = agent_config.get("toolkits", [])

        system_message = system_message or default_system_message
        self.toolkits = toolkits or default_toolkits

        name = name.replace(" ", "_").strip()

        super().__init__(
            name, system_message, description=agent_config["description"], **kwargs
        )

        if proxy is not None:
            self.register_proxy(proxy)

    def _preprocess_config(self, config):

        role_prompt, leader_prompt, responsibilities = "", "", ""

        if "responsibilities" in config:
            title = config["title"] if "title" in config else config.get("name", "")
            if "name" not in config:
                config["name"] = config["title"]
            responsibilities = config["responsibilities"]
            responsibilities = (
                "\n".join([f" - {r}" for r in responsibilities])
                if isinstance(responsibilities, list)
                else responsibilities
            )
            role_prompt = role_system_message.format(
                title=title,
                responsibilities=responsibilities,
            )

        name = config.get("name", "")
        description = (
            f"Name: {name}\nResponsibility:\n{responsibilities}"
            if responsibilities
            else f"Name: {name}"
        )
        config["description"] = description.strip()

        if "group_desc" in config:
            group_desc = config["group_desc"]
            leader_prompt = leader_system_message.format(group_desc=group_desc)

        config["profile"] = (
            (role_prompt + "\n\n").strip()
            + (leader_prompt + "\n\n").strip()
            + config.get("profile", "")
        ).strip()

        return config

    def register_proxy(self, proxy):
        register_toolkits(self.toolkits, self, proxy)


class SingleAssistantBase(ABC):

    def __init__(
        self,
        agent_config: str | Dict[str, Any],
        llm_config: Dict[str, Any] = {},
    ):
        self.assistant = FinRobot(
            agent_config=agent_config,
            llm_config=llm_config,
            proxy=None,
        )

    @abstractmethod
    def chat(self):
        pass

    @abstractmethod
    def reset(self):
        pass


class SingleAssistant(SingleAssistantBase):

    def __init__(
        self,
        agent_config: str | Dict[str, Any],
        llm_config: Dict[str, Any] = {},
        is_termination_msg=lambda x: x.get("content", "")
        and x.get("content", "").endswith("TERMINATE"),
        human_input_mode="NEVER",
        max_consecutive_auto_reply=10,
        code_execution_config={
            "work_dir": "coding",
            "use_docker": False,
        },
        **kwargs,
    ):
        super().__init__(agent_config, llm_config=llm_config)
        self.user_proxy = UserProxyAgent(
            name="User_Proxy",
            is_termination_msg=is_termination_msg,
            human_input_mode=human_input_mode,
            max_consecutive_auto_reply=max_consecutive_auto_reply,
            code_execution_config=code_execution_config,
            **kwargs,
        )
        self.assistant.register_proxy(self.user_proxy)

    def chat(self, message: str, use_cache=False, **kwargs):
        with Cache.disk() as cache:
            self.user_proxy.initiate_chat(
                self.assistant,
                message=message,
                cache=cache if use_cache else None,
                **kwargs,
            )

        print("Current chat finished. Resetting agents ...")
        self.reset()

    def reset(self):
        self.user_proxy.reset()
        self.assistant.reset()


class SingleAssistantRAG(SingleAssistant):

    def __init__(
        self,
        agent_config: str | Dict[str, Any],
        llm_config: Dict[str, Any] = {},
        is_termination_msg=lambda x: x.get("content", "")
        and x.get("content", "").endswith("TERMINATE"),
        human_input_mode="NEVER",
        max_consecutive_auto_reply=10,
        code_execution_config={
            "work_dir": "coding",
            "use_docker": False,
        },
        retrieve_config={},
        rag_description="",
        **kwargs,
    ):
        super().__init__(
            agent_config,
            llm_config=llm_config,
            is_termination_msg=is_termination_msg,
            human_input_mode=human_input_mode,
            max_consecutive_auto_reply=max_consecutive_auto_reply,
            code_execution_config=code_execution_config,
            **kwargs,
        )
        assert retrieve_config, "retrieve config cannot be empty for RAG Agent."
        rag_func, rag_assistant = get_rag_function(retrieve_config, rag_description)
        self.rag_assistant = rag_assistant
        register_function(
            rag_func,
            caller=self.assistant,
            executor=self.user_proxy,
            description=rag_description if rag_description else rag_func.__doc__,
        )

    def reset(self):
        super().reset()
        self.rag_assistant.reset()


class SingleAssistantShadow(SingleAssistant):

    def __init__(
        self,
        agent_config: str | Dict[str, Any],
        llm_config: Dict[str, Any] = {},
        is_termination_msg=lambda x: x.get("content", "")
        and x.get("content", "").endswith("TERMINATE"),
        human_input_mode="NEVER",
        max_consecutive_auto_reply=10,
        code_execution_config={
            "work_dir": "coding",
            "use_docker": False,
        },
        **kwargs,
    ):
        super().__init__(
            agent_config,
            llm_config=llm_config,
            is_termination_msg=is_termination_msg,
            human_input_mode=human_input_mode,
            max_consecutive_auto_reply=max_consecutive_auto_reply,
            code_execution_config=code_execution_config,
            **kwargs,
        )
        if isinstance(agent_config, dict):
            agent_config_shadow = agent_config.copy()
            agent_config_shadow["name"] = agent_config["name"] + "_Shadow"
            agent_config_shadow["toolkits"] = []
        else:
            agent_config_shadow = agent_config + "_Shadow"

        self.assistant_shadow = FinRobot(
            agent_config,
            toolkits=[],
            llm_config=llm_config,
            proxy=None,
        )
        self.assistant.register_nested_chats(
            [
                {
                    "sender": self.assistant,
                    "recipient": self.assistant_shadow,
                    "message": instruction_message,
                    "summary_method": "last_msg",
                    "max_turns": 2,
                    "silent": True,  # mute the chat summary
                }
            ],
            trigger=instruction_trigger,
        )


"""
Multi Agent Workflows
"""


class MultiAssistantBase(ABC):

    def __init__(
        self,
        group_config: str | dict,
        agent_configs: List[
            Dict[str, Any] | str | ConversableAgent
        ] = [],  # overwrites previous config
        llm_config: Dict[str, Any] = {},
        user_proxy: UserProxyAgent | None = None,
        is_termination_msg=lambda x: x.get("content", "")
        and x.get("content", "").endswith("TERMINATE"),
        human_input_mode="NEVER",
        max_consecutive_auto_reply=10,
        code_execution_config={
            "work_dir": "coding",
            "use_docker": False,
        },
        **kwargs,
    ):
        self.group_config = group_config
        self.llm_config = llm_config
        if user_proxy is None:
            self.user_proxy = UserProxyAgent(
                name="User_Proxy",
                is_termination_msg=is_termination_msg,
                human_input_mode=human_input_mode,
                max_consecutive_auto_reply=max_consecutive_auto_reply,
                code_execution_config=code_execution_config,
                **kwargs,
            )
        else:
            self.user_proxy = user_proxy
        self.agent_configs = agent_configs or group_config.get("agents", [])
        assert self.agent_configs, f"agent_configs is required."
        self.agents = []
        self._init_agents()
        self.representative = self._get_representative()

    def _init_single_agent(self, agent_config):
        if isinstance(agent_config, ConversableAgent):
            return agent_config
        else:
            return FinRobot(
                agent_config,
                llm_config=self.llm_config,
                proxy=self.user_proxy,
            )

    def _init_agents(self):

        agent_dict = defaultdict(list)
        for c in self.agent_configs:
            agent = self._init_single_agent(c)
            agent_dict[agent.name].append(agent)

        # add index indicator for duplicate name/title
        for name, agent_list in agent_dict.items():
            if len(agent_list) == 1:
                self.agents.append(agent_list[0])
                continue
            for idx, agent in enumerate(agent_list):
                agent._name = f"{name}_{idx+1}"
                self.agents.append(agent)

    @abstractmethod
    def _get_representative(self) -> ConversableAgent:
        pass

    def chat(self, message: str, use_cache=False, **kwargs):
        with Cache.disk() as cache:
            self.user_proxy.initiate_chat(
                self.representative,
                message=message,
                cache=cache if use_cache else None,
                **kwargs,
            )
        print("Current chat finished. Resetting agents ...")
        self.reset()

    def reset(self):
        self.user_proxy.reset()
        self.representative.reset()
        for agent in self.agents:
            agent.reset()


class MultiAssistant(MultiAssistantBase):
    """
    Group Chat Workflow with multiple agents.
    """

    def _get_representative(self):

        def custom_speaker_selection_func(
            last_speaker: autogen.Agent, groupchat: autogen.GroupChat
        ):
            """Define a customized speaker selection function.
            A recommended way is to define a transition for each speaker in the groupchat.

            Returns:
                Return an `Agent` class or a string from ['auto', 'manual', 'random', 'round_robin'] to select a default method to use.
            """
            messages = groupchat.messages
            if len(messages) <= 1:
                return groupchat.agents[0]
            if last_speaker is self.user_proxy:
                return groupchat.agent_by_name(messages[-2]["name"])
            elif "tool_calls" in messages[-1] or messages[-1]["content"].endswith(
                "TERMINATE"
            ):
                return self.user_proxy
            else:
                return groupchat.next_agent(last_speaker, groupchat.agents[:-1])

        self.group_chat = GroupChat(
            self.agents + [self.user_proxy],
            messages=[],
            speaker_selection_method=custom_speaker_selection_func,
            send_introductions=True,
        )
        manager_name = (self.group_config.get("name", "") + "_chat_manager").strip("_")
        manager = GroupChatManager(
            self.group_chat, name=manager_name, llm_config=self.llm_config
        )
        return manager


class MultiAssistantWithLeader(MultiAssistantBase):
    """
    Leader based Workflow with multiple agents connected to a leader agent through nested chats.

    Group config has to follow the following structure:
    {
        "leader": {
            "title": "Leader Title",
            "responsibilities": ["responsibility 1", "responsibility 2"]
        },
        "agents": [
            {
                "title": "Employee Title",
                "responsibilities": ["responsibility 1", "responsibility 2"]
            }, ...
        ]
    }
    """

    def _get_representative(self):

        assert (
            "leader" in self.group_config and "agents" in self.group_config
        ), "Leader and Agents has to be explicitly defined in config."

        assert (
            self.agent_configs
        ), "At least one agent has to be defined in the group config."

        # We consider only two situations for now: all same name / title or all different
        need_suffix = (
            len(set([c["title"] for c in self.agent_configs if isinstance(c, dict)]))
            == 1
        )

        group_desc = ""
        for i, c in enumerate(self.agent_configs):
            if isinstance(c, ConversableAgent):
                group_desc += c.description + "\n\n"
            else:
                name = c["title"] if "title" in c else c.get("name", "")
                name = name.replace(" ", "_").strip() + (
                    f"_{i+1}" if need_suffix else ""
                )
                responsibilities = (
                    "\n".join([f" - {r}" for r in c.get("responsibilities", [])]),
                )
                group_desc += f"Name: {name}\nResponsibility:\n{responsibilities}\n\n"

        self.leader_config = self.group_config["leader"]
        self.leader_config["group_desc"] = group_desc.strip()

        # Initialize Leader
        leader = self._init_single_agent(self.leader_config)

        # Register Leader - Agents connections
        for agent in self.agents:
            self.user_proxy.register_nested_chats(
                [
                    {
                        "sender": self.user_proxy,
                        "recipient": agent,
                        "message": partial(order_message, agent.name),
                        "summary_method": "reflection_with_llm",
                        "max_turns": 10,
                        "max_consecutive_auto_reply": 3,
                    }
                ],
                trigger=partial(
                    order_trigger, name=leader.name, pattern=f"[{agent.name}]"
                ),
            )
        return leader

in functional folder, there are analyzer.py, charting.py, coding.py, quantitative.py, rag.py, ragquery.py, reportlab.py, text.py.

What’s key is the coding.py, where the execution of python is defined as

import os
from typing_extensions import Annotated
from IPython import get_ipython

default_path = "coding/"


class IPythonUtils:

    def exec_python(cell: Annotated[str, "Valid Python cell to execute."]) -> str:
        """
        run cell in ipython and return the execution result.
        """
        ipython = get_ipython()
        result = ipython.run_cell(cell)
        log = str(result.result)
        if result.error_before_exec is not None:
            log += f"\n{result.error_before_exec}"
        if result.error_in_exec is not None:
            log += f"\n{result.error_in_exec}"
        return log

    def display_image(
        image_path: Annotated[str, "Path to image file to display."]
    ) -> str:
        """
        Display image in Jupyter Notebook.
        """
        log = __class__.exec_python(
            f"from IPython.display import Image, display\n\ndisplay(Image(filename='{image_path}'))"
        )
        if not log:
            return "Image displayed successfully"
        else:
            return log


class CodingUtils:  # Borrowed from https://microsoft.github.io/autogen/docs/notebooks/agentchat_function_call_code_writing

    def list_dir(directory: Annotated[str, "Directory to check."]) -> str:
        """
        List files in choosen directory.
        """
        files = os.listdir(default_path + directory)
        return str(files)

    def see_file(filename: Annotated[str, "Name and path of file to check."]) -> str:
        """
        Check the contents of a chosen file.
        """
        with open(default_path + filename, "r") as file:
            lines = file.readlines()
        formatted_lines = [f"{i+1}:{line}" for i, line in enumerate(lines)]
        file_contents = "".join(formatted_lines)

        return file_contents

    def modify_code(
        filename: Annotated[str, "Name and path of file to change."],
        start_line: Annotated[int, "Start line number to replace with new code."],
        end_line: Annotated[int, "End line number to replace with new code."],
        new_code: Annotated[
            str,
            "New piece of code to replace old code with. Remember about providing indents.",
        ],
    ) -> str:
        """
        Replace old piece of code with new one. Proper indentation is important.
        """
        with open(default_path + filename, "r+") as file:
            file_contents = file.readlines()
            file_contents[start_line - 1 : end_line] = [new_code + "\n"]
            file.seek(0)
            file.truncate()
            file.write("".join(file_contents))
        return "Code modified"

    def create_file_with_code(
        filename: Annotated[str, "Name and path of file to create."],
        code: Annotated[str, "Code to write in the file."],
    ) -> str:
        """
        Create a new file with provided code.
        """
        directory = os.path.dirname(default_path + filename)
        os.makedirs(directory, exist_ok=True)
        with open(default_path + filename, "w") as file:
            file.write(code)
        return "File created successfully"

Now to see how these auto agents lined up in place to work: imm_agent_mplfinance.ipynb, which set up a multimodal agent with GPT-4v using the MultimodalConversableAgent provided in AugoGen. for this task, several agents are needed:
a normal llm agent as data provider who call charting function and provide instructions for multimodal agent;
a multimodal agent as market analyst who extract the information from the chart and analyze the future trend of the stock;
a user proxy to execute the python functions and control the conversation.

Next step is to follow the thread to create a simpler QC agent.

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.