import mcp
from crewai import Agent, Task, Crew
from crewai_tools import SerperDevTool, FileReadTool, FileWriteTool
# Configuration
MCP_ENDPOINT = "<<MCP_ENDPOINT>>"
CORVIC_API_TOKEN = "<<YOUR_CORVIC_API_TOKEN>>"
headers = {
"Authorization": f"Bearer {CORVIC_API_TOKEN}",
"Content-Type": "application/json"
}
# Create Corvic tool for CrewAI
async def query_corvic(query: str) -> str:
"""Query Corvic agent using MCP."""
async with mcp.ClientSession(
transport=mcp.SSEClientTransport(MCP_ENDPOINT, headers=headers)
) as session:
await session.initialize()
query_result = await session.call_tool(
"query",
arguments={"query_content": query}
)
return query_result.content[0].text if query_result.content else ""
# Create CrewAI tools
serper_tool = SerperDevTool()
file_write_tool = FileWriteTool()
# Create analyst agent
analyst = Agent(
role='Data Analyst',
goal='Analyze video game sales data using Corvic',
backstory='Expert in analyzing structured data using Corvic AI',
tools=[], # Corvic tool will be called directly
verbose=True
)
# Create researcher agent
researcher = Agent(
role='Content Researcher',
goal='Find relevant content about top game genres',
backstory='Expert in researching gaming industry trends',
tools=[serper_tool],
verbose=True
)
# Create writer agent
writer = Agent(
role='Technical Writer',
goal='Write comprehensive reports',
backstory='Expert technical writer',
tools=[file_write_tool],
verbose=True
)
# Create tasks
analysis_task = Task(
description='Group all the data by Genre and find the top titles by global sales. Provide the top 1 genre in a tabular format.',
agent=analyst,
expected_output='A tabular summary of the top genre by global sales'
)
research_task = Task(
description='Search for content related to the top genre identified in the analysis',
agent=researcher,
expected_output='Curated content recommendations'
)
write_task = Task(
description='Write the analysis results and research findings to results_with_content.md',
agent=writer,
expected_output='A markdown file with the complete analysis'
)
# Create crew
crew = Crew(
agents=[analyst, researcher, writer],
tasks=[analysis_task, research_task, write_task]
)
# Execute (note: you'll need to integrate Corvic query into the analyst's execution)
result = crew.kickoff()