Source code for mcp_ohmy_sql.config.define

# -*- coding: utf-8 -*-

"""
mcp_ohmy_sql Configuration System

This module defines the configuration system for the mcp_ohmy_sql MCP server, providing
a flexible JSON-based configuration that supports multiple databases and schemas.

The configuration system is built on Pydantic models for validation and type safety,
ensuring robust configuration handling for production deployments.
"""

import typing as T
import json
from pathlib import Path
from functools import cached_property

from pydantic import BaseModel, Field, field_validator

from ..constants import DbTypeEnum
from ..lazy_import import sa


[docs] class Settings(BaseModel): """ Global settings for the MCP server. This class is currently empty but reserved for future global configuration options such as query timeout limits, result size limits, logging levels, and other server-wide settings. Example: In JSON configuration:: { "settings": {} } """
# enable_cache_for_schema: bool = Field(default=False) # cache_for_schema_expires: int = Field(default=3600) # enable_cache_for_query: bool = Field(default=False) # cache_for_query_expires: int = Field(default=600)
[docs] class TableFilter(BaseModel): """ Table filtering configuration for database schemas. Provides include/exclude patterns to control which tables are accessible through the MCP server. Supports wildcards for flexible filtering. :param include: List of table names or patterns to include. If empty, includes all tables not in exclude list. Supports wildcards with '*'. :param exclude: List of table names or patterns to exclude. Supports wildcards with '*'. Applied after include filtering. **Examples**: Include specific tables only:: { "include": ["users", "orders", "products"], "exclude": [] } Exclude system and temporary tables:: { "include": [], "exclude": ["pg_*", "information_schema", "tmp_*", "_backup_*"] } Mixed filtering:: { "include": ["sales_*", "customer_*"], "exclude": ["*_temp", "*_staging"] } .. note:: When both include and exclude are specified, tables must be in the include list AND not in the exclude list to be accessible. """ include: list[str] = Field( default_factory=list, description="List of table names or patterns to include (supports wildcards)", ) exclude: list[str] = Field( default_factory=list, description="List of table names or patterns to exclude (supports wildcards)", )
[docs] class Schema(BaseModel): """ Database schema configuration. Defines a specific schema within a database and its table filtering rules. Each database can have multiple schemas, allowing fine-grained control over which parts of the database are accessible. :param name: Schema name. If None, uses the database's default schema. Some databases (like SQLite) don't have explicit schemas. :param table_filter: :class:`TableFilter` rules for this schema. **Examples**: Default schema with filtering:: { "table_filter": { "exclude": ["_migrations", "temp_*"] } } Named schema with specific tables:: { "name": "reporting", "table_filter": { "include": ["sales_summary", "customer_metrics"], "exclude": [] } } Multiple schemas for different purposes:: [ { "name": "public", "table_filter": {"exclude": ["audit_*"]} }, { "name": "analytics", "table_filter": {"include": ["fact_*", "dim_*"]} } ] """ name: T.Optional[str] = Field( default=None, description="Schema name. If None, uses database default schema" ) table_filter: TableFilter = Field( default_factory=TableFilter, description="Table filtering rules for this schema" )
from .sqlalchemy import SqlalchemyConnection from .aws_redshift import AWSRedshiftConnection T_CONNECTION = T.Union[ SqlalchemyConnection, AWSRedshiftConnection, ]
[docs] class Database(BaseModel): """ Database configuration definition. Represents a single database connection with its schemas and access rules. Each database must have a unique identifier and can contain multiple schemas with different filtering rules. :param identifier: Unique identifier for this database. Used in MCP tools to reference specific databases. Must be unique across all databases in the configuration. :param description: Human-readable description of the database purpose or contents. Useful for documentation and understanding the database role. :param db_type: Database type identifier (e.g., 'sqlite', 'postgres', 'mysql', 'aws_redshift'). Must match a valid :class:`~mcp_ohmy_sql.constants.DbTypeEnum` value. :param connection: Database connection configuration. The specific type depends on the database type ( :class:`~mcp_ohmy_sql.config.sqlalchemy.SqlalchemyConnection`, :class:`~mcp_ohmy_sql.config.aws_redshift.AWSRedshiftConnection`, etc.). :param schemas: List of :class:`Schema` configurations for this database. Each schema can have its own table filtering rules. **Examples**: SQLite database:: { "identifier": "app_db", "description": "Main application database", "db_type": "sqlite", "connection": { ... }, "schemas": [ { "name": null, "table_filter": { "exclude": ["migrations", "temp_*"] } } ] } .. note:: The connection field uses Pydantic's `discriminator feature <https://docs.pydantic.dev/latest/concepts/unions/#discriminated-unions>`_ to automatically select the appropriate connection type based on the "type" field in the connection configuration. """ identifier: str = Field(description="Unique identifier for this database") description: str = Field( default="", description="Human-readable description of the database" ) db_type: str = Field(description="Database type (must match DbTypeEnum values)") connection: T_CONNECTION = Field( discriminator="type", description="Database connection configuration" ) schemas: list[Schema] = Field( description="List of schema configurations for this database" )
[docs] @field_validator("db_type", mode="after") @classmethod def check_name(cls, value: str) -> str: # pragma: no cover """ Validate the db_type field. """ if DbTypeEnum.is_valid_value(value) is False: raise ValueError(f"{value} is not a valid value of {DbTypeEnum}") return value
@property def db_type_enum(self) -> DbTypeEnum: """ Get the database type as an :class:`~mcp_ohmy_sql.constants.DbTypeEnum`. """ return DbTypeEnum.get_by_value(self.db_type) @cached_property def schemas_mapping(self) -> dict[str, Schema]: """ Create a mapping of schema names to Schema objects. """ mapping = {schema.name: schema for schema in self.schemas} if len(mapping) != len(self.schemas): raise ValueError( "Duplicate schema names found in database configuration! " "Each schema name must be unique." ) return mapping @cached_property def sa_metadata(self) -> "sa.MetaData": """ Create SQLAlchemy metadata for this database. """ metadata = sa.MetaData() for schema in self.schemas: metadata.reflect( self.connection.sa_engine, schema=schema.name, views=True, ) return metadata
[docs] class Config(BaseModel): """ Root configuration object for the mcp_ohmy_sql MCP server. This is the main configuration class that contains all settings, database connections, and server configuration. It provides methods for loading and validating configuration from JSON files. :param version: Configuration schema version. Currently must be "0.1.1". Used for backward compatibility and migration handling. :param settings: Global server :class:`Settings`. For features like query timeouts, result limits, etc. :param databases: List of :class:`Database` configurations. Each database must have a unique identifier and can contain multiple schemas. **Configuration File Structure**: The JSON configuration file should follow this structure:: { "version": "0.1.1", "settings": {...}, "databases": [ { "identifier": "my first database", ... } { "identifier": "my second database", ... } ] } **Usage**: Load from environment variable:: import os from pathlib import Path config_path = Path(os.environ["MCP_OHMY_SQL_CONFIG"]) config = Config.load(config_path) Access databases:: # Get all databases for db in config.databases: print(f"Database: {db.identifier}") # Get specific database db = config.databases_mapping["my_db"] # Get database schemas for schema in db.schemas: print(f"Schema: {schema.name}") **Validation**: The configuration is validated when loaded using Pydantic. Common validation errors include: - Missing required fields (version, databases) - Invalid version number - Duplicate database identifiers - Invalid database types - Invalid connection configurations **Environment Loading**: The typical usage pattern is to load configuration from an environment variable that points to the JSON configuration file:: export MCP_OHMY_SQL_CONFIG=/path/to/config.json This allows different configurations for different environments (development, staging, production) without code changes. **Troubleshooting**: Common configuration issues: 1. **File not found**: Check MCP_OHMY_SQL_CONFIG environment variable 2. **JSON syntax errors**: Validate JSON with a JSON linter 3. **Validation errors**: Check field names and types match the schema 4. **Connection errors**: Verify database URLs and credentials 5. **Permission errors**: Ensure file is readable by the process """ version: str = Field(description="Configuration schema version (currently '0.1.1')") settings: Settings = Field( default_factory=Settings, description="Global server settings" ) databases: list[Database] = Field(description="List of database configurations")
[docs] @classmethod def load(cls, path: Path) -> "Config": """ Load configuration from a JSON file. Reads and parses a JSON configuration file, validates it against the configuration schema, and returns a Config object. Provides detailed error messages for common configuration problems. :paaram path: Path to the JSON configuration file. Must be readable by the current process. :returns: :class:`Config` Validated configuration object ready for use. :raises: If file cannot be read, JSON is invalid, or validation fails. Error messages include specific details about the failure to help with troubleshooting. This method performs three validation steps: 1. File system access (can read the file) 2. JSON parsing (valid JSON syntax) 3. Schema validation (matches expected structure and types) """ try: s = path.read_text() except Exception as e: # pragma: no cover raise Exception( f"Failed to read configuration content from {path}! Error: {e!r}" ) try: dct = json.loads(s) except Exception as e: # pragma: no cover raise Exception( f"Failed to load configuration from {path}! Check your JSON content! Error: {e!r}" ) try: config = cls(**dct) except Exception as e: # pragma: no cover raise Exception( f"Configuration Validation failed! Check your JSON content! Error: {e!r}" ) return config
@cached_property def databases_mapping(self) -> dict[str, Database]: """ Create a mapping of database identifiers to Database objects. """ mapping = {db.identifier: db for db in self.databases} if len(mapping) != len(self.databases): raise ValueError( "Duplicate database identifiers found in configuration! " "Each database identifier must be unique." ) return mapping