from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import mcp
import httpx
from typing import Optional
app = FastAPI()
# Configuration
MCP_ENDPOINT = "<<MCP_ENDPOINT>>"
CORVIC_API_TOKEN = "<<YOUR_CORVIC_API_TOKEN>>"
headers = {
"Authorization": f"Bearer {CORVIC_API_TOKEN}",
"Content-Type": "application/json"
}
class QueryRequest(BaseModel):
query: str
response_url: Optional[str] = None
class QueryResponse(BaseModel):
answer: str
status: str
async def process_query_and_respond(query: str, response_url: Optional[str] = None):
"""Process query with Corvic and optionally send to webhook."""
try:
# Query Corvic agent
async with mcp.ClientSession(
transport=mcp.SSEClientTransport(MCP_ENDPOINT, headers=headers)
) as session:
await session.initialize()
query_result = await session.call_tool(
"query",
arguments={"query_content": query}
)
answer = query_result.content[0].text if query_result.content else ""
# If response_url is provided, send async response
if response_url:
async with httpx.AsyncClient() as client:
await client.post(
response_url,
json={"text": answer},
timeout=30.0
)
return {
"answer": answer,
"status": "success"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/query", response_model=QueryResponse)
async def slack_handler(request: QueryRequest):
"""
General-purpose endpoint for querying Corvic agent.
Can be used with Slack, Discord, or any HTTP client.
"""
result = await process_query_and_respond(
query=request.query,
response_url=request.response_url
)
return QueryResponse(**result)
@app.post("/slack/command")
async def slack_command(request: Request):
"""
Slack slash command handler.
Expects Slack's form-encoded data.
"""
form_data = await request.form()
query = form_data.get("text", "")
response_url = form_data.get("response_url")
if not query:
return {"text": "Please provide a query."}
# Process asynchronously for Slack
if response_url:
# Return immediate acknowledgment
# Process query in background
import asyncio
asyncio.create_task(
process_query_and_respond(query, response_url)
)
return {"text": "Processing your query..."}
else:
# Synchronous response
result = await process_query_and_respond(query)
return {"text": result["answer"]}
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}