[docs]classAwsRedshiftConnectionMethodEnum(BetterStrEnum):""" Supported connection methods for AWS Redshift. """redshift_connector="redshift_connector"sqlalchemy="sqlalchemy"
[docs]classAWSRedshiftConnection(BaseConnection):""" Configures AWS Redshift connections for data warehouse access. Provides multiple connection methods for different Redshift deployment types: :param type: DO NOT set this field manually, it is automatically set to "aws_redshift". :param method: Connection library to use - "redshift_connector" or "sqlalchemy" **Direct Connection Parameters (username/password authentication):** :param host: Redshift cluster endpoint hostname :param port: Redshift cluster port (usually 5439) :param database: Target database name :param username: Database username :param password: Database password **IAM-based Authentication for Redshift Cluster:** :param cluster_identifier: Redshift cluster identifier for IAM authentication :param database: Target database name :param boto_session_kwargs: AWS credentials and session configuration **IAM-based Authentication for Redshift Serverless:** :param namespace_name: Redshift Serverless namespace name :param workgroup_name: Redshift Serverless workgroup name :param boto_session_kwargs: AWS credentials and session configuration **Additional Configuration:** :param redshift_connector_kwargs: Additional parameters for the redshift-connector library """# fmt: offtype:T.Literal["aws_redshift"]=Field(default=ConnectionTypeEnum.AWS_REDSHIFT.value)method:str=Field()host:T.Optional[str]=Field(default=None)port:T.Optional[int]=Field(default=None)database:T.Optional[str]=Field(default=None)username:T.Optional[str]=Field(default=None)password:T.Optional[str]=Field(default=None)cluster_identifier:T.Optional[str]=Field(default=None)namespace_name:T.Optional[str]=Field(default=None)workgroup_name:T.Optional[str]=Field(default=None)boto_session_kwargs:T.Optional["BotoSessionKwargs"]=Field(default=None)redshift_connector_kwargs:T.Optional[dict[str,T.Any]]=Field(default=None)# fmt: on@field_validator("method",mode="after")@classmethoddefcheck_method(cls,value:str)->str:# pragma: no coverifAwsRedshiftConnectionMethodEnum.is_valid_value(value)isFalse:raiseValueError(f"{value} is not a valid value of {AwsRedshiftConnectionMethodEnum}")returnvalue@cached_propertydefbsm(self)->"BotoSesManager":returnself.boto_session_kwargs.get_bsm()@cached_propertydef_use_what(self)->T.Literal["redshift_connector","sqlalchemy"]:raiseNotImplementedError
[docs]defget_rs_conn(self)->"redshift_connector.Connection":""" Returns a Redshift connection object using the redshift_connector library. """returnredshift_connector.connect(**self.redshift_connector_kwargs,)
@cached_propertydefrs_conn(self)->"redshift_connector.Connection":""" Returns a cached Redshift connection object using the redshift_connector library. """returnself.get_rs_conn()
[docs]defget_sa_engine(self)->"sa.Engine":""" Returns a SQLAlchemy engine for connecting to AWS Redshift. """if((self.hostisnotNone)and(self.portisnotNone)and(self.databaseisnotNone)and(self.usernameisnotNone)and(self.passwordisnotNone)):params=aws_rs.RedshiftClusterConnectionParams(host=self.host,port=self.port,database=self.database,username=self.username,password=self.password,)returnparams.get_engine()# Redshift Cluster with IAMif((self.cluster_identifierisnotNone)and(self.databaseisnotNone)and(self.boto_session_kwargsisnotNone)):params=aws_rs.RedshiftClusterConnectionParams.new(redshift_client=self.bsm.redshift_client,cluster_identifier=self.cluster_identifier,db_name=self.database,)returnparams.get_engine()# Redshift Serverless with IAMif((self.namespace_nameisnotNone)and(self.workgroup_nameisnotNone)and(self.boto_session_kwargsisnotNone)):params=aws_rs.RedshiftServerlessConnectionParams.new(redshift_serverless_client=self.bsm.redshiftserverless_client,namespace_name=self.namespace_name,workgroup_name=self.workgroup_name,)returnparams.get_engine()raiseValueError("Cannot create SQLAlchemy engine for AWS Redshift")
@cached_propertydefsa_engine(self)->"sa.Engine":""" Returns a cached SQLAlchemy engine for connecting to AWS Redshift. """returnself.get_sa_engine()