# -*- coding: utf-8 -*-importtypingasTfromtabulateimporttabulatefrom..lazy_importimportsa,sa_exctry:# pragma: no coverfromrichimportprintasrprintexceptImportError:# pragma: no coverpass
[docs]defformat_result(result:T.Union["sa.CursorResult","sa.Result"],)->str:""" Format SQL query result into a Markdown table. .. note:: Markdown tables are the optimal format for presenting SQL query results to LLMs, offering the best combination of token efficiency, comprehension, and maintainability. - Token Efficiency: Uses 24% fewer tokens than JSON, reducing API costs and fitting more data within context limits - Natural LLM Comprehension: Aligns with LLM training data patterns, enabling better understanding compared to nested JSON/XML structures - Balanced Readability: Maintains both machine parsability and human readability for seamless debugging and maintenance """records=result.fetchall()iflen(records)==0:return"No result"rows=list()columns=result.keys()rows.append(columns)forrecordinrecords:rows.append(list(record))text=tabulate(rows,headers="firstrow",tablefmt="pipe",floatfmt=".4f",)returntext
[docs]defensure_valid_select_query(query:str):""" Ensure the query is a valid SELECT statement. """ifquery.upper().strip().startswith("SELECT ")isFalse:raiseValueError("Invalid query: must start with 'SELECT '")
[docs]defexecute_count_query(engine:"sa.Engine",query:str,params:T.Optional[dict[str,T.Any]]=None,)->int:""" Executes a SQL SELECT query and returns the count of rows. TODO: this function is used in query optimizer, we are not using it yet. """ensure_valid_select_query(query)query=query.strip()# use engine.dialect.name is the most reliable way to detect database typeifengine.dialect.name=="sqlite":ifquery.endswith(";"):query=query[:-1].strip()count_query=f"SELECT COUNT(*) FROM ({query}) AS subquery"count_stmt=sa.text(count_query)else:# pragma: no coverraw_stmt=sa.text(query)subq=raw_stmt.subquery("anon_subq")# anonymous subquerycount_stmt=sa.select(sa.func.count()).select_from(subq)withengine.connect()asconnection:result=connection.execute(count_stmt,params)count=result.fetchone()[0]returncount
[docs]defexecute_select_query(engine:sa.Engine,query:str,params:T.Optional[dict[str,T.Any]]=None,)->str:""" Executes a SQL SELECT query and returns the result formatted as a Markdown table. """try:ensure_valid_select_query(query)exceptValueErrorase:# pragma: no coverreturnf"Error: {e}"stmt=sa.text(query)withengine.connect()asconnection:try:result=connection.execute(stmt,params)exceptsa_exc.OperationalErrorase:# pragma: no coverreturnf"Error executing query: {e._message()}"exceptExceptionase:# pragma: no coverreturnf"Error executing query: {e}"try:text=format_result(result)exceptExceptionase:# pragma: no coverreturnf"Error formatting result: {e}"returntext