LocalMate / app /agent /mmca_agent.py
Cuong2004's picture
fix intent and add plan
51ba917
"""Multi-Modal Contextual Agent (MMCA) - ReAct Agent with MCP Tools.
Implements the Agent-Centric Orchestration pattern:
1. Parse user intent
2. Select appropriate MCP tool(s)
3. Execute tool(s) with logging
4. Synthesize final response with workflow trace
Supports multiple LLM providers: Google (Gemini) and MegaLLM (DeepSeek).
"""
import json
import re
import time
from dataclasses import dataclass, field
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from app.mcp.tools import mcp_tools
from app.shared.integrations.gemini_client import GeminiClient
from app.shared.integrations.megallm_client import MegaLLMClient
from app.shared.logger import agent_logger, AgentWorkflow, WorkflowStep
from app.shared.prompts import (
MMCA_SYSTEM_PROMPT as SYSTEM_PROMPT,
GREETING_SYSTEM_PROMPT,
INTENT_SYSTEM_PROMPT,
build_greeting_prompt,
build_intent_prompt,
build_synthesis_prompt,
)
# Default coordinates for Da Nang (if no location specified)
DANANG_CENTER = (16.0544, 108.2022)
# SYSTEM_PROMPT is imported from app.shared.prompts
@dataclass
class ChatMessage:
"""Chat message model."""
role: str # "user" or "assistant"
content: str
@dataclass
class ToolCall:
"""Tool call with arguments and results."""
tool_name: str
arguments: dict
result: list | None = None
duration_ms: float = 0
@dataclass
class ChatResult:
"""Complete chat result with response and workflow."""
response: str
workflow: AgentWorkflow
tools_used: list[str] = field(default_factory=list)
total_duration_ms: float = 0
tool_results: list = field(default_factory=list) # List of ToolCall with results
selected_place_ids: list[str] = field(default_factory=list) # LLM-selected place IDs
class MMCAAgent:
"""
Multi-Modal Contextual Agent with Logging and Workflow Tracing.
Implements ReAct (Reasoning + Acting) pattern:
1. Observe: Parse user message and intent
2. Think: Decide which tool(s) to use
3. Act: Execute MCP tools
4. Synthesize: Generate final response
Supports multiple LLM providers:
- Google: Gemini models
- MegaLLM: DeepSeek models (OpenAI-compatible)
"""
def __init__(self, provider: str = "MegaLLM", model: str | None = None):
"""
Initialize agent with LLM provider and model.
Args:
provider: "Google" or "MegaLLM"
model: Model name (uses default if None)
"""
self.provider = provider
self.model = model
self.tools = mcp_tools
self.conversation_history: list[ChatMessage] = []
# Initialize LLM client based on provider
if provider == "Google":
self.llm_client = GeminiClient(model=model)
else:
self.llm_client = MegaLLMClient(model=model)
agent_logger.workflow_step("Agent initialized", f"Provider: {provider}, Model: {model}")
async def chat(
self,
message: str,
db: AsyncSession,
image_url: str | None = None,
history: str | None = None,
) -> ChatResult:
"""
Process a chat message and return response with workflow trace.
Args:
message: User's natural language message
db: Database session for pgvector queries
image_url: Optional image URL for visual search
history: Optional conversation history string
Returns:
ChatResult with response, workflow, and metadata
"""
start_time = time.time()
# Initialize workflow tracking
workflow = AgentWorkflow(query=message)
# Log incoming request
agent_logger.api_request(
endpoint="/chat",
method="POST",
body={"message": message[:100], "has_image": bool(image_url), "has_history": bool(history)}
)
# Add user message to internal history
self.conversation_history.append(ChatMessage(role="user", content=message))
# Step 1: Analyze intent and plan tools (LLM-based)
workflow.add_step(WorkflowStep(
step_name="Intent Analysis",
purpose="Phân tích câu hỏi để chọn tool phù hợp"
))
agent_logger.workflow_step("Step 1: Intent Analysis", message[:80])
tool_calls = await self._plan_tool_calls(message, image_url)
# Set intent based on selected tools (from LLM)
if not tool_calls:
intent = "greeting"
else:
intent = " + ".join([tc.tool_name for tc in tool_calls])
workflow.intent_detected = intent
agent_logger.workflow_step("Intent detected", intent)
workflow.add_step(WorkflowStep(
step_name="Tool Planning",
purpose=f"Chọn {len(tool_calls)} tool(s) để thực thi",
output_summary=", ".join([tc.tool_name for tc in tool_calls])
))
# Step 2: Execute tools
agent_logger.workflow_step("Step 2: Execute Tools", f"{len(tool_calls)} tool(s)")
tool_results = []
for tool_call in tool_calls:
tool_start = time.time()
agent_logger.tool_call(tool_call.tool_name, tool_call.arguments)
result = await self._execute_tool(tool_call, db)
result.duration_ms = (time.time() - tool_start) * 1000
result_count = len(result.result) if result.result else 0
agent_logger.tool_result(
tool_call.tool_name,
result_count,
result.result[0] if result.result else None
)
# Add to workflow
workflow.add_step(WorkflowStep(
step_name=f"Execute {tool_call.tool_name}",
tool_name=tool_call.tool_name,
purpose=self._get_tool_purpose(tool_call.tool_name),
input_summary=json.dumps(tool_call.arguments, ensure_ascii=False)[:100],
result_count=result_count,
duration_ms=result.duration_ms
))
tool_results.append(result)
# Step 3: Synthesize response with history context
agent_logger.workflow_step("Step 3: Synthesize Response")
llm_start = time.time()
response, selected_place_ids = await self._synthesize_response(message, tool_results, image_url, history)
llm_duration = (time.time() - llm_start) * 1000
agent_logger.llm_response(self.provider, response[:100], tokens=None)
workflow.add_step(WorkflowStep(
step_name="LLM Synthesis",
purpose="Tổng hợp kết quả và tạo phản hồi",
duration_ms=llm_duration
))
# Add assistant response to internal history
self.conversation_history.append(ChatMessage(role="assistant", content=response))
# Calculate total duration
total_duration = (time.time() - start_time) * 1000
workflow.total_duration_ms = total_duration
# Log complete
agent_logger.api_response("/chat", 200, {"response_len": len(response), "places": len(selected_place_ids)}, total_duration)
return ChatResult(
response=response,
workflow=workflow,
tools_used=workflow.tools_used,
total_duration_ms=total_duration,
tool_results=tool_results,
selected_place_ids=selected_place_ids,
)
def _get_tool_purpose(self, tool_name: str) -> str:
"""Get human-readable purpose for tool."""
purposes = {
"retrieve_context_text": "Tìm kiếm semantic trong văn bản (review, mô tả)",
"retrieve_similar_visuals": "Tìm địa điểm có hình ảnh tương tự",
"find_nearby_places": "Tìm địa điểm gần vị trí được nhắc đến",
"search_social_media": "Tìm kiếm thông tin từ mạng xã hội (news, trends)",
}
return purposes.get(tool_name, tool_name)
async def _plan_tool_calls(
self,
message: str,
image_url: str | None = None,
) -> list[ToolCall]:
"""
Use LLM to analyze message and plan which tools to call.
Returns list of ToolCall objects with tool_name and arguments.
Returns empty list for greetings/small-talk (LLM detects via is_greeting).
"""
# If image is provided, always use visual search (fast path)
if image_url:
return [ToolCall(
tool_name="retrieve_similar_visuals",
arguments={"image_url": image_url, "limit": 5},
)]
# Use LLM to detect intent and select tools
intent_prompt = build_intent_prompt(message, has_image=bool(image_url))
try:
intent_response = await self.llm_client.generate(
prompt=intent_prompt,
temperature=0.2, # Low temperature for consistent JSON
system_instruction=INTENT_SYSTEM_PROMPT,
)
agent_logger.workflow_step("LLM Intent Detection", intent_response[:200])
# Parse JSON response
tool_calls = self._parse_intent_response(intent_response, message)
return tool_calls
except Exception as e:
agent_logger.error(f"Intent detection failed: {e}", None)
# Fallback to text search
return [ToolCall(
tool_name="retrieve_context_text",
arguments={"query": message, "limit": 5},
)]
def _parse_intent_response(self, response: str, original_message: str) -> list[ToolCall]:
"""Parse LLM intent detection response into ToolCall list."""
try:
# Extract JSON from response
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', response, re.DOTALL)
if json_match:
response = json_match.group(1)
# Find JSON object
json_start = response.find('{')
json_end = response.rfind('}')
if json_start != -1 and json_end != -1:
response = response[json_start:json_end + 1]
data = json.loads(response)
# Check if greeting
if data.get("is_greeting", False):
return []
# Parse tools
tools = data.get("tools", [])
tool_calls = []
for tool in tools:
name = tool.get("name")
arguments = tool.get("arguments", {})
# Validate tool name
valid_tools = ["retrieve_context_text", "find_nearby_places",
"search_social_media", "retrieve_similar_visuals"]
if name not in valid_tools:
continue
# Ensure required arguments
if name == "retrieve_context_text":
arguments.setdefault("query", original_message)
arguments.setdefault("limit", 5)
elif name == "find_nearby_places":
# Need to geocode location if provided
location = arguments.get("location", "")
arguments.setdefault("max_distance_km", 3.0)
arguments.setdefault("limit", 5)
# Will handle geocoding in execute step
elif name == "search_social_media":
arguments.setdefault("query", original_message)
arguments.setdefault("limit", 5)
arguments.setdefault("freshness", "pw")
tool_calls.append(ToolCall(tool_name=name, arguments=arguments))
# If no tools selected, default to text search
if not tool_calls:
tool_calls.append(ToolCall(
tool_name="retrieve_context_text",
arguments={"query": original_message, "limit": 5},
))
return tool_calls
except (json.JSONDecodeError, KeyError) as e:
agent_logger.error(f"Failed to parse intent JSON: {e}", None)
# Fallback to text search
return [ToolCall(
tool_name="retrieve_context_text",
arguments={"query": original_message, "limit": 5},
)]
async def _execute_tool(
self,
tool_call: ToolCall,
db: AsyncSession,
) -> ToolCall:
"""Execute a single tool and return results."""
try:
if tool_call.tool_name == "retrieve_context_text":
results = await self.tools.retrieve_context_text(
db=db,
query=tool_call.arguments.get("query", ""),
limit=tool_call.arguments.get("limit", 10),
)
tool_call.result = [
{
"place_id": r.place_id,
"name": r.name,
"category": r.category,
"rating": r.rating,
"similarity": r.similarity,
"description": r.description,
"source_text": r.source_text,
}
for r in results
]
elif tool_call.tool_name == "retrieve_similar_visuals":
results = await self.tools.retrieve_similar_visuals(
db=db,
image_url=tool_call.arguments.get("image_url"),
limit=tool_call.arguments.get("limit", 10),
)
tool_call.result = [
{
"place_id": r.place_id,
"name": r.name,
"category": r.category,
"rating": r.rating,
"similarity": r.similarity,
"matched_images": r.matched_images,
"image_url": r.image_url,
}
for r in results
]
elif tool_call.tool_name == "find_nearby_places":
# Handle geocoding if location name provided instead of lat/lng
lat = tool_call.arguments.get("lat")
lng = tool_call.arguments.get("lng")
if lat is None or lng is None:
# Try to geocode from location name
location = tool_call.arguments.get("location", "")
if location:
coords = await self.tools.get_location_coordinates(location)
if coords:
lat, lng = coords
else:
lat, lng = DANANG_CENTER
else:
lat, lng = DANANG_CENTER
results = await self.tools.find_nearby_places(
lat=lat,
lng=lng,
max_distance_km=tool_call.arguments.get("max_distance_km", 5.0),
category=tool_call.arguments.get("category"),
limit=tool_call.arguments.get("limit", 10),
)
tool_call.result = [
{
"place_id": r.place_id,
"name": r.name,
"category": r.category,
"distance_km": r.distance_km,
"rating": r.rating,
"description": r.description,
}
for r in results
]
elif tool_call.tool_name == "search_social_media":
results = await self.tools.search_social_media(
query=tool_call.arguments.get("query", ""),
limit=tool_call.arguments.get("limit", 10),
freshness=tool_call.arguments.get("freshness", "pw"),
platforms=tool_call.arguments.get("platforms"),
)
tool_call.result = [
{
"title": r.title,
"url": r.url,
"description": r.description,
"age": r.age,
"platform": r.platform,
}
for r in results
]
except Exception as e:
agent_logger.error(f"Tool execution failed: {tool_call.tool_name}", e)
tool_call.result = [{"error": str(e)}]
return tool_call
async def _synthesize_response(
self,
message: str,
tool_results: list[ToolCall],
image_url: str | None = None,
history: str | None = None,
) -> tuple[str, list[str]]:
"""
Synthesize final response from tool results with conversation history.
Returns:
Tuple of (response_text, selected_place_ids)
"""
# Collect all available place_ids from tool results
all_place_ids = []
for tool_call in tool_results:
if tool_call.result:
for item in tool_call.result:
if isinstance(item, dict) and 'place_id' in item:
all_place_ids.append(item['place_id'])
# If no tool results (greeting case), return simple response
if not tool_results:
prompt = build_greeting_prompt(message, history)
response = await self.llm_client.generate(
prompt=prompt,
temperature=0.7,
system_instruction=GREETING_SYSTEM_PROMPT,
)
return response, []
# Build context from tool results
context_parts = []
for tool_call in tool_results:
if tool_call.result:
context_parts.append(
f"Kết quả từ {tool_call.tool_name}:\n{json.dumps(tool_call.result, ensure_ascii=False, indent=2)}"
)
context = "\n\n".join(context_parts)
# Generate response using LLM with JSON format for place selection
prompt = build_synthesis_prompt(message, context, history)
agent_logger.llm_call(self.provider, self.model or "default", prompt[:100])
raw_response = await self.llm_client.generate(
prompt=prompt,
temperature=0.7,
system_instruction=SYSTEM_PROMPT,
)
# Parse JSON response
try:
# Extract JSON from code blocks
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', raw_response, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
# Try to find raw JSON
json_start = raw_response.find('{')
json_end = raw_response.rfind('}')
if json_start != -1 and json_end != -1:
json_str = raw_response[json_start:json_end + 1]
else:
# No JSON found, return raw response
return raw_response, []
data = json.loads(json_str)
text_response = data.get("response", raw_response)
selected_ids = data.get("selected_place_ids", [])
# Validate selected_ids are in available places
valid_ids = [pid for pid in selected_ids if pid in all_place_ids]
return text_response, valid_ids
except (json.JSONDecodeError, KeyError) as e:
agent_logger.error("Failed to parse synthesis JSON", e)
# Fallback: return raw response with no places
return raw_response, []
def _extract_location(self, message: str) -> str | None:
"""Extract location name from message using pattern matching."""
known_locations = {
"mỹ khê": "My Khe Beach",
"my khe": "My Khe Beach",
"bãi biển mỹ khê": "My Khe Beach",
"cầu rồng": "Dragon Bridge",
"cau rong": "Dragon Bridge",
"dragon bridge": "Dragon Bridge",
"bà nà": "Ba Na Hills",
"ba na": "Ba Na Hills",
"bà nà hills": "Ba Na Hills",
"sơn trà": "Son Tra Peninsula",
"son tra": "Son Tra Peninsula",
"hội an": "Hoi An",
"hoi an": "Hoi An",
"ngũ hành sơn": "Marble Mountains",
"ngu hanh son": "Marble Mountains",
"marble mountains": "Marble Mountains",
}
message_lower = message.lower()
for pattern, location in known_locations.items():
if pattern in message_lower:
return location
return None
def _extract_category(self, message: str) -> str | None:
"""Extract place category from message."""
categories = {
"cafe": ["cafe", "cà phê", "coffee"],
"restaurant": ["nhà hàng", "quán ăn", "restaurant", "ăn"],
"beach": ["bãi biển", "beach", "biển"],
"attraction": ["điểm tham quan", "du lịch", "attraction"],
"hotel": ["khách sạn", "hotel", "lưu trú"],
"bar": ["bar", "pub", "quán bar"],
}
message_lower = message.lower()
for category, keywords in categories.items():
if any(kw in message_lower for kw in keywords):
return category
return None
def clear_history(self) -> None:
"""Clear conversation history."""
self.conversation_history = []
# Default agent instance (using MegaLLM)
mmca_agent = MMCAAgent()