Architecture of a Multi-Index Agent

After a successful single-index agent creation, I am pondering about creating a multi-index agent, the benefits are obvious:

  1. Scalability: Add new indexes without changing core code
  2. Maintainability: Each index config is isolated
  3. Flexibility: Easy to modify methodology for one index without affecting others
  4. Testability: Test each index independently
  5. Reusability: Share common logic (QC, processing rules, weighting strategies)
  6. Version Control: Track methodology changes in config files
  7. User-Friendly: Users select which index to run from UI
  8. Audit Trail: Track which config was used for each run

It also comes with the following:

  • More complex initial setup
  • May be over-engineered if only 2-3 indexes
  • Requires good documentation
  • Config files need to be well-maintained

The steps are

  1. Configuration-Driven Methodology System: Instead of hard-coding index rules, use configuration files:

config/
├── indexes/
│ ├── ev_index.yaml # Current EV Index
│ ├── renewable_energy_index.yaml
│ ├── tech_index.yaml
│ └── …
├── processing_rules/
│ ├── ev_filters.py
│ ├── renewable_filters.py
│ └── base_filters.py
└── weighting_strategies/
├── geographic_split.py
├── market_cap_weighted.py
└── equal_weighted.py

2. Abstract Base Classes Pattern, to create a hierarchy of abstract classes:

  # index_base.py
  from abc import ABC, abstractmethod

  class BaseIndex(ABC):
      """Base class for all indexes"""

      def __init__(self, config_path: str):
          self.config = self.load_config(config_path)
          self.processor = self.get_processor()
          self.weighting_engine = self.get_weighting_engine()
          self.qc_tools = self.get_qc_tools()

      @abstractmethod
      def load_config(self, config_path: str) -> dict:
          """Load index-specific configuration"""
          pass

      @abstractmethod
      def get_processor(self) -> 'BaseProcessor':
          """Return appropriate data processor"""
          pass

      @abstractmethod
      def get_weighting_engine(self) -> 'BaseWeighting':
          """Return appropriate weighting engine"""
          pass

      @abstractmethod
      def get_qc_tools(self) -> 'BaseQC':
          """Return appropriate QC tools"""
          pass

      def execute_query(self, query_date: str):
          """Common execution logic"""
          # 1. Run main SQL query
          raw_df = self.run_main_query(query_date)

          # 2. Process data using processor
          processed = self.processor.process_data(raw_df)

          # 3. Select components
          final_index = self.processor.apply_selection(processed)

          # 4. Apply weighting
          weighted_index = self.weighting_engine.apply_weighting(final_index)

          # 5. Run QC
          qc_results = self.qc_tools.run_checks(weighted_index)

          # 6. Generate additional tabs
          additional_tabs = self.generate_additional_tabs(final_index, query_date)

          return {
              'raw': raw_df,
              'processed': processed,
              'final_index': final_index,
              'weighted_index': weighted_index,
              'qc_results': qc_results,
              **additional_tabs
          }


  class BaseProcessor(ABC):
      """Base class for data processing"""

      @abstractmethod
      def process_data(self, df: pd.DataFrame) -> pd.DataFrame:
          """Apply processing rules"""
          pass

      @abstractmethod
      def apply_selection(self, df: pd.DataFrame) -> pd.DataFrame:
          """Select index components"""
          pass


  class BaseWeighting(ABC):
      """Base class for weighting strategies"""

      @abstractmethod
      def apply_weighting(self, df: pd.DataFrame) -> pd.DataFrame:
          """Apply weighting methodology"""
          pass


  class BaseQC(ABC):
      """Base class for QC checks"""

      @abstractmethod
      def run_checks(self, df: pd.DataFrame) -> dict:
          """Run QC checks"""
          pass

3. Index Registry / Factory Pattern to centralize index management

 # index_registry.py
  class IndexRegistry:
      """Registry for all available indexes"""

      _indexes = {}

      @classmethod
      def register(cls, index_code: str, index_class: type):
          """Register a new index"""
          cls._indexes[index_code] = index_class

      @classmethod
      def get_index(cls, index_code: str, config_path: str = None):
          """Get index instance by code"""
          if index_code not in cls._indexes:
              raise ValueError(f"Index {index_code} not registered")

          if config_path is None:
              config_path = f"config/indexes/{index_code.lower()}.yaml"

          return cls._indexes[index_code](config_path)

      @classmethod
      def list_indexes(cls) -> list:
          """List all registered indexes"""
          return list(cls._indexes.keys())


  # Register indexes
  IndexRegistry.register("EV_IDX", EVIndex)
  IndexRegistry.register("RENEWABLE_IDX", RenewableEnergyIndex)
  IndexRegistry.register("TECH_IDX", TechIndex)

4. Modular Processing Rules, make the rules pluggable

 # processing_rules.py
  class ProcessingRule(ABC):
      """Base class for processing rules"""

      @abstractmethod
      def apply(self, df: pd.DataFrame) -> pd.DataFrame:
          pass

      @abstractmethod
      def get_description(self) -> str:
          pass


  class RemoveEntityRule(ProcessingRule):
      """Remove specific entities"""

      def __init__(self, entity_ids: list):
          self.entity_ids = entity_ids

      def apply(self, df: pd.DataFrame) -> pd.DataFrame:
          return df[~df['factset_entity_id'].isin(self.entity_ids)]

      def get_description(self) -> str:
          return f"Remove entities: {', '.join(self.entity_ids)}"


  class MinimumThresholdRule(ProcessingRule):
      """Apply minimum thresholds"""

      def __init__(self, thresholds: dict):
          self.thresholds = thresholds

      def apply(self, df: pd.DataFrame) -> pd.DataFrame:
          for field, min_value in self.thresholds.items():
              df = df[df[field] >= min_value]
          return df


  # Build processing pipeline from config
  def build_processor_from_config(config: dict) -> BaseProcessor:
      rules = []
      for rule_config in config['processing_rules']:
          rule_type = rule_config['name']
          rule = RuleFactory.create(rule_type, rule_config)
          rules.append(rule)

      return ConfigurableProcessor(rules)

5. Generic QC Framework, QC checks with index-specific parameters:

# qc_framework.py
  class QCCheck(ABC):
      """Base class for QC checks"""

      @abstractmethod
      def run(self, df: pd.DataFrame) -> dict:
          """Run the check"""
          pass


  class TotalWeightCheck(QCCheck):
      """Check total weight equals target"""

      def __init__(self, target: float = 1.0, tolerance: float = 1e-6):
          self.target = target
          self.tolerance = tolerance

      def run(self, df: pd.DataFrame) -> dict:
          actual = df['final_weight'].sum()
          passed = abs(actual - self.target) < self.tolerance

          return {
              'name': 'Total Weight',
              'passed': passed,
              'expected': self.target,
              'actual': actual,
              'deviation': abs(actual - self.target)
          }


  class ComponentCountCheck(QCCheck):
      """Check component count"""

      def __init__(self, expected_count: int):
          self.expected_count = expected_count

      def run(self, df: pd.DataFrame) -> dict:
          actual = len(df)
          passed = actual == self.expected_count

          return {
              'name': 'Component Count',
              'passed': passed,
              'expected': self.expected_count,
              'actual': actual
          }


  # Build QC suite from config
  def build_qc_from_config(config: dict) -> BaseQC:
      checks = []
      for check_name, params in config['qc']['checks'].items():
          check = QCCheckFactory.create(check_name, params)
          checks.append(check)

      return ConfigurableQC(checks)

6. Modified Agent Architecture

  # generic_agent.py
  class GenericIndexAgent:
      """Generic agent for all indexes"""

      def __init__(self):
          self.api_key = os.getenv('ANTHROPIC_API_KEY')
          self.client = anthropic.Anthropic(api_key=self.api_key)

          # Current index (set by user)
          self.current_index = None
          self.current_index_code = None

          # Available indexes
          self.available_indexes = IndexRegistry.list_indexes()

      def set_index(self, index_code: str):
          """Set the active index"""
          self.current_index = IndexRegistry.get_index(index_code)
          self.current_index_code = index_code
          logger.info(f"Switched to index: {index_code}")

      def execute_query(self, query_date: str = None):
          """Execute query for current index"""
          if self.current_index is None:
              raise ValueError("No index selected. Use set_index() first.")

          return self.current_index.execute_query(query_date)

      def list_available_indexes(self) -> list:
          """List all available indexes"""
          return [
              {
                  'code': code,
                  'name': IndexRegistry.get_index(code).config['index']['name'],
                  'description': IndexRegistry.get_index(code).config['index']['description']
              }
              for code in self.available_indexes
          ]

7. Update UI

 # app.py additions
  def display_index_selector():
      """Display index selection dropdown"""
      with st.sidebar:
          st.markdown("### Select Index")

          indexes = st.session_state.agent.list_available_indexes()

          index_options = {
              idx['name']: idx['code']
              for idx in indexes
          }

          selected_index_name = st.selectbox(
              "Choose index to run",
              options=list(index_options.keys()),
              help="Select which index methodology to use"
          )

          selected_code = index_options[selected_index_name]

          if st.session_state.get('current_index_code') != selected_code:
              st.session_state.agent.set_index(selected_code)
              st.session_state.current_index_code = selected_code
              st.success(f"Switched to: {selected_index_name}")
              st.rerun()

8. Database-Backed configuration, it’s official then store all configs in database

 -- Index configuration table
  CREATE TABLE index_configurations (
      index_code VARCHAR(50) PRIMARY KEY,
      index_name VARCHAR(255),
      description TEXT,
      config_json TEXT,  -- Store YAML/JSON config
      active BOOLEAN DEFAULT TRUE,
      created_date DATE,
      last_modified DATE
  );

  -- Index run history
  CREATE TABLE index_run_history (
      run_id BIGINT PRIMARY KEY,
      index_code VARCHAR(50),
      query_date DATE,
      run_timestamp TIMESTAMP,
      components_count INT,
      qc_passed BOOLEAN,
      output_file_path VARCHAR(500)
  );

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.